Use Kazoo to operate ZooKeeper service governance

The reliability and scalability of a stand-alone service is limited, and the downtime of a certain service may affect the normal operation of the entire system Use; Distributed services can effectively solve this problem, but at the same time distributed services will also bring some new problems, such as: service discovery (how to ensure that the client is aware of new or deleted services), disaster tolerance ( Some services fail to allow clients to only access normal services); ZooKeeper is mainly proposed to solve the governance problem of distributed services. It coordinates and manages services in a distributed environment.

The process of Zookeeper coordination and management service is as follows :

share picture

Server: Each server must register with Zookeeper, the registration center, and maintain a connection with Zookeeper. If the server is disconnected from Zookeeper, Zookeeper will delete the server’s address.
Client: When a service is needed, first subscribe to Zookeeper the address information of the server, Zookeeper returns to the client a list of registered server information, and the client starts from Select the server in the server information list to call the service. If the server information recorded by Zookeeper changes, the server will notify the client of the change event, and the client can obtain the latest server information.
The data structure of ZooKeeper file system is a tree structure, and each node (znode) of it is identified by a name, and is divided by path/division:
span>

share picture

ZooKeeper The types of nodes are:

1. Persistent node (ZooKeeper default node type. After the client that created the node disconnects, The persistent node still exists)

2. Sequence node (append a 10-digit serial number to the original name to set the path of the node, such as :/Server0000000001)

3. Temporary node (when the client is disconnected from ZooKeeper, the temporary node will be automatically deleted)

span>

Register the RPC service to ZooKeeper

Server:

 1 import threading
2 import json
3 import socket
4 import sys
5 from kazoo.client import KazooClient
6 from divide_rpc import< span style="color: #000000"> ServerStub
7 from divide_rpc import InvalidOperation
8
9
10 class ThreadServer(object):
11 def __init__(self, host, port, handlers):
12 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
13 self.sock.setsockopt(socket. SOL_SOCKET, socket.SO_REUSEADDR, 1)
14 self.host = host
15 self.port = port< br />16 self.sock.bind((host, port))
17 self.han dlers = handlers
18
19 def serve(self):
20 """
21 span> Start service
22 """
23 self.sock.listen(128)
24 self.register_zk()
25 print("Start listening")
26 while True:
27 conn, addr = self.sock.accept()
28 print("Create link %s" % str(addr))
29 t = threading.Thread(target=self.handle, args=(conn,))
30 t.start()
31
32 def handle(self, client):
33 stub = ServerStub(client, self.handlers)
34 try:
35 while True:
36 stub.process()
37 except< /span> EOFError:
38 print("The client closes the connection")
39
40 client.close()
41
42 def register_zk(self):
43 """
44 Register to zookeeper
45 """
46 self.zk = KazooClient(hosts='127.0.0.1:2181')
47 self.zk.start()
48 self.zk.ensure_path('/rpc') #
Create a root node
49< /span> value = json.dumps({'host': self.host, 'port': self.port})
50 # Create a service child node
51 self.zk.create('/rpc/server', value.encode(), ephemeral=True, sequence=True)
52
53
54 class Handlers:
55 @staticmethod
56 def divide(num1, num2=1):
57 """
58 Division
59 :param num1:
60 :param num2:
61 :return:
62 """
63 if num2 == 0:
64 raise InvalidOperation()
65 val = num1 / num2
66 return val
67
68
69 if __name__ == '__main__' :
70 if len(sys.argv) <3:
71 print("usage:python server.py [host ] [port]")
72 exit(1)
73 host = sys.argv[1]
74 port = sys.argv[2< span style="color: #000000">]
75 server = ThreadServer( host, int(port), Handlers)
76 server.serve()

< span style="font-size: 18pt">The server connects to zookeeper through kazoo, and creates the root node and the child nodes of the service in turn. When the multi-threaded server is started, different nodes will be created according to the ip and port, and the two will be started in turn server(8001, 8002), view the zookeeper node information:

1 >> > from kazoo.client import KazooClient
2 span> >>> zk = KazooClient(hosts='127.0.0.1:2181')
3> >> zk.start()
4 >>> children = zk.get_children ("/rpc" )
5 >>> print(children)
6 ['server0000000001', 'server0000000000']

Client:

 1 import random
2 import time
3 span> import json
4 import socket
5 from divide_rpc import (
6 ClientStub, InvalidOperation
7 )
8 from kazoo.client import KazooClient
9
10
11 class DistributedChannel(object):
12 def __init__(self):
13 self._zk = KazooClient(hosts='127.0.0.1:2181')
14 self._zk.sta rt()
15 self._get_servers()
16
17 def< /span> _get_servers(self, event=None):
18 """
19 Get the server from zookeeper Address information list
20 """
< span style="color: #008080">21
servers = self._zk.get_children(' /rpc', watch=self._get_servers)
22 print(s ervers)
23 self._servers = []
24 for server in servers:
25 data = self._zk.get('/rpc/' + server)[0]
26 if span> data:
27 addr = json.loads(data.decode())
28 self._servers .append(addr)
29
30 def _get_server(self):
31 """
32 Randomly select an available server
33 """
34 return random.choice(self._servers)
35
36 def get_connection(self):
37 """
38 Provide a usable tcp connection
39 """
40 while True:
41 server = self._get_server()
42 print< span style="color: #000000">(server)
43 try< /span>:
44 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
45 sock.connect((server['host'], server['p ort']))
46 except ConnectionRefusedError:
47 time.sleep(1)
48< /span> continue
49 else:
50 break
51 return sock
52
53 < br />54 channel = DistributedChannel()
55
56 for i in range(50):< br />57 try :
58 stub = ClientStub(channel)
59 val = stub.divide(i)
60 except InvalidOperation as e:
61 print(e.message)
62 else:
63 print(val)
64 time.sleep(1)

