From c01a600909e8abd6b5916b3091e1db9e81750881 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 23 Jan 2025 19:05:23 +0100 Subject: [PATCH 01/24] Reproduction by DCT --- .../common/util/concurrent/EsExecutors.java | 3 + .../util/concurrent/EsExecutorsTests.java | 108 ++++++++++++++++++ 2 files changed, 111 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index b45b348376fdf..9addc854d9b55 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -486,6 +486,9 @@ private static void put(ThreadPoolExecutor executor, Runnable task) { // force queue policy should only be used with a scaling queue assert queue instanceof ExecutorScalingQueue; try { + // If core size is 0, we risk adding the task onto the queue despite the only remaining worker timing out + // before the task can be worked on. + // Why not use allowCoreThreadTimeOut with core/max size 1 instead? queue.put(task); } catch (final InterruptedException e) { assert false : "a scaling queue never blocks so a put to it can never be interrupted"; diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 2867c9e007937..2d77edaaa53a2 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -754,4 +754,112 @@ public void onRejection(Exception e) { executor.execute(shouldBeRejected); assertTrue(rejected.get()); } + + public void testScalingWithEmptyCore() { + try ( + var executor = EsExecutors.newScaling( + getTestName(), + 0, + 1, + 0, + TimeUnit.SECONDS, + true, + EsExecutors.daemonThreadFactory(getTestName()), + threadContext + ) + ) { + class Task extends AbstractRunnable { + private int remaining; + private final CountDownLatch doneLatch; + + Task(int iterations, CountDownLatch doneLatch) { + this.remaining = iterations; + this.doneLatch = doneLatch; + } + + @Override + public void onFailure(Exception e) { + fail(e); + } + + @Override + protected void doRun() { + if (--remaining == 0) { + doneLatch.countDown(); + } else { + logger.info("--> remaining [{}]", remaining); + new Thread(() -> executor.execute(Task.this)).start(); + } + } + } + + for (int i = 0; i < 100; i++) { + logger.info("--> attempt [{}]", i); + final var doneLatch = new CountDownLatch(1); + executor.execute(new Task(between(1, 1000), doneLatch)); + safeAwait(doneLatch); + } + } + } + + public void testScalingWithEmptyCoreAndKeepAlive() { + try ( + var executor = EsExecutors.newScaling( + getTestName(), + 0, + 1, + 1, + TimeUnit.MILLISECONDS, + true, + EsExecutors.daemonThreadFactory(getTestName()), + threadContext + ) + ) { + class Task extends AbstractRunnable { + private int remaining; + private final CountDownLatch doneLatch; + + Task(int iterations, CountDownLatch doneLatch) { + this.remaining = iterations; + this.doneLatch = doneLatch; + } + + @Override + public void onFailure(Exception e) { + fail(e); + } + + @Override + protected void doRun() { + if (--remaining == 0) { + doneLatch.countDown(); + } else { + new Thread(() -> { + final var targetNanoTime = System.nanoTime() + 1_000_000 + between(-10_000, 10_000); + while (System.nanoTime() < targetNanoTime) { + Thread.yield(); + } + executor.execute(Task.this); + }).start(); + } + } + } + + for (int i = 0; i < 100; i++) { + logger.info("--> attempt [{}]", i); + final var doneLatch = new CountDownLatch(1); + executor.execute(new Task(between(1, 1000), doneLatch)); + boolean success = false; + try { + safeAwait(doneLatch); + success = true; + } finally { + if (success == false) { + logger.info("fail"); + } + } + } + } + } + } From ab051739a727c6a2bbf763f5b4fbb3e95a453f4e Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 13 Mar 2025 09:38:11 +0100 Subject: [PATCH 02/24] Use new EsExecutors.newSingleScalingToZero for masterService#updateTask and TimestampFieldMapperService#updateTask --- .../cluster/service/MasterService.java | 4 +- .../common/util/concurrent/EsExecutors.java | 98 +++++++++++++------ .../util/concurrent/EsThreadPoolExecutor.java | 10 ++ .../indices/TimestampFieldMapperService.java | 4 +- .../util/concurrent/EsExecutorsTests.java | 53 +--------- .../search/SearchServiceSingleNodeTests.java | 7 +- 6 files changed, 85 insertions(+), 91 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index d9b03ac044a8c..2a4dd8c82f82a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -170,10 +170,8 @@ protected synchronized void doStart() { } protected ExecutorService createThreadPoolExecutor() { - return EsExecutors.newScaling( + return EsExecutors.newSingleScalingToZero( nodeName + "/" + MASTER_UPDATE_THREAD_NAME, - 0, - 1, 60, TimeUnit.SECONDS, true, diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 9addc854d9b55..5331ebd31f617 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -107,10 +107,9 @@ public static EsThreadPoolExecutor newScaling( ThreadContext contextHolder, TaskTrackingConfig config ) { - ExecutorScalingQueue queue = new ExecutorScalingQueue<>(); - EsThreadPoolExecutor executor; + LinkedTransferQueue queue = newUnboundedScalingLTQueue(min, max); if (config.trackExecutionTime()) { - executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor( + return new TaskExecutionTimeTrackingEsThreadPoolExecutor( name, min, max, @@ -124,7 +123,7 @@ public static EsThreadPoolExecutor newScaling( config ); } else { - executor = new EsThreadPoolExecutor( + return new EsThreadPoolExecutor( name, min, max, @@ -136,8 +135,6 @@ public static EsThreadPoolExecutor newScaling( contextHolder ); } - queue.executor = executor; - return executor; } public static EsThreadPoolExecutor newScaling( @@ -163,6 +160,57 @@ public static EsThreadPoolExecutor newScaling( ); } + /** + * A single threaded executor that can safely scale down to 0 threads when idle. + * @throws IllegalArgumentException if keepAliveTime is 0 + */ + public static EsThreadPoolExecutor newSingleScalingToZero( + String name, + long keepAliveTime, + TimeUnit unit, + boolean rejectAfterShutdown, + ThreadFactory threadFactory, + ThreadContext contextHolder, + TaskTrackingConfig config + ) { + EsThreadPoolExecutor executor = newScaling( + name, + 1, + 1, + keepAliveTime, + unit, + rejectAfterShutdown, + threadFactory, + contextHolder, + config + ); + executor.allowCoreThreadTimeOut(true); + return executor; + } + + /** + * A single threaded executor that can safely scale down to 0 threads when idle. + * @throws IllegalArgumentException if keepAliveTime is 0 + */ + public static EsThreadPoolExecutor newSingleScalingToZero( + String name, + long keepAliveTime, + TimeUnit unit, + boolean rejectAfterShutdown, + ThreadFactory threadFactory, + ThreadContext contextHolder + ) { + return newSingleScalingToZero( + name, + keepAliveTime, + unit, + rejectAfterShutdown, + threadFactory, + contextHolder, + TaskTrackingConfig.DO_NOT_TRACK + ); + } + public static EsThreadPoolExecutor newFixed( String name, int size, @@ -389,32 +437,23 @@ public boolean isSystem() { */ private EsExecutors() {} - static class ExecutorScalingQueue extends LinkedTransferQueue { + private static LinkedTransferQueue newUnboundedScalingLTQueue(int corePoolSize, int maxPoolSize) { + // scaling beyond core pool size using an unbounded queue requires ExecutorScalingQueue + // note, reconfiguration of core / max pool size not supported in EsThreadPoolExecutor + return maxPoolSize > corePoolSize ? new ExecutorScalingQueue<>() : new LinkedTransferQueue<>(); + } - ThreadPoolExecutor executor; + static class ExecutorScalingQueue extends LinkedTransferQueue { ExecutorScalingQueue() {} @Override public boolean offer(E e) { - // first try to transfer to a waiting worker thread - if (tryTransfer(e) == false) { - // check if there might be spare capacity in the thread - // pool executor - int left = executor.getMaximumPoolSize() - executor.getCorePoolSize(); - if (left > 0) { - // reject queuing the task to force the thread pool - // executor to add a worker if it can; combined - // with ForceQueuePolicy, this causes the thread - // pool to always scale up to max pool size and we - // only queue when there is no spare capacity - return false; - } else { - return super.offer(e); - } - } else { - return true; - } + // try to transfer to a waiting worker thread + // otherwise reject queuing the task to force the thread pool executor to add a worker if it can; + // combined with ForceQueuePolicy, this causes the thread pool to always scale up to max pool size + // so that we only queue when there is no spare capacity + return tryTransfer(e); } // Overridden to workaround a JDK bug introduced in JDK 21.0.2 @@ -483,12 +522,9 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { private static void put(ThreadPoolExecutor executor, Runnable task) { final BlockingQueue queue = executor.getQueue(); - // force queue policy should only be used with a scaling queue - assert queue instanceof ExecutorScalingQueue; + // force queue policy should only be used with a scaling queue (ExecutorScalingQueue / LinkedTransferQueue) + assert queue instanceof LinkedTransferQueue; try { - // If core size is 0, we risk adding the task onto the queue despite the only remaining worker timing out - // before the task can be worked on. - // Why not use allowCoreThreadTimeOut with core/max size 1 instead? queue.put(task); } catch (final InterruptedException e) { assert false : "a scaling queue never blocks so a put to it can never be interrupted"; diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index a4d2777a48b63..4303d03db0a13 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -66,6 +66,16 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { this.contextHolder = contextHolder; } + @Override + public void setCorePoolSize(int corePoolSize) { + throw new UnsupportedOperationException("reconfiguration at runtime is not supported"); + } + + @Override + public void setMaximumPoolSize(int maximumPoolSize) { + throw new UnsupportedOperationException("reconfiguration at runtime is not supported"); + } + @Override public void execute(Runnable command) { final Runnable wrappedRunnable = wrapRunnable(command); diff --git a/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java b/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java index 46c51bd69db29..68c092521df2e 100644 --- a/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java +++ b/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java @@ -69,11 +69,9 @@ public TimestampFieldMapperService(Settings settings, ThreadPool threadPool, Ind final String nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings)); final String threadName = "TimestampFieldMapperService#updateTask"; - executor = EsExecutors.newScaling( + executor = EsExecutors.newSingleScalingToZero( nodeName + "/" + threadName, - 0, 1, - 0, TimeUnit.MILLISECONDS, true, daemonThreadFactory(nodeName, threadName), diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 2d77edaaa53a2..0344da0cd754a 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -755,59 +755,10 @@ public void onRejection(Exception e) { assertTrue(rejected.get()); } - public void testScalingWithEmptyCore() { + public void testSingleScalingToZero() { try ( - var executor = EsExecutors.newScaling( + var executor = EsExecutors.newSingleScalingToZero( getTestName(), - 0, - 1, - 0, - TimeUnit.SECONDS, - true, - EsExecutors.daemonThreadFactory(getTestName()), - threadContext - ) - ) { - class Task extends AbstractRunnable { - private int remaining; - private final CountDownLatch doneLatch; - - Task(int iterations, CountDownLatch doneLatch) { - this.remaining = iterations; - this.doneLatch = doneLatch; - } - - @Override - public void onFailure(Exception e) { - fail(e); - } - - @Override - protected void doRun() { - if (--remaining == 0) { - doneLatch.countDown(); - } else { - logger.info("--> remaining [{}]", remaining); - new Thread(() -> executor.execute(Task.this)).start(); - } - } - } - - for (int i = 0; i < 100; i++) { - logger.info("--> attempt [{}]", i); - final var doneLatch = new CountDownLatch(1); - executor.execute(new Task(between(1, 1000), doneLatch)); - safeAwait(doneLatch); - } - } - } - - public void testScalingWithEmptyCoreAndKeepAlive() { - try ( - var executor = EsExecutors.newScaling( - getTestName(), - 0, - 1, 1, TimeUnit.MILLISECONDS, true, diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java index 9c3dc7ca8ab9d..f47490fe109e0 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java @@ -2744,10 +2744,11 @@ public void testEnableSearchWorkerThreads() throws IOException { * parallel collection. */ public void testSlicingBehaviourForParallelCollection() throws Exception { - IndexService indexService = createIndex("index", Settings.EMPTY); - ThreadPoolExecutor executor = (ThreadPoolExecutor) indexService.getThreadPool().executor(ThreadPool.Names.SEARCH); + // We set the executor pool size explicitly to be independent of CPU cores. final int configuredMaxPoolSize = 10; - executor.setMaximumPoolSize(configuredMaxPoolSize); // We set this explicitly to be independent of CPU cores. + IndexService indexService = createIndex("index", Settings.builder().put("thread_pool.search.size", configuredMaxPoolSize).build()); + ThreadPoolExecutor executor = (ThreadPoolExecutor) indexService.getThreadPool().executor(ThreadPool.Names.SEARCH); + int numDocs = randomIntBetween(50, 100); for (int i = 0; i < numDocs; i++) { prepareIndex("index").setId(String.valueOf(i)).setSource("field", "value").get(); From a59bdc78c171cceb9ed7aba121f9ed73ddf9bd0e Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 13 Mar 2025 10:07:37 +0100 Subject: [PATCH 03/24] increase min core size for various test executors from 0 to 1 --- .../elasticsearch/action/support/RefCountingRunnableTests.java | 2 +- .../blobstore/ESMockAPIBasedRepositoryIntegTestCase.java | 2 +- .../org/elasticsearch/test/transport/MockTransportService.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java b/server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java index 5363722f2f49f..abbbd53dec570 100644 --- a/server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java @@ -100,7 +100,7 @@ public void testAsyncAcquire() throws InterruptedException { final var completionLatch = new CountDownLatch(1); final var executorService = EsExecutors.newScaling( "test", - 0, + 1, between(1, 10), 10, TimeUnit.SECONDS, diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java index 21de4872c7b2c..372f61698ed8a 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java @@ -91,7 +91,7 @@ public static void startHttpServer() throws Exception { // the EncryptedRepository can require more than one connection open at one time executorService = EsExecutors.newScaling( ESMockAPIBasedRepositoryIntegTestCase.class.getName(), - 0, + 1, 2, 60, TimeUnit.SECONDS, diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 0ecd9bcd86dfc..df77d2b939dfe 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -314,7 +314,7 @@ private MockTransportService( this.original = transport.getDelegate(); this.testExecutor = EsExecutors.newScaling( "mock-transport", - 0, + 1, 4, 30, TimeUnit.SECONDS, From 090e1045bbd952c9d6f1cb2d3f2d60ec7f838100 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 13 Mar 2025 11:38:37 +0100 Subject: [PATCH 04/24] Improve validation in ScalingExecutorBuilder and use newSingleScalingToZero if core = 0 / max = 1 --- .../threadpool/ScalingExecutorBuilder.java | 94 ++++++++++++++++--- .../UpdateThreadPoolSettingsTests.java | 35 +++++++ 2 files changed, 115 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java index 1017d41a77444..64c53b8216b43 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java @@ -17,8 +17,10 @@ import org.elasticsearch.node.Node; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -105,9 +107,14 @@ public ScalingExecutorBuilder( final EsExecutors.TaskTrackingConfig trackingConfig ) { super(name, false); - this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, Setting.Property.NodeScope); - this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, Setting.Property.NodeScope); - this.keepAliveSetting = Setting.timeSetting(settingsKey(prefix, "keep_alive"), keepAlive, Setting.Property.NodeScope); + this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, 0, Setting.Property.NodeScope); + this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, 1, Setting.Property.NodeScope); + this.keepAliveSetting = Setting.timeSetting( + settingsKey(prefix, "keep_alive"), + keepAlive, + new KeepAliveTimeValidator(coreSetting, maxSetting), + Setting.Property.NodeScope + ); this.rejectAfterShutdown = rejectAfterShutdown; this.trackingConfig = trackingConfig; } @@ -133,17 +140,30 @@ ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final Th final ThreadPool.Info info = new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.SCALING, core, max, keepAlive, null); final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings.nodeName, name()); ExecutorService executor; - executor = EsExecutors.newScaling( - settings.nodeName + "/" + name(), - core, - max, - keepAlive.millis(), - TimeUnit.MILLISECONDS, - rejectAfterShutdown, - threadFactory, - threadContext, - trackingConfig - ); + if (core == 0 && max == 1) { + // special case for single threaded pool scaling safely down to zero + executor = EsExecutors.newSingleScalingToZero( + settings.nodeName + "/" + name(), + keepAlive.millis(), + TimeUnit.MILLISECONDS, + rejectAfterShutdown, + threadFactory, + threadContext, + trackingConfig + ); + } else { + executor = EsExecutors.newScaling( + settings.nodeName + "/" + name(), + core, + max, + keepAlive.millis(), + TimeUnit.MILLISECONDS, + rejectAfterShutdown, + threadFactory, + threadContext, + trackingConfig + ); + } return new ThreadPool.ExecutorHolder(executor, info); } @@ -173,4 +193,50 @@ static class ScalingExecutorSettings extends ExecutorBuilder.ExecutorSettings { } } + private static class KeepAliveTimeValidator implements Setting.Validator { + private final Setting coreSetting; + private final Setting maxSetting; + + KeepAliveTimeValidator(Setting coreSetting, Setting maxSetting) { + this.coreSetting = coreSetting; + this.maxSetting = maxSetting; + } + + @Override + public void validate(TimeValue value) { + if (value.duration() < 0) { + throw new IllegalArgumentException("Invalid negative value for [" + prefix() + "keep_alive]."); + } + } + + @Override + public void validate(TimeValue value, Map, Object> settings) { + if (value.duration() > 0) { + return; + } + if (settings.get(coreSetting).equals(0) && settings.get(maxSetting).equals(1)) { + throw new IllegalArgumentException( + "[" + + prefix() + + "keep_alive] must be greater than 0 when [" + + coreSetting.getKey() + + "] is [0] and [" + + coreSetting.getKey() + + "] is [1]." + ); + } + + } + + private String prefix() { + String core = coreSetting.getKey(); + return core.substring(0, core.length() - 4); + } + + @Override + public Iterator> settings() { + return List.>of(coreSetting, maxSetting).iterator(); + } + } + } diff --git a/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index 855a74101dabd..c95f5eb110594 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -133,6 +133,41 @@ public void testScalingExecutorType() throws InterruptedException { } } + public void testScalingExecutorTypeKeepAliveValidator() { + IllegalArgumentException illegalArgument = expectThrows(IllegalArgumentException.class, () -> { + ThreadPool tp = null; + try { + tp = new ThreadPool( + Settings.builder() + .put("node.name", "testScalingExecutorTypeKeepAliveValidator") + .put("thread_pool." + Names.GENERIC + ".core", 0) + .put("thread_pool." + Names.GENERIC + ".max", 1) + .put("thread_pool." + Names.GENERIC + ".keep_alive", "0s") + .build(), + MeterRegistry.NOOP, + new DefaultBuiltInExecutorBuilders() + ); + } finally { + terminateThreadPoolIfNeeded(tp); + } + }); + + assertThat( + illegalArgument, + hasToString( + containsString( + "[thread_pool." + + Names.GENERIC + + ".keep_alive] must be greater than 0 when [thread_pool." + + Names.GENERIC + + ".core] is [0] and [thread_pool." + + Names.GENERIC + + ".core] is [1]" + ) + ) + ); + } + public void testShutdownNowInterrupts() throws Exception { String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED); ThreadPool threadPool = null; From 6ce4fb7da17b5890ab6f946e690a79b4bc709b60 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 13 Mar 2025 11:42:58 +0100 Subject: [PATCH 05/24] Don't allow EsExecutors.newScaling for min=0 / max=1 to not risk starving workers --- .../org/elasticsearch/common/util/concurrent/EsExecutors.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 5331ebd31f617..f418b6748db9b 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -107,6 +107,10 @@ public static EsThreadPoolExecutor newScaling( ThreadContext contextHolder, TaskTrackingConfig config ) { + if (min == 0 && max == 1) { + // forbidden due to https://github.com/elastic/elasticsearch/issues/124667 + throw new IllegalArgumentException("Unsupported configuration, use EsExecutors.newSingleScalingToZero instead"); + } LinkedTransferQueue queue = newUnboundedScalingLTQueue(min, max); if (config.trackExecutionTime()) { return new TaskExecutionTimeTrackingEsThreadPoolExecutor( From 59ccab16d41ae2d422db2086ab0fa34be1447abd Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 13 Mar 2025 12:19:31 +0100 Subject: [PATCH 06/24] Update docs/changelog/124732.yaml --- docs/changelog/124732.yaml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 docs/changelog/124732.yaml diff --git a/docs/changelog/124732.yaml b/docs/changelog/124732.yaml new file mode 100644 index 0000000000000..7628c20da774b --- /dev/null +++ b/docs/changelog/124732.yaml @@ -0,0 +1,14 @@ +pr: 124732 +summary: Prevent starvation bug if using scaling `EsThreadPoolExecutor` if core pool + size = 0 / max pool size = 1 +area: Infra/Core +type: "breaking, bug" +issues: [] +breaking: + title: Prevent starvation bug if using scaling `EsThreadPoolExecutor` if core pool + size = 0 / max pool size = 1 + area: Infra/Core + details: Please describe the details of this change for the release notes. You can + use asciidoc. + impact: Please describe the impact of this change to users + notable: false From 5b9be6c2f56a72aebec3977bdc03c3d7ebc839b8 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 13 Mar 2025 14:19:17 +0100 Subject: [PATCH 07/24] remove newSingleScalingToZero / usage of allowCoreThreadTimeOut --- .../cluster/service/MasterService.java | 12 ++- .../common/util/concurrent/EsExecutors.java | 63 ++------------ .../indices/TimestampFieldMapperService.java | 13 ++- .../threadpool/ScalingExecutorBuilder.java | 86 +++---------------- .../util/concurrent/EsExecutorsTests.java | 35 ++++++-- .../UpdateThreadPoolSettingsTests.java | 35 -------- 6 files changed, 63 insertions(+), 181 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 2a4dd8c82f82a..d5c9a2b4e97c1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -66,6 +66,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -170,13 +171,18 @@ protected synchronized void doStart() { } protected ExecutorService createThreadPoolExecutor() { - return EsExecutors.newSingleScalingToZero( + ThreadFactory threadFactory = daemonThreadFactory(nodeName, MASTER_UPDATE_THREAD_NAME); + ThreadContext contextHolder = threadPool.getThreadContext(); + return EsExecutors.newScaling( nodeName + "/" + MASTER_UPDATE_THREAD_NAME, + 0, + 1, 60, TimeUnit.SECONDS, true, - daemonThreadFactory(nodeName, MASTER_UPDATE_THREAD_NAME), - threadPool.getThreadContext() + threadFactory, + contextHolder, + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK ); } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index f418b6748db9b..f62c0df5e395f 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -107,10 +107,6 @@ public static EsThreadPoolExecutor newScaling( ThreadContext contextHolder, TaskTrackingConfig config ) { - if (min == 0 && max == 1) { - // forbidden due to https://github.com/elastic/elasticsearch/issues/124667 - throw new IllegalArgumentException("Unsupported configuration, use EsExecutors.newSingleScalingToZero instead"); - } LinkedTransferQueue queue = newUnboundedScalingLTQueue(min, max); if (config.trackExecutionTime()) { return new TaskExecutionTimeTrackingEsThreadPoolExecutor( @@ -164,57 +160,6 @@ public static EsThreadPoolExecutor newScaling( ); } - /** - * A single threaded executor that can safely scale down to 0 threads when idle. - * @throws IllegalArgumentException if keepAliveTime is 0 - */ - public static EsThreadPoolExecutor newSingleScalingToZero( - String name, - long keepAliveTime, - TimeUnit unit, - boolean rejectAfterShutdown, - ThreadFactory threadFactory, - ThreadContext contextHolder, - TaskTrackingConfig config - ) { - EsThreadPoolExecutor executor = newScaling( - name, - 1, - 1, - keepAliveTime, - unit, - rejectAfterShutdown, - threadFactory, - contextHolder, - config - ); - executor.allowCoreThreadTimeOut(true); - return executor; - } - - /** - * A single threaded executor that can safely scale down to 0 threads when idle. - * @throws IllegalArgumentException if keepAliveTime is 0 - */ - public static EsThreadPoolExecutor newSingleScalingToZero( - String name, - long keepAliveTime, - TimeUnit unit, - boolean rejectAfterShutdown, - ThreadFactory threadFactory, - ThreadContext contextHolder - ) { - return newSingleScalingToZero( - name, - keepAliveTime, - unit, - rejectAfterShutdown, - threadFactory, - contextHolder, - TaskTrackingConfig.DO_NOT_TRACK - ); - } - public static EsThreadPoolExecutor newFixed( String name, int size, @@ -442,9 +387,13 @@ public boolean isSystem() { private EsExecutors() {} private static LinkedTransferQueue newUnboundedScalingLTQueue(int corePoolSize, int maxPoolSize) { - // scaling beyond core pool size using an unbounded queue requires ExecutorScalingQueue + if (maxPoolSize == 1 || maxPoolSize == corePoolSize) { + // scaling beyond core pool size (or 1) not required, use a regular unbounded LinkedTransferQueue + return new LinkedTransferQueue<>(); + } + // scaling beyond core pool size with an unbounded queue requires ExecutorScalingQueue // note, reconfiguration of core / max pool size not supported in EsThreadPoolExecutor - return maxPoolSize > corePoolSize ? new ExecutorScalingQueue<>() : new LinkedTransferQueue<>(); + return new ExecutorScalingQueue(); } static class ExecutorScalingQueue extends LinkedTransferQueue { diff --git a/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java b/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java index 68c092521df2e..c3603422f0f8a 100644 --- a/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java +++ b/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.Index; @@ -38,6 +39,7 @@ import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; @@ -69,13 +71,18 @@ public TimestampFieldMapperService(Settings settings, ThreadPool threadPool, Ind final String nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings)); final String threadName = "TimestampFieldMapperService#updateTask"; - executor = EsExecutors.newSingleScalingToZero( + ThreadFactory threadFactory = daemonThreadFactory(nodeName, threadName); + ThreadContext contextHolder = threadPool.getThreadContext(); + executor = EsExecutors.newScaling( nodeName + "/" + threadName, + 0, + 1, 1, TimeUnit.MILLISECONDS, true, - daemonThreadFactory(nodeName, threadName), - threadPool.getThreadContext() + threadFactory, + contextHolder, + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK ); } diff --git a/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java index 64c53b8216b43..0fb2f1e471d0b 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java @@ -17,10 +17,8 @@ import org.elasticsearch.node.Node; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -112,7 +110,7 @@ public ScalingExecutorBuilder( this.keepAliveSetting = Setting.timeSetting( settingsKey(prefix, "keep_alive"), keepAlive, - new KeepAliveTimeValidator(coreSetting, maxSetting), + TimeValue.ZERO, Setting.Property.NodeScope ); this.rejectAfterShutdown = rejectAfterShutdown; @@ -140,30 +138,17 @@ ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final Th final ThreadPool.Info info = new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.SCALING, core, max, keepAlive, null); final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings.nodeName, name()); ExecutorService executor; - if (core == 0 && max == 1) { - // special case for single threaded pool scaling safely down to zero - executor = EsExecutors.newSingleScalingToZero( - settings.nodeName + "/" + name(), - keepAlive.millis(), - TimeUnit.MILLISECONDS, - rejectAfterShutdown, - threadFactory, - threadContext, - trackingConfig - ); - } else { - executor = EsExecutors.newScaling( - settings.nodeName + "/" + name(), - core, - max, - keepAlive.millis(), - TimeUnit.MILLISECONDS, - rejectAfterShutdown, - threadFactory, - threadContext, - trackingConfig - ); - } + executor = EsExecutors.newScaling( + settings.nodeName + "/" + name(), + core, + max, + keepAlive.millis(), + TimeUnit.MILLISECONDS, + rejectAfterShutdown, + threadFactory, + threadContext, + trackingConfig + ); return new ThreadPool.ExecutorHolder(executor, info); } @@ -192,51 +177,4 @@ static class ScalingExecutorSettings extends ExecutorBuilder.ExecutorSettings { this.keepAlive = keepAlive; } } - - private static class KeepAliveTimeValidator implements Setting.Validator { - private final Setting coreSetting; - private final Setting maxSetting; - - KeepAliveTimeValidator(Setting coreSetting, Setting maxSetting) { - this.coreSetting = coreSetting; - this.maxSetting = maxSetting; - } - - @Override - public void validate(TimeValue value) { - if (value.duration() < 0) { - throw new IllegalArgumentException("Invalid negative value for [" + prefix() + "keep_alive]."); - } - } - - @Override - public void validate(TimeValue value, Map, Object> settings) { - if (value.duration() > 0) { - return; - } - if (settings.get(coreSetting).equals(0) && settings.get(maxSetting).equals(1)) { - throw new IllegalArgumentException( - "[" - + prefix() - + "keep_alive] must be greater than 0 when [" - + coreSetting.getKey() - + "] is [0] and [" - + coreSetting.getKey() - + "] is [1]." - ); - } - - } - - private String prefix() { - String core = coreSetting.getKey(); - return core.substring(0, core.length() - 4); - } - - @Override - public Iterator> settings() { - return List.>of(coreSetting, maxSetting).iterator(); - } - } - } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 0344da0cd754a..ca02127616aed 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -755,17 +755,31 @@ public void onRejection(Exception e) { assertTrue(rejected.get()); } - public void testSingleScalingToZero() { + public void testScalingWithEmptyCore() { + testScalingWithEmptyCore(0); + } + + public void testScalingWithEmptyCoreAndKeepAlive() { + testScalingWithEmptyCore(1); + } + + private void testScalingWithEmptyCore(long keepAliveMillis) { + String name = getTestName(); + ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(getTestName()); try ( - var executor = EsExecutors.newSingleScalingToZero( - getTestName(), + var executor = EsExecutors.newScaling( + name, + 0, 1, + keepAliveMillis, TimeUnit.MILLISECONDS, true, - EsExecutors.daemonThreadFactory(getTestName()), - threadContext + threadFactory, + threadContext, + DO_NOT_TRACK ) ) { + class Task extends AbstractRunnable { private int remaining; private final CountDownLatch doneLatch; @@ -785,10 +799,14 @@ protected void doRun() { if (--remaining == 0) { doneLatch.countDown(); } else { + logger.info("--> remaining [{}]", remaining); + final long keepAliveNanos = executor.getKeepAliveTime(TimeUnit.NANOSECONDS); new Thread(() -> { - final var targetNanoTime = System.nanoTime() + 1_000_000 + between(-10_000, 10_000); - while (System.nanoTime() < targetNanoTime) { - Thread.yield(); + if (keepAliveNanos > 0) { + final var targetNanoTime = System.nanoTime() + keepAliveNanos + between(-10_000, 10_000); + while (System.nanoTime() < targetNanoTime) { + Thread.yield(); + } } executor.execute(Task.this); }).start(); @@ -812,5 +830,4 @@ protected void doRun() { } } } - } diff --git a/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index c95f5eb110594..855a74101dabd 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -133,41 +133,6 @@ public void testScalingExecutorType() throws InterruptedException { } } - public void testScalingExecutorTypeKeepAliveValidator() { - IllegalArgumentException illegalArgument = expectThrows(IllegalArgumentException.class, () -> { - ThreadPool tp = null; - try { - tp = new ThreadPool( - Settings.builder() - .put("node.name", "testScalingExecutorTypeKeepAliveValidator") - .put("thread_pool." + Names.GENERIC + ".core", 0) - .put("thread_pool." + Names.GENERIC + ".max", 1) - .put("thread_pool." + Names.GENERIC + ".keep_alive", "0s") - .build(), - MeterRegistry.NOOP, - new DefaultBuiltInExecutorBuilders() - ); - } finally { - terminateThreadPoolIfNeeded(tp); - } - }); - - assertThat( - illegalArgument, - hasToString( - containsString( - "[thread_pool." - + Names.GENERIC - + ".keep_alive] must be greater than 0 when [thread_pool." - + Names.GENERIC - + ".core] is [0] and [thread_pool." - + Names.GENERIC - + ".core] is [1]" - ) - ) - ); - } - public void testShutdownNowInterrupts() throws Exception { String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED); ThreadPool threadPool = null; From 77a44bbfb39d6483850617019fadef67b5974b88 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 13 Mar 2025 14:19:52 +0100 Subject: [PATCH 08/24] Update docs/changelog/124732.yaml --- docs/changelog/124732.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog/124732.yaml b/docs/changelog/124732.yaml index 7628c20da774b..e00fc1b2387b0 100644 --- a/docs/changelog/124732.yaml +++ b/docs/changelog/124732.yaml @@ -2,7 +2,7 @@ pr: 124732 summary: Prevent starvation bug if using scaling `EsThreadPoolExecutor` if core pool size = 0 / max pool size = 1 area: Infra/Core -type: "breaking, bug" +type: bug issues: [] breaking: title: Prevent starvation bug if using scaling `EsThreadPoolExecutor` if core pool From b7ff07d6943153be6aa0bc9e9bc8703f53d41803 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 13 Mar 2025 14:25:34 +0100 Subject: [PATCH 09/24] revert obsolete changes --- .../elasticsearch/cluster/service/MasterService.java | 7 ++----- .../indices/TimestampFieldMapperService.java | 10 +++------- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index d5c9a2b4e97c1..eef7157607173 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -66,7 +66,6 @@ import java.util.Objects; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -171,8 +170,6 @@ protected synchronized void doStart() { } protected ExecutorService createThreadPoolExecutor() { - ThreadFactory threadFactory = daemonThreadFactory(nodeName, MASTER_UPDATE_THREAD_NAME); - ThreadContext contextHolder = threadPool.getThreadContext(); return EsExecutors.newScaling( nodeName + "/" + MASTER_UPDATE_THREAD_NAME, 0, @@ -180,8 +177,8 @@ protected ExecutorService createThreadPoolExecutor() { 60, TimeUnit.SECONDS, true, - threadFactory, - contextHolder, + daemonThreadFactory(nodeName, MASTER_UPDATE_THREAD_NAME), + threadPool.getThreadContext(), EsExecutors.TaskTrackingConfig.DO_NOT_TRACK ); } diff --git a/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java b/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java index c3603422f0f8a..2e19637b7203a 100644 --- a/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java +++ b/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.Index; @@ -39,7 +38,6 @@ import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; @@ -71,17 +69,15 @@ public TimestampFieldMapperService(Settings settings, ThreadPool threadPool, Ind final String nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings)); final String threadName = "TimestampFieldMapperService#updateTask"; - ThreadFactory threadFactory = daemonThreadFactory(nodeName, threadName); - ThreadContext contextHolder = threadPool.getThreadContext(); executor = EsExecutors.newScaling( nodeName + "/" + threadName, 0, 1, - 1, + 0, TimeUnit.MILLISECONDS, true, - threadFactory, - contextHolder, + daemonThreadFactory(nodeName, threadName), + threadPool.getThreadContext(), EsExecutors.TaskTrackingConfig.DO_NOT_TRACK ); } From 7954b6b523a7cbc33fa6b233aca02b9e320b8286 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 13 Mar 2025 14:26:32 +0100 Subject: [PATCH 10/24] revert obsolete changes --- .../java/org/elasticsearch/cluster/service/MasterService.java | 3 +-- .../org/elasticsearch/indices/TimestampFieldMapperService.java | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index eef7157607173..d9b03ac044a8c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -178,8 +178,7 @@ protected ExecutorService createThreadPoolExecutor() { TimeUnit.SECONDS, true, daemonThreadFactory(nodeName, MASTER_UPDATE_THREAD_NAME), - threadPool.getThreadContext(), - EsExecutors.TaskTrackingConfig.DO_NOT_TRACK + threadPool.getThreadContext() ); } diff --git a/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java b/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java index 2e19637b7203a..46c51bd69db29 100644 --- a/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java +++ b/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java @@ -77,8 +77,7 @@ public TimestampFieldMapperService(Settings settings, ThreadPool threadPool, Ind TimeUnit.MILLISECONDS, true, daemonThreadFactory(nodeName, threadName), - threadPool.getThreadContext(), - EsExecutors.TaskTrackingConfig.DO_NOT_TRACK + threadPool.getThreadContext() ); } From 8fe724c5bcffa03f335d4fe4c770a760c579f4a1 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 13 Mar 2025 14:36:34 +0100 Subject: [PATCH 11/24] fix rawtype --- .../org/elasticsearch/common/util/concurrent/EsExecutors.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index f62c0df5e395f..7736f730fc8b0 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -393,7 +393,7 @@ private static LinkedTransferQueue newUnboundedScalingLTQueue(int corePoo } // scaling beyond core pool size with an unbounded queue requires ExecutorScalingQueue // note, reconfiguration of core / max pool size not supported in EsThreadPoolExecutor - return new ExecutorScalingQueue(); + return new ExecutorScalingQueue<>(); } static class ExecutorScalingQueue extends LinkedTransferQueue { From 3d9e590d677a79fe1353e61700f5f8e37d0d0cf0 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 13 Mar 2025 15:08:30 +0100 Subject: [PATCH 12/24] reduce noisy test logger to trace --- .../elasticsearch/common/util/concurrent/EsExecutorsTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index ca02127616aed..ecec72ceab9c9 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -799,7 +799,7 @@ protected void doRun() { if (--remaining == 0) { doneLatch.countDown(); } else { - logger.info("--> remaining [{}]", remaining); + logger.trace("--> remaining [{}]", remaining); final long keepAliveNanos = executor.getKeepAliveTime(TimeUnit.NANOSECONDS); new Thread(() -> { if (keepAliveNanos > 0) { From 7607e52bdebb7d430ca9f109c7a5fefd26d2885b Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 13 Mar 2025 15:50:20 +0100 Subject: [PATCH 13/24] add Java docs --- .../common/util/concurrent/EsExecutors.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 7736f730fc8b0..633412a954ccf 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -96,6 +96,21 @@ public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing( return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer); } + /** + * Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue. + *

+ * The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size + * (and at least 1) is reached: each time a task is submitted a new worker is added. + *

+ * Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects + * a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never + * scale beyond the core pool size. + *

+ * Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless + * it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add + * a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy} + * rejection handler. + */ public static EsThreadPoolExecutor newScaling( String name, int min, @@ -137,6 +152,21 @@ public static EsThreadPoolExecutor newScaling( } } + /** + * Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue. + *

+ * The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size + * (and at least 1) is reached: each time a task is submitted a new worker is added. + *

+ * Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects + * a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never + * scale beyond the core pool size. + *

+ * Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless + * it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add + * a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy} + * rejection handler. + */ public static EsThreadPoolExecutor newScaling( String name, int min, @@ -396,6 +426,22 @@ private static LinkedTransferQueue newUnboundedScalingLTQueue(int corePoo return new ExecutorScalingQueue<>(); } + /** + * Customized {@link LinkedTransferQueue} to allow a {@link ThreadPoolExecutor} to scale beyond its core pool size despite having an + * unbounded queue. + *

+ * Once having reached its core pool size, a {@link ThreadPoolExecutor} will only add more workers if capacity remains and + * the task offer is rejected by the work queue. Typically that's never the case using a regular unbounded queue. + *

+ * This customized implementation rejects every task offer unless it can be immediately transferred to an available idle worker. + * It relies on {@link ForceQueuePolicy} rejection handler to append the task to the work queue if no additional worker can be added + * and the task is rejected by the executor. + *

+ * Note, {@link ForceQueuePolicy} cannot guarantee there will be available workers when appending tasks directly to the queue. + * For that reason {@link ExecutorScalingQueue} cannot be used with executors with empty core and max pool size of 1: + * the only available worker could time out just about at the same time as the task is appended, see + * this Github issue for more details. + */ static class ExecutorScalingQueue extends LinkedTransferQueue { ExecutorScalingQueue() {} From dd33e6ddbccd340e4ead1c3f574ba0dadb3d78bc Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 13 Mar 2025 17:15:04 +0100 Subject: [PATCH 14/24] fix test timeouts --- .../util/concurrent/EsExecutorsTests.java | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index ecec72ceab9c9..74237c9d26251 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.Processors; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -765,21 +766,18 @@ public void testScalingWithEmptyCoreAndKeepAlive() { private void testScalingWithEmptyCore(long keepAliveMillis) { String name = getTestName(); - ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(getTestName()); - try ( - var executor = EsExecutors.newScaling( - name, - 0, - 1, - keepAliveMillis, - TimeUnit.MILLISECONDS, - true, - threadFactory, - threadContext, - DO_NOT_TRACK - ) - ) { - + var executor = EsExecutors.newScaling( + name, + 0, + 1, + keepAliveMillis, + TimeUnit.MILLISECONDS, + true, + EsExecutors.daemonThreadFactory(getTestName()), + threadContext, + DO_NOT_TRACK + ); + try { class Task extends AbstractRunnable { private int remaining; private final CountDownLatch doneLatch; @@ -814,13 +812,13 @@ protected void doRun() { } } - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 20; i++) { logger.info("--> attempt [{}]", i); final var doneLatch = new CountDownLatch(1); - executor.execute(new Task(between(1, 1000), doneLatch)); + executor.execute(new Task(between(1, 500), doneLatch)); boolean success = false; try { - safeAwait(doneLatch); + safeAwait(doneLatch, TimeValue.ONE_MINUTE); success = true; } finally { if (success == false) { @@ -828,6 +826,8 @@ protected void doRun() { } } } + } finally { + ThreadPool.terminate(executor, 1, TimeUnit.SECONDS); } } } From 30d4a50e25e2c3fb6a14d71c058f34b052a06bf1 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 13 Mar 2025 18:23:07 +0100 Subject: [PATCH 15/24] Implement worker pool probing to prevent #124667 if max pool size > 1. --- .../common/util/concurrent/EsExecutors.java | 36 +++++++-- .../util/concurrent/EsThreadPoolExecutor.java | 5 +- .../util/concurrent/EsExecutorsTests.java | 76 +++++++++++++++---- 3 files changed, 97 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 633412a954ccf..490b15c5a60dd 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -123,6 +123,9 @@ public static EsThreadPoolExecutor newScaling( TaskTrackingConfig config ) { LinkedTransferQueue queue = newUnboundedScalingLTQueue(min, max); + // Force queued work via ForceQueuePolicy might starve if no worker is available (if core size is empty), + // probing the worker pool prevents this. + boolean probeWorkerPool = min == 0 && queue instanceof ExecutorScalingQueue; if (config.trackExecutionTime()) { return new TaskExecutionTimeTrackingEsThreadPoolExecutor( name, @@ -133,7 +136,7 @@ public static EsThreadPoolExecutor newScaling( queue, TimedRunnable::new, threadFactory, - new ForceQueuePolicy(rejectAfterShutdown), + new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool), contextHolder, config ); @@ -146,7 +149,7 @@ public static EsThreadPoolExecutor newScaling( unit, queue, threadFactory, - new ForceQueuePolicy(rejectAfterShutdown), + new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool), contextHolder ); } @@ -430,6 +433,11 @@ private static LinkedTransferQueue newUnboundedScalingLTQueue(int corePoo * Customized {@link LinkedTransferQueue} to allow a {@link ThreadPoolExecutor} to scale beyond its core pool size despite having an * unbounded queue. *

+ * Note, usage of unbounded work queues is a problem by itself. For once, it makes error-prone customizations necessary so that + * thread pools can scale up adequately. But worse, infinite queues prevent backpressure and impose a high risk of causing OOM errors. + * Github #18613 captures various long outstanding, but important + * improvements to thread pools. + *

* Once having reached its core pool size, a {@link ThreadPoolExecutor} will only add more workers if capacity remains and * the task offer is rejected by the work queue. Typically that's never the case using a regular unbounded queue. *

@@ -440,7 +448,7 @@ private static LinkedTransferQueue newUnboundedScalingLTQueue(int corePoo * Note, {@link ForceQueuePolicy} cannot guarantee there will be available workers when appending tasks directly to the queue. * For that reason {@link ExecutorScalingQueue} cannot be used with executors with empty core and max pool size of 1: * the only available worker could time out just about at the same time as the task is appended, see - * this Github issue for more details. + * Github #124667 for more details. */ static class ExecutorScalingQueue extends LinkedTransferQueue { @@ -448,6 +456,10 @@ static class ExecutorScalingQueue extends LinkedTransferQueue { @Override public boolean offer(E e) { + if (e == EsThreadPoolExecutor.NOOP_PROBE) { // referential equality + // this probe ensures a worker is available after force queueing a task via ForceQueuePolicy + return super.offer(e); + } // try to transfer to a waiting worker thread // otherwise reject queuing the task to force the thread pool executor to add a worker if it can; // combined with ForceQueuePolicy, this causes the thread pool to always scale up to max pool size @@ -494,15 +506,24 @@ static class ForceQueuePolicy extends EsRejectedExecutionHandler { */ private final boolean rejectAfterShutdown; + /** + * Flag to indicate if the worker pool needs to be probed after force queuing a task to guarantee a worker is available. + */ + private final boolean probeWorkerPool; + /** * @param rejectAfterShutdown indicates if {@link Runnable} should be rejected once the thread pool is shutting down */ - ForceQueuePolicy(boolean rejectAfterShutdown) { + ForceQueuePolicy(boolean rejectAfterShutdown, boolean probeWorkerPool) { this.rejectAfterShutdown = rejectAfterShutdown; + this.probeWorkerPool = probeWorkerPool; } @Override public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { + if (task == EsThreadPoolExecutor.NOOP_PROBE) { // referential equality + return; + } if (rejectAfterShutdown) { if (executor.isShutdown()) { reject(executor, task); @@ -519,12 +540,17 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { } } - private static void put(ThreadPoolExecutor executor, Runnable task) { + private void put(ThreadPoolExecutor executor, Runnable task) { final BlockingQueue queue = executor.getQueue(); // force queue policy should only be used with a scaling queue (ExecutorScalingQueue / LinkedTransferQueue) assert queue instanceof LinkedTransferQueue; try { queue.put(task); + if (probeWorkerPool && task == queue.peek()) { // referential equality + // If the task is at the head of the queue, we can assume the queue was previously empty. In this case available workers + // might have timed out in the meanwhile. To prevent the task from starving, we submit a noop probe to the executor. + executor.execute(EsThreadPoolExecutor.NOOP_PROBE); + } } catch (final InterruptedException e) { assert false : "a scaling queue never blocks so a put to it can never be interrupted"; throw new AssertionError(e); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 4303d03db0a13..d670eaeab8a6e 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -29,6 +29,9 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { private static final Logger logger = LogManager.getLogger(EsThreadPoolExecutor.class); + // probe to + static final Runnable NOOP_PROBE = () -> {}; + private final ThreadContext contextHolder; /** @@ -78,7 +81,7 @@ public void setMaximumPoolSize(int maximumPoolSize) { @Override public void execute(Runnable command) { - final Runnable wrappedRunnable = wrapRunnable(command); + final Runnable wrappedRunnable = command != NOOP_PROBE ? wrapRunnable(command) : NOOP_PROBE; try { super.execute(wrappedRunnable); } catch (Exception e) { diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 74237c9d26251..edbec41172208 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -757,26 +757,74 @@ public void onRejection(Exception e) { } public void testScalingWithEmptyCore() { - testScalingWithEmptyCore(0); + testScalingWithEmptyCore( + EsExecutors.newScaling( + getTestName(), + 0, + 1, + 0, + TimeUnit.MILLISECONDS, + true, + EsExecutors.daemonThreadFactory(getTestName()), + threadContext + ) + ); } public void testScalingWithEmptyCoreAndKeepAlive() { - testScalingWithEmptyCore(1); + testScalingWithEmptyCore( + EsExecutors.newScaling( + getTestName(), + 0, + 1, + 1, + TimeUnit.MILLISECONDS, + true, + EsExecutors.daemonThreadFactory(getTestName()), + threadContext + ) + ); } - private void testScalingWithEmptyCore(long keepAliveMillis) { - String name = getTestName(); - var executor = EsExecutors.newScaling( - name, - 0, - 1, - keepAliveMillis, - TimeUnit.MILLISECONDS, - true, - EsExecutors.daemonThreadFactory(getTestName()), - threadContext, - DO_NOT_TRACK + public void testScalingWithEmptyCoreAndWorkerPoolProbing() { + // https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1. + // if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well. + // the executor is created directly here, newScaling doesn't use ExecutorScalingQueue & probing if max pool size = 1. + testScalingWithEmptyCore( + new EsThreadPoolExecutor( + getTestName(), + 0, + 1, + 0, + TimeUnit.MILLISECONDS, + new EsExecutors.ExecutorScalingQueue<>(), + EsExecutors.daemonThreadFactory(getTestName()), + new EsExecutors.ForceQueuePolicy(true, true), + threadContext + ) + ); + } + + public void testScalingWithEmptyCoreAndKeepAliveAndWorkerPoolProbing() { + // https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1. + // if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well. + // the executor is created directly here, newScaling doesn't use ExecutorScalingQueue & probing if max pool size = 1. + testScalingWithEmptyCore( + new EsThreadPoolExecutor( + getTestName(), + 0, + 1, + 1, + TimeUnit.MILLISECONDS, + new EsExecutors.ExecutorScalingQueue<>(), + EsExecutors.daemonThreadFactory(getTestName()), + new EsExecutors.ForceQueuePolicy(true, true), + threadContext + ) ); + } + + private void testScalingWithEmptyCore(EsThreadPoolExecutor executor) { try { class Task extends AbstractRunnable { private int remaining; From 327962811f9c5fd11ff39ef7bcfe0bfacf6861cb Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 13 Mar 2025 20:11:31 +0100 Subject: [PATCH 16/24] fix thread pool configuration in testSlicingBehaviourForParallelCollection --- .../search/SearchServiceSingleNodeTests.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java index f47490fe109e0..9da83350d7edf 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java @@ -265,7 +265,11 @@ public void onQueryPhase(SearchContext context, long tookInNanos) { @Override protected Settings nodeSettings() { - return Settings.builder().put("search.default_search_timeout", "5s").build(); + Settings.Builder builder = Settings.builder().put("search.default_search_timeout", "5s"); + if (getTestName().equals("testSlicingBehaviourForParallelCollection")) { + builder.put("thread_pool.search.size", 10); // customize search pool size, reconfiguring at runtime is unsupported + } + return builder.build(); } public void testClearOnClose() { @@ -2744,11 +2748,14 @@ public void testEnableSearchWorkerThreads() throws IOException { * parallel collection. */ public void testSlicingBehaviourForParallelCollection() throws Exception { - // We set the executor pool size explicitly to be independent of CPU cores. - final int configuredMaxPoolSize = 10; - IndexService indexService = createIndex("index", Settings.builder().put("thread_pool.search.size", configuredMaxPoolSize).build()); + IndexService indexService = createIndex("index", Settings.EMPTY); ThreadPoolExecutor executor = (ThreadPoolExecutor) indexService.getThreadPool().executor(ThreadPool.Names.SEARCH); + // We configure the executor pool size explicitly in nodeSettings to be independent of CPU cores. + final int configuredMaxPoolSize = 10; + assert String.valueOf(configuredMaxPoolSize).equals(nodeSettings().get("thread_pool.search.size")) + : "Unexpected thread_pool.search.size"; + int numDocs = randomIntBetween(50, 100); for (int i = 0; i < numDocs; i++) { prepareIndex("index").setId(String.valueOf(i)).setSource("field", "value").get(); From b77ea90b7eef0f79117ed1fe49e0dfe4691b47eb Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Fri, 14 Mar 2025 08:41:40 +0100 Subject: [PATCH 17/24] deterministic search pool size in SearchServiceSingleNodeTests --- .../search/SearchServiceSingleNodeTests.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java index 9da83350d7edf..61e6a96c1df3d 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java @@ -180,6 +180,8 @@ public class SearchServiceSingleNodeTests extends ESSingleNodeTestCase { + private static final int SEARCH_POOL_SIZE = 10; + @Override protected boolean resetNodeAfterTest() { return true; @@ -265,11 +267,10 @@ public void onQueryPhase(SearchContext context, long tookInNanos) { @Override protected Settings nodeSettings() { - Settings.Builder builder = Settings.builder().put("search.default_search_timeout", "5s"); - if (getTestName().equals("testSlicingBehaviourForParallelCollection")) { - builder.put("thread_pool.search.size", 10); // customize search pool size, reconfiguring at runtime is unsupported - } - return builder.build(); + return Settings.builder() + .put("search.default_search_timeout", "5s") + .put("thread_pool.search.size", SEARCH_POOL_SIZE) // customized search pool size, reconfiguring at runtime is unsupported + .build(); } public void testClearOnClose() { @@ -2150,6 +2151,7 @@ public void onFailure(Exception exc) { CountDownLatch latch = new CountDownLatch(1); shardRequest.source().query(new MatchNoneQueryBuilder()); service.executeQueryPhase(shardRequest, task, new ActionListener<>() { + @Override public void onResponse(SearchPhaseResult result) { try { @@ -2751,9 +2753,8 @@ public void testSlicingBehaviourForParallelCollection() throws Exception { IndexService indexService = createIndex("index", Settings.EMPTY); ThreadPoolExecutor executor = (ThreadPoolExecutor) indexService.getThreadPool().executor(ThreadPool.Names.SEARCH); - // We configure the executor pool size explicitly in nodeSettings to be independent of CPU cores. - final int configuredMaxPoolSize = 10; - assert String.valueOf(configuredMaxPoolSize).equals(nodeSettings().get("thread_pool.search.size")) + // We configure the executor pool size explicitly in nodeSettings to be independent of CPU cores + assert String.valueOf(SEARCH_POOL_SIZE).equals(node().settings().get("thread_pool.search.size")) : "Unexpected thread_pool.search.size"; int numDocs = randomIntBetween(50, 100); @@ -2788,7 +2789,7 @@ public void testSlicingBehaviourForParallelCollection() throws Exception { final int maxPoolSize = executor.getMaximumPoolSize(); assertEquals( "Sanity check to ensure this isn't the default of 1 when pool size is unset", - configuredMaxPoolSize, + SEARCH_POOL_SIZE, maxPoolSize ); @@ -2818,7 +2819,7 @@ public void testSlicingBehaviourForParallelCollection() throws Exception { final int maxPoolSize = executor.getMaximumPoolSize(); assertEquals( "Sanity check to ensure this isn't the default of 1 when pool size is unset", - configuredMaxPoolSize, + SEARCH_POOL_SIZE, maxPoolSize ); @@ -2909,7 +2910,7 @@ public void testSlicingBehaviourForParallelCollection() throws Exception { final int maxPoolSize = executor.getMaximumPoolSize(); assertEquals( "Sanity check to ensure this isn't the default of 1 when pool size is unset", - configuredMaxPoolSize, + SEARCH_POOL_SIZE, maxPoolSize ); From 5bf3b126e4002b43f83f21b77fdb35e2fed686d0 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Fri, 14 Mar 2025 08:47:22 +0100 Subject: [PATCH 18/24] changelog --- docs/changelog/124732.yaml | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/docs/changelog/124732.yaml b/docs/changelog/124732.yaml index e00fc1b2387b0..671c04b478ded 100644 --- a/docs/changelog/124732.yaml +++ b/docs/changelog/124732.yaml @@ -1,14 +1,6 @@ pr: 124732 -summary: Prevent starvation bug if using scaling `EsThreadPoolExecutor` if core pool - size = 0 / max pool size = 1 +summary: Prevent rare starvation bug when using scaling `EsThreadPoolExecutor` with empty core pool size. area: Infra/Core type: bug -issues: [] -breaking: - title: Prevent starvation bug if using scaling `EsThreadPoolExecutor` if core pool - size = 0 / max pool size = 1 - area: Infra/Core - details: Please describe the details of this change for the release notes. You can - use asciidoc. - impact: Please describe the impact of this change to users - notable: false +issues: + - 124667 From 1f4fb2d94a505c6c424ecaaff7f3fcc76825f7ba Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Fri, 14 Mar 2025 08:58:32 +0100 Subject: [PATCH 19/24] rename worker probe, add comment --- .../elasticsearch/common/util/concurrent/EsExecutors.java | 6 +++--- .../common/util/concurrent/EsThreadPoolExecutor.java | 7 ++++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 490b15c5a60dd..9f9e022c88375 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -456,7 +456,7 @@ static class ExecutorScalingQueue extends LinkedTransferQueue { @Override public boolean offer(E e) { - if (e == EsThreadPoolExecutor.NOOP_PROBE) { // referential equality + if (e == EsThreadPoolExecutor.WORKER_PROBE) { // referential equality // this probe ensures a worker is available after force queueing a task via ForceQueuePolicy return super.offer(e); } @@ -521,7 +521,7 @@ static class ForceQueuePolicy extends EsRejectedExecutionHandler { @Override public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { - if (task == EsThreadPoolExecutor.NOOP_PROBE) { // referential equality + if (task == EsThreadPoolExecutor.WORKER_PROBE) { // referential equality return; } if (rejectAfterShutdown) { @@ -549,7 +549,7 @@ private void put(ThreadPoolExecutor executor, Runnable task) { if (probeWorkerPool && task == queue.peek()) { // referential equality // If the task is at the head of the queue, we can assume the queue was previously empty. In this case available workers // might have timed out in the meanwhile. To prevent the task from starving, we submit a noop probe to the executor. - executor.execute(EsThreadPoolExecutor.NOOP_PROBE); + executor.execute(EsThreadPoolExecutor.WORKER_PROBE); } } catch (final InterruptedException e) { assert false : "a scaling queue never blocks so a put to it can never be interrupted"; diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index d670eaeab8a6e..a941a47cdc548 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -29,8 +29,9 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { private static final Logger logger = LogManager.getLogger(EsThreadPoolExecutor.class); - // probe to - static final Runnable NOOP_PROBE = () -> {}; + // noop probe to prevent starvation of work in the work queue due to ForceQueuePolicy + // https://github.com/elastic/elasticsearch/issues/124667 + static final Runnable WORKER_PROBE = () -> {}; private final ThreadContext contextHolder; @@ -81,7 +82,7 @@ public void setMaximumPoolSize(int maximumPoolSize) { @Override public void execute(Runnable command) { - final Runnable wrappedRunnable = command != NOOP_PROBE ? wrapRunnable(command) : NOOP_PROBE; + final Runnable wrappedRunnable = command != WORKER_PROBE ? wrapRunnable(command) : WORKER_PROBE; try { super.execute(wrappedRunnable); } catch (Exception e) { From a831436bd387d2f7ad15ac3b5b3d341b2985addb Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Fri, 14 Mar 2025 09:19:32 +0100 Subject: [PATCH 20/24] more java docs --- .../elasticsearch/common/util/concurrent/EsExecutors.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 9f9e022c88375..fa9e66955cde5 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -159,7 +159,7 @@ public static EsThreadPoolExecutor newScaling( * Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue. *

* The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size - * (and at least 1) is reached: each time a task is submitted a new worker is added. + * (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available. *

* Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects * a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never @@ -449,6 +449,12 @@ private static LinkedTransferQueue newUnboundedScalingLTQueue(int corePoo * For that reason {@link ExecutorScalingQueue} cannot be used with executors with empty core and max pool size of 1: * the only available worker could time out just about at the same time as the task is appended, see * Github #124667 for more details. + *

+ * Note, configuring executors using core = max size in combination with {@code allowCoreThreadTimeOut} could be an alternative to + * {@link ExecutorScalingQueue}. However, the scaling behavior would be very different: Using {@link ExecutorScalingQueue} + * we are able to reuse idle workers if available by means of {@link ExecutorScalingQueue#tryTransfer(Object)}. + * If setting core = max size, the executor will add a new worker for every task submitted until reaching the core/max pool size + * even if there's idle workers available. */ static class ExecutorScalingQueue extends LinkedTransferQueue { From bd2eac81a7ea9c059084ae2301e42cfc46fbc72a Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Fri, 14 Mar 2025 09:20:45 +0100 Subject: [PATCH 21/24] more java docs --- .../org/elasticsearch/common/util/concurrent/EsExecutors.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index fa9e66955cde5..3cee599fa1b21 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -100,7 +100,7 @@ public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing( * Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue. *

* The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size - * (and at least 1) is reached: each time a task is submitted a new worker is added. + * (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available. *

* Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects * a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never From 48364c6c9e131a47186f003dce46bccc49688ed1 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Fri, 14 Mar 2025 11:27:57 +0100 Subject: [PATCH 22/24] add comment --- .../org/elasticsearch/common/util/concurrent/EsExecutors.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 3cee599fa1b21..f0c7308c5c8db 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -555,6 +555,8 @@ private void put(ThreadPoolExecutor executor, Runnable task) { if (probeWorkerPool && task == queue.peek()) { // referential equality // If the task is at the head of the queue, we can assume the queue was previously empty. In this case available workers // might have timed out in the meanwhile. To prevent the task from starving, we submit a noop probe to the executor. + // Note, this deliberately doesn't check getPoolSize()==0 to avoid potential race conditions, + // as the count in the atomic state (used by workerCountOf) is decremented first. executor.execute(EsThreadPoolExecutor.WORKER_PROBE); } } catch (final InterruptedException e) { From 0f1f95ddc002c1217972ad5913ad1a95ba2d1dc6 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Fri, 14 Mar 2025 11:57:21 +0100 Subject: [PATCH 23/24] PR comments --- .../common/util/concurrent/EsThreadPoolExecutor.java | 5 ++++- .../common/util/concurrent/EsExecutorsTests.java | 12 ++---------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index a941a47cdc548..8c6f3f6eca648 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -31,7 +31,10 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { // noop probe to prevent starvation of work in the work queue due to ForceQueuePolicy // https://github.com/elastic/elasticsearch/issues/124667 - static final Runnable WORKER_PROBE = () -> {}; + static final Runnable WORKER_PROBE = new Runnable() { + @Override + public void run() {} + }; private final ThreadContext contextHolder; diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index edbec41172208..703b9092b1e3d 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -861,18 +861,10 @@ protected void doRun() { } for (int i = 0; i < 20; i++) { - logger.info("--> attempt [{}]", i); + logger.trace("--> attempt [{}]", i); final var doneLatch = new CountDownLatch(1); executor.execute(new Task(between(1, 500), doneLatch)); - boolean success = false; - try { - safeAwait(doneLatch, TimeValue.ONE_MINUTE); - success = true; - } finally { - if (success == false) { - logger.info("fail"); - } - } + safeAwait(doneLatch, TimeValue.ONE_MINUTE); } } finally { ThreadPool.terminate(executor, 1, TimeUnit.SECONDS); From 97fefa6f569b9c2edbebe9b7c0cbf7384ef3a0b9 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Fri, 14 Mar 2025 12:19:11 +0100 Subject: [PATCH 24/24] PR comments --- .../util/concurrent/EsThreadPoolExecutor.java | 2 ++ .../util/concurrent/EsExecutorsTests.java | 34 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 8c6f3f6eca648..ad4616692850e 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -31,6 +31,8 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { // noop probe to prevent starvation of work in the work queue due to ForceQueuePolicy // https://github.com/elastic/elasticsearch/issues/124667 + // note, this is intentionally not a lambda to avoid this ever be turned into a compile time constant + // matching similar lambdas coming from other places static final Runnable WORKER_PROBE = new Runnable() { @Override public void run() {} diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 703b9092b1e3d..e87c0d00c15cd 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -786,6 +786,40 @@ public void testScalingWithEmptyCoreAndKeepAlive() { ); } + public void testScalingWithEmptyCoreAndLargerMaxSize() { + // TODO currently the reproduction of the starvation bug does not work if max pool size > 1 + // https://github.com/elastic/elasticsearch/issues/124867 + testScalingWithEmptyCore( + EsExecutors.newScaling( + getTestName(), + 0, + between(2, 5), + 0, + TimeUnit.MILLISECONDS, + true, + EsExecutors.daemonThreadFactory(getTestName()), + threadContext + ) + ); + } + + public void testScalingWithEmptyCoreAndKeepAliveAndLargerMaxSize() { + // TODO currently the reproduction of the starvation bug does not work if max pool size > 1 + // https://github.com/elastic/elasticsearch/issues/124867 + testScalingWithEmptyCore( + EsExecutors.newScaling( + getTestName(), + 0, + between(2, 5), + 1, + TimeUnit.MILLISECONDS, + true, + EsExecutors.daemonThreadFactory(getTestName()), + threadContext + ) + ); + } + public void testScalingWithEmptyCoreAndWorkerPoolProbing() { // https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1. // if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well.