ZooKeeper is built with Kafka cluster

First of all, this blog doesn’t have any theoretical stuff, it just explains in detail the process of setting up a cluster of kafka and zookeeper, which requires three linux servers.

  • java environment variable settings
  • zookeeper cluster construction
  • kafka cluster construction

java environment variable settings< /h3>

Java environment variables are set on each server

Here is the way to install java source code:

Download the source package and unzip it, and put it in /usr/ Under the local/ folder, change the name of the directory name to jdk! The next step is to add the java command parameters to the linux environment variables.

[[email protected] jdk]# cat /etc/profile.d/java.sh 
[[emailprotected] jdk]#

#Then execute the source command to load the newly added java.sh script!
[[email Protected] bin]# source /etc/profile.d/java.sh

#Verify whether the java environment variable is set successfully
[[emailprotected] bin]# java -version
java version "1.7.0_79"
Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)
[[email Protected] bin]# echo $JAVA_HOME #Check whether the JAVA_HOME variable is the path set above
[[emailprotected] bin]#

If the above feedback is normal, it means that the java environment has been set up!

zookeeper cluster construction

zookeeper cluster is called a group. Zookeeper uses a consensus protocol, so it is recommended that each group should contain an odd number of nodes, because only when most of the nodes in the group are available, zookeeper can process external requests. In other words, if a group contains 3 nodes, then it allows one node to fail. If it contains 5 nodes, then it allows 2 nodes to fail. [It is not recommended that the zookeeper group has more than 7 nodes, because zookeeper uses the consistency protocol, too many nodes will reduce the performance of the group]

in the group Each server needs to create a myid file in the data directory to indicate its own ID.

Here zookeeper is installed using source code:

[[emailprotected] src]# tar zxvf zookeeper-3.4.6.tar.gz -C ../ #Unzip
[[emailprotected] src]# cd ..
[[email protected] local]#
mv zookeeper-3.4.6 zookeeper #modify name
[[ emailprotected] local]# cd zookeeper
[[emailprotected] zookeeper]#
bin CHANGES.txt contrib docs ivy.xml LICENSE.txt README_packaging.txt recipes zookeeper
-3.4.6.jar zookeeper-3.4.6.jar.md5 zookeeper.out
build.xml conf dist
-maven ivysettings.xml lib NOTICE.txt README.txt src zookeeper-3.4.6.jar.asc zookeeper-3.4.6 .jar.sha1
[[email protected] zookeeper]#
mkdir -p / data/zookeeper/data #Create a data directory
[[emailprotected] zookeeper]# mkdir -p /data/zookeeper/logs #Create log file directory
[[emailprotected] zookeeper ]# cd conf
[[email protected] conf]#
configuration.xsl log4j.properties zoo_sample.cfg
[[emailprotected] conf]#
mv zoo_sample.cfg zoo.cfg #Modify the name of the configuration file , It is recommended to back up the original configuration file

The configuration file of zookeeper is as follows:

# The number of milliseconds of each tick
tickTime=2000 #Unit is milliseconds
# The number of ticks that the initial
# synchronization phase can take
= 10
# indicates the upper limit for establishing the initial connection between the slave node and the master node.

# The number of ticks that can pass between
# sending a request and getting an acknowledgement
# indicates the upper limit of the time that the slave node and the master node are not synchronized with the turntable.
#initLimit and syncLimit are both multiples of tickTime, which means the time is multiplied by tickTime The value of these two values.
# the directory where the snapshot is stored.
do not use /tmp for storage, /tmp here is just
# example sakes.
=/data /zookeeper/data #Specify data directory file
=/data/zookeeper/ logs #Specify the log directory file
# the port at
which the clients will connect
=2181 #zookeeperListener Port of
# the maximum number of client connections.
# increase this
if you need to handle more clients
#maxClie ntCnxns
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
# http:
# The number of snapshots to retain
in dataDir
=< span style="color: #800080">3

# Purge task interval
in hours
# Set to
"0" to disable auto purge feature
#autopurge.purgeInterval< /span>=1

