Kafka Producer Performance Optimization

When we are talking about performance of Kafka Producer, we are really talking about two different things:

  • latency: how much time passes from the time KafkaProducer.send() was called until the message shows up in a Kafka broker.
  • throughput: how many messages can the producer send to Kafka each second.

Many years ago, I was in a storage class taught by scalability expert James Morle. One of the students asked why we need to worry about both latency and throughput – after all, if processing a message takes 10ms (latency), then clearly throughput is limited to 100 messages per second. When looking at things this way, it may look like higher latency == higher throughput. However, the relation between latency and throughput is not this trivial.

Lets start our discussion with agreeing that we are only talking about the new Kafka Producer (the one in org.apache.kafka.clients package). It makes things simpler and there’s no reason to use the old producer at this point.

Kafka Producer allows to send message batches. Suppose that due to network roundtrip times, it takes 2ms to send a single Kafka message. By sending one message at a time, we have latency of 2ms and throughput of 500 messages per second. But suppose that we are in no big hurry, and are willing to wait few milliseconds and send a larger batch – lets say we decided to wait 8ms and managed to accumulate 1000 messages. Our latency is now 10ms, but our throughput is up to 100,000 messages per second! Thats the main reason I love microbatches so much. By adding a tiny delay, and 10ms is usually acceptable even for financial applications, our throughput is 200 times greater. This type of trade-off is not unique to Kafka, btw. Network and storage subsystem use this kind of “micro batching” all the time.

Sometimes latency and throughput interact in even funnier ways. One day Ted Malaska complained that with Flafka, he can get 20ms latency when sending 100,000 messages per second, but huge 1-3s latency when sending just 100 messages a second. This made no sense at all, until we remembered that to save CPU, if Flafka doesn’t find messages to read from Kafka it will back off and retry later. Backoff times started at 0.5s and steadily increased. Ted kindly improved Flume to avoid this issue in FLUME-2729.

Anyway, back to the Kafka Producer. There are few settings you can modify to improve latency or throughput in Kafka Producer:

  • batch.size – This is an upper limit of how many messages Kafka Producer will attempt to batch before sending – specified in bytes (Default is 16K bytes – so 16 messages if each message is 1K in size). Kafka may send batches before this limit is reached (so latency doesn’t change by modifying this parameter), but will always send when this limit is reached. Therefore setting this limit too low will hurt throughput without improving latency. The main reason to set this low is lack of memory – Kafka will always allocate enough memory for the entire batch size, even if latency requirements cause it to send half-empty batches.
  • linger.ms – How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch. Normally the producer will not wait at all, and simply send all the messages that accumulated while the previous send was in progress (2 ms in the example above), but as we’ve discussed, sometimes we are willing to wait a bit longer in order to improve the overall throughput at the expense of a little higher latency. In this case tuning linger.ms to a higher value will make sense. Note that if batch.size is low and the batch if full before linger.ms time passes, the batch will send early, so it makes sense to tune batch.size and linger.ms together.

Other than tuning these parameters, you will want to avoid waiting on the future of the send method (i.e. the result from Kafka brokers), and instead send data continuously to Kafka. You can simply ignore the result (if success of sending messages is not critical), but its probably better to use a callback. You can find an example of how to do this in my github (look at produceAsync method).

If sending is still slow and you are trying to understand what is going on, you will want to check if the send thread is fully utilized through jvisualsm (it is called kafka-producer-network-thread) or keep an eye on average batch size metric. If you find that you can’t fill the buffer fast enough and the sender is idle, you can try adding application threads that share the same producer and increase throughput this way.

Another concern can be that the Producer will send all the batches that go to the same broker together when at least one of them is full – if you have one very busy topic and others that are less busy, you may see some skew in throughput this way.

