Skip to content

Commit be15267

Browse files
committed
Revert "Prevent work starvation bug if using scaling EsThreadPoolExecutor with core pool size = 0 (elastic#124732)"
This reverts commit 36874e8.
1 parent 59a55c8 commit be15267

File tree

9 files changed

+45
-305
lines changed

9 files changed

+45
-305
lines changed

docs/changelog/124732.yaml

Lines changed: 0 additions & 6 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 31 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -96,21 +96,6 @@ public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(
9696
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer);
9797
}
9898

99-
/**
100-
* Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue.
101-
* <p>
102-
* The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size
103-
* (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available.
104-
* <p>
105-
* Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects
106-
* a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never
107-
* scale beyond the core pool size.
108-
* <p>
109-
* Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless
110-
* it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add
111-
* a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy}
112-
* rejection handler.
113-
*/
11499
public static EsThreadPoolExecutor newScaling(
115100
String name,
116101
int min,
@@ -122,12 +107,10 @@ public static EsThreadPoolExecutor newScaling(
122107
ThreadContext contextHolder,
123108
TaskTrackingConfig config
124109
) {
125-
LinkedTransferQueue<Runnable> queue = newUnboundedScalingLTQueue(min, max);
126-
// Force queued work via ForceQueuePolicy might starve if no worker is available (if core size is empty),
127-
// probing the worker pool prevents this.
128-
boolean probeWorkerPool = min == 0 && queue instanceof ExecutorScalingQueue;
110+
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
111+
EsThreadPoolExecutor executor;
129112
if (config.trackExecutionTime()) {
130-
return new TaskExecutionTimeTrackingEsThreadPoolExecutor(
113+
executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
131114
name,
132115
min,
133116
max,
@@ -136,40 +119,27 @@ public static EsThreadPoolExecutor newScaling(
136119
queue,
137120
TimedRunnable::new,
138121
threadFactory,
139-
new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool),
122+
new ForceQueuePolicy(rejectAfterShutdown),
140123
contextHolder,
141124
config
142125
);
143126
} else {
144-
return new EsThreadPoolExecutor(
127+
executor = new EsThreadPoolExecutor(
145128
name,
146129
min,
147130
max,
148131
keepAliveTime,
149132
unit,
150133
queue,
151134
threadFactory,
152-
new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool),
135+
new ForceQueuePolicy(rejectAfterShutdown),
153136
contextHolder
154137
);
155138
}
139+
queue.executor = executor;
140+
return executor;
156141
}
157142

158-
/**
159-
* Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue.
160-
* <p>
161-
* The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size
162-
* (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available.
163-
* <p>
164-
* Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects
165-
* a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never
166-
* scale beyond the core pool size.
167-
* <p>
168-
* Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless
169-
* it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add
170-
* a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy}
171-
* rejection handler.
172-
*/
173143
public static EsThreadPoolExecutor newScaling(
174144
String name,
175145
int min,
@@ -419,58 +389,32 @@ public boolean isSystem() {
419389
*/
420390
private EsExecutors() {}
421391

422-
private static <E> LinkedTransferQueue<E> newUnboundedScalingLTQueue(int corePoolSize, int maxPoolSize) {
423-
if (maxPoolSize == 1 || maxPoolSize == corePoolSize) {
424-
// scaling beyond core pool size (or 1) not required, use a regular unbounded LinkedTransferQueue
425-
return new LinkedTransferQueue<>();
426-
}
427-
// scaling beyond core pool size with an unbounded queue requires ExecutorScalingQueue
428-
// note, reconfiguration of core / max pool size not supported in EsThreadPoolExecutor
429-
return new ExecutorScalingQueue<>();
430-
}
431-
432-
/**
433-
* Customized {@link LinkedTransferQueue} to allow a {@link ThreadPoolExecutor} to scale beyond its core pool size despite having an
434-
* unbounded queue.
435-
* <p>
436-
* Note, usage of unbounded work queues is a problem by itself. For once, it makes error-prone customizations necessary so that
437-
* thread pools can scale up adequately. But worse, infinite queues prevent backpressure and impose a high risk of causing OOM errors.
438-
* <a href="https://github.com/elastic/elasticsearch/issues/18613">Github #18613</a> captures various long outstanding, but important
439-
* improvements to thread pools.
440-
* <p>
441-
* Once having reached its core pool size, a {@link ThreadPoolExecutor} will only add more workers if capacity remains and
442-
* the task offer is rejected by the work queue. Typically that's never the case using a regular unbounded queue.
443-
* <p>
444-
* This customized implementation rejects every task offer unless it can be immediately transferred to an available idle worker.
445-
* It relies on {@link ForceQueuePolicy} rejection handler to append the task to the work queue if no additional worker can be added
446-
* and the task is rejected by the executor.
447-
* <p>
448-
* Note, {@link ForceQueuePolicy} cannot guarantee there will be available workers when appending tasks directly to the queue.
449-
* For that reason {@link ExecutorScalingQueue} cannot be used with executors with empty core and max pool size of 1:
450-
* the only available worker could time out just about at the same time as the task is appended, see
451-
* <a href="https://github.com/elastic/elasticsearch/issues/124667">Github #124667</a> for more details.
452-
* <p>
453-
* Note, configuring executors using core = max size in combination with {@code allowCoreThreadTimeOut} could be an alternative to
454-
* {@link ExecutorScalingQueue}. However, the scaling behavior would be very different: Using {@link ExecutorScalingQueue}
455-
* we are able to reuse idle workers if available by means of {@link ExecutorScalingQueue#tryTransfer(Object)}.
456-
* If setting core = max size, the executor will add a new worker for every task submitted until reaching the core/max pool size
457-
* even if there's idle workers available.
458-
*/
459392
static class ExecutorScalingQueue<E> extends LinkedTransferQueue<E> {
460393

394+
ThreadPoolExecutor executor;
395+
461396
ExecutorScalingQueue() {}
462397

463398
@Override
464399
public boolean offer(E e) {
465-
if (e == EsThreadPoolExecutor.WORKER_PROBE) { // referential equality
466-
// this probe ensures a worker is available after force queueing a task via ForceQueuePolicy
467-
return super.offer(e);
400+
// first try to transfer to a waiting worker thread
401+
if (tryTransfer(e) == false) {
402+
// check if there might be spare capacity in the thread
403+
// pool executor
404+
int left = executor.getMaximumPoolSize() - executor.getCorePoolSize();
405+
if (left > 0) {
406+
// reject queuing the task to force the thread pool
407+
// executor to add a worker if it can; combined
408+
// with ForceQueuePolicy, this causes the thread
409+
// pool to always scale up to max pool size and we
410+
// only queue when there is no spare capacity
411+
return false;
412+
} else {
413+
return super.offer(e);
414+
}
415+
} else {
416+
return true;
468417
}
469-
// try to transfer to a waiting worker thread
470-
// otherwise reject queuing the task to force the thread pool executor to add a worker if it can;
471-
// combined with ForceQueuePolicy, this causes the thread pool to always scale up to max pool size
472-
// so that we only queue when there is no spare capacity
473-
return tryTransfer(e);
474418
}
475419

476420
// Overridden to workaround a JDK bug introduced in JDK 21.0.2
@@ -512,24 +456,15 @@ static class ForceQueuePolicy extends EsRejectedExecutionHandler {
512456
*/
513457
private final boolean rejectAfterShutdown;
514458

515-
/**
516-
* Flag to indicate if the worker pool needs to be probed after force queuing a task to guarantee a worker is available.
517-
*/
518-
private final boolean probeWorkerPool;
519-
520459
/**
521460
* @param rejectAfterShutdown indicates if {@link Runnable} should be rejected once the thread pool is shutting down
522461
*/
523-
ForceQueuePolicy(boolean rejectAfterShutdown, boolean probeWorkerPool) {
462+
ForceQueuePolicy(boolean rejectAfterShutdown) {
524463
this.rejectAfterShutdown = rejectAfterShutdown;
525-
this.probeWorkerPool = probeWorkerPool;
526464
}
527465

528466
@Override
529467
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
530-
if (task == EsThreadPoolExecutor.WORKER_PROBE) { // referential equality
531-
return;
532-
}
533468
if (rejectAfterShutdown) {
534469
if (executor.isShutdown()) {
535470
reject(executor, task);
@@ -546,19 +481,12 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
546481
}
547482
}
548483

549-
private void put(ThreadPoolExecutor executor, Runnable task) {
484+
private static void put(ThreadPoolExecutor executor, Runnable task) {
550485
final BlockingQueue<Runnable> queue = executor.getQueue();
551-
// force queue policy should only be used with a scaling queue (ExecutorScalingQueue / LinkedTransferQueue)
552-
assert queue instanceof LinkedTransferQueue;
486+
// force queue policy should only be used with a scaling queue
487+
assert queue instanceof ExecutorScalingQueue;
553488
try {
554489
queue.put(task);
555-
if (probeWorkerPool && task == queue.peek()) { // referential equality
556-
// If the task is at the head of the queue, we can assume the queue was previously empty. In this case available workers
557-
// might have timed out in the meanwhile. To prevent the task from starving, we submit a noop probe to the executor.
558-
// Note, this deliberately doesn't check getPoolSize()==0 to avoid potential race conditions,
559-
// as the count in the atomic state (used by workerCountOf) is decremented first.
560-
executor.execute(EsThreadPoolExecutor.WORKER_PROBE);
561-
}
562490
} catch (final InterruptedException e) {
563491
assert false : "a scaling queue never blocks so a put to it can never be interrupted";
564492
throw new AssertionError(e);

server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,6 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
2929

3030
private static final Logger logger = LogManager.getLogger(EsThreadPoolExecutor.class);
3131

32-
// noop probe to prevent starvation of work in the work queue due to ForceQueuePolicy
33-
// https://github.com/elastic/elasticsearch/issues/124667
34-
// note, this is intentionally not a lambda to avoid this ever be turned into a compile time constant
35-
// matching similar lambdas coming from other places
36-
static final Runnable WORKER_PROBE = new Runnable() {
37-
@Override
38-
public void run() {}
39-
};
40-
4132
private final ThreadContext contextHolder;
4233

4334
/**
@@ -75,19 +66,9 @@ public void run() {}
7566
this.contextHolder = contextHolder;
7667
}
7768

78-
@Override
79-
public void setCorePoolSize(int corePoolSize) {
80-
throw new UnsupportedOperationException("reconfiguration at runtime is not supported");
81-
}
82-
83-
@Override
84-
public void setMaximumPoolSize(int maximumPoolSize) {
85-
throw new UnsupportedOperationException("reconfiguration at runtime is not supported");
86-
}
87-
8869
@Override
8970
public void execute(Runnable command) {
90-
final Runnable wrappedRunnable = command != WORKER_PROBE ? wrapRunnable(command) : WORKER_PROBE;
71+
final Runnable wrappedRunnable = wrapRunnable(command);
9172
try {
9273
super.execute(wrappedRunnable);
9374
} catch (Exception e) {

server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,9 @@ public ScalingExecutorBuilder(
105105
final EsExecutors.TaskTrackingConfig trackingConfig
106106
) {
107107
super(name, false);
108-
this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, 0, Setting.Property.NodeScope);
109-
this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, 1, Setting.Property.NodeScope);
110-
this.keepAliveSetting = Setting.timeSetting(
111-
settingsKey(prefix, "keep_alive"),
112-
keepAlive,
113-
TimeValue.ZERO,
114-
Setting.Property.NodeScope
115-
);
108+
this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, Setting.Property.NodeScope);
109+
this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, Setting.Property.NodeScope);
110+
this.keepAliveSetting = Setting.timeSetting(settingsKey(prefix, "keep_alive"), keepAlive, Setting.Property.NodeScope);
116111
this.rejectAfterShutdown = rejectAfterShutdown;
117112
this.trackingConfig = trackingConfig;
118113
}
@@ -177,4 +172,5 @@ static class ScalingExecutorSettings extends ExecutorBuilder.ExecutorSettings {
177172
this.keepAlive = keepAlive;
178173
}
179174
}
175+
180176
}

server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public void testAsyncAcquire() throws InterruptedException {
100100
final var completionLatch = new CountDownLatch(1);
101101
final var executorService = EsExecutors.newScaling(
102102
"test",
103-
1,
103+
0,
104104
between(1, 10),
105105
10,
106106
TimeUnit.SECONDS,

0 commit comments

Comments
 (0)