Linux Hospital Kafka

kafka is a distributed message queue. With high performance, persistence, multiple copy backup, horizontal expansion capabilities. The producer writes a message to the queue, and the consumer fetches the message from the queue for business logic. Generally play the role of decoupling, peak clipping, and asynchronous processing in architecture design. Kafka uses the concept of topic externally. Producers write messages to topics, and consumers read messages from them. In order to achieve horizontal expansion, a topic is actually composed of multiple partitions. When a bottleneck is encountered, horizontal expansion can be performed by increasing the number of partitions. The order of messages is guaranteed within a single partition. Every time a new message is written, Kafka writes in the corresponding file append, so the performance is very high.

vBasic knowledge

What is Message Queue?

Message

Data transmitted between two computers or two communication devices in the network. For example: text, music, video, etc.

Queue (Queue)

A special linear table (data elements are connected end to end). The special feature is that only the header is allowed Delete elements and append elements to the end. Join and leave the team.

Message Queue (MQ)

Message + Queue, the queue for storing messages. The container in the process of message transmission; mainly provides production and consumption interfaces for external calls for data storage and acquisition.

MQ classification

MQ is mainly divided into two categories: point-to-point (p2p), publish and subscribe (Pub/Sub)

< p>Common:

The message producer produces the message and sends it to the queue, and then the message consumer reads and consumes the message from the queue.

Different points:

p2p model includes: message queue (Queue), sender (Sender), receiver (Receiver)

p>

A message produced by a producer has only one consumer (Consumer) (that is, once consumed, the message is not in the message queue). For example, make a phone call.

Pub/Sub includes: message queue (Queue), topic (Topic), publisher (Publisher), subscriber (Subscriber). Each message can have multiple consumers, and they do not affect each other. For example, I posted a Weibo: people who follow me can see it.

So in the field of big data, in order to meet the increasing amount of data, there is also a model that can meet the generation and consumption of millions of messages, distributed and durable Stable product-Kafka.

vKafka concept

In To understand Kafka, you must first understand the main terms such as subject, broker, producer, and consumer. The figure below illustrates the main terms, and the table describes the chart components in detail. If you already know, you can skip this part.

Linux install Kafka

In the picture above, the theme Configured as three partitions. Partition 1 has two offset factors of 0 and 1. Partition 2 has four offset factors of 0, 1, 2, and 3. Partition 3 has an offset factor of 0. The id of the replica is the same as the id of the server that hosts it.

Assuming that if the replication factor of the topic is set to 3, then Kafka will create 3 identical copies of each partition and place them in the cluster to make them available for all its operations. In order to balance the load in the cluster, each agent stores one or more of these partitions. Multiple producers and consumers can publish and retrieve messages at the same time.

Topics: Every message posted to the Kafka cluster has a category, which is called topic. (Physically, messages of different topics are stored separately. Logically, although the messages of a topic are stored on one or more brokers, users only need to specify the topic of the message to produce or consume data without worrying about where the data is stored)

Partition:Parition is a physical concept. Each topic contains one or more partitions. When creating a topic, you can specify the partition quantity. Each partition corresponds to a folder, and the partition’s data and index files are stored in this folder

Partition offset:< /span>Each partition message has a unique sequence identifier called offset.

Replicas of partition: A copy is just a backup of a partition. The copy never reads or writes data. They are used to prevent data loss.

Broker:A Kafka cluster contains one or more servers, which are called brokers

< span style="font-weight: bold;">Brokers: Brokers are simple systems responsible for maintaining and publishing data. Each topic in each agent can have zero or more partitions. Suppose, if there are N partitions in a topic and N agents, each agent will have one partition. Assuming there are N partitions and more than N agents (n+m) in a topic, the first N agent will have one partition, and the next M agent will not have any partitions for that particular topic. Assuming that there are N partitions and less than N agents (n-m) in a topic, each agent will have one or more partitions shared among them. Due to the unequal load distribution between agents, this solution is not recommended.

Kafka Cluster: Kafka has multiple agents called Kafka clusters. Kafka cluster can be expanded without downtime. These clusters are used to manage the persistence and replication of message data.

