Zookeeper Source Code Reading (Fourth) Single Single Server

Foreword

The previous two articles mainly talked about the client-server session-related content, so here is the content of the client and the content of the client-server connection. Basically come to an end, the remaining part is the internal structure of the server part, the election of zk and the working mechanism of the server part, etc. This article mainly talks about the startup process of a single server, which will involve some internal working mechanisms and institutions of the server.

Server architecture

share picture

It can be seen that the server side of Zookeeper is mainly divided into several large modules. ZKDatabase is the internal memory database of the zk server. It maintains key data such as node data internally. It is responsible for the recording of snapshots and logs. It manages session timeouts and cluster elections. The other parts are mainly sessiontracker responsible for session management, request chain and Learner module responsible for processing requests (cluster communication? It is not particularly clear at this time).

Starting the stand-alone server

Main process of the stand-alone server:

share picture

From the figure above, you can see that the stand-alone server startup can be divided into two parts: pre-start and initialization.

Pre-launch

1. QuorumPeerMain is used as the startup class

Regardless of stand-alone or Cluster, QuorumPeerMain is configured as the startup entry class in both zkServer.cmd and zkServer.sh.

2. Analyze zoo.cfg

Students who have used ZK know that zoo.cfg is the zookeeper core configuration file configured by the user, ticktime, dataDir , DataLogDir, cluster ip:port, etc. are all configured in it. After instantiating the QuorumPeerMain object, the zoo.cfg file will be parsed.

QuorumPeerMain(Main)->QuorumPeerMain(initializeAndRun)->QuorumPeerConfig(parse)->QuorumPeerConfig(parseProperties)

The parseProperties function is too long. . . And they are all very simple property file value operations, you can simply look at them.

3. Create and start the historical file cleaner DatadirCleanupManager

The start method of DatadirCleanupManager is responsible for automatically cleaning up historical snapshots and transaction logs.

