Overview
jetty NIO is a typical reactor model, As shown below:
That is: mainReactor is responsible for monitoring the server socket, accepting new connections, and assigning the established socket to the subReactor. subReactor is responsible for demultiplexing connected sockets, reading and writing network data, and throwing it to the worker thread pool for processing. This article mainly explains the realization of mainReactor, subReactor and thread pool in jetty.
mainReactor
The server in jetty is equivalent to A container, a jetty container contains multiple connectors and a thread pool. The connector implements the LifeCycle interface and starts when the container is started. The following figure shows the process of listening to the server socket and establishing a connection after the connector is started:
It can be seen that jetty uses the thread pool to establish connections, Each connection task is treated as a job and placed in the job queue. The thread responsible for the connection will take the task out of the queue for execution, and hand over the obtained ServerSocket to the subReactor. Let’s look at the implementation of subReactor.
subReactor
I need to mention that jetty nio is very An important class SelectorManager, which is responsible for channel registration, select, wakeup and other operations. There is an array of SelectSet in SelectorManager. SelectSet can be understood as a proxy of SelectorManager, because the real thing is SelectSet. SelectSet is designed as an array. It should also be a divide-and-conquer idea, allowing a selector to monitor fewer selectionkeys.
SelectSet has a very important member changes, which stores all the changed channels, endpoints, attachments. The addChannel method is triggered in the following situations: when a new channel is added, when a new event arrives, and when data arrives.
The execution flow of subReactor is as follows:
here leads to addChange in addition to selectorManager.register, there are also endpoint.updatekey() and selectionkey data changes and so on.
ThreadPool
Jetty’s thread pool is quite simple, In fact, mainReactor and subReactor share the same thread pool. The implementation class of thread pool is QueuedThreadPool. Of course, you can set your own thread pool class in jetty.xml. Simply look at the run method of the thread pool
- privateRunnable_runnable=newRunnable()
- {
- publicvoid run()
- {
- booleanshrink=false;
- try< /li>
- {
- Runnable job=_jobs.poll();
- while(isRunning())
- {
- Job loop
- while (job!=null&& isRunning()) span>
- runJob(job);
- < span style="margin: 0px; padding: 0px; border: none; color: black; background-color: inherit;"> Job=_jobs.poll();
- // Idle loop
- try
- {
- _threadsIdle.incre mentAndGet();
- li>
- while( isRunning()&&job==null)
- {
- if(_maxIdleTimeMs<=0)
- < span style="margin: 0px; padding: 0px; border: none; color: black; background-color: inherit;"> Job=_jobs.take();
- else< /li>
- {
- // maybe we should shrink?
- finalintsize=_threadsStarted.get();
- if (size>_minThreads)
- < span style="margin: 0px; padding: 0px; border: none; color: black; background-color: inherit;">
- longlast=_lastShrink.get();
- longnow=System.currentTimeMillis();
- if (last==0||(now-last)>_maxIdleTimeMs) span>
- Shrink=_lastShrink.compareAndSet(last,now)&&
- =”margin: 0px; padding: 0px; border: none; color: black; background-color: inherit;”> ThreadsStarted.compareAndSet(size,size-1);
- if (shrink)
- Job=idleJobPoll();
- finally
- {
- _threadsIdle.decrementAndGet();
- < span style="margin: 0px; padding: 0px; border: none; color: black; background-color: inherit;">
- catch(InterruptedException e)
- {
- }
- };
< li> color: black; background-color: inherit;”> color: border none blue; font-weight: bold; background-color: inherit;”>return;
1, the thread pool has a minimum The number of threads _minThreads=8, when the thread pool is started, _minThreads threads will be created and started. On line 12, the thread takes a task from the task queue and executes it. The while loop is used here to indicate that it will block and wait for the task to be executed. When there is no task in the task queue, the while loop will exit;
2. After exiting the while loop, this thread is idle, and there needs to be a recovery here Strategy, after waiting for _maxIdleTimeMs time, if the current number of threads is greater than _minThreads, this thread will be recycled.
When will the number of threads be greater than _minThreads? Let’s take a look at the core code in the dispatch() method
< span> print?
- // If we had no idle threads or the jobQ is greater than the idle threads span>
- if(idle==0 || jobQ>idle)
- {
- intthreads=_threadsStarted.get();
- if(threads<_maxThreads)
- startThread(threads);
< p>If there are no idle threads or there are too few idle threads, a new thread will be created when the number of threads does not exceed _maxThreads.