Skip to content

Commit 9ac6576

Browse files
authored
Add thread pool for write coordination (#129450)
This change adds a thread pool for write coordination to ensure that bulk coordination does not get stuck on an overloaded primary node.
1 parent f48a3c3 commit 9ac6576

File tree

12 files changed

+109
-38
lines changed

12 files changed

+109
-38
lines changed

docs/reference/elasticsearch/configuration-reference/thread-pool-settings.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ $$$search-throttled$$$`search_throttled`
3333
: For analyze requests. Thread pool type is `fixed` with a size of `1`, queue size of `16`.
3434

3535
`write`
36-
: For single-document index/delete/update, ingest processors, and bulk requests. Thread pool type is `fixed` with a size of [`# of allocated processors`](#node.processors), queue_size of `10000`. The maximum size for this pool is `1 + `[`# of allocated processors`](#node.processors).
36+
: For write operations and ingest processors. Thread pool type is `fixed` with a size of [`# of allocated processors`](#node.processors), queue_size of `10000`. The maximum size for this pool is `1 + `[`# of allocated processors`](#node.processors).
37+
38+
`write_coordination`
39+
: For bulk request coordination operations. Thread pool type is `fixed` with a size of [`# of allocated processors`](#node.processors), queue_size of `10000`. The maximum size for this pool is `1 + `[`# of allocated processors`](#node.processors).
3740

3841
`snapshot`
3942
: For snapshot/restore operations. Thread pool type is `scaling` with a keep-alive of `5m`. On nodes with at least 750MB of heap the maximum size of this pool is `10` by default. On nodes with less than 750MB of heap the maximum size of this pool is `min(5, (`[`# of allocated processors`](#node.processors)`) / 2)` by default.

server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception {
232232
add512BRequests(requestsThrottle, index);
233233

234234
CountDownLatch finishLatch = new CountDownLatch(1);
235-
blockWritePool(threadPool, finishLatch);
235+
blockWriteCoordinationPool(threadPool, finishLatch);
236236
IncrementalBulkService.Handler handlerThrottled = incrementalBulkService.newBulkRequest();
237237
refCounted.incRef();
238238
handlerThrottled.addItems(requestsThrottle, refCounted::decRef, () -> nextPage.set(true));
@@ -295,8 +295,8 @@ public void testGlobalBulkFailure() throws InterruptedException {
295295
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, randomNodeName);
296296
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, randomNodeName);
297297

298-
blockWritePool(threadPool, blockingLatch);
299-
fillWriteQueue(threadPool);
298+
blockWriteCoordinationPool(threadPool, blockingLatch);
299+
fillWriteCoordinationQueue(threadPool);
300300

301301
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
302302
if (randomBoolean()) {
@@ -333,7 +333,7 @@ public void testBulkLevelBulkFailureAfterFirstIncrementalRequest() throws Except
333333
AtomicBoolean nextRequested = new AtomicBoolean(true);
334334
AtomicLong hits = new AtomicLong(0);
335335
try {
336-
blockWritePool(threadPool, blockingLatch1);
336+
blockWriteCoordinationPool(threadPool, blockingLatch1);
337337
while (nextRequested.get()) {
338338
nextRequested.set(false);
339339
refCounted.incRef();
@@ -348,8 +348,8 @@ public void testBulkLevelBulkFailureAfterFirstIncrementalRequest() throws Except
348348
CountDownLatch blockingLatch2 = new CountDownLatch(1);
349349

350350
try {
351-
blockWritePool(threadPool, blockingLatch2);
352-
fillWriteQueue(threadPool);
351+
blockWriteCoordinationPool(threadPool, blockingLatch2);
352+
fillWriteCoordinationQueue(threadPool);
353353

354354
handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future);
355355
} finally {
@@ -531,8 +531,8 @@ public void testShortCircuitShardLevelFailureWithIngestNodeHop() throws Exceptio
531531
}
532532
}
533533

534-
private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) {
535-
final var threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax();
534+
private static void blockWriteCoordinationPool(ThreadPool threadPool, CountDownLatch finishLatch) {
535+
final var threadCount = threadPool.info(ThreadPool.Names.WRITE_COORDINATION).getMax();
536536
final var startBarrier = new CyclicBarrier(threadCount + 1);
537537
final var blockingTask = new AbstractRunnable() {
538538
@Override
@@ -552,13 +552,13 @@ public boolean isForceExecution() {
552552
}
553553
};
554554
for (int i = 0; i < threadCount; i++) {
555-
threadPool.executor(ThreadPool.Names.WRITE).execute(blockingTask);
555+
threadPool.executor(ThreadPool.Names.WRITE_COORDINATION).execute(blockingTask);
556556
}
557557
safeAwait(startBarrier);
558558
}
559559

560-
private static void fillWriteQueue(ThreadPool threadPool) {
561-
final var queueSize = Math.toIntExact(threadPool.info(ThreadPool.Names.WRITE).getQueueSize().singles());
560+
private static void fillWriteCoordinationQueue(ThreadPool threadPool) {
561+
final var queueSize = Math.toIntExact(threadPool.info(ThreadPool.Names.WRITE_COORDINATION).getQueueSize().singles());
562562
final var queueFilled = new AtomicBoolean(false);
563563
final var queueFillingTask = new AbstractRunnable() {
564564
@Override
@@ -577,7 +577,7 @@ public boolean isForceExecution() {
577577
}
578578
};
579579
for (int i = 0; i < queueSize; i++) {
580-
threadPool.executor(ThreadPool.Names.WRITE).execute(queueFillingTask);
580+
threadPool.executor(ThreadPool.Names.WRITE_COORDINATION).execute(queueFillingTask);
581581
}
582582
queueFilled.set(true);
583583
}

server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,7 @@ public void testIncrementalBulkHighWatermarkSplitMetrics() throws Exception {
840840
add512BRequests(requestsThrottle, index);
841841

842842
CountDownLatch finishLatch = new CountDownLatch(1);
843-
blockWritePool(threadPool, finishLatch);
843+
blockWriteCoordinationPool(threadPool, finishLatch);
844844
IncrementalBulkService.Handler handlerThrottled = incrementalBulkService.newBulkRequest();
845845
refCounted.incRef();
846846
handlerThrottled.addItems(requestsThrottle, refCounted::decRef, () -> nextPage.set(true));
@@ -919,8 +919,8 @@ private static void add512BRequests(ArrayList<DocWriteRequest<?>> requests, Stri
919919
assertThat(total, lessThan(1024L));
920920
}
921921

922-
private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) {
923-
final var threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax();
922+
private static void blockWriteCoordinationPool(ThreadPool threadPool, CountDownLatch finishLatch) {
923+
final var threadCount = threadPool.info(ThreadPool.Names.WRITE_COORDINATION).getMax();
924924
final var startBarrier = new CyclicBarrier(threadCount + 1);
925925
final var blockingTask = new AbstractRunnable() {
926926
@Override
@@ -940,7 +940,7 @@ public boolean isForceExecution() {
940940
}
941941
};
942942
for (int i = 0; i < threadCount; i++) {
943-
threadPool.executor(ThreadPool.Names.WRITE).execute(blockingTask);
943+
threadPool.executor(ThreadPool.Names.WRITE_COORDINATION).execute(blockingTask);
944944
}
945945
safeAwait(startBarrier);
946946
}

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
6868
private final IngestService ingestService;
6969
private final IngestActionForwarder ingestForwarder;
7070
protected final LongSupplier relativeTimeNanosProvider;
71+
protected final Executor coordinationExecutor;
7172
protected final Executor writeExecutor;
7273
protected final Executor systemWriteExecutor;
7374
private final ActionType<BulkResponse> bulkAction;
@@ -92,6 +93,7 @@ public TransportAbstractBulkAction(
9293
this.indexingPressure = indexingPressure;
9394
this.systemIndices = systemIndices;
9495
this.projectResolver = projectResolver;
96+
this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION);
9597
this.writeExecutor = threadPool.executor(ThreadPool.Names.WRITE);
9698
this.systemWriteExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE);
9799
this.ingestForwarder = new IngestActionForwarder(transportService);
@@ -106,8 +108,8 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
106108
* This is called on the Transport thread so we can check the indexing
107109
* memory pressure *quickly* but we don't want to keep the transport
108110
* thread busy. Then, as soon as we have the indexing pressure in we fork
109-
* to one of the write thread pools. We do this because juggling the
110-
* bulk request can get expensive for a few reasons:
111+
* to the coordinator thread pool for coordination tasks. We do this because
112+
* juggling the bulk request can get expensive for a few reasons:
111113
* 1. Figuring out which shard should receive a bulk request might require
112114
* parsing the _source.
113115
* 2. When dispatching the sub-requests to shards we may have to compress
@@ -131,14 +133,15 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
131133
releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
132134
}
133135
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
134-
final Executor executor = isOnlySystem ? systemWriteExecutor : writeExecutor;
135-
ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener);
136+
// Use coordinationExecutor for dispatching coordination tasks
137+
ensureClusterStateThenForkAndExecute(task, bulkRequest, coordinationExecutor, isOnlySystem, releasingListener);
136138
}
137139

138140
private void ensureClusterStateThenForkAndExecute(
139141
Task task,
140142
BulkRequest bulkRequest,
141143
Executor executor,
144+
boolean isOnlySystem,
142145
ActionListener<BulkResponse> releasingListener
143146
) {
144147
final ClusterState initialState = clusterService.state();
@@ -160,7 +163,7 @@ private void ensureClusterStateThenForkAndExecute(
160163
clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
161164
@Override
162165
public void onNewClusterState(ClusterState state) {
163-
forkAndExecute(task, bulkRequest, executor, releasingListener);
166+
forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
164167
}
165168

166169
@Override
@@ -174,21 +177,32 @@ public void onTimeout(TimeValue timeout) {
174177
}
175178
}, newState -> false == newState.blocks().hasGlobalBlockWithLevel(projectId, ClusterBlockLevel.WRITE));
176179
} else {
177-
forkAndExecute(task, bulkRequest, executor, releasingListener);
180+
forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
178181
}
179182
}
180183

181-
private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> releasingListener) {
184+
private void forkAndExecute(
185+
Task task,
186+
BulkRequest bulkRequest,
187+
Executor executor,
188+
boolean isOnlySystem,
189+
ActionListener<BulkResponse> releasingListener
190+
) {
182191
executor.execute(new ActionRunnable<>(releasingListener) {
183192
@Override
184193
protected void doRun() throws IOException {
185-
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener);
194+
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
186195
}
187196
});
188197
}
189198

190-
private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener)
191-
throws IOException {
199+
private boolean applyPipelines(
200+
Task task,
201+
BulkRequest bulkRequest,
202+
Executor executor,
203+
boolean isOnlySystem,
204+
ActionListener<BulkResponse> listener
205+
) throws IOException {
192206
boolean hasIndexRequestsWithPipelines = false;
193207
ClusterState state = clusterService.state();
194208
ProjectId projectId = projectResolver.getProjectId();
@@ -277,7 +291,7 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec
277291
assert arePipelinesResolved : bulkRequest;
278292
}
279293
if (clusterService.localNode().isIngestNode()) {
280-
processBulkIndexIngestRequest(task, bulkRequest, executor, project, l);
294+
processBulkIndexIngestRequest(task, bulkRequest, executor, isOnlySystem, project, l);
281295
} else {
282296
ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l);
283297
}
@@ -291,6 +305,7 @@ private void processBulkIndexIngestRequest(
291305
Task task,
292306
BulkRequest original,
293307
Executor executor,
308+
boolean isOnlySystem,
294309
ProjectMetadata metadata,
295310
ActionListener<BulkResponse> listener
296311
) {
@@ -324,20 +339,21 @@ private void processBulkIndexIngestRequest(
324339
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
325340
@Override
326341
protected void doRun() throws IOException {
327-
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
342+
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, actionListener);
328343
}
329344

330345
@Override
331346
public boolean isForceExecution() {
332-
// If we fork back to a write thread we **not** should fail, because tp queue is full.
347+
// If we fork back to a coordination thread we **not** should fail, because tp queue is full.
333348
// (Otherwise the work done during ingest will be lost)
334349
// It is okay to force execution here. Throttling of write requests happens prior to
335350
// ingest when a node receives a bulk request.
336351
return true;
337352
}
338353
};
339354
// If a processor went async and returned a response on a different thread then
340-
// before we continue the bulk request we should fork back on a write thread:
355+
// before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform
356+
// coordination steps on the write thread
341357
if (originalThread == Thread.currentThread()) {
342358
runnable.run();
343359
} else {
@@ -346,7 +362,8 @@ public boolean isForceExecution() {
346362
}
347363
}
348364
},
349-
executor
365+
// Use the appropriate write executor for actual ingest processing
366+
isOnlySystem ? systemWriteExecutor : writeExecutor
350367
);
351368
}
352369