public void start() {if (PurgeTaskStatus.STARTED == purgeTaskStatus) {LOG.warn("Purge task is already running."); return;} // Don't schedule the purge task with zero or negative purge interval. if (purgeInterval <= 0) {LOG.info("Purge task is not scheduled."); return;} timer = new Timer("PurgeTask", true);// Use the Timer class of java to do timing tasks TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);//PurgeTask is a timertask timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));//Set Frequency purgeTaskStatus = PurgeTaskStatus.STARTED;}
4. Judging start mode
if (args.length == 1 && config. servers.size()> 0) {//Judging whether it is a cluster by analyzing the number of zoo.cfg servers. runFromConfig(config);//If it is a cluster, use the cluster startup method in QuorumPeerMain directly} else {LOG.warn("Either no config or no quorum defined in config, running "+" in standalone mode"); // there is only server in the quorum - run as stan dalone ZooKeeperServerMain.main(args);//If it is a stand-alone machine, use the stand-alone startup method}
5. parse zoo.cfg again

ZooKeeperServerMain (Main)->ZooKeeperServerMain(initializeAndRun)->ServerConfig(parse)->QuorumPeerConfig(parse)->QuorumPeerConfig(parseProperties).

The reason why there is another analysis here is because the main method of zookeeperserver is called, and the parameters that were originally parsed cannot be passed in. Moreover, the configuration file is relatively small, and the analysis is not particularly resource-intensive, which is acceptable.

6. Create an instance of ZookeeperServer

ZookeeperServer is the core class on the server side, and an instance of zookeeperserver will be created at startup.

final ZooKeeperServer zkServer = new ZooKeeperServer();

At this point, the so-called pre-start is completed. It can be seen that the Zookeeper server is in the pre-start phase I did the following things:

  1. Clean up historical snapshots and log files;
  2. Analyze configuration files and perform preliminary analysis to determine the server status (Standalone/Cluster);
  3. Instantiate ZookeeperServer.

Initialization

After instantiating the zookeeper server, the startup process of the zookeeper server comes to the initialization phase, which is also relatively long.

First in the runfromconfig method:

public void runFromConfig(ServerConfig config) throws IOException {LOG.info("Starting server"); FileTxnSnapLog txnLog = null ; try {// Note that this thread isn't going to be doing anything else, // so rather than spawning another thread, we will just call // run() in this thread. // create a file logger url from the command line args final ZooKeeperServer zkServer = new ZooKeeperServer();//1 // Registers shutdown handler which will be used to know the // server error or shutdown state changes. final CountDownLatch shutdownLatch = new CountDownLatch(1);//2 zkServer .registerServerShutdownHandler( new ZooKeeperServerShutdownHandler(shutdownLatch)); txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(//3 config.dataDir)); zkServer.setTxnLogFactory(txnLog); zkServer.setTickTime(config.tickTime) ; zkServer.setMinSessionTimeout(config.minSessionTimeout); zkServer.setMaxSessionTimeout(config.maxSessionTimeout); cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); cnxnServerFactory.startup(zkn // Watch status of ZooKeeper server. It will do a graceful shutdown // if the server is not running or hits an internal error. shutdownLatch.await(); shutdown(); cnxnFactory.join(); if (zkServer.canShutdown( )) {zkServer.shutdown(true);}} catch (InterruptedException e) {// warn, but generally this is ok LOG.warn("Server interrupted", e);} finally {if (txnLog != null) { txnLog.close();} }}
1. Create a server statistic ServerStats

In the above code at 1, which is zookeeperserver The Server statistic ServerStats is instantiated in the constructor.

public ZooKeeperServer() {serverStats = new ServerStats(this); //ServerStats counts basic server data such as the number of packets sent and received, delay information, etc. listener = new ZooKeeperServerListenerImpl(this);}

A brief introduction to ServerStats:

/** * Basic Server Statistics */ //You can also know ServerStats from the comments It is the basic class of server data public class ServerStats {private long packetsSent;//After zkserver is started, or after the latest recharge server statistics information, the number of responses sent by the server -> client private long packetsReceived;//after zkserver is started , Or the number of responses sent by the client to the server after the last recharge server statistics information private long maxLatency;//After zkserver is started, or after the last recharge server statistics information, the server request processing the maximum Delay private long minLatency = Long.MAX_VALUE;//After zkserver is started, or after the latest recharge server statistics, the minimum delay for server-side request processing private long totalLatency = 0;//Zkserver is started, or recently After a recharge of server-side statistics, the total delay of server-side request processing private long count = 0;//After zkserver is started, or after the last recharge of server-side statistics, the total number of client requests processed by the server-side private final Provider provider;//provider object provides some statistical data, as follows public interface Provider {public long getOutstandingRequests();//Get the number of requests in the queue that have not been processed, in zookeeperserver and finalrequestprocessor public long getLastProcessedZxid();//Get The latest zxid public String getState();//Get server status public int getNumAliveConnecti ons();//Get the total number of surviving client connections} public ServerStats(Provider provider) {//Constructor this.provider = provider; }
2. Create a data manager FileTxnSnapLog

FileTxnSnapLog is the docking layer between the Zookeeper upper server and the underlying data storage. It provides a series of interfaces for operating data files, such as transaction log files and snapshot data files. Zookeeper creates FileTxnSnapLog based on the snapshot data directory dataDir and the transaction log directory dataLogDir parsed in the zoo.cfg file.

In fact, FileTxnSnapLog here contains the snapshot and log class FileTxnLog mentioned in Chapter 3 and 4, the function class of FileSnap, FileTxnLog, dataDir and snapDir (dataLogDir) since the creation of FileSnap generate.

3. Set server tickTime and session timeout limit
zkServer.setTickTime(config.tickTime); zkServer.setMinSessionTimeout(config.minSessionTimeout);zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
4. Create ServerCnxnFactory

By configuring the system property zookeper.serverCnxnFactory Specify whether to use Zookeeper's own implementation of NIO or use the Netty framework as the Zookeeper server network connection factory.

cnxnFactory = ServerCnxnFactory.createFactory();//Create ServerCnxnFactorycnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());//Initialize ServerCnxnFactorycnxnFactory.startup(zkServer); //Start ServerCnxnFactory

As mentioned before, reflection is used here.

static public ServerCnxnFactory createFactory() throws IOException {String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);//Read configuration if (serverCnxnFactoryName == null) {serverCnxnFactoryName = NIOServerCnxnFactory.class. getName();//The default is NIO implementation} try {ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)//If configured, it will be instantiated according to the configuration. Reflection.getDeclaredConstructor().newInstance(); LOG.info("Using {} as server connection factory", serverCnxnFactoryName); return serverCnxnFactory;} catch (Exception e) {IOException ioe = new IOException("Couldn't instantiate "+ serverCnxnFactoryName); ioe.initCause(e); throw ioe; }}
5. Initialize ServerCnxnFactory

NIOServerCnxnFactory(configure)

@Overridepublic void configure(InetSocketAddress addr, int maxcc) throws IOException {configureSaslLogin(); thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);//The runnable object passed in is ServerCnxnFactory Implementation class thread.setDaemon(true);//Set to daemon thread maxClientCnxns = maxcc;//NIO related settings this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info( "binding to port "+ addr); ss.socket().bind(addr); ss.configureBlocking(false); ss.register(selector, SelectionKey.OP_ACCEPT);} 

As you can see, Zookeeper will initialize Thread as the main thread of ServerCnxnFactory, and then initialize the NIO server. The runnable object passed in by the thread initialized by zookeeper is still the implementation class of ServerCnxnFactory, which means that ServerCnxnFactory is still executed during run.

6. Start ServerCnxnFactory main thread
@Overridepublic void startup(ZooKeeperServer zks) throws IOException, InterruptedException {start(); //Start the main thread setZooKeeperServer(zks);//Set the server-side object zks.startdata();//Restore data, etc. zks.startup();}

The start method starts the thread.

public void start() {// ensure thread is started once and only once if (thread.getState() == Thread.State.NEW) {//If it is just started thread.start();//Start thread}}

The run method of NIOServerCnxnFactory is some basic settings of NIO asynchronous connection, such as connection establishment.

7. Recovering local data

Every time zk is started, you need to find data files and transaction log files from the local block for data recovery. NIOServerCnxnFactory(startdata)->ZooKeeperServer(startdata) has performed the operation of restoring data.

8. Create and start session manager

The so-called session manager is the sessiontracker mentioned earlier.

public synchronized void startup() {if (sessionTracker == null) {createSessionTracker();//Create session manager} startSessionTracker();//Start session manager setupRequestProcessors( );//Initialize the zookeeper request processing chain registerJMX();//Register JMX service setState(State.RUNNING); notifyAll();}
protected void createSessionTracker() {sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, 1, getZooKeeperServerListener());//Create a sessiontrack, initialize the variables such as sessionsWithTimeout, expirationInterval in the manager, and in particular, initialize the sessionId together here}< /pre> 
9. Initiate Zookeeper's request processing chain
protected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this) ; RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start();}

Share a picture

The zookeeper request processing method is based on the chain of responsibility model, which means that there are multiple request processors on the server side to process a client request at a time. When the server starts, these processors are connected in series to form a processing chain. The picture above is the processor of a stand-alone server.

10. Register JMX service

The information of the ZK server will be exposed to the outside in the form of JXM. I don't know much about it here.

11. Register ZK server instance
public void submitRequest(Request si) {if (firstProcessor == null) {synchronized (this) {try {// Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. while (state == State.INITIAL) {//If you will wait while initializing wait(1000);}} catch (InterruptedException e) {LOG.warn("Une

The comment of the above code can know that it is at initialization Request submission processing will wait, and this function is called in NIOServerCnxnFactory(run)->NIOServerCnxn(doIO)->ZookeeperServer(processConnectRequest)->ZookeeperServer(createSession)->ZookeeperServer(submitRequest), which is also called in the previous startup ServerCnxnFactory will wait here when it is the main thread.

setState(State.RUNNING);notifyAll();

And here it will notify when the thread can Fully working.

Thinking

JMX needs to learn more

request chain and log/snapshot cleanup process

reference

http://www.cnblogs.com/ leesf456/p/6105276.html

https://www.jianshu.com/p/47cb9e6d309d
https://my.oschina.net/pingpangkuangmo/blog/491673

< p>https://my.oschina.net/xianggao/blog/537902

https://www.jianshu.com/p/76d6d674530b

From paxos to zk

p>

Leave a Comment

Your email address will not be published.