@@ -96,6 +96,21 @@ public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(
9696 return new PrioritizedEsThreadPoolExecutor (name , 1 , 1 , 0L , TimeUnit .MILLISECONDS , threadFactory , contextHolder , timer );
9797 }
9898
99+ /**
100+ * Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue.
101+ * <p>
102+ * The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size
103+ * (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available.
104+ * <p>
105+ * Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects
106+ * a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never
107+ * scale beyond the core pool size.
108+ * <p>
109+ * Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless
110+ * it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add
111+ * a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy}
112+ * rejection handler.
113+ */
99114 public static EsThreadPoolExecutor newScaling (
100115 String name ,
101116 int min ,
@@ -107,10 +122,12 @@ public static EsThreadPoolExecutor newScaling(
107122 ThreadContext contextHolder ,
108123 TaskTrackingConfig config
109124 ) {
110- ExecutorScalingQueue <Runnable > queue = new ExecutorScalingQueue <>();
111- EsThreadPoolExecutor executor ;
125+ LinkedTransferQueue <Runnable > queue = newUnboundedScalingLTQueue (min , max );
126+ // Force queued work via ForceQueuePolicy might starve if no worker is available (if core size is empty),
127+ // probing the worker pool prevents this.
128+ boolean probeWorkerPool = min == 0 && queue instanceof ExecutorScalingQueue ;
112129 if (config .trackExecutionTime ()) {
113- executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor (
130+ return new TaskExecutionTimeTrackingEsThreadPoolExecutor (
114131 name ,
115132 min ,
116133 max ,
@@ -119,27 +136,40 @@ public static EsThreadPoolExecutor newScaling(
119136 queue ,
120137 TimedRunnable ::new ,
121138 threadFactory ,
122- new ForceQueuePolicy (rejectAfterShutdown ),
139+ new ForceQueuePolicy (rejectAfterShutdown , probeWorkerPool ),
123140 contextHolder ,
124141 config
125142 );
126143 } else {
127- executor = new EsThreadPoolExecutor (
144+ return new EsThreadPoolExecutor (
128145 name ,
129146 min ,
130147 max ,
131148 keepAliveTime ,
132149 unit ,
133150 queue ,
134151 threadFactory ,
135- new ForceQueuePolicy (rejectAfterShutdown ),
152+ new ForceQueuePolicy (rejectAfterShutdown , probeWorkerPool ),
136153 contextHolder
137154 );
138155 }
139- queue .executor = executor ;
140- return executor ;
141156 }
142157
158+ /**
159+ * Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue.
160+ * <p>
161+ * The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size
162+ * (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available.
163+ * <p>
164+ * Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects
165+ * a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never
166+ * scale beyond the core pool size.
167+ * <p>
168+ * Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless
169+ * it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add
170+ * a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy}
171+ * rejection handler.
172+ */
143173 public static EsThreadPoolExecutor newScaling (
144174 String name ,
145175 int min ,
@@ -366,32 +396,58 @@ public Thread newThread(Runnable r) {
366396 */
367397 private EsExecutors () {}
368398
369- static class ExecutorScalingQueue <E > extends LinkedTransferQueue <E > {
399+ private static <E > LinkedTransferQueue <E > newUnboundedScalingLTQueue (int corePoolSize , int maxPoolSize ) {
400+ if (maxPoolSize == 1 || maxPoolSize == corePoolSize ) {
401+ // scaling beyond core pool size (or 1) not required, use a regular unbounded LinkedTransferQueue
402+ return new LinkedTransferQueue <>();
403+ }
404+ // scaling beyond core pool size with an unbounded queue requires ExecutorScalingQueue
405+ // note, reconfiguration of core / max pool size not supported in EsThreadPoolExecutor
406+ return new ExecutorScalingQueue <>();
407+ }
370408
371- ThreadPoolExecutor executor ;
409+ /**
410+ * Customized {@link LinkedTransferQueue} to allow a {@link ThreadPoolExecutor} to scale beyond its core pool size despite having an
411+ * unbounded queue.
412+ * <p>
413+ * Note, usage of unbounded work queues is a problem by itself. For once, it makes error-prone customizations necessary so that
414+ * thread pools can scale up adequately. But worse, infinite queues prevent backpressure and impose a high risk of causing OOM errors.
415+ * <a href="https://github.com/elastic/elasticsearch/issues/18613">Github #18613</a> captures various long outstanding, but important
416+ * improvements to thread pools.
417+ * <p>
418+ * Once having reached its core pool size, a {@link ThreadPoolExecutor} will only add more workers if capacity remains and
419+ * the task offer is rejected by the work queue. Typically that's never the case using a regular unbounded queue.
420+ * <p>
421+ * This customized implementation rejects every task offer unless it can be immediately transferred to an available idle worker.
422+ * It relies on {@link ForceQueuePolicy} rejection handler to append the task to the work queue if no additional worker can be added
423+ * and the task is rejected by the executor.
424+ * <p>
425+ * Note, {@link ForceQueuePolicy} cannot guarantee there will be available workers when appending tasks directly to the queue.
426+ * For that reason {@link ExecutorScalingQueue} cannot be used with executors with empty core and max pool size of 1:
427+ * the only available worker could time out just about at the same time as the task is appended, see
428+ * <a href="https://github.com/elastic/elasticsearch/issues/124667">Github #124667</a> for more details.
429+ * <p>
430+ * Note, configuring executors using core = max size in combination with {@code allowCoreThreadTimeOut} could be an alternative to
431+ * {@link ExecutorScalingQueue}. However, the scaling behavior would be very different: Using {@link ExecutorScalingQueue}
432+ * we are able to reuse idle workers if available by means of {@link ExecutorScalingQueue#tryTransfer(Object)}.
433+ * If setting core = max size, the executor will add a new worker for every task submitted until reaching the core/max pool size
434+ * even if there's idle workers available.
435+ */
436+ static class ExecutorScalingQueue <E > extends LinkedTransferQueue <E > {
372437
373438 ExecutorScalingQueue () {}
374439
375440 @ Override
376441 public boolean offer (E e ) {
377- // first try to transfer to a waiting worker thread
378- if (tryTransfer (e ) == false ) {
379- // check if there might be spare capacity in the thread
380- // pool executor
381- int left = executor .getMaximumPoolSize () - executor .getCorePoolSize ();
382- if (left > 0 ) {
383- // reject queuing the task to force the thread pool
384- // executor to add a worker if it can; combined
385- // with ForceQueuePolicy, this causes the thread
386- // pool to always scale up to max pool size and we
387- // only queue when there is no spare capacity
388- return false ;
389- } else {
390- return super .offer (e );
391- }
392- } else {
393- return true ;
442+ if (e == EsThreadPoolExecutor .WORKER_PROBE ) { // referential equality
443+ // this probe ensures a worker is available after force queueing a task via ForceQueuePolicy
444+ return super .offer (e );
394445 }
446+ // try to transfer to a waiting worker thread
447+ // otherwise reject queuing the task to force the thread pool executor to add a worker if it can;
448+ // combined with ForceQueuePolicy, this causes the thread pool to always scale up to max pool size
449+ // so that we only queue when there is no spare capacity
450+ return tryTransfer (e );
395451 }
396452
397453 // Overridden to workaround a JDK bug introduced in JDK 21.0.2
@@ -433,15 +489,24 @@ static class ForceQueuePolicy extends EsRejectedExecutionHandler {
433489 */
434490 private final boolean rejectAfterShutdown ;
435491
492+ /**
493+ * Flag to indicate if the worker pool needs to be probed after force queuing a task to guarantee a worker is available.
494+ */
495+ private final boolean probeWorkerPool ;
496+
436497 /**
437498 * @param rejectAfterShutdown indicates if {@link Runnable} should be rejected once the thread pool is shutting down
438499 */
439- ForceQueuePolicy (boolean rejectAfterShutdown ) {
500+ ForceQueuePolicy (boolean rejectAfterShutdown , boolean probeWorkerPool ) {
440501 this .rejectAfterShutdown = rejectAfterShutdown ;
502+ this .probeWorkerPool = probeWorkerPool ;
441503 }
442504
443505 @ Override
444506 public void rejectedExecution (Runnable task , ThreadPoolExecutor executor ) {
507+ if (task == EsThreadPoolExecutor .WORKER_PROBE ) { // referential equality
508+ return ;
509+ }
445510 if (rejectAfterShutdown ) {
446511 if (executor .isShutdown ()) {
447512 reject (executor , task );
@@ -458,12 +523,19 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
458523 }
459524 }
460525
461- private static void put (ThreadPoolExecutor executor , Runnable task ) {
526+ private void put (ThreadPoolExecutor executor , Runnable task ) {
462527 final BlockingQueue <Runnable > queue = executor .getQueue ();
463- // force queue policy should only be used with a scaling queue
464- assert queue instanceof ExecutorScalingQueue ;
528+ // force queue policy should only be used with a scaling queue (ExecutorScalingQueue / LinkedTransferQueue)
529+ assert queue instanceof LinkedTransferQueue ;
465530 try {
466531 queue .put (task );
532+ if (probeWorkerPool && task == queue .peek ()) { // referential equality
533+ // If the task is at the head of the queue, we can assume the queue was previously empty. In this case available workers
534+ // might have timed out in the meanwhile. To prevent the task from starving, we submit a noop probe to the executor.
535+ // Note, this deliberately doesn't check getPoolSize()==0 to avoid potential race conditions,
536+ // as the count in the atomic state (used by workerCountOf) is decremented first.
537+ executor .execute (EsThreadPoolExecutor .WORKER_PROBE );
538+ }
467539 } catch (final InterruptedException e ) {
468540 assert false : "a scaling queue never blocks so a put to it can never be interrupted" ;
469541 throw new AssertionError (e );
0 commit comments