Producers: Producers are publishers of messages sent to one or more Kafka topics. The producer sends data to the Kafka broker. Whenever the producer publishes a message to the agent, the agent only needs to append the message to the last segment file. In fact, the message will be attached to the partition. Producers can also send messages to the partition of their choice.

Consumers: Consumer news. Each consumer belongs to a specific consumer group (the group name can be specified for each consumer, if the group name is not specified, it belongs to the default group). When using the consumer high level API, a message of the same topic can only be consumed by one consumer in the same consumer group, but multiple consumer groups can consume this message at the same time.

Consumer Group (consumer group): is a logical concept, and Kafka implements both unicast and broadcast message models means. The data of the same topic will be broadcast to different groups; only one worker in the same group can get this data. In other words, for the same topic, each group can get all the same data, but the data can only be consumed by one of the workers after entering the group. The workers in the group can be implemented by multi-threading or multi-process, or the processes can be scattered on multiple machines. The number of workers usually does not exceed the number of partitions, and it is best to maintain an integer multiple relationship between the two, because Kafka is designed It is assumed that a partition can only be consumed by one worker (within the same group). The simple understanding is that the queue is implemented. Consumers with the same groupid belong to a queue mode, and they are finished after consumption

Leader: Leader is responsible for a given All read and write nodes of the partition. Each partition has a server acting as the leader.

Follower: The node that follows the leader’s instructions is called Follower . If the leader fails, a follower will automatically become the new leader. Followers, as normal consumers, pull messages and update their own data storage.

Kafka features:

  • Reliability: Kafka is distributed , Partitioning, replication and fault tolerance.
  • Scalability: Kafka messaging system scales easily without downtime.
  • Durability/persistence: Kafka uses a distributed commit log, which means that messages are kept on disk as quickly as possible, so it is durable.
  • Performance: Kafka has high throughput for both publishing and subscribing to messages. Even if many terabytes of messages are stored, it maintains stable performance.
  • High concurrency: support thousands of clients to read and write at the same time

Use scenario:

  • Metrics: Kafka is usually used to manipulate monitoring data. This involves aggregating statistical information from distributed applications to produce a centralized feed of operational data.
  • Operational indicators: Kafka is also often used to record operational monitoring data. Including collecting data from various distributed applications and producing centralized feedback for various operations, such as alarms and reports.
  • Log aggregation solution: Kafka can be used to collect logs from multiple services across organizations and make them available to multiple servers in a standard format.
  • Message system: decoupling from producers and consumers, caching messages, etc.
  • Stream processing: Popular frameworks (such as Storm and Spark Streaming) read data from the theme, process it, and write the processed data into a new theme for users and applications to use. Kafka’s strong durability is also very useful in the context of stream processing.

Before installing Kafka, first confirm whether Java and Zookeeper are installed

If you don’t have Java JDK installed, you can Just look here. “CentOS Install Java JDK”

Friends who have not installed Zookeeper can directly read here. “Install ZooKeeper”

v install Kafka

2.1 Download

wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2 .12-2.0.0.tgz

If the download is slow or inconvenient, you can also use the compressed package that has been downloaded here. Link: https://pan.baidu.com/s/1u8mSfubwZupFqKtK6PH6Qw Extraction code: v5em

2.2 Unzip

tar -xzf kafka_2.12-2.0.0.tgz

Attention, kafka_2.12-2.0.0.tgz version is the compiled version, unzip Can be used.

2.3 Configure server.properties

The default configuration advertised. listeners=PLAINTEXT://:your.host.name:9092 amended to advertised.listeners=PLAINTEXT://:ip:9092 p>

ip is the server ip.

The hostname and port are recommended for use by producers and consumers. If they are not set, the listeners configuration will be used. If the listeners are also not configured, java.net.InetAddress.getCanonicalHostName( ) To get the hostname and port. For ipv4, it is basically localhost.

“PLAINTEXT” means the protocol. The optional values ​​are PLAINTEXT and SSL. The hostname can specify the IP address. You can also use “0.0.0.0” to indicate that it is valid for all network interfaces. If the hostname is empty, it means only It is valid for the default network interface. That is to say, if you do not configure advertised.listeners, use the configuration of listeners to notify the producers and consumers of the message. This process is to obtain the source data (metadata) from the producers and consumers.

