@@ -96,6 +96,21 @@ public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(
9696 return new PrioritizedEsThreadPoolExecutor (name , 1 , 1 , 0L , TimeUnit .MILLISECONDS , threadFactory , contextHolder , timer );
9797 }
9898
99+ /**
100+ * Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue.
101+ * <p>
102+ * The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size
103+ * (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available.
104+ * <p>
105+ * Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects
106+ * a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never
107+ * scale beyond the core pool size.
108+ * <p>
109+ * Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless
110+ * it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add
111+ * a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy}
112+ * rejection handler.
113+ */
99114 public static EsThreadPoolExecutor newScaling (
100115 String name ,
101116 int min ,
@@ -107,10 +122,12 @@ public static EsThreadPoolExecutor newScaling(
107122 ThreadContext contextHolder ,
108123 TaskTrackingConfig config
109124 ) {
110- ExecutorScalingQueue <Runnable > queue = new ExecutorScalingQueue <>();
111- EsThreadPoolExecutor executor ;
125+ LinkedTransferQueue <Runnable > queue = newUnboundedScalingLTQueue (min , max );
126+ // Force queued work via ForceQueuePolicy might starve if no worker is available (if core size is empty),
127+ // probing the worker pool prevents this.
128+ boolean probeWorkerPool = min == 0 && queue instanceof ExecutorScalingQueue ;
112129 if (config .trackExecutionTime ()) {
113- executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor (
130+ return new TaskExecutionTimeTrackingEsThreadPoolExecutor (
114131 name ,
115132 min ,
116133 max ,
@@ -119,27 +136,40 @@ public static EsThreadPoolExecutor newScaling(
119136 queue ,
120137 TimedRunnable ::new ,
121138 threadFactory ,
122- new ForceQueuePolicy (rejectAfterShutdown ),
139+ new ForceQueuePolicy (rejectAfterShutdown , probeWorkerPool ),
123140 contextHolder ,
124141 config
125142 );
126143 } else {
127- executor = new EsThreadPoolExecutor (
144+ return new EsThreadPoolExecutor (
128145 name ,
129146 min ,
130147 max ,
131148 keepAliveTime ,
132149 unit ,
133150 queue ,
134151 threadFactory ,
135- new ForceQueuePolicy (rejectAfterShutdown ),
152+ new ForceQueuePolicy (rejectAfterShutdown , probeWorkerPool ),
136153 contextHolder
137154 );
138155 }
139- queue .executor = executor ;
140- return executor ;
141156 }
142157
158+ /**
159+ * Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue.
160+ * <p>
161+ * The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size
162+ * (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available.
163+ * <p>
164+ * Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects
165+ * a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never
166+ * scale beyond the core pool size.
167+ * <p>
168+ * Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless
169+ * it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add
170+ * a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy}
171+ * rejection handler.
172+ */
143173 public static EsThreadPoolExecutor newScaling (
144174 String name ,
145175 int min ,
@@ -389,32 +419,58 @@ public boolean isSystem() {
389419 */
390420 private EsExecutors () {}
391421
392- static class ExecutorScalingQueue <E > extends LinkedTransferQueue <E > {
422+ private static <E > LinkedTransferQueue <E > newUnboundedScalingLTQueue (int corePoolSize , int maxPoolSize ) {
423+ if (maxPoolSize == 1 || maxPoolSize == corePoolSize ) {
424+ // scaling beyond core pool size (or 1) not required, use a regular unbounded LinkedTransferQueue
425+ return new LinkedTransferQueue <>();
426+ }
427+ // scaling beyond core pool size with an unbounded queue requires ExecutorScalingQueue
428+ // note, reconfiguration of core / max pool size not supported in EsThreadPoolExecutor
429+ return new ExecutorScalingQueue <>();
430+ }
393431
394- ThreadPoolExecutor executor ;
432+ /**
433+ * Customized {@link LinkedTransferQueue} to allow a {@link ThreadPoolExecutor} to scale beyond its core pool size despite having an
434+ * unbounded queue.
435+ * <p>
436+ * Note, usage of unbounded work queues is a problem by itself. For once, it makes error-prone customizations necessary so that
437+ * thread pools can scale up adequately. But worse, infinite queues prevent backpressure and impose a high risk of causing OOM errors.
438+ * <a href="https://github.com/elastic/elasticsearch/issues/18613">Github #18613</a> captures various long outstanding, but important
439+ * improvements to thread pools.
440+ * <p>
441+ * Once having reached its core pool size, a {@link ThreadPoolExecutor} will only add more workers if capacity remains and
442+ * the task offer is rejected by the work queue. Typically that's never the case using a regular unbounded queue.
443+ * <p>
444+ * This customized implementation rejects every task offer unless it can be immediately transferred to an available idle worker.
445+ * It relies on {@link ForceQueuePolicy} rejection handler to append the task to the work queue if no additional worker can be added
446+ * and the task is rejected by the executor.
447+ * <p>
448+ * Note, {@link ForceQueuePolicy} cannot guarantee there will be available workers when appending tasks directly to the queue.
449+ * For that reason {@link ExecutorScalingQueue} cannot be used with executors with empty core and max pool size of 1:
450+ * the only available worker could time out just about at the same time as the task is appended, see
451+ * <a href="https://github.com/elastic/elasticsearch/issues/124667">Github #124667</a> for more details.
452+ * <p>
453+ * Note, configuring executors using core = max size in combination with {@code allowCoreThreadTimeOut} could be an alternative to
454+ * {@link ExecutorScalingQueue}. However, the scaling behavior would be very different: Using {@link ExecutorScalingQueue}
455+ * we are able to reuse idle workers if available by means of {@link ExecutorScalingQueue#tryTransfer(Object)}.
456+ * If setting core = max size, the executor will add a new worker for every task submitted until reaching the core/max pool size
457+ * even if there's idle workers available.
458+ */
459+ static class ExecutorScalingQueue <E > extends LinkedTransferQueue <E > {
395460
396461 ExecutorScalingQueue () {}
397462
398463 @ Override
399464 public boolean offer (E e ) {
400- // first try to transfer to a waiting worker thread
401- if (tryTransfer (e ) == false ) {
402- // check if there might be spare capacity in the thread
403- // pool executor
404- int left = executor .getMaximumPoolSize () - executor .getCorePoolSize ();
405- if (left > 0 ) {
406- // reject queuing the task to force the thread pool
407- // executor to add a worker if it can; combined
408- // with ForceQueuePolicy, this causes the thread
409- // pool to always scale up to max pool size and we
410- // only queue when there is no spare capacity
411- return false ;
412- } else {
413- return super .offer (e );
414- }
415- } else {
416- return true ;
465+ if (e == EsThreadPoolExecutor .WORKER_PROBE ) { // referential equality
466+ // this probe ensures a worker is available after force queueing a task via ForceQueuePolicy
467+ return super .offer (e );
417468 }
469+ // try to transfer to a waiting worker thread
470+ // otherwise reject queuing the task to force the thread pool executor to add a worker if it can;
471+ // combined with ForceQueuePolicy, this causes the thread pool to always scale up to max pool size
472+ // so that we only queue when there is no spare capacity
473+ return tryTransfer (e );
418474 }
419475
420476 // Overridden to workaround a JDK bug introduced in JDK 21.0.2
@@ -456,15 +512,24 @@ static class ForceQueuePolicy extends EsRejectedExecutionHandler {
456512 */
457513 private final boolean rejectAfterShutdown ;
458514
515+ /**
516+ * Flag to indicate if the worker pool needs to be probed after force queuing a task to guarantee a worker is available.
517+ */
518+ private final boolean probeWorkerPool ;
519+
459520 /**
460521 * @param rejectAfterShutdown indicates if {@link Runnable} should be rejected once the thread pool is shutting down
461522 */
462- ForceQueuePolicy (boolean rejectAfterShutdown ) {
523+ ForceQueuePolicy (boolean rejectAfterShutdown , boolean probeWorkerPool ) {
463524 this .rejectAfterShutdown = rejectAfterShutdown ;
525+ this .probeWorkerPool = probeWorkerPool ;
464526 }
465527
466528 @ Override
467529 public void rejectedExecution (Runnable task , ThreadPoolExecutor executor ) {
530+ if (task == EsThreadPoolExecutor .WORKER_PROBE ) { // referential equality
531+ return ;
532+ }
468533 if (rejectAfterShutdown ) {
469534 if (executor .isShutdown ()) {
470535 reject (executor , task );
@@ -481,12 +546,19 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
481546 }
482547 }
483548
484- private static void put (ThreadPoolExecutor executor , Runnable task ) {
549+ private void put (ThreadPoolExecutor executor , Runnable task ) {
485550 final BlockingQueue <Runnable > queue = executor .getQueue ();
486- // force queue policy should only be used with a scaling queue
487- assert queue instanceof ExecutorScalingQueue ;
551+ // force queue policy should only be used with a scaling queue (ExecutorScalingQueue / LinkedTransferQueue)
552+ assert queue instanceof LinkedTransferQueue ;
488553 try {
489554 queue .put (task );
555+ if (probeWorkerPool && task == queue .peek ()) { // referential equality
556+ // If the task is at the head of the queue, we can assume the queue was previously empty. In this case available workers
557+ // might have timed out in the meanwhile. To prevent the task from starving, we submit a noop probe to the executor.
558+ // Note, this deliberately doesn't check getPoolSize()==0 to avoid potential race conditions,
559+ // as the count in the atomic state (used by workerCountOf) is decremented first.
560+ executor .execute (EsThreadPoolExecutor .WORKER_PROBE );
561+ }
490562 } catch (final InterruptedException e ) {
491563 assert false : "a scaling queue never blocks so a put to it can never be interrupted" ;
492564 throw new AssertionError (e );
0 commit comments