Skip to content

Commit 53dae7a

Browse files
authored
Dispatch ingest work to coordination thread pool (#129820)
The vast majority of ingest pipelines are light CPU operations. We don't want these to be put behind IO work on the write executor. Instead, execute these on the coordination pool.
1 parent bae6e3c commit 53dae7a

File tree

8 files changed

+84
-50
lines changed

8 files changed

+84
-50
lines changed

modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ public void testForwardBulkWithSystemWritePoolDisabled() throws Exception {
410410

411411
private void blockSystemWriteThreadPool(CountDownLatch blockingLatch, ThreadPool threadPool) {
412412
assertThat(blockingLatch.getCount(), greaterThan(0L));
413-
final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE);
413+
final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION);
414414
// Add tasks repeatedly until we get an EsRejectedExecutionException which indicates that the threadpool and its queue are full.
415415
expectThrows(EsRejectedExecutionException.class, () -> {
416416
// noinspection InfiniteLoopStatement

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
6969
private final IngestActionForwarder ingestForwarder;
7070
protected final LongSupplier relativeTimeNanosProvider;
7171
protected final Executor coordinationExecutor;
72-
protected final Executor writeExecutor;
73-
protected final Executor systemWriteExecutor;
72+
protected final Executor systemCoordinationExecutor;
7473
private final ActionType<BulkResponse> bulkAction;
7574

7675
public TransportAbstractBulkAction(
@@ -94,8 +93,7 @@ public TransportAbstractBulkAction(
9493
this.systemIndices = systemIndices;
9594
this.projectResolver = projectResolver;
9695
this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION);
97-
this.writeExecutor = threadPool.executor(ThreadPool.Names.WRITE);
98-
this.systemWriteExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE);
96+
this.systemCoordinationExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION);
9997
this.ingestForwarder = new IngestActionForwarder(transportService);
10098
clusterService.addStateApplier(this.ingestForwarder);
10199
this.relativeTimeNanosProvider = relativeTimeNanosProvider;
@@ -134,14 +132,14 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
134132
}
135133
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
136134
// Use coordinationExecutor for dispatching coordination tasks
137-
ensureClusterStateThenForkAndExecute(task, bulkRequest, coordinationExecutor, isOnlySystem, releasingListener);
135+
final Executor executor = isOnlySystem ? systemCoordinationExecutor : coordinationExecutor;
136+
ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener);
138137
}
139138

140139
private void ensureClusterStateThenForkAndExecute(
141140
Task task,
142141
BulkRequest bulkRequest,
143142
Executor executor,
144-
boolean isOnlySystem,
145143
ActionListener<BulkResponse> releasingListener
146144
) {
147145
final ClusterState initialState = clusterService.state();
@@ -163,7 +161,7 @@ private void ensureClusterStateThenForkAndExecute(
163161
clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
164162
@Override
165163
public void onNewClusterState(ClusterState state) {
166-
forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
164+
forkAndExecute(task, bulkRequest, executor, releasingListener);
167165
}
168166

169167
@Override
@@ -177,32 +175,21 @@ public void onTimeout(TimeValue timeout) {
177175
}
178176
}, newState -> false == newState.blocks().hasGlobalBlockWithLevel(projectId, ClusterBlockLevel.WRITE));
179177
} else {
180-
forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
178+
forkAndExecute(task, bulkRequest, executor, releasingListener);
181179
}
182180
}
183181

184-
private void forkAndExecute(
185-
Task task,
186-
BulkRequest bulkRequest,
187-
Executor executor,
188-
boolean isOnlySystem,
189-
ActionListener<BulkResponse> releasingListener
190-
) {
182+
private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> releasingListener) {
191183
executor.execute(new ActionRunnable<>(releasingListener) {
192184
@Override
193185
protected void doRun() throws IOException {
194-
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
186+
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener);
195187
}
196188
});
197189
}
198190