More introduction:

# Licensed to the Apache Software Foundation (ASF) under one or more

# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

############################ Server Basics ############## ##############

############################################ ##################################
# A broker is a deployment instance of Kafka. In a Kafka cluster, each Kafka must have a broker.id
# And, the id is unique and must be an integer
############################################ ##################################
broker.id=10

############################ Socket Server Settings ############## ###############

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

############################################ ##################################
#The number of threads handling network requests
# The default number of threads for processing network requests is 3
############################################ ##################################
num.network.threads=3
############################################ ##################################
# The number of threads doing disk I/O
# The default number of threads performing disk IO operations 8
############################################ ##################################
num.io.threads=8

############################################ ##################################
# The send buffer (SO_SNDBUF) used by the socket server
# The buffer size used by the socket service for sending data, the default is 100kb
############################################ ##################################
socket.send.buffer.bytes=102400

############################################ ##################################
# The receive buffer (SO_SNDBUF) used by the socket server
# The size of the buffer used by the socket service to receive data, the default is 100kb
############################################ ##################################
socket.receive.buffer.bytes=102400

############################################ ##################################
# The maximum size of a request that the socket server will accept (protection against OOM)
# The maximum amount of requests that the socket service can accept to prevent OOM (Out of memory) memory overflow, the default value is: 100m
# (Should be the maximum size of a request that the socket server can accept, the default is 100M)
############################################ ##################################
socket.request.max.bytes=104857600

############################# Log Basics (Data related part, Kafka data is called log)### #########################

############################################ ##################################
# A comma seperated list of directories under which to store log files
# A comma-separated list of directories used to store the data received by Kafka
############################################ ##################################
log.dirs=/home/uplooking/data/kafka

############################################ ##################################
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# The number of partitions of the log corresponding to each topic, the default is 1. More partitions will increase consumption
# Parallelism, but it will also cause more files to be transferred in the Kafka cluster
# (partition is distributed storage, which is equivalent to dividing a piece of data into several pieces for storage, which means dividing blocks and partitions)
############################################ ##################################
num.partitions=1

############################################ ##################################
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
# The number of threads that each data directory is used to restore data when Kafka is started and refresh data when it is shut down. If Kafka data is stored in the disk array
# It is recommended that this value can be adjusted larger.
############################################ ##################################
num.recovery.threads.per.data.dir=1

############################ Log Flush Policy ######### ###################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs (balance) here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# In kafka, only the data refresh strategy is formulated based on the number of messages and the number of time intervals, but there is no size option. You can choose to configure one of these two options
# Of course, you can configure both. By default, both are configured. The configuration is as follows.

# The number of messages to accept before forcing a flush of data to disk
# The threshold of the number of messages flushed to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
# The time interval for the message to be flushed to the disk to generate a log data file
#log.flush.interval.ms=1000

############################ Log Retention Policy ######### ###################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated (cumulative).
# A segment will be deleted whenever (no matter what time) *either* of these criteria (standard) are met. Deletion always happens
# from the end of the log.
# The following configuration is used to control the cleaning of data fragments. As long as one of the strategies (time-based or size-based) is met, the fragments will be deleted

# The minimum age of a log file to be eligible for deletion
# Time-based strategy, the time to delete log data, save for 7 days by default
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don’t drop below log.retention.bytes. 1G
# Size-based strategy, 1G
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# Data fragmentation strategy
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies 5 minutes
# How long does it take to check whether the data reaches the deletion condition?
log.retention.check.interval.ms=300000

########################### Zookeeper ############### #############

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=uplooking01:2181,uplooking02:2181,uplooking03:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

Time: 2019-10 -05 23:04:41 Reading (10)