Sometimes you will notice that the producer performance doesn’t scale as you add more partitions to a topic. This can happen because, as we mentioned, there is a send buffer for each partition. When you add more partitions, you have more send buffers, so perhaps the configuration you set to keep the buffers full before (# of threads, linger.ms) is no longer sufficient and buffers are sent half-empty (check the batch sizes). In this case you will need to add threads or increase linger.ms to improve utilization and scale your throughput.

Got more tips on ingesting data into Kafka? comments are welcome!

Use regular to replace the src of the img tag

Requirement: Due to system switching, it is required to patch the src attribute of the img tag in the webpage content in the database. For example:

Content="<p><img title=\"122444234\" src=\"/files/post/122444234.jpg\"/><p>Other characters";

After the replacement is required:

Content="<p><img title=\"122444234\" src=\"http://xxx.xxx.com/files/post/122444234_500.jpg\" /><p>Other characters";

Use regular to solve, the code is as follows (ApiUtil.java static method)

Java code collection code

    /**
     * Double-pack the src in the img tag
     * @param content content
     * @param replaceHttp needs to be added to the domain name in src
     * @param size needs to add _size to the file name in src
     * @return
     */
    Public static String repairContent(String content,String replaceHttp,int size){
        String patternStr="<img\\s*([^>]*)\\s*src=\\\"(.*?)\\\"\\s*([^>]*)>";
        Pattern pattern = Pattern.compile(patternStr,Pattern.CASE_INSENSITIVE);
        Matcher matcher = pattern.matcher(content);
        String result = content;
        While(matcher.find()) {
            String src = matcher.group(2);
            Logger.debug("pattern string:"+src);
            String replaceSrc = "";
            If(src.lastIndexOf(".")>0){
                replaceSrc = src.substring(0,src.lastIndexOf("."))+"_"+size+src.substring(src.lastIndexOf("."));
            }
            If(!src.startsWith("http://")&&!src.startsWith("https://")){
                replaceSrc = replaceHttp + replaceSrc;
            }
            Result = result.replaceAll(src,replaceSrc);
        }
        Logger.debug(" content == " +content);
        Logger.debug(" result == " + result);
        Return result;
    }

 Test code:

Java code collection code

Public static void main(String[] args) {
        String content = "<p><img title=\"10010001\" src=\"/files/post/10010001.gif\" width=\"200\" height=\"300\" />" +
                "</p><p><img title=\"10010002\" src=\"/files/post/10010002.gif\" width=\"500\" height=\"300\" /><p>  </p>"+
                "</p><p><img title=\"10010003\" src=\"/files/post/10010003.gif\" width=\"600\" height=\"300\" /><p>  </p>";
        String replaceHttp = "http://www.baidu.com";
        Int size = 500;
        String result = ApiUtil.repairContent(content, replaceHttp, size);
        System.out.println(result);
    }

The key is the regular expression:

<img\\s*([^>]*)\\s*src=\\\"(.*?)\\\"\\s*([^>]*)>

In particular ([^>]) can’t be replaced with ., otherwise it will only match <img to the last “>” symbol of the string. If the content of each src is different, only the last src will be replaced.

Mobile web big picture browsing jquery component

Recently, I made a mobile image page. The mother page is a series of preview thumbnails. After clicking a small image, I need to display a large image on the full screen, and I can slide left and right to browse other large images. I found a fotorama component on the Internet. Feel good to use, record the use of experience;

First download the fotorama package from http://www.fotorama.io/set-up/, import CSS and JS on the page. In addition, fotorama depends on the jquery framework, and jquery needs to be introduced.

Introducing css in the head:

<link href="js/fotorama.css" type="text/css" rel="stylesheet">
Introducing js in the body:
<script src="js/jquery-1.11.2.min.js"></script>
<script src="js/fotorama.js"></script>

Put the page into the preview thumbnail:

<div class="box">
<div class="pic"><img src="images/1_s.jpg" /></div>
<div class="pic"><img src="images/2_s.jpg" /></div>
<div class="pic"><img src="images/3_s.jpg" /></div>
<div class="pic"><img src="images/4_s.jpg" /></div>
<div class="pic"><img src="images/5_s.jpg" /></div>
<div class="pic"><img src="images/6_s.jpg" /></div>
</div>
<div class="fotorama" data-max-width="100%"></div>

Where is used by the component to display a large image;

Zookeeper log cleanup script

The following is a shell script that clears three zookeeper nodes (making it a scheduled task):

zookeeper home

Zkdir=/server/app/zookeeper-3.4.6

snapshot file dir

snapDir=/mnt/data/zookeeper/n_1
snapDir2=/mnt/data/zookeeper/n_2
snapDir3=/mnt/data/zookeeper/n_3

tran log dir

dataDir=/mnt/data/zookeeper/n_1
dataDir2=/mnt/data/zookeeper/n_2
dataDir3=/mnt/data/zookeeper/n_3

Leave 30 files

Count=15
Cd /server/app/zookeeper-3.4.6
Java -cp ${zkdir}/zookeeper-3.4.6.jar:${zkdir}/lib/slf4j-api-1.6.1.jar:${zkdir}/lib/slf4j-log4j12-1.6.1.jar: ${zkdir}/lib/log4j-1.2.16.jar:${zkdir}/conf org.apache.zookeeper.server.PurgeTxnLog ${dataDir} ${snapDir} -n ${count}
Java -cp ${zkdir}/zookeeper-3.4.6.jar:${zkdir}/lib/slf4j-api-1.6.1.jar:${zkdir}/lib/slf4j-log4j12-1.6.1.jar: ${zkdir}/lib/log4j-1.2.16.jar:${zkdir}/conf org.apache.zookeeper.server.PurgeTxnLog ${dataDir2} ${snapDir2} -n ${count}
Java -cp ${zkdir}/zookeeper-3.4.6.jar:${zkdir}/lib/slf4j-api-1.6.1.jar:${zkdir}/lib/slf4j-log4j12-1.6.1.jar: ${zkdir}/lib/log4j-1.2.16.jar:${zkdir}/conf org.apache.zookeeper.server.PurgeTxnLog ${dataDir3} ${snapDir3} -n ${count}

Since the zookeeper log file cannot be deleted at will, it is entirely possible that a node that has not been updated for a long time exists in a log file a few days ago. So how do you safely delete them? You can write your own program, but zookeeper also provides us with a handy gadget: PurgeTxnLog Usage:
Java -Djava.ext.dirs=lib org.apache.zookeeper.server.PurgeTxnLog log_path snap_path -n 10
Where -n indicates how many files to keep, not less than 3 log_path and snap_path must be the root path of the zookeeper’s log, which is the version-x path. The code will go to the input path to find the version-x directory, and then go to the following log file

IOS enterprise app application installation and https certificate generation

In addition to being able to publish on the Appstore, the IOS application can also apply for a corporate certificate and deploy the server to publish and provide downloads. However, after the enterprise certificate is in IOS 7.1, the application download needs to use the trusted https release to download normally. Otherwise, Prompt that an error such as a server could not be found;

First, developers need to package an ipa, and provide a plist file, the plist file can refer to the following:

Plist file (test.plist):

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>items</key>
<array>
<dict>
<key>assets</key>
<array>
<dict>
<key>kind</key>
<string>software-package</string>
<key>url</key>
<string>https://192.168.0.8/test.ipa</string>
</dict>
</array>
<key>metadata</key>
<dict>
<key>bundle-identifier</key>
<string>com.test</string>
<key>bundle-version</key>
<string>1.0</string>
<key>kind</key>
<string>software</string>
<key>title</key>
<string>test</string>
</dict>
</dict>
</array>
</dict>
</plist>
Create a new page, assuming index.html, as follows:
<script>
Location.href="itms-services:///?action=download-manifest&url=https://192.168.0.8/test.plist";
</script>

OR

<a href="itms-services:///?action=download-manifest&url=https://192.168.0.8/test.plist">Click to download</a>
 Visit: https://192.168.0.8/test.html (open with safari)

Note: The value of the url in the plist file may not be https, but in the page, the url parameter after the items-services protocol must be https, and must be trusted https, that is, if you apply for a certificate from an authority, directly If you configure it on the server, you can trust it. Otherwise, the certificate generated by the client must be installed with the ca certificate to be trusted. In addition, remember to remind the user that it can only be downloaded in the safari browser. Other browsers do not recognize the itms-services protocol.

If you need to download the LAN, you can only generate the certificate yourself. The steps are as follows:

1.Generate the private key of the server

Openssl genrsa -out server.key 1024

2.Generate a signing application (note that it can be empty except for Common Name, Common Name must be the server’s ip or domain name)

Openssl req -new -key server.key -out server.csr

3.Generate CA private key

Openssl genrsa -out ca.key 1024

4.Use the CA’s private key to generate a self-signed certificate for the CA

Openssl req -new -x509 -days 365 -key ca.key -out ca.crt

5.Create a demoCA in the current directory, which creates the file index.txt and serial, the serial content is 01, the index.txt is empty, and the folder newcerts

Openssl ca -in server.csr -out server.crt -cert ca.crt -keyfile ca.key

Thus, the generated file has server.crt, server.key, ca.crt

Configure server.crt, server.key to the server, apache, nginx are different, Baidu, a lot of configuration instructions, here will not repeat; in addition, ca.crt into the service root directory, so that users can Access installation

After the server configures the certificate and restarts the service, you can access https://192.168.0.8/test.html at this time, but it is still not fully credited. The client must install ca.crt to download normally. Guide the client to access http. ://192.168.0.8/ca.crt, safari will directly jump to the certificate installation interface. After the certificate is installed, it can be downloaded normally through https://192.168.0.8/test.html.

Apache AB tools

Ab is a very good stress test tool of apache. When apache is installed, you can find ab under bin.

We can simulate 100 concurrent users and send 1000 requests to a page.as follow:

./ab -n1000 -c100 http://mytest.com/a.html

Where -n represents the number of requests and -c represents the number of concurrent
 

Return results:

First is the version information of apache

This is ApacheBench, Version 2.3 <$Revision: 655654 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://mytest.com/
Licensed to The Apache Software Foundation, http://mytest.com/

Benchmarking vm1.jianfeng.com (be patient)

Server Software: Apache/2.2.19 ##apache version
Server Hostname: mytest.com ##Requested machine
Server Port: 80 ##Request Port

Document Path: /a.html
Document Length: 25 bytes ##Page Length

Concurrency Level: 100 ##Concurrent Numbers
Time taken for tests: 0.273 seconds ## How much time did you use?
Complete requests: 1000 ##Requests
Failed requests: 0 ##Failed requests
Write errors: 0
Total transferred: 275000 bytes ## Total number of bytes transferred, including http header information, etc.
HTML transferred: 25000 bytes ##htmlbytes, the actual number of bytes passed by the page
Requests per second: 3661.60 #/sec ##How many requests per second, this is a very important parameter value, server throughput
Time per request: 27.310 ms ##User average request waiting time
Time per request: 0.273 [ms] (mean, across all concurrent requests) ##Server average processing time, which is the reciprocal of server throughput
Transfer rate: 983.34 [Kbytes/sec] received ## The data length obtained per second

Connection Times (ms)
              Min mean[+/-sd] median max
Connect: 0 1 2.3 0 16
Processing: 6 25 3.2 25 32
Waiting: 5 24 3.2 25 32
Total: 6 25 4.0 25 48

Percentage of the requests served within a certain time (ms)
  50% 25 ## 50% of requests returned within 25ms
  66% 26 ## 60% of requests returned within 26ms
  75% 26
  80% 26
  90% 27
  95% 31
  98% 38
  99% 43
100% 48 (longest request)

Mysql deadlock, waiting for resources, transaction lock, lock wait timeout exceeded; try restarting transaction

Mysql deadlock, waiting for resources, transaction lock, lock wait timeout exceeded; try restarting transaction
I have already learned about InnoDB. When the lock wait occurs, it will judge whether the timeout operation is needed according to the configuration of the parameter innodb_lock_wait_timeout. This document describes the viewing and analysis processing when the lock wait occurs.

Before the InnoDB Plugin, the current database request is generally viewed through the show full processlist (it is difficult to find the locked row record problem) and the show engine innodb status command, and then the lock in the current transaction is determined. With the development of mysql, a more convenient way has been provided to monitor the lock wait phenomenon in the database.

There are three tables under the information_schema: INNODB_TRX, INNODB_LOCKS, INNODB_LOCK_WAITS (solving the problem method), through which you can more easily monitor the current transaction and analyze possible problems.

More commonly used columns:

Trx_id: the unique transaction ID inside the InnoDB storage engine
Trx_status: the status of the current transaction
Trx_status: the start time of the transaction
Trx_requested_lock_id: the lock ID of the waiting transaction
Trx_wait_started: the start time of the transaction wait
Trx_weight: The weight of the transaction, which reflects the number of rows modified and locked by a transaction. When a deadlock is found to need to be rolled back, the smaller the weight, the value is rolled back.
Trx_mysql_thread_id: Process ID in MySQL, corresponding to the ID value in show processlist
Trx_query: SQL statement run by the transaction

Kill process ID; encountered a vehicle record in the work, select * from car for update or modify the value of a field, the error: Lock wait timeout exceeded; try restarting transaction solution, and fundamentally from the business logic code optimization For the operation of the database, I have encountered such a situation before. For example, if I have just modified this record and then modified it again, I will report this error and try to avoid it from the code and business level.

jstl JasperException error

Today, my jsp project has an error. I checked it for a long time before I solved this error.The solution is as follows.

1,Error Description
org.apache.jasper.JasperException: The absolute uri: http://java.sun.com/jsp/jstl/core cannot be resolved in either web.xml or the jar files deployed with this application

2,Solution
Copy all the tld files from the META-INF directory in the jstl.jar package and put them into the WEB-INF under the jsp project.

You can create a tlds directory under WEB-INF/, put all the tld files into this directory, the directory name can be arbitrarily named

Mybatis annotation application mapping statement

MyBatis provides a variety of annotation mappings such as SELECT, UPDATE, INSERT, and DELETE. Let me take a closer look at the application of these mappings.

1.@Insert
We can use the @Insert annotation to declare an INSERT mapping.

Package com.owen.mybatis.mappers;
Public interface StudentMapper
{
@Insert(“INSERT INTO STUDENTS(STUD_ID, NAME, EMAIL, ADDR_ID, PHONE)
VALUES(#{studId},#{name},#{email},#{address.addrId},#{phone})”)
Int insertStudent(Student student);
}
In the insertStudent() method, we annotated @Insert, which will return the number of affected rows through the declaration of the insert.

When we used the xml file earlier, we declared the statement that automatically generated the primary key. In this annotation method, we can also use the @Options method to annotate the declaration to generate the primary key. This method contains the parameters of useGeneratedKeys and keyProperty. These two parameters are for the data to form the column value of auto_increment, the value of which is the column of an object in the existing column as the value.

@Insert(“INSERT INTO STUDENTS(NAME,EMAIL,ADDR_ID, PHONE)
VALUES(#{name},#{email},#{address.addrId},#{phone})”)
@Options(useGeneratedKeys=true, keyProperty=”studId”)
Int insertStudent(Student student);
The column of STUD_ID here will be automatically generated by the MYSQL database, and its value will be the same as the value of studId.

StudentMapper mapper = sqlSession.getMapper(StudentMapper.class);
mapper.insertStudent(student);
Int studentId = student.getStudId();
Some databases, such as Oracle can not provide the AUTO_INCREMENT column and we need to use SEQUENCE to form the primary key. We can use the @Selectkey annotation to specify any SQL declaration, and it can be used as the value of the primary key.

@Insert(“INSERT INTO STUDENTS(STUD_ID, NAME, EMAIL, ADDR_ID, PHONE)
VALUES(#{studId},#{name},#{email},#{address.addrId},#{phone})”)
@SelectKey(statement=”SELECT STUD_ID_SEQ.NEXTVAL FROM DUAL”,
keyProperty=”studId”, resultType=int.class, before=true)
Int insertStudent(Student student);
Here we apply @SelectKey to generate the value of the primary key and save it in the attribute of the student’s studId. Because we defined before=true, the primary key was already generated when it was inserted.

If you use the trigger of SEQUENCE to generate the primary key, we can get the primary key from sequence_name.currval and then execute the insert statement.

 @Insert(“INSERT INTO STUDENTS(NAME,EMAIL,ADDR_ID, PHONE)
VALUES(#{name},#{email},#{address.addrId},#{phone})”)
@SelectKey(statement=”SELECT STUD_ID_SEQ.CURRVAL FROM DUAL”,
keyProperty=”studId”, resultType=int.class, before=false)
Int insertStudent(Student student);

2.@Update
We can use the @Update annotation to declare an UPDATE.

@Update(“UPDATE STUDENTS SET NAME=#{name}, EMAIL=#{email},
PHONE=#{phone} WHERE STUD_ID=#{studId}”)
Int updateStudent(Student student);
In the updateStudent() method, we used the @Update annotation and will return the affected rows.

StudentMapper mapper = sqlSession.getMapper(StudentMapper.class);
Int noOfRowsUpdated = mapper.updateStudent(student);

3.@Delete
We can use @Delete to implement the declaration of DELETE.

@Delete(“DELETE FROM STUDENTS WHERE STUD_ID=#{studId}”)
Int deleteStudent(int studId);
The method of deleteStudent() will return the affected row.

4.@Select
We can use the @Select annotation to implement the SELECT mapping.

 Package com.owen.mybatis.mappers;
Public interface StudentMapper
{
@Select(“SELECT STUD_ID AS STUDID, NAME, EMAIL, PHONE FROM
STUDENTS WHERE STUD_ID=#{studId}”)
Student findStudentById(Integer studId);
}
To match the instance of the Student object, we use studId as an alias for stud_id. How to return a value with multiple rows will report a TooManyResultException error.