Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c01a600
Reproduction by DCT
mosche Jan 23, 2025
ab05173
Use new EsExecutors.newSingleScalingToZero for masterService#updateTa…
mosche Mar 13, 2025
a59bdc7
increase min core size for various test executors from 0 to 1
mosche Mar 13, 2025
090e104
Improve validation in ScalingExecutorBuilder and use newSingleScaling…
mosche Mar 13, 2025
6ce4fb7
Don't allow EsExecutors.newScaling for min=0 / max=1 to not risk star…
mosche Mar 13, 2025
59ccab1
Update docs/changelog/124732.yaml
mosche Mar 13, 2025
5b9be6c
remove newSingleScalingToZero / usage of allowCoreThreadTimeOut
mosche Mar 13, 2025
77a44bb
Update docs/changelog/124732.yaml
mosche Mar 13, 2025
b7ff07d
revert obsolete changes
mosche Mar 13, 2025
7954b6b
revert obsolete changes
mosche Mar 13, 2025
8fe724c
fix rawtype
mosche Mar 13, 2025
3d9e590
reduce noisy test logger to trace
mosche Mar 13, 2025
7607e52
add Java docs
mosche Mar 13, 2025
dd33e6d
fix test timeouts
mosche Mar 13, 2025
30d4a50
Implement worker pool probing to prevent #124667 if max pool size > 1.
mosche Mar 13, 2025
3279628
fix thread pool configuration in testSlicingBehaviourForParallelColle…
mosche Mar 13, 2025
b77ea90
deterministic search pool size in SearchServiceSingleNodeTests
mosche Mar 14, 2025
0bf9c96
Merge branch 'main' into ktlo/esExecutorBug
mosche Mar 14, 2025
5bf3b12
changelog
mosche Mar 14, 2025
1f4fb2d
rename worker probe, add comment
mosche Mar 14, 2025
a831436
more java docs
mosche Mar 14, 2025
bd2eac8
more java docs
mosche Mar 14, 2025
d53d5c3
Merge branch 'main' into ktlo/esExecutorBug
mosche Mar 14, 2025
48364c6
add comment
mosche Mar 14, 2025
0f1f95d
PR comments
mosche Mar 14, 2025
97fefa6
PR comments
mosche Mar 14, 2025
19d1765
Merge branch 'main' into ktlo/esExecutorBug
mosche Mar 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/changelog/124732.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
pr: 124732
summary: Prevent starvation bug if using scaling `EsThreadPoolExecutor` if core pool
size = 0 / max pool size = 1
area: Infra/Core
type: "breaking, bug"
issues: []
breaking:
title: Prevent starvation bug if using scaling `EsThreadPoolExecutor` if core pool
size = 0 / max pool size = 1
area: Infra/Core
details: Please describe the details of this change for the release notes. You can
use asciidoc.
impact: Please describe the impact of this change to users
notable: false
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,8 @@ protected synchronized void doStart() {
}

