Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c01a600
Reproduction by DCT
mosche Jan 23, 2025
ab05173
Use new EsExecutors.newSingleScalingToZero for masterService#updateTa…
mosche Mar 13, 2025
a59bdc7
increase min core size for various test executors from 0 to 1
mosche Mar 13, 2025
090e104
Improve validation in ScalingExecutorBuilder and use newSingleScaling…
mosche Mar 13, 2025
6ce4fb7
Don't allow EsExecutors.newScaling for min=0 / max=1 to not risk star…
mosche Mar 13, 2025
59ccab1
Update docs/changelog/124732.yaml
mosche Mar 13, 2025
5b9be6c
remove newSingleScalingToZero / usage of allowCoreThreadTimeOut
mosche Mar 13, 2025
77a44bb
Update docs/changelog/124732.yaml
mosche Mar 13, 2025
b7ff07d
revert obsolete changes
mosche Mar 13, 2025
7954b6b
revert obsolete changes
mosche Mar 13, 2025
8fe724c
fix rawtype
mosche Mar 13, 2025
3d9e590
reduce noisy test logger to trace
mosche Mar 13, 2025
7607e52
add Java docs
mosche Mar 13, 2025
dd33e6d
fix test timeouts
mosche Mar 13, 2025
30d4a50
Implement worker pool probing to prevent #124667 if max pool size > 1.
mosche Mar 13, 2025
3279628
fix thread pool configuration in testSlicingBehaviourForParallelColle…
mosche Mar 13, 2025
b77ea90
deterministic search pool size in SearchServiceSingleNodeTests
mosche Mar 14, 2025
0bf9c96
Merge branch 'main' into ktlo/esExecutorBug
mosche Mar 14, 2025
5bf3b12
changelog
mosche Mar 14, 2025
1f4fb2d
rename worker probe, add comment
mosche Mar 14, 2025
a831436
more java docs
mosche Mar 14, 2025
bd2eac8
more java docs
mosche Mar 14, 2025
d53d5c3
Merge branch 'main' into ktlo/esExecutorBug
mosche Mar 14, 2025
48364c6
add comment
mosche Mar 14, 2025
0f1f95d
PR comments
mosche Mar 14, 2025
97fefa6
PR comments
mosche Mar 14, 2025
19d1765
Merge branch 'main' into ktlo/esExecutorBug
mosche Mar 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/124732.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 124732
summary: Prevent rare starvation bug when using scaling `EsThreadPoolExecutor` with empty core pool size.
area: Infra/Core
type: bug
issues:
- 124667
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,21 @@ public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer);
}

/**
* Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue.
* <p>
* The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size
* (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available.
* <p>
* Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects
* a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never
* scale beyond the core pool size.
* <p>
* Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless
* it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add
* a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy}
* rejection handler.
*/
public static EsThreadPoolExecutor newScaling(
String name,
int min,
Expand All @@ -107,10 +122,12 @@ public static EsThreadPoolExecutor newScaling(
ThreadContext contextHolder,
TaskTrackingConfig config
) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
EsThreadPoolExecutor executor;
LinkedTransferQueue<Runnable> queue = newUnboundedScalingLTQueue(min, max);
// Force queued work via ForceQueuePolicy might starve if no worker is available (if core size is empty),
// probing the worker pool prevents this.
boolean probeWorkerPool = min == 0 && queue instanceof ExecutorScalingQueue;
if (config.trackExecutionTime()) {
executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
return new TaskExecutionTimeTrackingEsThreadPoolExecutor(
name,
min,
max,
Expand All @@ -119,27 +136,40 @@ public static EsThreadPoolExecutor newScaling(
queue,
TimedRunnable::new,
threadFactory,
new ForceQueuePolicy(rejectAfterShutdown),
new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool),
contextHolder,
config
);
} else {
executor = new EsThreadPoolExecutor(
return new EsThreadPoolExecutor(
name,
min,
max,
keepAliveTime,
unit,
queue,
threadFactory,
new ForceQueuePolicy(rejectAfterShutdown),
new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool),
contextHolder
);
}
queue.executor = executor;
return executor;
}

