From 92c9cc7fe41b00c13a2ac9b0a36d6e86bde6098d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sat, 14 Jun 2025 15:13:55 -0600 Subject: [PATCH 01/10] Add thread pool for write coordination This change adds a thread pool for write coordination to ensure that bulk coordination does not get stuck on an overloaded primary node. --- .../bulk/TransportAbstractBulkAction.java | 50 +++++++++++++------ .../DefaultBuiltInExecutorBuilders.java | 10 ++++ .../threadpool/ExecutorBuilder.java | 1 + .../elasticsearch/threadpool/ThreadPool.java | 2 + 4 files changed, 47 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index 9c8ca93a7744c..60d4a1d4930de 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -68,6 +68,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction private final IngestService ingestService; private final IngestActionForwarder ingestForwarder; protected final LongSupplier relativeTimeNanosProvider; + protected final Executor coordinationExecutor; protected final Executor writeExecutor; protected final Executor systemWriteExecutor; private final ActionType bulkAction; @@ -92,6 +93,7 @@ public TransportAbstractBulkAction( this.indexingPressure = indexingPressure; this.systemIndices = systemIndices; this.projectResolver = projectResolver; + this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION); this.writeExecutor = threadPool.executor(ThreadPool.Names.WRITE); this.systemWriteExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE); this.ingestForwarder = new IngestActionForwarder(transportService); @@ -106,8 +108,8 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); - final Executor executor = isOnlySystem ? systemWriteExecutor : writeExecutor; - ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener); + // Use coordinationExecutor for dispatching coordination tasks + ensureClusterStateThenForkAndExecute(task, bulkRequest, coordinationExecutor, isOnlySystem, releasingListener); } private void ensureClusterStateThenForkAndExecute( Task task, BulkRequest bulkRequest, Executor executor, + boolean isOnlySystem, ActionListener releasingListener ) { final ClusterState initialState = clusterService.state(); @@ -159,7 +162,7 @@ private void ensureClusterStateThenForkAndExecute( clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { - forkAndExecute(task, bulkRequest, executor, releasingListener); + forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener); } @Override @@ -173,21 +176,32 @@ public void onTimeout(TimeValue timeout) { } }, newState -> false == newState.blocks().hasGlobalBlockWithLevel(ClusterBlockLevel.WRITE)); } else { - forkAndExecute(task, bulkRequest, executor, releasingListener); + forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener); } } - private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executor, ActionListener releasingListener) { + private void forkAndExecute( + Task task, + BulkRequest bulkRequest, + Executor executor, + boolean isOnlySystem, + ActionListener releasingListener + ) { executor.execute(new ActionRunnable<>(releasingListener) { @Override protected void doRun() throws IOException { - applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener); + applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, releasingListener); } }); } - private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener listener) - throws IOException { + private boolean applyPipelines( + Task task, + BulkRequest bulkRequest, + Executor executor, + boolean isOnlySystem, + ActionListener listener + ) throws IOException { boolean hasIndexRequestsWithPipelines = false; ClusterState state = clusterService.state(); ProjectId projectId = projectResolver.getProjectId(); @@ -276,7 +290,7 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec assert arePipelinesResolved : bulkRequest; } if (clusterService.localNode().isIngestNode()) { - processBulkIndexIngestRequest(task, bulkRequest, executor, project, l); + processBulkIndexIngestRequest(task, bulkRequest, executor, isOnlySystem, project, l); } else { ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l); } @@ -290,6 +304,7 @@ private void processBulkIndexIngestRequest( Task task, BulkRequest original, Executor executor, + boolean isOnlySystem, ProjectMetadata metadata, ActionListener listener ) { @@ -323,12 +338,12 @@ private void processBulkIndexIngestRequest( ActionRunnable runnable = new ActionRunnable<>(actionListener) { @Override protected void doRun() throws IOException { - applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener); + applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, actionListener); } @Override public boolean isForceExecution() { - // If we fork back to a write thread we **not** should fail, because tp queue is full. + // If we fork back to a coordination thread we **not** should fail, because tp queue is full. // (Otherwise the work done during ingest will be lost) // It is okay to force execution here. Throttling of write requests happens prior to // ingest when a node receives a bulk request. @@ -336,7 +351,8 @@ public boolean isForceExecution() { } }; // If a processor went async and returned a response on a different thread then - // before we continue the bulk request we should fork back on a write thread: + // before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform + // coordination steps on the write thread if (originalThread == Thread.currentThread()) { runnable.run(); } else { @@ -345,7 +361,8 @@ public boolean isForceExecution() { } } }, - executor + // Use the appropriate write executor for actual ingest processing + isOnlySystem ? systemWriteExecutor : writeExecutor ); } @@ -401,10 +418,11 @@ private void applyPipelinesAndDoInternalExecute( Task task, BulkRequest bulkRequest, Executor executor, + boolean isOnlySystem, ActionListener listener ) throws IOException { final long relativeStartTimeNanos = relativeTimeNanos(); - if (applyPipelines(task, bulkRequest, executor, listener) == false) { + if (applyPipelines(task, bulkRequest, executor, isOnlySystem, listener) == false) { doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos); } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index b8dddc20cc51d..336d978358b9f 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -38,6 +38,16 @@ public Map getBuilders(Settings settings, int allocated ThreadPool.Names.GENERIC, new ScalingExecutorBuilder(ThreadPool.Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30), false) ); + result.put( + ThreadPool.Names.WRITE_COORDINATION, + new FixedExecutorBuilder( + settings, + ThreadPool.Names.WRITE_COORDINATION, + allocatedProcessors, + 10000, + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK + ) + ); result.put( ThreadPool.Names.WRITE, new FixedExecutorBuilder( diff --git a/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java index c259feb1c978e..6d438298acffc 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java @@ -41,6 +41,7 @@ protected static String settingsKey(final String prefix, final String key) { protected static int applyHardSizeLimit(final Settings settings, final String name) { if (name.equals("bulk") + || name.equals(ThreadPool.Names.WRITE_COORDINATION) || name.equals(ThreadPool.Names.WRITE) || name.equals(ThreadPool.Names.SYSTEM_WRITE) || name.equals(ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) { diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 225fc5bc2bd03..4bc5f88abd65a 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -113,6 +113,7 @@ public static class Names { public static final String GET = "get"; public static final String ANALYZE = "analyze"; public static final String WRITE = "write"; + public static final String WRITE_COORDINATION = "write_coordination"; public static final String SEARCH = "search"; public static final String SEARCH_COORDINATION = "search_coordination"; public static final String AUTO_COMPLETE = "auto_complete"; @@ -186,6 +187,7 @@ public static ThreadPoolType fromType(String type) { entry(Names.CLUSTER_COORDINATION, ThreadPoolType.FIXED), entry(Names.GET, ThreadPoolType.FIXED), entry(Names.ANALYZE, ThreadPoolType.FIXED), + entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED), entry(Names.WRITE, ThreadPoolType.FIXED), entry(Names.SEARCH, ThreadPoolType.FIXED), entry(Names.SEARCH_COORDINATION, ThreadPoolType.FIXED), From 31ac05738aa61ebac8801f6d5933283bb0b15889 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sat, 14 Jun 2025 15:50:39 -0600 Subject: [PATCH 02/10] Fix test --- .../action/bulk/TransportBulkActionTests.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 4e7e80a0b2ffe..3fb8acca64a95 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -410,9 +410,9 @@ public void testOnlySystem() throws IOException { assertFalse(TransportBulkAction.isOnlySystem(buildBulkStreamRequest(mixed), indicesLookup, systemIndices)); } - private void blockWriteThreadPool(CountDownLatch blockingLatch) { + private void blockWriteCoordinationThreadPool(CountDownLatch blockingLatch) { assertThat(blockingLatch.getCount(), greaterThan(0L)); - final var executor = threadPool.executor(ThreadPool.Names.WRITE); + final var executor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION); // Add tasks repeatedly until we get an EsRejectedExecutionException which indicates that the threadpool and its queue are full. expectThrows(EsRejectedExecutionException.class, () -> { // noinspection InfiniteLoopStatement @@ -427,7 +427,7 @@ public void testRejectCoordination() { final var blockingLatch = new CountDownLatch(1); try { - blockWriteThreadPool(blockingLatch); + blockWriteCoordinationThreadPool(blockingLatch); PlainActionFuture future = new PlainActionFuture<>(); ActionTestUtils.execute(bulkAction, null, bulkRequest, future); expectThrows(EsRejectedExecutionException.class, future); @@ -442,7 +442,7 @@ public void testRejectionAfterCreateIndexIsPropagated() { bulkAction.failIndexCreationException = randomBoolean() ? new ResourceAlreadyExistsException("index already exists") : null; final var blockingLatch = new CountDownLatch(1); try { - bulkAction.beforeIndexCreation = () -> blockWriteThreadPool(blockingLatch); + bulkAction.beforeIndexCreation = () -> blockWriteCoordinationThreadPool(blockingLatch); PlainActionFuture future = new PlainActionFuture<>(); ActionTestUtils.execute(bulkAction, null, bulkRequest, future); expectThrows(EsRejectedExecutionException.class, future); From b7f5b128b75e398936536f5b3109538237cc704c Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sat, 14 Jun 2025 21:52:26 -0600 Subject: [PATCH 03/10] Fix --- .../action/bulk/TransportBulkActionIngestTests.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index f344f2e1006dc..ef24a7944d505 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -107,6 +107,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { private static final Thread DUMMY_WRITE_THREAD = new Thread(ThreadPool.Names.WRITE); private FeatureService mockFeatureService; + private static final ExecutorService writeCoordinationExecutor = new NamedDirectExecutorService("write_coordination"); private static final ExecutorService writeExecutor = new NamedDirectExecutorService("write"); private static final ExecutorService systemWriteExecutor = new NamedDirectExecutorService("system_write"); @@ -293,6 +294,7 @@ public T invokeAny(Collection> tasks, long timeout, Ti public void setupAction() { // initialize captors, which must be members to use @Capture because of generics threadPool = mock(ThreadPool.class); + when(threadPool.executor(eq(ThreadPool.Names.WRITE_COORDINATION))).thenReturn(writeCoordinationExecutor); when(threadPool.executor(eq(ThreadPool.Names.WRITE))).thenReturn(writeExecutor); when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE))).thenReturn(systemWriteExecutor); MockitoAnnotations.openMocks(this); From ad5d0eda7a5afe0656c4a60dfcc5ea610a8b76fc Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sat, 14 Jun 2025 22:57:49 -0600 Subject: [PATCH 04/10] Fix --- .../org/elasticsearch/action/bulk/IncrementalBulkIT.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java index 0d39d8808ce2c..884a4802a6341 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java @@ -532,7 +532,7 @@ public void testShortCircuitShardLevelFailureWithIngestNodeHop() throws Exceptio } private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) { - final var threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax(); + final var threadCount = threadPool.info(ThreadPool.Names.WRITE_COORDINATION).getMax(); final var startBarrier = new CyclicBarrier(threadCount + 1); final var blockingTask = new AbstractRunnable() { @Override @@ -552,13 +552,13 @@ public boolean isForceExecution() { } }; for (int i = 0; i < threadCount; i++) { - threadPool.executor(ThreadPool.Names.WRITE).execute(blockingTask); + threadPool.executor(ThreadPool.Names.WRITE_COORDINATION).execute(blockingTask); } safeAwait(startBarrier); } private static void fillWriteQueue(ThreadPool threadPool) { - final var queueSize = Math.toIntExact(threadPool.info(ThreadPool.Names.WRITE).getQueueSize().singles()); + final var queueSize = Math.toIntExact(threadPool.info(ThreadPool.Names.WRITE_COORDINATION).getQueueSize().singles()); final var queueFilled = new AtomicBoolean(false); final var queueFillingTask = new AbstractRunnable() { @Override @@ -577,7 +577,7 @@ public boolean isForceExecution() { } }; for (int i = 0; i < queueSize; i++) { - threadPool.executor(ThreadPool.Names.WRITE).execute(queueFillingTask); + threadPool.executor(ThreadPool.Names.WRITE_COORDINATION).execute(queueFillingTask); } queueFilled.set(true); } From 6ca7bb9df33cea951ab467e7354d8d9f67f2533d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 16 Jun 2025 10:42:07 -0600 Subject: [PATCH 05/10] Change --- .../metrics/NodeIndexingMetricsIT.java | 8 ++--- .../bulk/TransportAbstractBulkAction.java | 17 +++++++--- .../action/bulk/TransportBulkAction.java | 33 +++++++++++-------- .../action/bulk/TransportBulkActionTests.java | 17 ++++++++++ .../concurrent/EsThreadPoolExecutorTests.java | 4 ++- .../threadpool/ThreadPoolTests.java | 1 + 6 files changed, 57 insertions(+), 23 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java index 04130d176b9e5..290f299df5a4c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java @@ -840,7 +840,7 @@ public void testIncrementalBulkHighWatermarkSplitMetrics() throws Exception { add512BRequests(requestsThrottle, index); CountDownLatch finishLatch = new CountDownLatch(1); - blockWritePool(threadPool, finishLatch); + blockWriteCoordinationPool(threadPool, finishLatch); IncrementalBulkService.Handler handlerThrottled = incrementalBulkService.newBulkRequest(); refCounted.incRef(); handlerThrottled.addItems(requestsThrottle, refCounted::decRef, () -> nextPage.set(true)); @@ -919,8 +919,8 @@ private static void add512BRequests(ArrayList> requests, Stri assertThat(total, lessThan(1024L)); } - private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) { - final var threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax(); + private static void blockWriteCoordinationPool(ThreadPool threadPool, CountDownLatch finishLatch) { + final var threadCount = threadPool.info(ThreadPool.Names.WRITE_COORDINATION).getMax(); final var startBarrier = new CyclicBarrier(threadCount + 1); final var blockingTask = new AbstractRunnable() { @Override @@ -940,7 +940,7 @@ public boolean isForceExecution() { } }; for (int i = 0; i < threadCount; i++) { - threadPool.executor(ThreadPool.Names.WRITE).execute(blockingTask); + threadPool.executor(ThreadPool.Names.WRITE_COORDINATION).execute(blockingTask); } safeAwait(startBarrier); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index 60d4a1d4930de..06f374f83f2e0 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -353,11 +353,7 @@ public boolean isForceExecution() { // If a processor went async and returned a response on a different thread then // before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform // coordination steps on the write thread - if (originalThread == Thread.currentThread()) { - runnable.run(); - } else { - executor.execute(runnable); - } + runOrDispatch(executor, originalThread, runnable); } } }, @@ -366,6 +362,17 @@ public boolean isForceExecution() { ); } + /** + * If the work is still on the original thread run the ActionRunnable synchronously. Otherwise, dispatch to the provided executor. + */ + protected static void runOrDispatch(Executor executor, Thread originalThread, ActionRunnable bulkRunnable) { + if (originalThread == Thread.currentThread()) { + bulkRunnable.run(); + } else { + executor.execute(bulkRunnable); + } + } + /** * Determines if an index name is associated with either an existing data stream or a template * for one that has the failure store enabled. diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 9a79137361260..dcb25dd864db9 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -385,19 +385,26 @@ protected void createMissingIndicesAndIndexData( Map indicesExceptions = new ConcurrentHashMap<>(); Map dataStreamExceptions = new ConcurrentHashMap<>(); Map failureStoreExceptions = new ConcurrentHashMap<>(); - Runnable executeBulkRunnable = () -> executor.execute(new ActionRunnable<>(listener) { - @Override - protected void doRun() { - failRequestsWhenPrerequisiteActionFailed( - indicesExceptions, - dataStreamExceptions, - failureStoreExceptions, - bulkRequest, - responses - ); - executeBulk(task, bulkRequest, startTimeNanos, listener, executor, responses); - } - }); + + Thread originalThread = Thread.currentThread(); + Runnable executeBulkRunnable = () -> { + ActionRunnable bulkRunnable = new ActionRunnable<>(listener) { + @Override + protected void doRun() { + failRequestsWhenPrerequisiteActionFailed( + indicesExceptions, + dataStreamExceptions, + failureStoreExceptions, + bulkRequest, + responses + ); + executeBulk(task, bulkRequest, startTimeNanos, listener, executor, responses); + } + }; + // If we performed some async action and ended up on a different thread dispatch back to the coordination thread pool. + // Otherwise, just run the action since we are on the same thread. + runOrDispatch(executor, originalThread, bulkRunnable); + }; try (RefCountingRunnable refs = new RefCountingRunnable(executeBulkRunnable)) { createIndices(indicesToAutoCreate, refs, indicesExceptions); rollOverDataStreams(bulkRequest, dataStreamsToBeRolledOver, false, refs, dataStreamExceptions); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 3fb8acca64a95..4376519a7afc2 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -70,6 +70,7 @@ import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolStats; import org.elasticsearch.transport.TransportService; import org.junit.After; import org.junit.Before; @@ -422,6 +423,22 @@ private void blockWriteCoordinationThreadPool(CountDownLatch blockingLatch) { }); } + public void testDispatchesToWriteCoordinationThreadPoolOnce() { + BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap())); + PlainActionFuture future = new PlainActionFuture<>(); + ThreadPoolStats.Stats stats = threadPool.stats() + .stats() + .stream() + .filter(s -> s.name().equals(ThreadPool.Names.WRITE_COORDINATION)) + .findAny() + .get(); + assertThat(stats.completed(), equalTo(0L)); + ActionTestUtils.execute(bulkAction, null, bulkRequest, future); + future.actionGet(); + stats = threadPool.stats().stats().stream().filter(s -> s.name().equals(ThreadPool.Names.WRITE_COORDINATION)).findAny().get(); + assertThat(stats.completed(), equalTo(1L)); + } + public void testRejectCoordination() { BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap())); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java index fb71f59551d79..1a31c68377491 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java @@ -32,6 +32,8 @@ public class EsThreadPoolExecutorTests extends ESSingleNodeTestCase { protected Settings nodeSettings() { return Settings.builder() .put("node.name", "es-thread-pool-executor-tests") + .put("thread_pool.write_coordination.size", 1) + .put("thread_pool.write_coordination.queue_size", 0) .put("thread_pool.write.size", 1) .put("thread_pool.write.queue_size", 0) .put("thread_pool.search.size", 1) @@ -41,7 +43,7 @@ protected Settings nodeSettings() { public void testRejectedExecutionExceptionContainsNodeName() { // we test a fixed and an auto-queue executor but not scaling since it does not reject - runThreadPoolExecutorTest(1, ThreadPool.Names.WRITE); + runThreadPoolExecutorTest(1, randomFrom(ThreadPool.Names.WRITE_COORDINATION, ThreadPool.Names.WRITE)); runThreadPoolExecutorTest(2, ThreadPool.Names.SEARCH); } diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index 3762ea0feaee3..ad86c1159f426 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -573,6 +573,7 @@ public void testThreadCountMetrics() throws Exception { ThreadPool.Names.GENERIC, ThreadPool.Names.ANALYZE, ThreadPool.Names.WRITE, + ThreadPool.Names.WRITE_COORDINATION, ThreadPool.Names.SEARCH ); final ThreadPool.Info threadPoolInfo = threadPool.info(threadPoolName); From cb9b06f18daf445c1f3bdb45faeba081b93f2007 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 16 Jun 2025 13:27:01 -0600 Subject: [PATCH 06/10] Change --- .../bulk/TransportAbstractBulkAction.java | 17 +++------- .../action/bulk/TransportBulkAction.java | 33 ++++++++----------- .../action/bulk/TransportBulkActionTests.java | 4 ++- 3 files changed, 21 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index 06f374f83f2e0..60d4a1d4930de 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -353,7 +353,11 @@ public boolean isForceExecution() { // If a processor went async and returned a response on a different thread then // before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform // coordination steps on the write thread - runOrDispatch(executor, originalThread, runnable); + if (originalThread == Thread.currentThread()) { + runnable.run(); + } else { + executor.execute(runnable); + } } } }, @@ -362,17 +366,6 @@ public boolean isForceExecution() { ); } - /** - * If the work is still on the original thread run the ActionRunnable synchronously. Otherwise, dispatch to the provided executor. - */ - protected static void runOrDispatch(Executor executor, Thread originalThread, ActionRunnable bulkRunnable) { - if (originalThread == Thread.currentThread()) { - bulkRunnable.run(); - } else { - executor.execute(bulkRunnable); - } - } - /** * Determines if an index name is associated with either an existing data stream or a template * for one that has the failure store enabled. diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index dcb25dd864db9..9a79137361260 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -385,26 +385,19 @@ protected void createMissingIndicesAndIndexData( Map indicesExceptions = new ConcurrentHashMap<>(); Map dataStreamExceptions = new ConcurrentHashMap<>(); Map failureStoreExceptions = new ConcurrentHashMap<>(); - - Thread originalThread = Thread.currentThread(); - Runnable executeBulkRunnable = () -> { - ActionRunnable bulkRunnable = new ActionRunnable<>(listener) { - @Override - protected void doRun() { - failRequestsWhenPrerequisiteActionFailed( - indicesExceptions, - dataStreamExceptions, - failureStoreExceptions, - bulkRequest, - responses - ); - executeBulk(task, bulkRequest, startTimeNanos, listener, executor, responses); - } - }; - // If we performed some async action and ended up on a different thread dispatch back to the coordination thread pool. - // Otherwise, just run the action since we are on the same thread. - runOrDispatch(executor, originalThread, bulkRunnable); - }; + Runnable executeBulkRunnable = () -> executor.execute(new ActionRunnable<>(listener) { + @Override + protected void doRun() { + failRequestsWhenPrerequisiteActionFailed( + indicesExceptions, + dataStreamExceptions, + failureStoreExceptions, + bulkRequest, + responses + ); + executeBulk(task, bulkRequest, startTimeNanos, listener, executor, responses); + } + }); try (RefCountingRunnable refs = new RefCountingRunnable(executeBulkRunnable)) { createIndices(indicesToAutoCreate, refs, indicesExceptions); rollOverDataStreams(bulkRequest, dataStreamsToBeRolledOver, false, refs, dataStreamExceptions); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 4376519a7afc2..1cf8d0dc655b3 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -436,7 +436,9 @@ public void testDispatchesToWriteCoordinationThreadPoolOnce() { ActionTestUtils.execute(bulkAction, null, bulkRequest, future); future.actionGet(); stats = threadPool.stats().stats().stream().filter(s -> s.name().equals(ThreadPool.Names.WRITE_COORDINATION)).findAny().get(); - assertThat(stats.completed(), equalTo(1L)); + // Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the index + // is created. + assertThat(stats.completed(), equalTo(2L)); } public void testRejectCoordination() { From e7f336a6f327da514fd305b8a8b44a254eb6b1e6 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 16 Jun 2025 13:49:44 -0600 Subject: [PATCH 07/10] Docs --- .../configuration-reference/thread-pool-settings.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/reference/elasticsearch/configuration-reference/thread-pool-settings.md b/docs/reference/elasticsearch/configuration-reference/thread-pool-settings.md index 7f786e9b8e29a..8314ff41bdc59 100644 --- a/docs/reference/elasticsearch/configuration-reference/thread-pool-settings.md +++ b/docs/reference/elasticsearch/configuration-reference/thread-pool-settings.md @@ -33,7 +33,10 @@ $$$search-throttled$$$`search_throttled` : For analyze requests. Thread pool type is `fixed` with a size of `1`, queue size of `16`. `write` -: For single-document index/delete/update, ingest processors, and bulk requests. Thread pool type is `fixed` with a size of [`# of allocated processors`](#node.processors), queue_size of `10000`. The maximum size for this pool is `1 + `[`# of allocated processors`](#node.processors). +: For write operations and ingest processors. Thread pool type is `fixed` with a size of [`# of allocated processors`](#node.processors), queue_size of `10000`. The maximum size for this pool is `1 + `[`# of allocated processors`](#node.processors). + +`write_coordination` +: For bulk request coordination operations. Thread pool type is `fixed` with a size of [`# of allocated processors`](#node.processors), queue_size of `10000`. The maximum size for this pool is `1 + `[`# of allocated processors`](#node.processors). `snapshot` : For snapshot/restore operations. Thread pool type is `scaling` with a keep-alive of `5m`. On nodes with at least 750MB of heap the maximum size of this pool is `10` by default. On nodes with less than 750MB of heap the maximum size of this pool is `min(5, (`[`# of allocated processors`](#node.processors)`) / 2)` by default. From 8697eaec538257a6bd970285b173449113f1b610 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 16 Jun 2025 22:30:04 -0600 Subject: [PATCH 08/10] Fix --- .../action/bulk/TransportBulkActionTests.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 1cf8d0dc655b3..a834ecc2782dd 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -423,7 +423,7 @@ private void blockWriteCoordinationThreadPool(CountDownLatch blockingLatch) { }); } - public void testDispatchesToWriteCoordinationThreadPoolOnce() { + public void testDispatchesToWriteCoordinationThreadPoolOnce() throws Exception { BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap())); PlainActionFuture future = new PlainActionFuture<>(); ThreadPoolStats.Stats stats = threadPool.stats() @@ -435,10 +435,22 @@ public void testDispatchesToWriteCoordinationThreadPoolOnce() { assertThat(stats.completed(), equalTo(0L)); ActionTestUtils.execute(bulkAction, null, bulkRequest, future); future.actionGet(); - stats = threadPool.stats().stats().stream().filter(s -> s.name().equals(ThreadPool.Names.WRITE_COORDINATION)).findAny().get(); - // Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the index - // is created. - assertThat(stats.completed(), equalTo(2L)); + + assertBusy(() -> { + // Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the + // index + // is created. + assertThat( + threadPool.stats() + .stats() + .stream() + .filter(s -> s.name().equals(ThreadPool.Names.WRITE_COORDINATION)) + .findAny() + .get() + .completed(), + equalTo(2L) + ); + }); } public void testRejectCoordination() { From a81788cad23670b36d0d03a079d23ca7c97a18e4 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 17 Jun 2025 12:58:22 -0600 Subject: [PATCH 09/10] Names --- .../action/bulk/IncrementalBulkIT.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java index 884a4802a6341..d4d49bc4aa87a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java @@ -232,7 +232,7 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception { add512BRequests(requestsThrottle, index); CountDownLatch finishLatch = new CountDownLatch(1); - blockWritePool(threadPool, finishLatch); + blockWriteCoordinationPool(threadPool, finishLatch); IncrementalBulkService.Handler handlerThrottled = incrementalBulkService.newBulkRequest(); refCounted.incRef(); handlerThrottled.addItems(requestsThrottle, refCounted::decRef, () -> nextPage.set(true)); @@ -295,8 +295,8 @@ public void testGlobalBulkFailure() throws InterruptedException { IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, randomNodeName); ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, randomNodeName); - blockWritePool(threadPool, blockingLatch); - fillWriteQueue(threadPool); + blockWriteCoordinationPool(threadPool, blockingLatch); + fillWriteCoordinationQueue(threadPool); IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); if (randomBoolean()) { @@ -333,7 +333,7 @@ public void testBulkLevelBulkFailureAfterFirstIncrementalRequest() throws Except AtomicBoolean nextRequested = new AtomicBoolean(true); AtomicLong hits = new AtomicLong(0); try { - blockWritePool(threadPool, blockingLatch1); + blockWriteCoordinationPool(threadPool, blockingLatch1); while (nextRequested.get()) { nextRequested.set(false); refCounted.incRef(); @@ -348,8 +348,8 @@ public void testBulkLevelBulkFailureAfterFirstIncrementalRequest() throws Except CountDownLatch blockingLatch2 = new CountDownLatch(1); try { - blockWritePool(threadPool, blockingLatch2); - fillWriteQueue(threadPool); + blockWriteCoordinationPool(threadPool, blockingLatch2); + fillWriteCoordinationQueue(threadPool); handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future); } finally { @@ -531,7 +531,7 @@ public void testShortCircuitShardLevelFailureWithIngestNodeHop() throws Exceptio } } - private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) { + private static void blockWriteCoordinationPool(ThreadPool threadPool, CountDownLatch finishLatch) { final var threadCount = threadPool.info(ThreadPool.Names.WRITE_COORDINATION).getMax(); final var startBarrier = new CyclicBarrier(threadCount + 1); final var blockingTask = new AbstractRunnable() { @@ -557,7 +557,7 @@ public boolean isForceExecution() { safeAwait(startBarrier); } - private static void fillWriteQueue(ThreadPool threadPool) { + private static void fillWriteCoordinationQueue(ThreadPool threadPool) { final var queueSize = Math.toIntExact(threadPool.info(ThreadPool.Names.WRITE_COORDINATION).getQueueSize().singles()); final var queueFilled = new AtomicBoolean(false); final var queueFillingTask = new AbstractRunnable() { From f76760d1359cc9d16202892412372c3df7dc34db Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 17 Jun 2025 21:55:31 -0600 Subject: [PATCH 10/10] Change --- .../xpack/enrich/action/EnrichCoordinatorProxyAction.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java index 4fb12bb5ca3c7..26e692fbb7c7f 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java @@ -75,6 +75,7 @@ protected void doExecute(Task task, SearchRequest request, ActionListener