@@ -402,10 +419,11 @@ private void applyPipelinesAndDoInternalExecute(
402419
Task task,
403420
BulkRequest bulkRequest,
404421
Executor executor,
422+
boolean isOnlySystem,
405423
ActionListener<BulkResponse> listener
406424
) throws IOException {
407425
final long relativeStartTimeNanos = relativeTimeNanos();
408-
if (applyPipelines(task, bulkRequest, executor, listener) == false) {
426+
if (applyPipelines(task, bulkRequest, executor, isOnlySystem, listener) == false) {
409427
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);
410428
}
411429
}

server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
3838
ThreadPool.Names.GENERIC,
3939
new ScalingExecutorBuilder(ThreadPool.Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30), false)
4040
);
41+
result.put(
42+
ThreadPool.Names.WRITE_COORDINATION,
43+
new FixedExecutorBuilder(
44+
settings,
45+
ThreadPool.Names.WRITE_COORDINATION,
46+
allocatedProcessors,
47+
10000,
48+
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
49+
)
50+
);
4151
result.put(
4252
ThreadPool.Names.WRITE,
4353
new FixedExecutorBuilder(

server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ protected static String settingsKey(final String prefix, final String key) {
4141

4242
protected static int applyHardSizeLimit(final Settings settings, final String name) {
4343
if (name.equals("bulk")
44+
|| name.equals(ThreadPool.Names.WRITE_COORDINATION)
4445
|| name.equals(ThreadPool.Names.WRITE)
4546
|| name.equals(ThreadPool.Names.SYSTEM_WRITE)
4647
|| name.equals(ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) {

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public static class Names {
113113
public static final String GET = "get";
114114
public static final String ANALYZE = "analyze";
115115
public static final String WRITE = "write";
116+
public static final String WRITE_COORDINATION = "write_coordination";
116117
public static final String SEARCH = "search";
117118
public static final String SEARCH_COORDINATION = "search_coordination";
118119
public static final String AUTO_COMPLETE = "auto_complete";
@@ -186,6 +187,7 @@ public static ThreadPoolType fromType(String type) {
186187
entry(Names.CLUSTER_COORDINATION, ThreadPoolType.FIXED),
187188
entry(Names.GET, ThreadPoolType.FIXED),
188189
entry(Names.ANALYZE, ThreadPoolType.FIXED),
190+
entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED),
189191
entry(Names.WRITE, ThreadPoolType.FIXED),
190192
entry(Names.SEARCH, ThreadPoolType.FIXED),
191193
entry(Names.SEARCH_COORDINATION, ThreadPoolType.FIXED),

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
107107
private static final Thread DUMMY_WRITE_THREAD = new Thread(ThreadPool.Names.WRITE);
108108
private FeatureService mockFeatureService;
109109

110+
private static final ExecutorService writeCoordinationExecutor = new NamedDirectExecutorService("write_coordination");
110111
private static final ExecutorService writeExecutor = new NamedDirectExecutorService("write");
111112
private static final ExecutorService systemWriteExecutor = new NamedDirectExecutorService("system_write");
112113

@@ -293,6 +294,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, Ti
293294
public void setupAction() {
294295
// initialize captors, which must be members to use @Capture because of generics
295296
threadPool = mock(ThreadPool.class);
297+
when(threadPool.executor(eq(ThreadPool.Names.WRITE_COORDINATION))).thenReturn(writeCoordinationExecutor);
296298
when(threadPool.executor(eq(ThreadPool.Names.WRITE))).thenReturn(writeExecutor);
297299
when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE))).thenReturn(systemWriteExecutor);
298300
MockitoAnnotations.openMocks(this);

0 commit comments

Comments
 (0)