/**
* Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue.
* <p>
* The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size
* (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available.
* <p>
* Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects
* a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never
* scale beyond the core pool size.
* <p>
* Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless
* it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add
* a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy}
* rejection handler.
*/
public static EsThreadPoolExecutor newScaling(
String name,
int min,
Expand Down Expand Up @@ -389,32 +419,58 @@ public boolean isSystem() {
*/
private EsExecutors() {}

static class ExecutorScalingQueue<E> extends LinkedTransferQueue<E> {
private static <E> LinkedTransferQueue<E> newUnboundedScalingLTQueue(int corePoolSize, int maxPoolSize) {
if (maxPoolSize == 1 || maxPoolSize == corePoolSize) {
// scaling beyond core pool size (or 1) not required, use a regular unbounded LinkedTransferQueue
return new LinkedTransferQueue<>();
}
// scaling beyond core pool size with an unbounded queue requires ExecutorScalingQueue
// note, reconfiguration of core / max pool size not supported in EsThreadPoolExecutor
return new ExecutorScalingQueue<>();
}

ThreadPoolExecutor executor;
/**
* Customized {@link LinkedTransferQueue} to allow a {@link ThreadPoolExecutor} to scale beyond its core pool size despite having an
* unbounded queue.
* <p>
* Note, usage of unbounded work queues is a problem by itself. For once, it makes error-prone customizations necessary so that
* thread pools can scale up adequately. But worse, infinite queues prevent backpressure and impose a high risk of causing OOM errors.
* <a href="https://github.com/elastic/elasticsearch/issues/18613">Github #18613</a> captures various long outstanding, but important
* improvements to thread pools.
* <p>
* Once having reached its core pool size, a {@link ThreadPoolExecutor} will only add more workers if capacity remains and
* the task offer is rejected by the work queue. Typically that's never the case using a regular unbounded queue.
* <p>
* This customized implementation rejects every task offer unless it can be immediately transferred to an available idle worker.
* It relies on {@link ForceQueuePolicy} rejection handler to append the task to the work queue if no additional worker can be added
* and the task is rejected by the executor.
* <p>
* Note, {@link ForceQueuePolicy} cannot guarantee there will be available workers when appending tasks directly to the queue.
* For that reason {@link ExecutorScalingQueue} cannot be used with executors with empty core and max pool size of 1:
* the only available worker could time out just about at the same time as the task is appended, see
* <a href="https://github.com/elastic/elasticsearch/issues/124667">Github #124667</a> for more details.
* <p>
* Note, configuring executors using core = max size in combination with {@code allowCoreThreadTimeOut} could be an alternative to
* {@link ExecutorScalingQueue}. However, the scaling behavior would be very different: Using {@link ExecutorScalingQueue}
* we are able to reuse idle workers if available by means of {@link ExecutorScalingQueue#tryTransfer(Object)}.
* If setting core = max size, the executor will add a new worker for every task submitted until reaching the core/max pool size
* even if there's idle workers available.
*/
static class ExecutorScalingQueue<E> extends LinkedTransferQueue<E> {

ExecutorScalingQueue() {}

@Override
public boolean offer(E e) {
// first try to transfer to a waiting worker thread
if (tryTransfer(e) == false) {
// check if there might be spare capacity in the thread
// pool executor
int left = executor.getMaximumPoolSize() - executor.getCorePoolSize();
if (left > 0) {
// reject queuing the task to force the thread pool
// executor to add a worker if it can; combined
// with ForceQueuePolicy, this causes the thread
// pool to always scale up to max pool size and we
// only queue when there is no spare capacity
return false;
} else {
return super.offer(e);
}
} else {
return true;
if (e == EsThreadPoolExecutor.WORKER_PROBE) { // referential equality
// this probe ensures a worker is available after force queueing a task via ForceQueuePolicy
return super.offer(e);
}
// try to transfer to a waiting worker thread
// otherwise reject queuing the task to force the thread pool executor to add a worker if it can;
// combined with ForceQueuePolicy, this causes the thread pool to always scale up to max pool size
// so that we only queue when there is no spare capacity
return tryTransfer(e);
}

// Overridden to workaround a JDK bug introduced in JDK 21.0.2
Expand Down Expand Up @@ -456,15 +512,24 @@ static class ForceQueuePolicy extends EsRejectedExecutionHandler {
*/
private final boolean rejectAfterShutdown;

/**
* Flag to indicate if the worker pool needs to be probed after force queuing a task to guarantee a worker is available.
*/
private final boolean probeWorkerPool;

/**
* @param rejectAfterShutdown indicates if {@link Runnable} should be rejected once the thread pool is shutting down
*/
ForceQueuePolicy(boolean rejectAfterShutdown) {
ForceQueuePolicy(boolean rejectAfterShutdown, boolean probeWorkerPool) {
this.rejectAfterShutdown = rejectAfterShutdown;
this.probeWorkerPool = probeWorkerPool;
}

@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
if (task == EsThreadPoolExecutor.WORKER_PROBE) { // referential equality
return;
}
if (rejectAfterShutdown) {
if (executor.isShutdown()) {
reject(executor, task);
Expand All @@ -481,12 +546,17 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
}
}

private static void put(ThreadPoolExecutor executor, Runnable task) {
private void put(ThreadPoolExecutor executor, Runnable task) {
final BlockingQueue<Runnable> queue = executor.getQueue();
// force queue policy should only be used with a scaling queue
assert queue instanceof ExecutorScalingQueue;
// force queue policy should only be used with a scaling queue (ExecutorScalingQueue / LinkedTransferQueue)
assert queue instanceof LinkedTransferQueue;
try {
queue.put(task);
if (probeWorkerPool && task == queue.peek()) { // referential equality
// If the task is at the head of the queue, we can assume the queue was previously empty. In this case available workers
// might have timed out in the meanwhile. To prevent the task from starving, we submit a noop probe to the executor.
executor.execute(EsThreadPoolExecutor.WORKER_PROBE);
}
} catch (final InterruptedException e) {
assert false : "a scaling queue never blocks so a put to it can never be interrupted";
throw new AssertionError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {

private static final Logger logger = LogManager.getLogger(EsThreadPoolExecutor.class);

// noop probe to prevent starvation of work in the work queue due to ForceQueuePolicy
// https://github.com/elastic/elasticsearch/issues/124667
static final Runnable WORKER_PROBE = () -> {};

private final ThreadContext contextHolder;

/**
Expand Down Expand Up @@ -66,9 +70,19 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
this.contextHolder = contextHolder;
}

@Override
public void setCorePoolSize(int corePoolSize) {
throw new UnsupportedOperationException("reconfiguration at runtime is not supported");
}

@Override
public void setMaximumPoolSize(int maximumPoolSize) {
throw new UnsupportedOperationException("reconfiguration at runtime is not supported");
}

@Override
public void execute(Runnable command) {
final Runnable wrappedRunnable = wrapRunnable(command);
final Runnable wrappedRunnable = command != WORKER_PROBE ? wrapRunnable(command) : WORKER_PROBE;
try {
super.execute(wrappedRunnable);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,14 @@ public ScalingExecutorBuilder(
final EsExecutors.TaskTrackingConfig trackingConfig
) {
super(name, false);
this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, Setting.Property.NodeScope);
this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, Setting.Property.NodeScope);
this.keepAliveSetting = Setting.timeSetting(settingsKey(prefix, "keep_alive"), keepAlive, Setting.Property.NodeScope);
this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, 0, Setting.Property.NodeScope);
this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, 1, Setting.Property.NodeScope);
this.keepAliveSetting = Setting.timeSetting(
settingsKey(prefix, "keep_alive"),
keepAlive,
TimeValue.ZERO,
Setting.Property.NodeScope
);
this.rejectAfterShutdown = rejectAfterShutdown;
this.trackingConfig = trackingConfig;
}
Expand Down Expand Up @@ -172,5 +177,4 @@ static class ScalingExecutorSettings extends ExecutorBuilder.ExecutorSettings {
this.keepAlive = keepAlive;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void testAsyncAcquire() throws InterruptedException {
final var completionLatch = new CountDownLatch(1);
final var executorService = EsExecutors.newScaling(
"test",
0,
1,
between(1, 10),
10,
TimeUnit.SECONDS,
Expand Down
Loading