diff --git a/docs/changelog/124732.yaml b/docs/changelog/124732.yaml new file mode 100644 index 0000000000000..671c04b478ded --- /dev/null +++ b/docs/changelog/124732.yaml @@ -0,0 +1,6 @@ +pr: 124732 +summary: Prevent rare starvation bug when using scaling `EsThreadPoolExecutor` with empty core pool size. +area: Infra/Core +type: bug +issues: + - 124667 diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index b10db7d4d1dd3..e1aeea84fd840 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -96,6 +96,21 @@ public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing( return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer); } + /** + * Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue. + *

+ * The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size + * (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available. + *

+ * Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects + * a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never + * scale beyond the core pool size. + *

+ * Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless + * it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add + * a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy} + * rejection handler. + */ public static EsThreadPoolExecutor newScaling( String name, int min, @@ -107,10 +122,12 @@ public static EsThreadPoolExecutor newScaling( ThreadContext contextHolder, TaskTrackingConfig config ) { - ExecutorScalingQueue queue = new ExecutorScalingQueue<>(); - EsThreadPoolExecutor executor; + LinkedTransferQueue queue = newUnboundedScalingLTQueue(min, max); + // Force queued work via ForceQueuePolicy might starve if no worker is available (if core size is empty), + // probing the worker pool prevents this. + boolean probeWorkerPool = min == 0 && queue instanceof ExecutorScalingQueue; if (config.trackExecutionTime()) { - executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor( + return new TaskExecutionTimeTrackingEsThreadPoolExecutor( name, min, max, @@ -119,12 +136,12 @@ public static EsThreadPoolExecutor newScaling( queue, TimedRunnable::new, threadFactory, - new ForceQueuePolicy(rejectAfterShutdown), + new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool), contextHolder, config ); } else { - executor = new EsThreadPoolExecutor( + return new EsThreadPoolExecutor( name, min, max, @@ -132,14 +149,27 @@ public static EsThreadPoolExecutor newScaling( unit, queue, threadFactory, - new ForceQueuePolicy(rejectAfterShutdown), + new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool), contextHolder ); } - queue.executor = executor; - return executor; } + /** + * Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue. + *

+ * The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size + * (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available. + *

+ * Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects + * a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never + * scale beyond the core pool size. + *

+ * Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless + * it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add + * a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy} + * rejection handler. + */ public static EsThreadPoolExecutor newScaling( String name, int min, @@ -366,32 +396,58 @@ public Thread newThread(Runnable r) { */ private EsExecutors() {} - static class ExecutorScalingQueue extends LinkedTransferQueue { + private static LinkedTransferQueue newUnboundedScalingLTQueue(int corePoolSize, int maxPoolSize) { + if (maxPoolSize == 1 || maxPoolSize == corePoolSize) { + // scaling beyond core pool size (or 1) not required, use a regular unbounded LinkedTransferQueue + return new LinkedTransferQueue<>(); + } + // scaling beyond core pool size with an unbounded queue requires ExecutorScalingQueue + // note, reconfiguration of core / max pool size not supported in EsThreadPoolExecutor + return new ExecutorScalingQueue<>(); + } - ThreadPoolExecutor executor; + /** + * Customized {@link LinkedTransferQueue} to allow a {@link ThreadPoolExecutor} to scale beyond its core pool size despite having an + * unbounded queue. + *

+ * Note, usage of unbounded work queues is a problem by itself. For once, it makes error-prone customizations necessary so that + * thread pools can scale up adequately. But worse, infinite queues prevent backpressure and impose a high risk of causing OOM errors. + * Github #18613 captures various long outstanding, but important + * improvements to thread pools. + *

+ * Once having reached its core pool size, a {@link ThreadPoolExecutor} will only add more workers if capacity remains and + * the task offer is rejected by the work queue. Typically that's never the case using a regular unbounded queue. + *

+ * This customized implementation rejects every task offer unless it can be immediately transferred to an available idle worker. + * It relies on {@link ForceQueuePolicy} rejection handler to append the task to the work queue if no additional worker can be added + * and the task is rejected by the executor. + *

+ * Note, {@link ForceQueuePolicy} cannot guarantee there will be available workers when appending tasks directly to the queue. + * For that reason {@link ExecutorScalingQueue} cannot be used with executors with empty core and max pool size of 1: + * the only available worker could time out just about at the same time as the task is appended, see + * Github #124667 for more details. + *

+ * Note, configuring executors using core = max size in combination with {@code allowCoreThreadTimeOut} could be an alternative to + * {@link ExecutorScalingQueue}. However, the scaling behavior would be very different: Using {@link ExecutorScalingQueue} + * we are able to reuse idle workers if available by means of {@link ExecutorScalingQueue#tryTransfer(Object)}. + * If setting core = max size, the executor will add a new worker for every task submitted until reaching the core/max pool size + * even if there's idle workers available. + */ + static class ExecutorScalingQueue extends LinkedTransferQueue { ExecutorScalingQueue() {} @Override public boolean offer(E e) { - // first try to transfer to a waiting worker thread - if (tryTransfer(e) == false) { - // check if there might be spare capacity in the thread - // pool executor - int left = executor.getMaximumPoolSize() - executor.getCorePoolSize(); - if (left > 0) { - // reject queuing the task to force the thread pool - // executor to add a worker if it can; combined - // with ForceQueuePolicy, this causes the thread - // pool to always scale up to max pool size and we - // only queue when there is no spare capacity - return false; - } else { - return super.offer(e); - } - } else { - return true; + if (e == EsThreadPoolExecutor.WORKER_PROBE) { // referential equality + // this probe ensures a worker is available after force queueing a task via ForceQueuePolicy + return super.offer(e); } + // try to transfer to a waiting worker thread + // otherwise reject queuing the task to force the thread pool executor to add a worker if it can; + // combined with ForceQueuePolicy, this causes the thread pool to always scale up to max pool size + // so that we only queue when there is no spare capacity + return tryTransfer(e); } // Overridden to workaround a JDK bug introduced in JDK 21.0.2 @@ -433,15 +489,24 @@ static class ForceQueuePolicy extends EsRejectedExecutionHandler { */ private final boolean rejectAfterShutdown; + /** + * Flag to indicate if the worker pool needs to be probed after force queuing a task to guarantee a worker is available. + */ + private final boolean probeWorkerPool; + /** * @param rejectAfterShutdown indicates if {@link Runnable} should be rejected once the thread pool is shutting down */ - ForceQueuePolicy(boolean rejectAfterShutdown) { + ForceQueuePolicy(boolean rejectAfterShutdown, boolean probeWorkerPool) { this.rejectAfterShutdown = rejectAfterShutdown; + this.probeWorkerPool = probeWorkerPool; } @Override public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { + if (task == EsThreadPoolExecutor.WORKER_PROBE) { // referential equality + return; + } if (rejectAfterShutdown) { if (executor.isShutdown()) { reject(executor, task); @@ -458,12 +523,19 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { } } - private static void put(ThreadPoolExecutor executor, Runnable task) { + private void put(ThreadPoolExecutor executor, Runnable task) { final BlockingQueue queue = executor.getQueue(); - // force queue policy should only be used with a scaling queue - assert queue instanceof ExecutorScalingQueue; + // force queue policy should only be used with a scaling queue (ExecutorScalingQueue / LinkedTransferQueue) + assert queue instanceof LinkedTransferQueue; try { queue.put(task); + if (probeWorkerPool && task == queue.peek()) { // referential equality + // If the task is at the head of the queue, we can assume the queue was previously empty. In this case available workers + // might have timed out in the meanwhile. To prevent the task from starving, we submit a noop probe to the executor. + // Note, this deliberately doesn't check getPoolSize()==0 to avoid potential race conditions, + // as the count in the atomic state (used by workerCountOf) is decremented first. + executor.execute(EsThreadPoolExecutor.WORKER_PROBE); + } } catch (final InterruptedException e) { assert false : "a scaling queue never blocks so a put to it can never be interrupted"; throw new AssertionError(e); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index a4d2777a48b63..ad4616692850e 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -29,6 +29,15 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { private static final Logger logger = LogManager.getLogger(EsThreadPoolExecutor.class); + // noop probe to prevent starvation of work in the work queue due to ForceQueuePolicy + // https://github.com/elastic/elasticsearch/issues/124667 + // note, this is intentionally not a lambda to avoid this ever be turned into a compile time constant + // matching similar lambdas coming from other places + static final Runnable WORKER_PROBE = new Runnable() { + @Override + public void run() {} + }; + private final ThreadContext contextHolder; /** @@ -66,9 +75,19 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { this.contextHolder = contextHolder; } + @Override + public void setCorePoolSize(int corePoolSize) { + throw new UnsupportedOperationException("reconfiguration at runtime is not supported"); + } + + @Override + public void setMaximumPoolSize(int maximumPoolSize) { + throw new UnsupportedOperationException("reconfiguration at runtime is not supported"); + } + @Override public void execute(Runnable command) { - final Runnable wrappedRunnable = wrapRunnable(command); + final Runnable wrappedRunnable = command != WORKER_PROBE ? wrapRunnable(command) : WORKER_PROBE; try { super.execute(wrappedRunnable); } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java index a31f940cdb2dc..ae45d9e70d21c 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java @@ -105,9 +105,14 @@ public ScalingExecutorBuilder( final EsExecutors.TaskTrackingConfig trackingConfig ) { super(name); - this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, Setting.Property.NodeScope); - this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, Setting.Property.NodeScope); - this.keepAliveSetting = Setting.timeSetting(settingsKey(prefix, "keep_alive"), keepAlive, Setting.Property.NodeScope); + this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, 0, Setting.Property.NodeScope); + this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, 1, Setting.Property.NodeScope); + this.keepAliveSetting = Setting.timeSetting( + settingsKey(prefix, "keep_alive"), + keepAlive, + TimeValue.ZERO, + Setting.Property.NodeScope + ); this.rejectAfterShutdown = rejectAfterShutdown; this.trackingConfig = trackingConfig; } @@ -172,5 +177,4 @@ static class ScalingExecutorSettings extends ExecutorBuilder.ExecutorSettings { this.keepAlive = keepAlive; } } - } diff --git a/server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java b/server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java index 5363722f2f49f..abbbd53dec570 100644 --- a/server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java @@ -100,7 +100,7 @@ public void testAsyncAcquire() throws InterruptedException { final var completionLatch = new CountDownLatch(1); final var executorService = EsExecutors.newScaling( "test", - 0, + 1, between(1, 10), 10, TimeUnit.SECONDS, diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index bdfec9dfaa630..430130d665168 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -748,4 +748,153 @@ public void onRejection(Exception e) { executor.execute(shouldBeRejected); assertTrue(rejected.get()); } + + public void testScalingWithEmptyCore() { + testScalingWithEmptyCore( + EsExecutors.newScaling( + getTestName(), + 0, + 1, + 0, + TimeUnit.MILLISECONDS, + true, + EsExecutors.daemonThreadFactory(getTestName()), + threadContext + ) + ); + } + + public void testScalingWithEmptyCoreAndKeepAlive() { + testScalingWithEmptyCore( + EsExecutors.newScaling( + getTestName(), + 0, + 1, + 1, + TimeUnit.MILLISECONDS, + true, + EsExecutors.daemonThreadFactory(getTestName()), + threadContext + ) + ); + } + + public void testScalingWithEmptyCoreAndLargerMaxSize() { + // TODO currently the reproduction of the starvation bug does not work if max pool size > 1 + // https://github.com/elastic/elasticsearch/issues/124867 + testScalingWithEmptyCore( + EsExecutors.newScaling( + getTestName(), + 0, + between(2, 5), + 0, + TimeUnit.MILLISECONDS, + true, + EsExecutors.daemonThreadFactory(getTestName()), + threadContext + ) + ); + } + + public void testScalingWithEmptyCoreAndKeepAliveAndLargerMaxSize() { + // TODO currently the reproduction of the starvation bug does not work if max pool size > 1 + // https://github.com/elastic/elasticsearch/issues/124867 + testScalingWithEmptyCore( + EsExecutors.newScaling( + getTestName(), + 0, + between(2, 5), + 1, + TimeUnit.MILLISECONDS, + true, + EsExecutors.daemonThreadFactory(getTestName()), + threadContext + ) + ); + } + + public void testScalingWithEmptyCoreAndWorkerPoolProbing() { + // https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1. + // if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well. + // the executor is created directly here, newScaling doesn't use ExecutorScalingQueue & probing if max pool size = 1. + testScalingWithEmptyCore( + new EsThreadPoolExecutor( + getTestName(), + 0, + 1, + 0, + TimeUnit.MILLISECONDS, + new EsExecutors.ExecutorScalingQueue<>(), + EsExecutors.daemonThreadFactory(getTestName()), + new EsExecutors.ForceQueuePolicy(true, true), + threadContext + ) + ); + } + + public void testScalingWithEmptyCoreAndKeepAliveAndWorkerPoolProbing() { + // https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1. + // if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well. + // the executor is created directly here, newScaling doesn't use ExecutorScalingQueue & probing if max pool size = 1. + testScalingWithEmptyCore( + new EsThreadPoolExecutor( + getTestName(), + 0, + 1, + 1, + TimeUnit.MILLISECONDS, + new EsExecutors.ExecutorScalingQueue<>(), + EsExecutors.daemonThreadFactory(getTestName()), + new EsExecutors.ForceQueuePolicy(true, true), + threadContext + ) + ); + } + + private void testScalingWithEmptyCore(EsThreadPoolExecutor executor) { + try { + class Task extends AbstractRunnable { + private int remaining; + private final CountDownLatch doneLatch; + + Task(int iterations, CountDownLatch doneLatch) { + this.remaining = iterations; + this.doneLatch = doneLatch; + } + + @Override + public void onFailure(Exception e) { + fail(e); + } + + @Override + protected void doRun() { + if (--remaining == 0) { + doneLatch.countDown(); + } else { + logger.trace("--> remaining [{}]", remaining); + final long keepAliveNanos = executor.getKeepAliveTime(TimeUnit.NANOSECONDS); + new Thread(() -> { + if (keepAliveNanos > 0) { + final var targetNanoTime = System.nanoTime() + keepAliveNanos + between(-10_000, 10_000); + while (System.nanoTime() < targetNanoTime) { + Thread.yield(); + } + } + executor.execute(Task.this); + }).start(); + } + } + } + + for (int i = 0; i < 100; i++) { + logger.trace("--> attempt [{}]", i); + final var doneLatch = new CountDownLatch(1); + executor.execute(new Task(between(1, 100), doneLatch)); + safeAwait(doneLatch); + } + } finally { + ThreadPool.terminate(executor, 1, TimeUnit.SECONDS); + } + } } diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java index 1f2d129f0293c..82aadc5b6f496 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java @@ -180,6 +180,8 @@ public class SearchServiceSingleNodeTests extends ESSingleNodeTestCase { + private static final int SEARCH_POOL_SIZE = 10; + @Override protected boolean resetNodeAfterTest() { return true; @@ -265,7 +267,10 @@ public void onQueryPhase(SearchContext context, long tookInNanos) { @Override protected Settings nodeSettings() { - return Settings.builder().put("search.default_search_timeout", "5s").build(); + return Settings.builder() + .put("search.default_search_timeout", "5s") + .put("thread_pool.search.size", SEARCH_POOL_SIZE) // customized search pool size, reconfiguring at runtime is unsupported + .build(); } public void testClearOnClose() { @@ -2148,6 +2153,7 @@ public void onFailure(Exception exc) { CountDownLatch latch = new CountDownLatch(1); shardRequest.source().query(new MatchNoneQueryBuilder()); service.executeQueryPhase(shardRequest, task, new ActionListener<>() { + @Override public void onResponse(SearchPhaseResult result) { try { @@ -2747,8 +2753,11 @@ public void testEnableSearchWorkerThreads() throws IOException { public void testSlicingBehaviourForParallelCollection() throws Exception { IndexService indexService = createIndex("index", Settings.EMPTY); ThreadPoolExecutor executor = (ThreadPoolExecutor) indexService.getThreadPool().executor(ThreadPool.Names.SEARCH); - final int configuredMaxPoolSize = 10; - executor.setMaximumPoolSize(configuredMaxPoolSize); // We set this explicitly to be independent of CPU cores. + + // We configure the executor pool size explicitly in nodeSettings to be independent of CPU cores + assert String.valueOf(SEARCH_POOL_SIZE).equals(node().settings().get("thread_pool.search.size")) + : "Unexpected thread_pool.search.size"; + int numDocs = randomIntBetween(50, 100); for (int i = 0; i < numDocs; i++) { prepareIndex("index").setId(String.valueOf(i)).setSource("field", "value").get(); @@ -2781,7 +2790,7 @@ public void testSlicingBehaviourForParallelCollection() throws Exception { final int maxPoolSize = executor.getMaximumPoolSize(); assertEquals( "Sanity check to ensure this isn't the default of 1 when pool size is unset", - configuredMaxPoolSize, + SEARCH_POOL_SIZE, maxPoolSize ); @@ -2811,7 +2820,7 @@ public void testSlicingBehaviourForParallelCollection() throws Exception { final int maxPoolSize = executor.getMaximumPoolSize(); assertEquals( "Sanity check to ensure this isn't the default of 1 when pool size is unset", - configuredMaxPoolSize, + SEARCH_POOL_SIZE, maxPoolSize ); @@ -2902,7 +2911,7 @@ public void testSlicingBehaviourForParallelCollection() throws Exception { final int maxPoolSize = executor.getMaximumPoolSize(); assertEquals( "Sanity check to ensure this isn't the default of 1 when pool size is unset", - configuredMaxPoolSize, + SEARCH_POOL_SIZE, maxPoolSize ); diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java index 6ae95b872a75f..f3861e9279ef2 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java @@ -91,7 +91,7 @@ public static void startHttpServer() throws Exception { // the EncryptedRepository can require more than one connection open at one time executorService = EsExecutors.newScaling( ESMockAPIBasedRepositoryIntegTestCase.class.getName(), - 0, + 1, 2, 60, TimeUnit.SECONDS,