Jetty’s NIO thread model

Overview

jetty NIO is a typical reactor model, As shown below:

That is: mainReactor is responsible for monitoring the server socket, accepting new connections, and assigning the established socket to the subReactor. subReactor is responsible for demultiplexing connected sockets, reading and writing network data, and throwing it to the worker thread pool for processing. This article mainly explains the realization of mainReactor, subReactor and thread pool in jetty.

mainReactor

The server in jetty is equivalent to A container, a jetty container contains multiple connectors and a thread pool. The connector implements the LifeCycle interface and starts when the container is started. The following figure shows the process of listening to the server socket and establishing a connection after the connector is started:

It can be seen that jetty uses the thread pool to establish connections, Each connection task is treated as a job and placed in the job queue. The thread responsible for the connection will take the task out of the queue for execution, and hand over the obtained ServerSocket to the subReactor. Let’s look at the implementation of subReactor.

subReactor

I need to mention that jetty nio is very An important class SelectorManager, which is responsible for channel registration, select, wakeup and other operations. There is an array of SelectSet in SelectorManager. SelectSet can be understood as a proxy of SelectorManager, because the real thing is SelectSet. SelectSet is designed as an array. It should also be a divide-and-conquer idea, allowing a selector to monitor fewer selectionkeys.

SelectSet has a very important member changes, which stores all the changed channels, endpoints, attachments. The addChannel method is triggered in the following situations: when a new channel is added, when a new event arrives, and when data arrives.

The execution flow of subReactor is as follows:

here leads to addChange in addition to selectorManager.register, there are also endpoint.updatekey() and selectionkey data changes and so on.

ThreadPool

Jetty’s thread pool is quite simple, In fact, mainReactor and subReactor share the same thread pool. The implementation class of thread pool is QueuedThreadPool. Of course, you can set your own thread pool class in jetty.xml. Simply look at the run method of the thread pool

[java] view plain copy