#zookeeper's group settings

< span style="color: #800080">3
= 2888:3888

#The format of group settings is: server.X=hostname:peerPort :leader:Port
X: The ID of the server. It must be an integer, but it does not have to start from 0 or be continuous.
peerPort: TCP port used for communication between nodes.
leaderPort: TCP port used for leader election.

The client only needs to connect to the group through clientport, and the communication between group nodes needs to use these three ports at the same time.

[The above big operation is just to modify the configuration file, create the data directory and log file directory, and decompress the source code package!

The next thing to do is to do the above operations on the other two machines, or you can copy the above files directly to the corresponding positions of the other two machines through the scp command. 】

Like the MySQL cluster, each server in the cluster needs a unique identifier, and zookeeper also needs a myid file to identify each server!

[[email protected] data]# pwd
[[emailprotected] data]#
cat myid

#Create a myid file under the data file directory of each cluster, and write in each file An integer, this integer is used to indicate the ID of the current server.
#This ID should be consistent with the X in the server.X in the above configuration file. [Encountered inconsistencies, but failed to start, modify to the same, then start successfully]

The myid files of the other two sets are as follows:

[[emailprotected] data]# pwd
[[email protected] data]#
cat myid

[[emailprotected] data]# pwd
[[email protected] data]# cat myid

Then start the zookeeper cluster:

#All three servers are started in this way, and the nohup.out file will be generated under the execution path of the current directory. If an error is reported, you can view the file content! 
[[email Protected] bin]# pwd
[[email Protected] bin]# ./zkServer.sh start
JMX enabled by default
Using config:
/usr/local/zookeeper/bin/../conf/< span style="color: #000000">zoo.cfg
Starting zookeeper ... STARTED

#It should be noted that after the startup, you must check whether there is a process, and sometimes no error will be reported. , But did not start successfully!
[[email protected] bin]# netstat -alntp | grep 2181 #Check if the monitoring port exists
tcp 0 :::2181 LIST 27 pre>

After all three servers are started, perform the following verification:

[[emailprotected] bin]# ./zkCli.sh --server10.0.102.204
Connecting to localhost:
2018-12-21 10:54:38,286 [myid:]-INFO [main:[email protected]100]-Client environment:zookeeper.version=< span style="color: #800080">3.4.6-1569965, built on 02/20/2014 09:09 GMT
2018-12-21 10:54:38,289 [myid :]-INFO [main:[email protected]100]-Client environment:host.name=test3
2018-12-21 10:54:38,289 [myid:]-INFO [main:[email protected]100]-Client environment:java.version=1.7.0_79
2018 -12-21 10 :54:38,291 [myid:]-INFO [main:[email protected]100]-Client environment:java.vendor= Oracle Corporation
2018-12-21 10:54:38,291 [myid:]-INFO [main:[emailprotected]100]-Client environment:java.home=/usr/local/jdk/jdk1.7.0_79/jre
2018-12-21 10:54:38,291 [myid:]-INFO [main:[email protected]]-Client environment:java.class.path=/usr /local/zookeeper/bin/../build/classes:/usr/local/zookeeper/bin/../build/lib/*.jar:/usr/local/zookeeper/bin/../lib/slf4j- log4j12-1.6.1.jar:/usr/local/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/usr/local/zookeeper/bin/../lib/netty-3.7. 0.Final.jar:/usr/local/zookeeper/bin/../lib/log4j-1.2.16.jar:/usr/local/zookeeper/bin/../lib/jline-0.9.94.jar: /usr/local/zookeeper/bin/../zookeeper-3.4.6.jar:/usr/local/zookeeper/bin/../src/java/lib/*.jar:/usr/local/zookeeper/bin /../conf:

2018-12-21 10:54:38,291 [myid:]-INFO [main:[email protected]100]-Client environment:java.library. path=/usr/local/mysql/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2018-12-21 10:54:38,291 [myid:]-INFO [main:[emailprotected]100]-Client environment:java.io.tmpdir=/tmp
2018 -12-21 10 :54:38,292 [myid:]-INFO [main:[email protected]100]-Client environment:java.compiler=
2018-12-21 10:54:38,292 [myid:]-INFO [main:[email protected]100]-Client environment:os.name=Linux
2018-12-21 10:54:38,292 [myid:]-INFO [main:[emailprotected]100]-Client environment:os.arch=amd64
2018-12-21 10:54:38,292 [myid:]-INFO [main:[email protected]100]-Client environment:os.version=2.6.32-504.el6.x86_64
2018-12 span>-21 10:54 span>:38,292 [myid:]-INFO [main:[emailprotected]< span style="color: #800080">100
]-Client environment:user.name=root
2018-12-21 10:54:38,292< /span> [myid:]-INFO [main:[email protected]100]-Client environment:user.home=/root
2018-12-21 10:54:38,292 [myid:]-INFO [main:[emailprotected]100]-Client environment:user.dir=/usr/local/zookeeper/ bin
2018-12-21 10:54:38 span>,293 [myid:]-INFO [main:[email protected]438]- Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=[email protected]
Welcome to ZooKeeper
2018-12-21 10:54:38,315 [myid:]-INFO [main -SendThread(localhost:2181):[email protected]975]-Opening socket connection to server localhost/127.0. 0.1:2181. Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2018- 12-21 10: 54:38,320 [myid:]-INFO [main-SendThread( localhost:2181):[emailprotected]852]-Socket connection established to localhost/, initiating session
2018-12< /span>-21 10:54:38,330 [myid:]-INFO [main-SendThread(localhost:2181):[emailprotected]1235]-Session establishment complete on server localhost/< /span>:2181, sessionid = 0x367ce7db1fa0003, negotiated timeout = 30000


WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:
2181(CONNECTED) 0]

#If the above is displayed, the zookeeper cluster is successfully built

Build Kafka cluster

The installation method is still using source code. The installation process is similar to zookeeper, unzip, modify the configuration file, and create the corresponding message catalog.

[[email protected] src]# tar -zxvf kafka_2.10- -C ../ #解压
[[emailprotected] src]# cd ..
[[emailprotected] local]#
mv kafka_2.10- kafka #modify name
[[emailprotected] local]# cd kafka
[[emailprotected] kafka]#
bin config libs LICENSE logs NOTICE site
[[emailprotected] kafka]# cd config
/ #
[[email protected ] config]#
ls #kafka has many configuration files, but for now we only need to pay attention to server.propertiesls #kafka span>
-console-sink.properties connect-file -sink.properties connect-standalone.properties producer.properties zookeeper.properties
-console-source.properties connect-file-source.properties consumer.properties server.properties
-distributed.properties connect-log4j.properties log4j. properties tools-log4j.properties

The configuration file is as follows: delete the comments!

broker.id=3 #In each The only one in the kafka server is an integer
= false < span style="color: #000000">
delete.topic.enable = true


socket.send.buffer .bytes

socket.receive.buffer.bytes< /span>=102400





=168< /span>



= 2181, :2181,

Description of configuration file parameters:

  • broker.id: Every broker needs an identification Symbol, which is represented by broker.id. The default is 0, which can be set to any integer, and must be unique in the Kafka cluster.
  • port: The default port is 9092, configure the port that Kafka listens to.
  • zookeeper.connect: The zookeeper address used to store broker metadata is specified by zookeeper.connect. The new style is like the above, the host and port number, and multiple addresses are separated by commas. The above three are a zookeeper cluster. If the currently connected zookeeper is down, the broker can connect to other servers in the zookeeper group.
  • log.dirs: Kafka saves all messages on the disk, and the directory for storing fragments of these days is specified by log.dirs. It is a set of local file system paths separated by commas. If multiple paths are specified, the broker will save the log fragments of the same partition to the same path according to the "least use" principle. It should be noted that borker will add partitions to the path with the least number of partitions instead of adding partitions to the path with the smallest disk space.
  • num.recovery.threads.per.data.dir, for the following three situations, how many threads will Kafka use to process log fragments?
    •    The server starts normally and is used to open the log fragments of each partition.
    • The server restarts after a crash, which is used to check and truncate the log fragments of each partition.
    • The server is shut down normally, which is used to close log fragments.

By default, each log directory uses only one thread. Because these threads are only used when the server is started and shut down, a large number of threads can be set up to achieve the purpose of parallel operation. Especially for a server with a large number of partitions, once a crash occurs, using parallel operations during recovery may save a lot of time. The configured number corresponds to a single log directory specified by log.dirs. For example, if this value is set to 8, and log.dirx specifies 3 paths, it takes 24 threads for a long time.

  • auto.create.topics.enable By default, Kafka will create topics in the following situations.
    •    When a producer starts writing messages to the topic.
    • When a consumer starts to read the message from the topic.
    • When any client sends a metadata request to the subject.

     This parameter is a bit confusing, try to set it to false!

The log directory of kafa is specified above, and a message directory needs to be created:

[[emailprotected] ~]# mkdir -p /data/kafka/logs

Like zookeeper, decompress the files above and create the log directory , Respectively copy to the other two servers, pay attention to modify the broker.id of the other two servers.

Then start kafka:

[[emailprotected] bin]# ls 
-distributed.sh kafka-consumer-groups.sh kafka-reassign-partitions.sh kafka- simple-consumer-shell.sh zookeeper-server-start.sh
-standalone.sh kafka-consumer-offset-checker.sh kafka-replay-log-producer.sh kafka-topics.sh zookeeper-server-stop.sh
-acls.sh kafka-consumer-perf-test.sh kafka-replica-verification.sh kafka-verifiable-consumer.sh zookeeper-shell.sh
-configs.sh kafka-mirror-maker.sh kafka-run-class.sh kafka-verifiable-producer.sh
-console-consumer.sh kafka-preferred-replica-election.sh kafka-server-start.sh windows
-console-producer.sh kafka-producer-perf-test.sh kafka-server-stop.sh zookeeper-security-migration.sh


[[email protected] bin]# ./kafka-server-start.sh /usr/local/kafka/config/server.properties 1>/dev/null 2>&1 &
[1] 8757
[[email protected] bin]#


[[email protected] bin]# netstat -lntp |grep 9092
tcp        0      0 ::ffff:    :::*                        LISTEN      8757/java           
[[email protected] bin]#


[[email protected] bin]# ./kafka-console-producer.sh --broker-list --topci science       #创建一个主题
topci is not a recognized option
[[email protected] bin]#

[[email protected] bin]# ./kafka-console-producer.sh --broker-list --topic science
test message
[[email protected] bin]# ./kafka-console-consumer.sh --zookeeper --topic science --from-beginning
test message




[[email protected] bin]# ./kafka-topics.sh --zookeeper --describe --topic science         #查看主题的信息
Topic:science PartitionCount:
1 ReplicationFactor:1 Configs:
Topic: science Partition:
0 Leader: 2 Replicas: 2 Isr: 2
[[email protected] bin]#



[[email protected] bin]# ./kafka-console-producer.sh  --broker-list localhost:9092 --topic lianxi
2018-12-20 17:33:27,112] ERROR Error when sending message to topic lianxi with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after
60000 ms.



[[email protected] ~]# java -version
java version
Java(TM) SE Runtime Environment (build
Java HotSpot(TM)
64-Bit Server VM (build 24.79-b02, mixed mode)

[[email protected] libs]# pwd
[[email protected] libs]# ls kafka_2.10-


[[email protected] zookeeper]# ls
bin        CHANGES.txt  contrib     docs             ivy.xml  LICENSE.txt  README_packaging.txt  recipes  zookeeper-3.4.6.jar      zookeeper-3.4.6.jar.md5
build.xml  conf         dist-maven  ivysettings.xml  lib      NOTICE.txt   README.txt            src      zookeeper-3.4.6.jar.asc  zookeeper-3.4.6.jar.sha1
[[email protected] zookeeper]# #zookeeper的版本是3.4.6