199-
private boolean applyPipelines(
200-
Task task,
201-
BulkRequest bulkRequest,
202-
Executor executor,
203-
boolean isOnlySystem,
204-
ActionListener<BulkResponse> listener
205-
) throws IOException {
191+
private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener)
192+
throws IOException {
206193
boolean hasIndexRequestsWithPipelines = false;
207194
ClusterState state = clusterService.state();
208195
ProjectId projectId = projectResolver.getProjectId();
@@ -291,7 +278,7 @@ private boolean applyPipelines(
291278
assert arePipelinesResolved : bulkRequest;
292279
}
293280
if (clusterService.localNode().isIngestNode()) {
294-
processBulkIndexIngestRequest(task, bulkRequest, executor, isOnlySystem, project, l);
281+
processBulkIndexIngestRequest(task, bulkRequest, executor, project, l);
295282
} else {
296283
ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l);
297284
}
@@ -305,7 +292,6 @@ private void processBulkIndexIngestRequest(
305292
Task task,
306293
BulkRequest original,
307294
Executor executor,
308-
boolean isOnlySystem,
309295
ProjectMetadata metadata,
310296
ActionListener<BulkResponse> listener
311297
) {
@@ -339,7 +325,7 @@ private void processBulkIndexIngestRequest(
339325
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
340326
@Override
341327
protected void doRun() throws IOException {
342-
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, actionListener);
328+
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
343329
}
344330

345331
@Override
@@ -362,8 +348,7 @@ public boolean isForceExecution() {
362348
}
363349
}
364350
},
365-
// Use the appropriate write executor for actual ingest processing
366-
isOnlySystem ? systemWriteExecutor : writeExecutor
351+
executor
367352
);
368353
}
369354

@@ -419,11 +404,10 @@ private void applyPipelinesAndDoInternalExecute(
419404
Task task,
420405
BulkRequest bulkRequest,
421406
Executor executor,
422-
boolean isOnlySystem,
423407
ActionListener<BulkResponse> listener
424408
) throws IOException {
425409
final long relativeStartTimeNanos = relativeTimeNanos();
426-
if (applyPipelines(task, bulkRequest, executor, isOnlySystem, listener) == false) {
410+
if (applyPipelines(task, bulkRequest, executor, listener) == false) {
427411
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);
428412
}
429413
}