protected ExecutorService createThreadPoolExecutor() {
return EsExecutors.newScaling(
return EsExecutors.newSingleScalingToZero(
nodeName + "/" + MASTER_UPDATE_THREAD_NAME,
0,
1,
60,
TimeUnit.SECONDS,
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,13 @@ public static EsThreadPoolExecutor newScaling(
ThreadContext contextHolder,
TaskTrackingConfig config
) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
EsThreadPoolExecutor executor;
if (min == 0 && max == 1) {
// forbidden due to https://github.com/elastic/elasticsearch/issues/124667
throw new IllegalArgumentException("Unsupported configuration, use EsExecutors.newSingleScalingToZero instead");
}
LinkedTransferQueue<Runnable> queue = newUnboundedScalingLTQueue(min, max);
if (config.trackExecutionTime()) {
executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
return new TaskExecutionTimeTrackingEsThreadPoolExecutor(
name,
min,
max,
Expand All @@ -124,7 +127,7 @@ public static EsThreadPoolExecutor newScaling(
config
);
} else {
executor = new EsThreadPoolExecutor(
return new EsThreadPoolExecutor(
name,
min,
max,
Expand All @@ -136,8 +139,6 @@ public static EsThreadPoolExecutor newScaling(
contextHolder
);
}
queue.executor = executor;
return executor;
}

public static EsThreadPoolExecutor newScaling(
Expand All @@ -163,6 +164,57 @@ public static EsThreadPoolExecutor newScaling(
);
}

/**
* A single threaded executor that can safely scale down to 0 threads when idle.
* @throws IllegalArgumentException if keepAliveTime is 0
*/
public static EsThreadPoolExecutor newSingleScalingToZero(
String name,
long keepAliveTime,
TimeUnit unit,
boolean rejectAfterShutdown,
ThreadFactory threadFactory,
ThreadContext contextHolder,
TaskTrackingConfig config
) {
EsThreadPoolExecutor executor = newScaling(
name,
1,
1,
keepAliveTime,
unit,
rejectAfterShutdown,
threadFactory,
contextHolder,
config
);
executor.allowCoreThreadTimeOut(true);
return executor;
}

/**
* A single threaded executor that can safely scale down to 0 threads when idle.
* @throws IllegalArgumentException if keepAliveTime is 0
*/
public static EsThreadPoolExecutor newSingleScalingToZero(
String name,
long keepAliveTime,
TimeUnit unit,
boolean rejectAfterShutdown,
ThreadFactory threadFactory,
ThreadContext contextHolder
) {
return newSingleScalingToZero(
name,
keepAliveTime,
unit,
rejectAfterShutdown,
threadFactory,
contextHolder,
TaskTrackingConfig.DO_NOT_TRACK
);
}

public static EsThreadPoolExecutor newFixed(
String name,
int size,
Expand Down Expand Up @@ -389,32 +441,23 @@ public boolean isSystem() {
*/
private EsExecutors() {}

static class ExecutorScalingQueue<E> extends LinkedTransferQueue<E> {
private static <E> LinkedTransferQueue<E> newUnboundedScalingLTQueue(int corePoolSize, int maxPoolSize) {
// scaling beyond core pool size using an unbounded queue requires ExecutorScalingQueue
// note, reconfiguration of core / max pool size not supported in EsThreadPoolExecutor
return maxPoolSize > corePoolSize ? new ExecutorScalingQueue<>() : new LinkedTransferQueue<>();
}

ThreadPoolExecutor executor;
static class ExecutorScalingQueue<E> extends LinkedTransferQueue<E> {

ExecutorScalingQueue() {}

@Override
public boolean offer(E e) {
// first try to transfer to a waiting worker thread
if (tryTransfer(e) == false) {
// check if there might be spare capacity in the thread
// pool executor
int left = executor.getMaximumPoolSize() - executor.getCorePoolSize();
if (left > 0) {
// reject queuing the task to force the thread pool
// executor to add a worker if it can; combined
// with ForceQueuePolicy, this causes the thread
// pool to always scale up to max pool size and we
// only queue when there is no spare capacity
return false;
} else {
return super.offer(e);
}
} else {
return true;
}
// try to transfer to a waiting worker thread
// otherwise reject queuing the task to force the thread pool executor to add a worker if it can;
// combined with ForceQueuePolicy, this causes the thread pool to always scale up to max pool size
// so that we only queue when there is no spare capacity
return tryTransfer(e);
}

// Overridden to workaround a JDK bug introduced in JDK 21.0.2
Expand Down Expand Up @@ -483,8 +526,8 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {

private static void put(ThreadPoolExecutor executor, Runnable task) {
final BlockingQueue<Runnable> queue = executor.getQueue();
// force queue policy should only be used with a scaling queue
assert queue instanceof ExecutorScalingQueue;
// force queue policy should only be used with a scaling queue (ExecutorScalingQueue / LinkedTransferQueue)
assert queue instanceof LinkedTransferQueue;
try {
queue.put(task);
} catch (final InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
this.contextHolder = contextHolder;
}

@Override
public void setCorePoolSize(int corePoolSize) {
throw new UnsupportedOperationException("reconfiguration at runtime is not supported");
}

@Override
public void setMaximumPoolSize(int maximumPoolSize) {
throw new UnsupportedOperationException("reconfiguration at runtime is not supported");
}

@Override
public void execute(Runnable command) {
final Runnable wrappedRunnable = wrapRunnable(command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,9 @@ public TimestampFieldMapperService(Settings settings, ThreadPool threadPool, Ind

final String nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
final String threadName = "TimestampFieldMapperService#updateTask";
executor = EsExecutors.newScaling(
executor = EsExecutors.newSingleScalingToZero(
nodeName + "/" + threadName,
0,
1,
0,
TimeUnit.MILLISECONDS,
true,
daemonThreadFactory(nodeName, threadName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import org.elasticsearch.node.Node;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -105,9 +107,14 @@ public ScalingExecutorBuilder(
final EsExecutors.TaskTrackingConfig trackingConfig
) {
super(name, false);
this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, Setting.Property.NodeScope);
this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, Setting.Property.NodeScope);
this.keepAliveSetting = Setting.timeSetting(settingsKey(prefix, "keep_alive"), keepAlive, Setting.Property.NodeScope);
this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, 0, Setting.Property.NodeScope);
this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, 1, Setting.Property.NodeScope);
this.keepAliveSetting = Setting.timeSetting(
settingsKey(prefix, "keep_alive"),
keepAlive,
new KeepAliveTimeValidator(coreSetting, maxSetting),
Setting.Property.NodeScope
);
this.rejectAfterShutdown = rejectAfterShutdown;
this.trackingConfig = trackingConfig;
}
Expand All @@ -133,17 +140,30 @@ ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final Th
final ThreadPool.Info info = new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.SCALING, core, max, keepAlive, null);
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings.nodeName, name());
ExecutorService executor;
executor = EsExecutors.newScaling(
settings.nodeName + "/" + name(),
core,
max,
keepAlive.millis(),
TimeUnit.MILLISECONDS,
rejectAfterShutdown,
threadFactory,
threadContext,
trackingConfig
);
if (core == 0 && max == 1) {
// special case for single threaded pool scaling safely down to zero
executor = EsExecutors.newSingleScalingToZero(
settings.nodeName + "/" + name(),
keepAlive.millis(),
TimeUnit.MILLISECONDS,
rejectAfterShutdown,
threadFactory,
threadContext,
trackingConfig
);
} else {
executor = EsExecutors.newScaling(
settings.nodeName + "/" + name(),
core,
max,
keepAlive.millis(),
TimeUnit.MILLISECONDS,
rejectAfterShutdown,
threadFactory,
threadContext,
trackingConfig
);
}
return new ThreadPool.ExecutorHolder(executor, info);
}

Expand Down Expand Up @@ -173,4 +193,50 @@ static class ScalingExecutorSettings extends ExecutorBuilder.ExecutorSettings {
}
}

private static class KeepAliveTimeValidator implements Setting.Validator<TimeValue> {
private final Setting<Integer> coreSetting;
private final Setting<Integer> maxSetting;

KeepAliveTimeValidator(Setting<Integer> coreSetting, Setting<Integer> maxSetting) {
this.coreSetting = coreSetting;
this.maxSetting = maxSetting;
}

@Override
public void validate(TimeValue value) {
if (value.duration() < 0) {
throw new IllegalArgumentException("Invalid negative value for [" + prefix() + "keep_alive].");
}
}

@Override
public void validate(TimeValue value, Map<Setting<?>, Object> settings) {
if (value.duration() > 0) {
return;
}
if (settings.get(coreSetting).equals(0) && settings.get(maxSetting).equals(1)) {
throw new IllegalArgumentException(
"["
+ prefix()
+ "keep_alive] must be greater than 0 when ["
+ coreSetting.getKey()
+ "] is [0] and ["
+ coreSetting.getKey()
+ "] is [1]."
);
}

}

private String prefix() {
String core = coreSetting.getKey();
return core.substring(0, core.length() - 4);
}

@Override
public Iterator<Setting<?>> settings() {
return List.<Setting<?>>of(coreSetting, maxSetting).iterator();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void testAsyncAcquire() throws InterruptedException {
final var completionLatch = new CountDownLatch(1);
final var executorService = EsExecutors.newScaling(
"test",
0,
1,
between(1, 10),
10,
TimeUnit.SECONDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,4 +754,63 @@ public void onRejection(Exception e) {
executor.execute(shouldBeRejected);
assertTrue(rejected.get());
}

public void testSingleScalingToZero() {
try (
var executor = EsExecutors.newSingleScalingToZero(
getTestName(),
1,
TimeUnit.MILLISECONDS,
true,
EsExecutors.daemonThreadFactory(getTestName()),
threadContext
)
) {
class Task extends AbstractRunnable {
private int remaining;
private final CountDownLatch doneLatch;

Task(int iterations, CountDownLatch doneLatch) {
this.remaining = iterations;
this.doneLatch = doneLatch;
}

@Override
public void onFailure(Exception e) {
fail(e);
}

@Override
protected void doRun() {
if (--remaining == 0) {
doneLatch.countDown();
} else {
new Thread(() -> {
final var targetNanoTime = System.nanoTime() + 1_000_000 + between(-10_000, 10_000);
while (System.nanoTime() < targetNanoTime) {
Thread.yield();
}
executor.execute(Task.this);
}).start();
}
}
}

for (int i = 0; i < 100; i++) {
logger.info("--> attempt [{}]", i);
final var doneLatch = new CountDownLatch(1);
executor.execute(new Task(between(1, 1000), doneLatch));
boolean success = false;
try {
safeAwait(doneLatch);
success = true;
} finally {
if (success == false) {
logger.info("fail");
}
}
}
}
}

}
Loading