Skip to content

Commit 6ca7bb9

Browse files
committed
Change
1 parent 3249513 commit 6ca7bb9

File tree

6 files changed

+57
-23
lines changed

6 files changed

+57
-23
lines changed

server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,7 @@ public void testIncrementalBulkHighWatermarkSplitMetrics() throws Exception {
840840
add512BRequests(requestsThrottle, index);
841841

842842
CountDownLatch finishLatch = new CountDownLatch(1);
843-
blockWritePool(threadPool, finishLatch);
843+
blockWriteCoordinationPool(threadPool, finishLatch);
844844
IncrementalBulkService.Handler handlerThrottled = incrementalBulkService.newBulkRequest();
845845
refCounted.incRef();
846846
handlerThrottled.addItems(requestsThrottle, refCounted::decRef, () -> nextPage.set(true));
@@ -919,8 +919,8 @@ private static void add512BRequests(ArrayList<DocWriteRequest<?>> requests, Stri
919919
assertThat(total, lessThan(1024L));
920920
}
921921

922-
private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) {
923-
final var threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax();
922+
private static void blockWriteCoordinationPool(ThreadPool threadPool, CountDownLatch finishLatch) {
923+
final var threadCount = threadPool.info(ThreadPool.Names.WRITE_COORDINATION).getMax();
924924
final var startBarrier = new CyclicBarrier(threadCount + 1);
925925
final var blockingTask = new AbstractRunnable() {
926926
@Override
@@ -940,7 +940,7 @@ public boolean isForceExecution() {
940940
}
941941
};
942942
for (int i = 0; i < threadCount; i++) {
943-
threadPool.executor(ThreadPool.Names.WRITE).execute(blockingTask);
943+
threadPool.executor(ThreadPool.Names.WRITE_COORDINATION).execute(blockingTask);
944944
}
945945
safeAwait(startBarrier);
946946
}

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -353,11 +353,7 @@ public boolean isForceExecution() {
353353
// If a processor went async and returned a response on a different thread then
354354
// before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform
355355
// coordination steps on the write thread
356-
if (originalThread == Thread.currentThread()) {
357-
runnable.run();
358-
} else {
359-
executor.execute(runnable);
360-
}
356+
runOrDispatch(executor, originalThread, runnable);
361357
}
362358
}
363359
},
@@ -366,6 +362,17 @@ public boolean isForceExecution() {
366362
);
367363
}
368364