server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,17 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
198198
true
199199
)
200200
);
201+
result.put(
202+
ThreadPool.Names.SYSTEM_WRITE_COORDINATION,
203+
new FixedExecutorBuilder(
204+
settings,
205+
ThreadPool.Names.SYSTEM_WRITE_COORDINATION,
206+
halfProcMaxAt5,
207+
1000,
208+
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA),
209+
true
210+
)
211+
);
201212
result.put(
202213
ThreadPool.Names.SYSTEM_CRITICAL_READ,
203214
new FixedExecutorBuilder(

server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ protected static String settingsKey(final String prefix, final String key) {
4242
protected static int applyHardSizeLimit(final Settings settings, final String name) {
4343
if (name.equals("bulk")
4444
|| name.equals(ThreadPool.Names.WRITE_COORDINATION)
45+
|| name.equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION)
4546
|| name.equals(ThreadPool.Names.WRITE)
4647
|| name.equals(ThreadPool.Names.SYSTEM_WRITE)
4748
|| name.equals(ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) {

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ public static class Names {
142142
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
143143
public static final String SYSTEM_READ = "system_read";
144144
public static final String SYSTEM_WRITE = "system_write";
145+
public static final String SYSTEM_WRITE_COORDINATION = "system_write_coordination";
145146
public static final String SYSTEM_CRITICAL_READ = "system_critical_read";
146147
public static final String SYSTEM_CRITICAL_WRITE = "system_critical_write";
147148
}
@@ -187,8 +188,8 @@ public static ThreadPoolType fromType(String type) {
187188
entry(Names.CLUSTER_COORDINATION, ThreadPoolType.FIXED),
188189
entry(Names.GET, ThreadPoolType.FIXED),
189190
entry(Names.ANALYZE, ThreadPoolType.FIXED),
190-
entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED),
191191
entry(Names.WRITE, ThreadPoolType.FIXED),
192+
entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED),
192193
entry(Names.SEARCH, ThreadPoolType.FIXED),
193194
entry(Names.SEARCH_COORDINATION, ThreadPoolType.FIXED),
194195
entry(Names.AUTO_COMPLETE, ThreadPoolType.FIXED),
@@ -204,6 +205,7 @@ public static ThreadPoolType fromType(String type) {
204205
entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING),
205206
entry(Names.SYSTEM_READ, ThreadPoolType.FIXED),
206207
entry(Names.SYSTEM_WRITE, ThreadPoolType.FIXED),
208+
entry(Names.SYSTEM_WRITE_COORDINATION, ThreadPoolType.FIXED),
207209
entry(Names.SYSTEM_CRITICAL_READ, ThreadPoolType.FIXED),
208210
entry(Names.SYSTEM_CRITICAL_WRITE, ThreadPoolType.FIXED)
209211
);

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
108108
private FeatureService mockFeatureService;
109109

110110
private static final ExecutorService writeCoordinationExecutor = new NamedDirectExecutorService("write_coordination");
111-
private static final ExecutorService writeExecutor = new NamedDirectExecutorService("write");
112-
private static final ExecutorService systemWriteExecutor = new NamedDirectExecutorService("system_write");
111+
private static final ExecutorService systemWriteCoordinationExecutor = new NamedDirectExecutorService("system_write_coordination");
113112

114113
private final ProjectId projectId = randomProjectIdOrDefault();
115114

@@ -295,8 +294,7 @@ public void setupAction() {
295294
// initialize captors, which must be members to use @Capture because of generics
296295
threadPool = mock(ThreadPool.class);
297296
when(threadPool.executor(eq(ThreadPool.Names.WRITE_COORDINATION))).thenReturn(writeCoordinationExecutor);
298-
when(threadPool.executor(eq(ThreadPool.Names.WRITE))).thenReturn(writeExecutor);
299-
when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE))).thenReturn(systemWriteExecutor);
297+
when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))).thenReturn(systemWriteCoordinationExecutor);
300298
MockitoAnnotations.openMocks(this);
301299
// setup services that will be called by action
302300
transportService = mock(TransportService.class);
@@ -424,7 +422,7 @@ public void testIngestLocal() throws Exception {
424422
redirectHandler.capture(),
425423
failureHandler.capture(),
426424
completionHandler.capture(),
427-
same(writeExecutor)
425+
same(writeCoordinationExecutor)
428426
);
429427
completionHandler.getValue().accept(null, exception);
430428
assertTrue(failureCalled.get());
@@ -475,7 +473,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
475473
any(),
476474
failureHandler.capture(),
477475
completionHandler.capture(),
478-
same(writeExecutor)
476+
same(writeCoordinationExecutor)
479477
);
480478
completionHandler.getValue().accept(null, exception);
481479
assertTrue(failureCalled.get());
@@ -524,7 +522,7 @@ public void testIngestSystemLocal() throws Exception {
524522
any(),
525523
failureHandler.capture(),
526524
completionHandler.capture(),
527-
same(systemWriteExecutor)
525+
same(systemWriteCoordinationExecutor)
528526
);
529527
completionHandler.getValue().accept(null, exception);
530528
assertTrue(failureCalled.get());
@@ -685,7 +683,7 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa
685683
any(),
686684
failureHandler.capture(),
687685
completionHandler.capture(),
688-
same(writeExecutor)
686+
same(writeCoordinationExecutor)
689687
);
690688
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
691689
assertEquals(indexRequest2.getPipeline(), "default_pipeline");
@@ -736,7 +734,7 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception {
736734
any(),
737735
failureHandler.capture(),
738736
completionHandler.capture(),
739-
same(writeExecutor)
737+
same(writeCoordinationExecutor)
740738
);
741739
completionHandler.getValue().accept(null, exception);
742740
assertFalse(action.indexCreated); // still no index yet, the ingest node failed.
@@ -830,7 +828,7 @@ public void testFindDefaultPipelineFromTemplateMatch() {
830828
any(),
831829
failureHandler.capture(),
832830
completionHandler.capture(),
833-
same(writeExecutor)
831+
same(writeCoordinationExecutor)
834832
);
835833
}
836834

@@ -871,7 +869,7 @@ public void testFindDefaultPipelineFromV2TemplateMatch() {
871869
any(),
872870
failureHandler.capture(),
873871
completionHandler.capture(),
874-
same(writeExecutor)
872+
same(writeCoordinationExecutor)
875873
);
876874
}
877875

