@@ -96,6 +96,21 @@ public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(
96
96
return new PrioritizedEsThreadPoolExecutor (name , 1 , 1 , 0L , TimeUnit .MILLISECONDS , threadFactory , contextHolder , timer );
97
97
}
98
98
99
+ /**
100
+ * Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue.
101
+ * <p>
102
+ * The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size
103
+ * (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available.
104
+ * <p>
105
+ * Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects
106
+ * a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never
107
+ * scale beyond the core pool size.
108
+ * <p>
109
+ * Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless
110
+ * it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add
111
+ * a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy}
112
+ * rejection handler.
113
+ */
99
114
public static EsThreadPoolExecutor newScaling (
100
115
String name ,
101
116
int min ,
@@ -107,10 +122,12 @@ public static EsThreadPoolExecutor newScaling(
107
122
ThreadContext contextHolder ,
108
123
TaskTrackingConfig config
109
124
) {
110
- ExecutorScalingQueue <Runnable > queue = new ExecutorScalingQueue <>();
111
- EsThreadPoolExecutor executor ;
125
+ LinkedTransferQueue <Runnable > queue = newUnboundedScalingLTQueue (min , max );
126
+ // Force queued work via ForceQueuePolicy might starve if no worker is available (if core size is empty),
127
+ // probing the worker pool prevents this.
128
+ boolean probeWorkerPool = min == 0 && queue instanceof ExecutorScalingQueue ;
112
129
if (config .trackExecutionTime ()) {
113
- executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor (
130
+ return new TaskExecutionTimeTrackingEsThreadPoolExecutor (
114
131
name ,
115
132
min ,
116
133
max ,
@@ -119,27 +136,40 @@ public static EsThreadPoolExecutor newScaling(
119
136
queue ,
120
137
TimedRunnable ::new ,
121
138
threadFactory ,
122
- new ForceQueuePolicy (rejectAfterShutdown ),
139
+ new ForceQueuePolicy (rejectAfterShutdown , probeWorkerPool ),
123
140
contextHolder ,
124
141
config
125
142
);
126
143
} else {
127
- executor = new EsThreadPoolExecutor (
144
+ return new EsThreadPoolExecutor (
128
145
name ,
129
146
min ,
130
147
max ,
131
148
keepAliveTime ,
132
149
unit ,
133
150
queue ,
134
151
threadFactory ,
135
- new ForceQueuePolicy (rejectAfterShutdown ),
152
+ new ForceQueuePolicy (rejectAfterShutdown , probeWorkerPool ),
136
153
contextHolder
137
154
);
138
155
}
139
- queue .executor = executor ;
140
- return executor ;
141
156
}
142
157
158
+ /**
159
+ * Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue.
160
+ * <p>
161
+ * The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size
162
+ * (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available.
163
+ * <p>
164
+ * Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects
165
+ * a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never
166
+ * scale beyond the core pool size.
167
+ * <p>
168
+ * Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless
169
+ * it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add
170
+ * a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy}
171
+ * rejection handler.
172
+ */
143
173
public static EsThreadPoolExecutor newScaling (
144
174
String name ,
145
175
int min ,
@@ -366,32 +396,58 @@ public Thread newThread(Runnable r) {
366
396
*/
367
397
private EsExecutors () {}
368
398
369
- static class ExecutorScalingQueue <E > extends LinkedTransferQueue <E > {
399
+ private static <E > LinkedTransferQueue <E > newUnboundedScalingLTQueue (int corePoolSize , int maxPoolSize ) {
400
+ if (maxPoolSize == 1 || maxPoolSize == corePoolSize ) {
401
+ // scaling beyond core pool size (or 1) not required, use a regular unbounded LinkedTransferQueue
402
+ return new LinkedTransferQueue <>();
403
+ }
404
+ // scaling beyond core pool size with an unbounded queue requires ExecutorScalingQueue
405
+ // note, reconfiguration of core / max pool size not supported in EsThreadPoolExecutor
406
+ return new ExecutorScalingQueue <>();
407
+ }
370
408
371
- ThreadPoolExecutor executor ;
409
+ /**
410
+ * Customized {@link LinkedTransferQueue} to allow a {@link ThreadPoolExecutor} to scale beyond its core pool size despite having an
411
+ * unbounded queue.
412
+ * <p>
413
+ * Note, usage of unbounded work queues is a problem by itself. For once, it makes error-prone customizations necessary so that
414
+ * thread pools can scale up adequately. But worse, infinite queues prevent backpressure and impose a high risk of causing OOM errors.
415
+ * <a href="https://github.com/elastic/elasticsearch/issues/18613">Github #18613</a> captures various long outstanding, but important
416
+ * improvements to thread pools.
417
+ * <p>
418
+ * Once having reached its core pool size, a {@link ThreadPoolExecutor} will only add more workers if capacity remains and
419
+ * the task offer is rejected by the work queue. Typically that's never the case using a regular unbounded queue.
420
+ * <p>
421
+ * This customized implementation rejects every task offer unless it can be immediately transferred to an available idle worker.
422
+ * It relies on {@link ForceQueuePolicy} rejection handler to append the task to the work queue if no additional worker can be added
423
+ * and the task is rejected by the executor.
424
+ * <p>
425
+ * Note, {@link ForceQueuePolicy} cannot guarantee there will be available workers when appending tasks directly to the queue.
426
+ * For that reason {@link ExecutorScalingQueue} cannot be used with executors with empty core and max pool size of 1:
427
+ * the only available worker could time out just about at the same time as the task is appended, see
428
+ * <a href="https://github.com/elastic/elasticsearch/issues/124667">Github #124667</a> for more details.
429
+ * <p>
430
+ * Note, configuring executors using core = max size in combination with {@code allowCoreThreadTimeOut} could be an alternative to
431
+ * {@link ExecutorScalingQueue}. However, the scaling behavior would be very different: Using {@link ExecutorScalingQueue}
432
+ * we are able to reuse idle workers if available by means of {@link ExecutorScalingQueue#tryTransfer(Object)}.
433
+ * If setting core = max size, the executor will add a new worker for every task submitted until reaching the core/max pool size
434
+ * even if there's idle workers available.
435
+ */
436
+ static class ExecutorScalingQueue <E > extends LinkedTransferQueue <E > {
372
437
373
438
ExecutorScalingQueue () {}
374
439
375
440
@ Override
376
441
public boolean offer (E e ) {
377
- // first try to transfer to a waiting worker thread
378
- if (tryTransfer (e ) == false ) {
379
- // check if there might be spare capacity in the thread
380
- // pool executor
381
- int left = executor .getMaximumPoolSize () - executor .getCorePoolSize ();
382
- if (left > 0 ) {
383
- // reject queuing the task to force the thread pool
384
- // executor to add a worker if it can; combined
385
- // with ForceQueuePolicy, this causes the thread
386
- // pool to always scale up to max pool size and we
387
- // only queue when there is no spare capacity
388
- return false ;
389
- } else {
390
- return super .offer (e );
391
- }
392
- } else {
393
- return true ;
442
+ if (e == EsThreadPoolExecutor .WORKER_PROBE ) { // referential equality
443
+ // this probe ensures a worker is available after force queueing a task via ForceQueuePolicy
444
+ return super .offer (e );
394
445
}
446
+ // try to transfer to a waiting worker thread
447
+ // otherwise reject queuing the task to force the thread pool executor to add a worker if it can;
448
+ // combined with ForceQueuePolicy, this causes the thread pool to always scale up to max pool size
449
+ // so that we only queue when there is no spare capacity
450
+ return tryTransfer (e );
395
451
}
396
452
397
453
// Overridden to workaround a JDK bug introduced in JDK 21.0.2
@@ -433,15 +489,24 @@ static class ForceQueuePolicy extends EsRejectedExecutionHandler {
433
489
*/
434
490
private final boolean rejectAfterShutdown ;
435
491
492
+ /**
493
+ * Flag to indicate if the worker pool needs to be probed after force queuing a task to guarantee a worker is available.
494
+ */
495
+ private final boolean probeWorkerPool ;
496
+
436
497
/**
437
498
* @param rejectAfterShutdown indicates if {@link Runnable} should be rejected once the thread pool is shutting down
438
499
*/
439
- ForceQueuePolicy (boolean rejectAfterShutdown ) {
500
+ ForceQueuePolicy (boolean rejectAfterShutdown , boolean probeWorkerPool ) {
440
501
this .rejectAfterShutdown = rejectAfterShutdown ;
502
+ this .probeWorkerPool = probeWorkerPool ;
441
503
}
442
504
443
505
@ Override
444
506
public void rejectedExecution (Runnable task , ThreadPoolExecutor executor ) {
507
+ if (task == EsThreadPoolExecutor .WORKER_PROBE ) { // referential equality
508
+ return ;
509
+ }
445
510
if (rejectAfterShutdown ) {
446
511
if (executor .isShutdown ()) {
447
512
reject (executor , task );
@@ -458,12 +523,19 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
458
523
}
459
524
}
460
525
461
- private static void put (ThreadPoolExecutor executor , Runnable task ) {
526
+ private void put (ThreadPoolExecutor executor , Runnable task ) {
462
527
final BlockingQueue <Runnable > queue = executor .getQueue ();
463
- // force queue policy should only be used with a scaling queue
464
- assert queue instanceof ExecutorScalingQueue ;
528
+ // force queue policy should only be used with a scaling queue (ExecutorScalingQueue / LinkedTransferQueue)
529
+ assert queue instanceof LinkedTransferQueue ;
465
530
try {
466
531
queue .put (task );
532
+ if (probeWorkerPool && task == queue .peek ()) { // referential equality
533
+ // If the task is at the head of the queue, we can assume the queue was previously empty. In this case available workers
534
+ // might have timed out in the meanwhile. To prevent the task from starving, we submit a noop probe to the executor.
535
+ // Note, this deliberately doesn't check getPoolSize()==0 to avoid potential race conditions,
536
+ // as the count in the atomic state (used by workerCountOf) is decremented first.
537
+ executor .execute (EsThreadPoolExecutor .WORKER_PROBE );
538
+ }
467
539
} catch (final InterruptedException e ) {
468
540
assert false : "a scaling queue never blocks so a put to it can never be interrupted" ;
469
541
throw new AssertionError (e );
0 commit comments