365+
/**
366+
* If the work is still on the original thread run the ActionRunnable synchronously. Otherwise, dispatch to the provided executor.
367+
*/
368+
protected static void runOrDispatch(Executor executor, Thread originalThread, ActionRunnable<BulkResponse> bulkRunnable) {
369+
if (originalThread == Thread.currentThread()) {
370+
bulkRunnable.run();
371+
} else {
372+
executor.execute(bulkRunnable);
373+
}
374+
}
375+
369376
/**
370377
* Determines if an index name is associated with either an existing data stream or a template
371378
* for one that has the failure store enabled.

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -385,19 +385,26 @@ protected void createMissingIndicesAndIndexData(
385385
Map<String, Exception> indicesExceptions = new ConcurrentHashMap<>();
386386
Map<String, Exception> dataStreamExceptions = new ConcurrentHashMap<>();
387387
Map<String, Exception> failureStoreExceptions = new ConcurrentHashMap<>();
388-
Runnable executeBulkRunnable = () -> executor.execute(new ActionRunnable<>(listener) {
389-
@Override
390-
protected void doRun() {
391-
failRequestsWhenPrerequisiteActionFailed(
392-
indicesExceptions,
393-
dataStreamExceptions,
394-
failureStoreExceptions,
395-
bulkRequest,
396-
responses
397-
);
398-
executeBulk(task, bulkRequest, startTimeNanos, listener, executor, responses);
399-
}
400-
});
388+
389+
Thread originalThread = Thread.currentThread();
390+
Runnable executeBulkRunnable = () -> {
391+
ActionRunnable<BulkResponse> bulkRunnable = new ActionRunnable<>(listener) {
392+
@Override
393+
protected void doRun() {
394+
failRequestsWhenPrerequisiteActionFailed(
395+
indicesExceptions,
396+
dataStreamExceptions,
397+
failureStoreExceptions,
398+
bulkRequest,
399+
responses
400+
);
401+
executeBulk(task, bulkRequest, startTimeNanos, listener, executor, responses);
402+
}
403+
};
404+
// If we performed some async action and ended up on a different thread dispatch back to the coordination thread pool.
405+
// Otherwise, just run the action since we are on the same thread.
406+
runOrDispatch(executor, originalThread, bulkRunnable);
407+
};
401408
try (RefCountingRunnable refs = new RefCountingRunnable(executeBulkRunnable)) {
402409
createIndices(indicesToAutoCreate, refs, indicesExceptions);
403410
rollOverDataStreams(bulkRequest, dataStreamsToBeRolledOver, false, refs, dataStreamExceptions);

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.elasticsearch.test.transport.CapturingTransport;
7171
import org.elasticsearch.threadpool.TestThreadPool;
7272
import org.elasticsearch.threadpool.ThreadPool;
73+
import org.elasticsearch.threadpool.ThreadPoolStats;
7374
import org.elasticsearch.transport.TransportService;
7475
import org.junit.After;
7576
import org.junit.Before;
@@ -422,6 +423,22 @@ private void blockWriteCoordinationThreadPool(CountDownLatch blockingLatch) {
422423
});
423424
}
424425

426+
public void testDispatchesToWriteCoordinationThreadPoolOnce() {
427+
BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap()));
428+
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
429+
ThreadPoolStats.Stats stats = threadPool.stats()
430+
.stats()
431+
.stream()
432+
.filter(s -> s.name().equals(ThreadPool.Names.WRITE_COORDINATION))
433+
.findAny()
434+
.get();
435+
assertThat(stats.completed(), equalTo(0L));
436+
ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
437+
future.actionGet();
438+
stats = threadPool.stats().stats().stream().filter(s -> s.name().equals(ThreadPool.Names.WRITE_COORDINATION)).findAny().get();
439+
assertThat(stats.completed(), equalTo(1L));
440+
}
441+
425442
public void testRejectCoordination() {
426443
BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap()));
427444

server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public class EsThreadPoolExecutorTests extends ESSingleNodeTestCase {
3232
protected Settings nodeSettings() {
3333
return Settings.builder()
3434
.put("node.name", "es-thread-pool-executor-tests")
35+
.put("thread_pool.write_coordination.size", 1)
36+
.put("thread_pool.write_coordination.queue_size", 0)
3537
.put("thread_pool.write.size", 1)
3638
.put("thread_pool.write.queue_size", 0)
3739
.put("thread_pool.search.size", 1)
@@ -41,7 +43,7 @@ protected Settings nodeSettings() {
4143

4244
public void testRejectedExecutionExceptionContainsNodeName() {
4345
// we test a fixed and an auto-queue executor but not scaling since it does not reject
44-
runThreadPoolExecutorTest(1, ThreadPool.Names.WRITE);
46+
runThreadPoolExecutorTest(1, randomFrom(ThreadPool.Names.WRITE_COORDINATION, ThreadPool.Names.WRITE));
4547
runThreadPoolExecutorTest(2, ThreadPool.Names.SEARCH);
4648

4749
}

server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,7 @@ public void testThreadCountMetrics() throws Exception {
573573
ThreadPool.Names.GENERIC,
574574
ThreadPool.Names.ANALYZE,
575575
ThreadPool.Names.WRITE,
576+
ThreadPool.Names.WRITE_COORDINATION,
576577
ThreadPool.Names.SEARCH
577578
);
578579
final ThreadPool.Info threadPoolInfo = threadPool.info(threadPoolName);

0 commit comments

Comments
 (0)