@@ -901,7 +899,7 @@ public void testIngestCallbackExceptionHandled() throws Exception {
901899
any(),
902900
failureHandler.capture(),
903901
completionHandler.capture(),
904-
same(writeExecutor)
902+
same(writeCoordinationExecutor)
905903
);
906904
indexRequest1.autoGenerateId();
907905
completionHandler.getValue().accept(Thread.currentThread(), null);
@@ -941,7 +939,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
941939
any(),
942940
failureHandler.capture(),
943941
completionHandler.capture(),
944-
same(writeExecutor)
942+
same(writeCoordinationExecutor)
945943
);
946944
assertEquals(indexRequest.getPipeline(), "default_pipeline");
947945
completionHandler.getValue().accept(null, exception);

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
import org.elasticsearch.index.IndexVersions;
6262
import org.elasticsearch.index.IndexingPressure;
6363
import org.elasticsearch.index.VersionType;
64-
import org.elasticsearch.indices.EmptySystemIndices;
6564
import org.elasticsearch.indices.SystemIndexDescriptorUtils;
6665
import org.elasticsearch.indices.SystemIndices;
6766
import org.elasticsearch.test.ESTestCase;
@@ -126,7 +125,15 @@ class TestTransportBulkAction extends TransportBulkAction {
126125
new ActionFilters(Collections.emptySet()),
127126
new Resolver(),
128127
new IndexingPressure(Settings.EMPTY),
129-
EmptySystemIndices.INSTANCE,
128+
new SystemIndices(
129+
List.of(
130+
new SystemIndices.Feature(
131+
"plugin",
132+
"test feature",
133+
List.of(SystemIndexDescriptorUtils.createUnmanaged(".transport_bulk_tests_system*", ""))
134+
)
135+
)
136+
),
130137
new ProjectResolver() {
131138
@Override
132139
public <E extends Exception> void executeOnProject(ProjectId projectId, CheckedRunnable<E> body) throws E {
@@ -386,7 +393,7 @@ private void blockWriteCoordinationThreadPool(CountDownLatch blockingLatch) {
386393
});
387394
}
388395

389-
public void testDispatchesToWriteCoordinationThreadPoolOnce() throws Exception {
396+
public void testDispatchesToWriteCoordinationThreadPool() throws Exception {
390397
BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap()));
391398
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
392399
ThreadPoolStats.Stats stats = threadPool.stats()
@@ -401,8 +408,7 @@ public void testDispatchesToWriteCoordinationThreadPoolOnce() throws Exception {
401408

402409
assertBusy(() -> {
403410
// Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the
404-
// index
405-
// is created.
411+
// index is created.
406412
assertThat(
407413
threadPool.stats()
408414
.stats()
@@ -416,6 +422,37 @@ public void testDispatchesToWriteCoordinationThreadPoolOnce() throws Exception {
416422
});
417423
}
418424

425+
public void testSystemWriteDispatchesToSystemWriteCoordinationThreadPool() throws Exception {
426+
BulkRequest bulkRequest = new BulkRequest().add(
427+
new IndexRequest(".transport_bulk_tests_system_1").id("id").source(Collections.emptyMap())
428+
);
429+
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
430+
ThreadPoolStats.Stats stats = threadPool.stats()
431+
.stats()
432+
.stream()
433+
.filter(s -> s.name().equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))
434+
.findAny()
435+
.get();
436+
assertThat(stats.completed(), equalTo(0L));
437+
ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
438+
future.actionGet();
439+
440+
assertBusy(() -> {
441+
// Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the
442+
// index is created.
443+
assertThat(
444+
threadPool.stats()
445+
.stats()
446+
.stream()
447+
.filter(s -> s.name().equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))
448+
.findAny()
449+
.get()
450+
.completed(),
451+
equalTo(2L)
452+
);
453+
});
454+
}
455+
419456
public void testRejectCoordination() {
420457
BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap()));
421458

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
7777
ThreadPool.Names.WRITE,
7878
ThreadPool.Names.WRITE_COORDINATION,
7979
ThreadPool.Names.SYSTEM_WRITE,
80+
ThreadPool.Names.SYSTEM_WRITE_COORDINATION,
8081
ThreadPool.Names.SEARCH,
8182
ThreadPool.Names.MANAGEMENT
8283
);

0 commit comments

Comments
 (0)