kafka is a distributed message queue. With high performance, persistence, multiple copy backup, horizontal expansion capabilities. The producer writes a message to the queue, and the consumer fetches the message from the queue for business logic. Generally play the role of decoupling, peak clipping, and asynchronous processing in architecture design. Kafka uses the concept of topic externally. Producers write messages to topics, and consumers read messages from them. In order to achieve horizontal expansion, a topic is actually composed of multiple partitions. When a bottleneck is encountered, horizontal expansion can be performed by increasing the number of partitions. The order of messages is guaranteed within a single partition. Every time a new message is written, Kafka writes in the corresponding file append, so the performance is very high.

# Licensed to the Apache Software Foundation (ASF) under one or more

# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

############################ Server Basics ############## ##############

############################################ ##################################
# A broker is a deployment instance of Kafka. In a Kafka cluster, each Kafka must have a broker.id
# And, the id is unique and must be an integer
############################################ ##################################
broker.id=10

############################ Socket Server Settings ############## ###############

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

############################################ ##################################
#The number of threads handling network requests
# The default number of threads for processing network requests is 3
############################################ ##################################
num.network.threads=3
############################################ ##################################
# The number of threads doing disk I/O
# The default number of threads performing disk IO operations 8
############################################ ##################################
num.io.threads=8

############################################ ##################################
# The send buffer (SO_SNDBUF) used by the socket server
# The buffer size used by the socket service for sending data, the default is 100kb
############################################ ##################################
socket.send.buffer.bytes=102400

############################################ ##################################
# The receive buffer (SO_SNDBUF) used by the socket server
# The size of the buffer used by the socket service to receive data, the default is 100kb
############################################ ##################################
socket.receive.buffer.bytes=102400

############################################ ##################################
# The maximum size of a request that the socket server will accept (protection against OOM)
# The maximum amount of requests that the socket service can accept to prevent OOM (Out of memory) memory overflow, the default value is: 100m
# (Should be the maximum size of a request that the socket server can accept, the default is 100M)
############################################ ##################################
socket.request.max.bytes=104857600

############################# Log Basics (Data related part, Kafka data is called log)### #########################

############################################ ##################################
# A comma seperated list of directories under which to store log files
# A comma-separated list of directories used to store the data received by Kafka
############################################ ##################################
log.dirs=/home/uplooking/data/kafka

############################################ ##################################
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# The number of partitions of the log corresponding to each topic, the default is 1. More partitions will increase consumption
# Parallelism, but it will also cause more files to be transferred in the Kafka cluster
# (partition is distributed storage, which is equivalent to dividing a piece of data into several pieces for storage, which means dividing blocks and partitions)
############################################ ##################################
num.partitions=1

############################################ ##################################
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
# The number of threads that each data directory is used to restore data when Kafka is started and refresh data when it is shut down. If Kafka data is stored in the disk array
# It is recommended that this value can be adjusted larger.
############################################ ##################################
num.recovery.threads.per.data.dir=1

############################ Log Flush Policy ######### ###################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs (balance) here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# In kafka, only the data refresh strategy is formulated based on the number of messages and the number of time intervals, but there is no size option. You can choose to configure one of these two options
# Of course, you can configure both. By default, both are configured. The configuration is as follows.

# The number of messages to accept before forcing a flush of data to disk
# The threshold of the number of messages flushed to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
# The time interval for the message to be flushed to the disk to generate a log data file
#log.flush.interval.ms=1000

############################ Log Retention Policy ######### ###################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated (cumulative).
# A segment will be deleted whenever (no matter what time) *either* of these criteria (standard) are met. Deletion always happens
# from the end of the log.
# The following configuration is used to control the cleaning of data fragments. As long as one of the strategies (time-based or size-based) is met, the fragments will be deleted

# The minimum age of a log file to be eligible for deletion
# Time-based strategy, the time to delete log data, save for 7 days by default
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don’t drop below log.retention.bytes. 1G
# Size-based strategy, 1G
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# Data fragmentation strategy
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies 5 minutes
# How long does it take to check whether the data reaches the deletion condition?
log.retention.check.interval.ms=300000

########################### Zookeeper ############### #############

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=uplooking01:2181,uplooking02:2181,uplooking03:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

时间:2019-10-05 23:04:41 阅读(10)

Leave a Comment

Your email address will not be published.