< /p>

The client connects to zookeeper, obtains server information through get_children, and watches the changes of the server. After starting the client, it will find that it will call the server on port 8001 And 8002 port server:

share picture

At this time, the server adds a new node, 8003, the change of the client: span>

share picture

It can be seen that zookeeper has a total of three nodes. The servers called before are 8001 and 8002. When 8003 joins, zookeeper will Find and call it

At this time, the server disconnects a server, 8001, and the client changes:

share picture

Before disconnecting the server, the client will call the three services 8001, 8002, and 8003. After opening server 8001, zookeeper will only call 8002 and 8003 servers

 1 import threading
2< /span> import json
3 import socket
4 import sys
5 from kazoo.client import KazooClient
6 from divide_rpc import ServerStub
7 from divide_rpc import InvalidOperation
8
9
10< /span> class ThreadServer(object):
11 def __init__(self, host, port, handlers):
12 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
13 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR , 1)
14
self.host = host
15 self.port = port
16 self.sock.bind((host, port))
17 self.handlers = handlers
18
19 span> def serve(self):
20 """
21 Start service
22 """
23 self.sock.listen(128)
< /span>24 self.register_zk()
25 print("Start listening")
26 while True:
27 conn, addr = self.sock.accept()
28 print("Create link %s" % str(addr))
< /span>29 t = threading.Thread(target=self.handle, args=(conn,))
30 t.start()< br />31
32 def handle(self, client):
33 stub = ServerStub(client, self.handlers)
34 try:
35 while True:
36 stub.process()
37 except EOFError:
38 print("The client closes the connection")
39
40 client.close()
41
42 def register_zk(self):
43 """
44 Register to zookeeper< br />45 """
46 self.zk = KazooClient(hosts='127.0.0.1: twenty one 81')
47 self.zk.start()
48 self .zk.ensure_path('/rpc' ) # Create a root node
49 value = json.dumps({'host': self.host, 'port': self.port})
50 # Create service child node
51 self.zk.create('/rpc/server', value.encode(), ephemeral=True, sequence=True)
52
53
54 class Handlers:
55 @staticmethod
56 def< /span> divide(num1, num2=1):
57 """
58 除法
59 :param num1:
60 :param num2:
61 :return:
62 """
63 if num2 == 0:
64 raise InvalidOperation()
65 val = num1 / num2
66 return val
67
68
69 if __name__ == __main__:
70 if len(sys.argv) < 3:
71 print("usage:python server.py [host] [port]")
72 exit(1)
73 host = sys.argv[1]
74 port = sys.argv[2]
75 server = ThreadServer(hos t, int(port), Handlers)
76 server.serve()

1 >>> from kazoo.client import KazooClient
2 >>> zk = KazooClient(hosts=127.0.0.1:2181)
3 >>> zk.start()
4 >>> children = zk.get_children("/rpc")
5 >>> print(children)
6 [server0000000001, server0000000000]

 1 import random
2 import time
3 import json
4 import socket
5 from divide_rpc import (
6 ClientStub, InvalidOperation
7 )
8 from kazoo.client import KazooClient
9
10
11 class DistributedChannel(object):
12 def __init__(self):
13 self._zk = KazooClient(hosts=127.0.0.1:2181)
14 self._zk.start()
15 self._get_servers()
16
17 def _get_servers(self, event=None):
18 """
19 从zookeeper获取服务器地址信息列表
20 """
21 servers = self._zk.get_children(/rpc, watch=self._get_servers)
22 print(servers)
23 self._servers = []
24 for server in servers:
25 data = self._zk.get(/rpc/ + server)[0]
26 if data:
27 addr = json.loads(data.decode())
28 self._servers.append(addr)
29
30 def _get_server(self):
31 """
32 随机选出一个可用的服务器
33 """
34 return random.choice(self._servers)
35
36 def get_connection(self):
37 """
38 提供一个可用的tcp连接
39 """
40 while True:
41 server = self._get_server()
42 print(server)
43 try:
44 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
45 sock.connect((server[host], server[port]))
46 except ConnectionRefusedError:
47 time.sleep(1)
48 continue
49 else:
50 break
51 return sock
52
53
54 channel = DistributedChannel()
55
56 for i in range(50):
57 try:
58 stub = ClientStub(channel)
59 val = stub.divide(i)
60 except InvalidOperation as e:
61 print( e.message)
62 else:
63 print(val)
64 time.sleep(1)

Leave a Comment

Your email address will not be published.