print ?

  1. privateRunnable_runnable=newRunnable()
  2. {
  3. publicvoid run()
  4. {
  5. booleanshrink=false;
  6. try< /li>
  7. {
  8. Runnable job=_jobs.poll();
  9. while(isRunning())
  10. {
  11. Job loop
  12. while (job!=null&& isRunning()) span>
  13. runJob(job);
  14. < span style="margin: 0px; padding: 0px; border: none; color: black; background-color: inherit;"> Job=_jobs.poll();
  15. // Idle loop
  16. try
  17. {
  18. _threadsIdle.incre mentAndGet();
  19. li>
  20. while( isRunning()&&job==null)
  21. {
  22. if(_maxIdleTimeMs<=0)
  23. < span style="margin: 0px; padding: 0px; border: none; color: black; background-color: inherit;"> Job=_jobs.take();
  24. else< /li>
  25. {
  26. // maybe we should shrink?
  27. finalintsize=_threadsStarted.get();
  28. if (size>_minThreads)
  29. < span style="margin: 0px; padding: 0px; border: none; color: black; background-color: inherit;">
  30. longlast=_lastShrink.get();
  31. longnow=System.currentTimeMillis();
  32. if (last==0||(now-last)>_maxIdleTimeMs)
  33. Shrink=_lastShrink.compareAndSet(last,now)&&
  34. =”margin: 0px; padding: 0px; border: none; color: black; background-color: inherit;”> ThreadsStarted.compareAndSet(size,size-1);
  35. if (shrink)
  36. < li> color: black; background-color: inherit;”> color: border none blue; font-weight: bold; background-color: inherit;”>return;

  37. Job=idleJobPoll();
  38. finally
  39. {
  40. _threadsIdle.decrementAndGet();
  41. < span style="margin: 0px; padding: 0px; border: none; color: black; background-color: inherit;">
  42. catch(InterruptedException e)
  43. {
  44. }
  45. };

1, the thread pool has a minimum The number of threads _minThreads=8, when the thread pool is started, _minThreads threads will be created and started. On line 12, the thread takes a task from the task queue and executes it. The while loop is used here to indicate that it will block and wait for the task to be executed. When there is no task in the task queue, the while loop will exit;

2. After exiting the while loop, this thread is idle, and there needs to be a recovery here Strategy, after waiting for _maxIdleTimeMs time, if the current number of threads is greater than _minThreads, this thread will be recycled.

When will the number of threads be greater than _minThreads? Let’s take a look at the core code in the dispatch() method

< div class="bar">

[java] view plain copy

< span> print?

  1. // If we had no idle threads or the jobQ is greater than the idle threads
  2. if(idle==0 || jobQ>idle)
  3. {
  4. intthreads=_threadsStarted.get();
  5. if(threads<_maxThreads)
  6. startThread(threads);

< p>If there are no idle threads or there are too few idle threads, a new thread will be created when the number of threads does not exceed _maxThreads.

Overview

span>

jetty NIO is a typical reactor model, As shown below:

That is: mainReactor is responsible for monitoring the server socket, accepting new connections, and assigning the established socket to the subReactor. subReactor is responsible for demultiplexing connected sockets, reading and writing network data, and throwing it to the worker thread pool for processing. This article mainly explains the realization of mainReactor, subReactor and thread pool in jetty.

mainReactor

The server in jetty is equivalent to A container, a jetty container contains multiple connectors and a thread pool. The connector implements the LifeCycle interface and starts when the container is started. The following figure shows the process of listening to the server socket and establishing a connection after the connector is started:

It can be seen that jetty uses the thread pool to establish connections, Each connection task is treated as a job and placed in the job queue. The thread responsible for the connection will take the task out of the queue for execution, and hand over the obtained ServerSocket to the subReactor. Let’s look at the implementation of subReactor.

subReactor

I need to mention that jetty nio is very An important class SelectorManager, which is responsible for channel registration, select, wakeup and other operations. There is an array of SelectSet in SelectorManager. SelectSet can be understood as a proxy of SelectorManager, because the real thing is SelectSet. SelectSet is designed as an array. It should also be a divide-and-conquer idea, allowing a selector to monitor fewer selectionkeys.

SelectSet has a very important member changes, which stores all the changed channels, endpoints, attachments. The addChannel method is triggered in the following situations: when a new channel is added, when a new event arrives, and when data arrives.

The execution flow of subReactor is as follows:

here leads to addChange in addition to selectorManager.register, there are also endpoint.updatekey() and selectionkey data changes and so on.

ThreadPool

Jetty’s thread pool is quite simple, In fact, mainReactor and subReactor share the same thread pool. The implementation class of thread pool is QueuedThreadPool. Of course, you can set your own thread pool class in jetty.xml. Simply look at the run method of the thread pool

[java] view plain copy

print ?

  1. privateRunnable_runnable=newRunnable()
  2. { < /li>
  3. public voidrun()
  4. {
  5. booleanshrink=false ;
  6. try
  7. {
  8. Runnable job=_jobs.poll();
  9. while(isRunning())
  10. {
  11.                   // Job loop  
  12.                   while (job!=null && isRunning())  
  13.                   {  
  14.                       runJob(job);  
  15.                       job=_jobs.poll();  
  16.                   }  
  17.   
  18. < li>                  // Idle loop  

  19.                   try  
  20.                   {  
  21.                       _threadsIdle.incrementAndGet();  
  22.   
  23.                       while (isRunning() && job==null)  
  24.                       {  
  25.                           if (_maxIdleTimeMs<=0)  
  26.                               job=_jobs.take();  
  27.                           else  
  28.                           {  
  29.                               // maybe we should shrink?  
  30.                               final int size =_threadsStarted.get();  
  31.                               if (size>_minThreads)  
  32.                               {  
  33.                                   long last=_lastShrink.get();  
  34.                                   long now=System.currentTimeMillis();  
  35.                                   if (last==0 || (now-last)>_maxIdleTimeMs)  
  36.                                   {  
  37.                                       shrink=_lastShrink.compareAndSet(last,now) &&  
  38.                                       _threadsStarted.compareAndSet(size,size-1);  
  39.                                       if (shrink)  
  40.                                           return;  
  41.                                   }  
  42.                               }  
  43.                               job=idleJobPoll();  
  44.                           }  
  45.                       }  
  46.                   }  
  47.                   finally  
  48.                   {  
  49.                       _threadsIdle.decrementAndGet();  
  50.                   }  
  51.               }  
  52.           }  
  53.           catch(InterruptedException e)  
  54.           {  
  55.                 …  
  56.           }  
  57.       }  
  58.   };  

1、线程池有个最小线程数_minThreads=8,当线程池启动时会创建_minThreads个线程,并启动它们。第12行,线程从任务队列中取出一个任务,并执行。这里使用了while循环表示这里会阻塞等待任务执行完,当任务队列中没有任务时,才会退出while循环;

 

2、退出while循环后,这个线程就空闲了,在这里需要有个回收策略,在等待_maxIdleTimeMs时间后,如果当前线程数大于_minThreads时,就会回收这个线程。

那么线程数什么时候会大于_minThreads?来看看dispatch()方法中的核心代码

 

[java]  view plain  copy

 

 print ?

  1. // If we had no idle threads or the jobQ is greater than the idle threads  
  2.                if (idle==0 || jobQ>idle)  
  3.                {  
  4.                    int threads=_threadsStarted.get();  
  5.                    if (threads<_maxThreads)  
  6.                        startThread(threads);  
  7.                }  

如果没有空闲的线程或者空闲线程数太少,在保证线程数没有超过_maxThreads时会新建线程。

[java]  view plain  copy

 

 print ?

  1. private Runnable _runnable = new Runnable()  
  2.   {  
  3.       public void run()  
  4.       {  
  5.           boolean shrink=false;  
  6.           try  
  7.           {  
  8.               Runnable job=_jobs.poll();  
  9.               whi le (isRunning())  
  10.               {  
  11.                   // Job loop  
  12.                   while (job!=null && isRunning())  
  13.                   {  
  14.                       runJob(job);  
  15.                       job=_jobs.poll();  
  16.                   }  
  17.   
  18.                   // Idle loop  
  19.                   try  
  20.                   {  
  21.                       _threadsIdle.incrementAndGet();  
  22.   
  23.                       while (isRunning() && job==null)  
  24.                       {  
  25.                           if (_maxIdleTimeMs<=0)  
  26.                               job=_jobs.take();  
  27.                           else  
  28.                           {  
  29.                               // maybe we should shrink?  
  30.                               final int size=_threadsStarted.get();  
  31.                               if (size>_minThreads)  
  32.                               {  
  33.                                   long last=_lastShrink.get();  
  34.                                   long now=System.currentTimeMillis();  
  35.                                   if (last==0 || (now-last)>_maxIdleTimeMs)  
  36.                                   {  
  37.                                       shrink=_lastShrink.compareAndSet(last,now) &&  
  38.                                       _threadsStarted.compareAndSet(size,size-1);  
  39.                                       if (shrink)  
  40.                                           return;  
  41.                                   }  
  42.                               }  
  43.                               job=idleJobPoll();  
  44.                           }  
  45.                       }  
  46.                   }  
  47.                   finally  
  48.                   {  
  49.                       _threadsIdle.decrementAndGet();  
  50.                   }  
  51.               }  
  52.           }  
  53.           catch(InterruptedException e)  
  54.           {  
  55.                 …  
  56.           }  
  57.       }  
  58.   };  

[java]  view plain  copy

 

 print ?

[java]  view plain  copy

 

 print ?

 

[java]  view plain  copy

 

 print ?

  1. // If we had no idle threads or the jobQ is greater than the idle threads  
  2.                if (idle==0 || jobQ>idle)  
  3.                {  
  4.                    int threads=_threadsStarted.get();  
  5.                    if (threads<_maxThreads)  
  6.                        startThread(threads);  
  7.                }  

[java]  view plain  copy

 

 print ?

[java]  view plain  copy

 

 print ?

 

Leave a Comment

Your email address will not be published.