Skip to content

Commit 189d339

Browse files
authored
Merge branch 'main' into integrate_delete_unowned_in_reshard
2 parents 426933a + ea2e7b4 commit 189d339

File tree

10 files changed

+115
-50
lines changed

10 files changed

+115
-50
lines changed

docs/reference/elasticsearch/configuration-reference/thread-pool-settings.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ $$$search-throttled$$$`search_throttled`
7171
`system_write`
7272
: For write operations on system indices. Thread pool type is `fixed` with a default maximum size of `min(5, (`[`# of allocated processors`](#node.processors)`) / 2)`.
7373

74+
`system_write_coordination`
75+
: For bulk request coordination operations on system indices. Thread pool type is `fixed` with a default maximum size of `min(5, (`[`# of allocated processors`](#node.processors)`) / 2)`.
76+
7477
`system_critical_read`
7578
: For critical read operations on system indices. Thread pool type is `fixed` with a default maximum size of `min(5, (`[`# of allocated processors`](#node.processors)`) / 2)`.
7679

modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ public void testForwardBulkWithSystemWritePoolDisabled() throws Exception {
410410

411411
private void blockSystemWriteThreadPool(CountDownLatch blockingLatch, ThreadPool threadPool) {
412412
assertThat(blockingLatch.getCount(), greaterThan(0L));
413-
final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE);
413+
final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION);
414414
// Add tasks repeatedly until we get an EsRejectedExecutionException which indicates that the threadpool and its queue are full.
415415
expectThrows(EsRejectedExecutionException.class, () -> {
416416
// noinspection InfiniteLoopStatement

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
6969
private final IngestActionForwarder ingestForwarder;
7070
protected final LongSupplier relativeTimeNanosProvider;
7171
protected final Executor coordinationExecutor;
72-
protected final Executor writeExecutor;
73-
protected final Executor systemWriteExecutor;
72+
protected final Executor systemCoordinationExecutor;
7473
private final ActionType<BulkResponse> bulkAction;
7574

7675
public TransportAbstractBulkAction(
@@ -94,8 +93,7 @@ public TransportAbstractBulkAction(
9493
this.systemIndices = systemIndices;
9594
this.projectResolver = projectResolver;
9695
this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION);
97-
this.writeExecutor = threadPool.executor(ThreadPool.Names.WRITE);
98-
this.systemWriteExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE);
96+
this.systemCoordinationExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION);
9997
this.ingestForwarder = new IngestActionForwarder(transportService);
10098
clusterService.addStateApplier(this.ingestForwarder);
10199
this.relativeTimeNanosProvider = relativeTimeNanosProvider;
@@ -134,14 +132,14 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
134132
}
135133
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
136134
// Use coordinationExecutor for dispatching coordination tasks
137-
ensureClusterStateThenForkAndExecute(task, bulkRequest, coordinationExecutor, isOnlySystem, releasingListener);
135+
final Executor executor = isOnlySystem ? systemCoordinationExecutor : coordinationExecutor;
136+
ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener);
138137
}
139138

140139
private void ensureClusterStateThenForkAndExecute(
141140
Task task,
142141
BulkRequest bulkRequest,
143142
Executor executor,
144-
boolean isOnlySystem,
145143
ActionListener<BulkResponse> releasingListener
146144
) {
147145
final ClusterState initialState = clusterService.state();
@@ -163,7 +161,7 @@ private void ensureClusterStateThenForkAndExecute(
163161
clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
164162
@Override
165163
public void onNewClusterState(ClusterState state) {
166-
forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
164+
forkAndExecute(task, bulkRequest, executor, releasingListener);
167165
}
168166

169167
@Override
@@ -177,32 +175,21 @@ public void onTimeout(TimeValue timeout) {
177175
}
178176
}, newState -> false == newState.blocks().hasGlobalBlockWithLevel(projectId, ClusterBlockLevel.WRITE));
179177
} else {
180-
forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
178+
forkAndExecute(task, bulkRequest, executor, releasingListener);
181179
}
182180
}
183181

184-
private void forkAndExecute(
185-
Task task,
186-
BulkRequest bulkRequest,
187-
Executor executor,
188-
boolean isOnlySystem,
189-
ActionListener<BulkResponse> releasingListener
190-
) {
182+
private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> releasingListener) {
191183
executor.execute(new ActionRunnable<>(releasingListener) {
192184
@Override
193185
protected void doRun() throws IOException {
194-
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
186+
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener);
195187
}
196188
});
197189
}
198190

199-
private boolean applyPipelines(
200-
Task task,
201-
BulkRequest bulkRequest,
202-
Executor executor,
203-
boolean isOnlySystem,
204-
ActionListener<BulkResponse> listener
205-
) throws IOException {
191+
private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener)
192+
throws IOException {
206193
boolean hasIndexRequestsWithPipelines = false;
207194
ClusterState state = clusterService.state();
208195
ProjectId projectId = projectResolver.getProjectId();
@@ -291,7 +278,7 @@ private boolean applyPipelines(
291278
assert arePipelinesResolved : bulkRequest;
292279
}
293280
if (clusterService.localNode().isIngestNode()) {
294-
processBulkIndexIngestRequest(task, bulkRequest, executor, isOnlySystem, project, l);
281+
processBulkIndexIngestRequest(task, bulkRequest, executor, project, l);
295282
} else {
296283
ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l);
297284
}
@@ -305,7 +292,6 @@ private void processBulkIndexIngestRequest(
305292
Task task,
306293
BulkRequest original,
307294
Executor executor,
308-
boolean isOnlySystem,
309295
ProjectMetadata metadata,
310296
ActionListener<BulkResponse> listener
311297
) {
@@ -339,7 +325,7 @@ private void processBulkIndexIngestRequest(
339325
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
340326
@Override
341327
protected void doRun() throws IOException {
342-
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, actionListener);
328+
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
343329
}
344330

345331
@Override
@@ -362,8 +348,7 @@ public boolean isForceExecution() {
362348
}
363349
}
364350
},
365-
// Use the appropriate write executor for actual ingest processing
366-
isOnlySystem ? systemWriteExecutor : writeExecutor
351+
executor
367352
);
368353
}
369354

@@ -419,11 +404,10 @@ private void applyPipelinesAndDoInternalExecute(
419404
Task task,
420405
BulkRequest bulkRequest,
421406
Executor executor,
422-
boolean isOnlySystem,
423407
ActionListener<BulkResponse> listener
424408
) throws IOException {
425409
final long relativeStartTimeNanos = relativeTimeNanos();
426-
if (applyPipelines(task, bulkRequest, executor, isOnlySystem, listener) == false) {
410+
if (applyPipelines(task, bulkRequest, executor, listener) == false) {
427411
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);
428412
}
429413
}

server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,17 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
199199
true
200200
)
201201
);
202+
result.put(
203+
ThreadPool.Names.SYSTEM_WRITE_COORDINATION,
204+
new FixedExecutorBuilder(
205+
settings,
206+
ThreadPool.Names.SYSTEM_WRITE_COORDINATION,
207+
halfProcMaxAt5,
208+
1000,
209+
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK,
210+
true
211+
)
212+
);
202213
result.put(
203214
ThreadPool.Names.SYSTEM_CRITICAL_READ,
204215
new FixedExecutorBuilder(

server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ protected static String settingsKey(final String prefix, final String key) {
4242
protected static int applyHardSizeLimit(final Settings settings, final String name) {
4343
if (name.equals("bulk")
4444
|| name.equals(ThreadPool.Names.WRITE_COORDINATION)
45+
|| name.equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION)
4546
|| name.equals(ThreadPool.Names.WRITE)
4647
|| name.equals(ThreadPool.Names.SYSTEM_WRITE)
4748
|| name.equals(ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) {

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ public static class Names {
142142
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
143143
public static final String SYSTEM_READ = "system_read";
144144
public static final String SYSTEM_WRITE = "system_write";
145+
public static final String SYSTEM_WRITE_COORDINATION = "system_write_coordination";
145146
public static final String SYSTEM_CRITICAL_READ = "system_critical_read";
146147
public static final String SYSTEM_CRITICAL_WRITE = "system_critical_write";
147148
}
@@ -187,8 +188,8 @@ public static ThreadPoolType fromType(String type) {
187188
entry(Names.CLUSTER_COORDINATION, ThreadPoolType.FIXED),
188189
entry(Names.GET, ThreadPoolType.FIXED),
189190
entry(Names.ANALYZE, ThreadPoolType.FIXED),
190-
entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED),
191191
entry(Names.WRITE, ThreadPoolType.FIXED),
192+
entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED),
192193
entry(Names.SEARCH, ThreadPoolType.FIXED),
193194
entry(Names.SEARCH_COORDINATION, ThreadPoolType.FIXED),
194195
entry(Names.AUTO_COMPLETE, ThreadPoolType.FIXED),
@@ -204,6 +205,7 @@ public static ThreadPoolType fromType(String type) {
204205
entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING),
205206
entry(Names.SYSTEM_READ, ThreadPoolType.FIXED),
206207
entry(Names.SYSTEM_WRITE, ThreadPoolType.FIXED),
208+
entry(Names.SYSTEM_WRITE_COORDINATION, ThreadPoolType.FIXED),
207209
entry(Names.SYSTEM_CRITICAL_READ, ThreadPoolType.FIXED),
208210
entry(Names.SYSTEM_CRITICAL_WRITE, ThreadPoolType.FIXED)
209211
);

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
109109
private FeatureService mockFeatureService;
110110

111111
private static final ExecutorService writeCoordinationExecutor = new NamedDirectExecutorService("write_coordination");
112-
private static final ExecutorService writeExecutor = new NamedDirectExecutorService("write");
113-
private static final ExecutorService systemWriteExecutor = new NamedDirectExecutorService("system_write");
112+
private static final ExecutorService systemWriteCoordinationExecutor = new NamedDirectExecutorService("system_write_coordination");
114113

115114
private final ProjectId projectId = randomProjectIdOrDefault();
116115

@@ -296,8 +295,7 @@ public void setupAction() {
296295
// initialize captors, which must be members to use @Capture because of generics
297296
threadPool = mock(ThreadPool.class);
298297
when(threadPool.executor(eq(ThreadPool.Names.WRITE_COORDINATION))).thenReturn(writeCoordinationExecutor);
299-
when(threadPool.executor(eq(ThreadPool.Names.WRITE))).thenReturn(writeExecutor);
300-
when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE))).thenReturn(systemWriteExecutor);
298+
when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))).thenReturn(systemWriteCoordinationExecutor);
301299
MockitoAnnotations.openMocks(this);
302300
// setup services that will be called by action
303301
transportService = mock(TransportService.class);
@@ -428,7 +426,7 @@ public void testIngestLocal() throws Exception {
428426
redirectHandler.capture(),
429427
failureHandler.capture(),
430428
completionHandler.capture(),
431-
same(writeExecutor)
429+
same(writeCoordinationExecutor)
432430
);
433431
completionHandler.getValue().accept(null, exception);
434432
assertTrue(failureCalled.get());
@@ -479,7 +477,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
479477
any(),
480478
failureHandler.capture(),
481479
completionHandler.capture(),
482-
same(writeExecutor)
480+
same(writeCoordinationExecutor)
483481
);
484482
completionHandler.getValue().accept(null, exception);
485483
assertTrue(failureCalled.get());
@@ -528,7 +526,7 @@ public void testIngestSystemLocal() throws Exception {
528526
any(),
529527
failureHandler.capture(),
530528
completionHandler.capture(),
531-
same(systemWriteExecutor)
529+
same(systemWriteCoordinationExecutor)
532530
);
533531
completionHandler.getValue().accept(null, exception);
534532
assertTrue(failureCalled.get());
@@ -689,7 +687,7 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa
689687
any(),
690688
failureHandler.capture(),
691689
completionHandler.capture(),
692-
same(writeExecutor)
690+
same(writeCoordinationExecutor)
693691
);
694692
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
695693
assertEquals(indexRequest2.getPipeline(), "default_pipeline");
@@ -740,7 +738,7 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception {
740738
any(),
741739
failureHandler.capture(),
742740
completionHandler.capture(),
743-
same(writeExecutor)
741+
same(writeCoordinationExecutor)
744742
);
745743
completionHandler.getValue().accept(null, exception);
746744
assertFalse(action.indexCreated); // still no index yet, the ingest node failed.
@@ -834,7 +832,7 @@ public void testFindDefaultPipelineFromTemplateMatch() {
834832
any(),
835833
failureHandler.capture(),
836834
completionHandler.capture(),
837-
same(writeExecutor)
835+
same(writeCoordinationExecutor)
838836
);
839837
}
840838

@@ -875,7 +873,7 @@ public void testFindDefaultPipelineFromV2TemplateMatch() {
875873
any(),
876874
failureHandler.capture(),
877875
completionHandler.capture(),
878-
same(writeExecutor)
876+
same(writeCoordinationExecutor)
879877
);
880878
}
881879

@@ -905,7 +903,7 @@ public void testIngestCallbackExceptionHandled() throws Exception {
905903
any(),
906904
failureHandler.capture(),
907905
completionHandler.capture(),
908-
same(writeExecutor)
906+
same(writeCoordinationExecutor)
909907
);
910908
indexRequest1.autoGenerateId();
911909
completionHandler.getValue().accept(Thread.currentThread(), null);
@@ -945,7 +943,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
945943
any(),
946944
failureHandler.capture(),
947945
completionHandler.capture(),
948-
same(writeExecutor)
946+
same(writeCoordinationExecutor)
949947
);
950948
assertEquals(indexRequest.getPipeline(), "default_pipeline");
951949
completionHandler.getValue().accept(null, exception);

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
import org.elasticsearch.index.IndexVersions;
6262
import org.elasticsearch.index.IndexingPressure;
6363
import org.elasticsearch.index.VersionType;
64-
import org.elasticsearch.indices.EmptySystemIndices;
6564
import org.elasticsearch.indices.SystemIndexDescriptorUtils;
6665
import org.elasticsearch.indices.SystemIndices;
6766
import org.elasticsearch.test.ESTestCase;
@@ -126,7 +125,15 @@ class TestTransportBulkAction extends TransportBulkAction {
126125
new ActionFilters(Collections.emptySet()),
127126
new Resolver(),
128127
new IndexingPressure(Settings.EMPTY),
129-
EmptySystemIndices.INSTANCE,
128+
new SystemIndices(
129+
List.of(
130+
new SystemIndices.Feature(
131+
"plugin",
132+
"test feature",
133+
List.of(SystemIndexDescriptorUtils.createUnmanaged(".transport_bulk_tests_system*", ""))
134+
)
135+
)
136+
),
130137
new ProjectResolver() {
131138
@Override
132139
public <E extends Exception> void executeOnProject(ProjectId projectId, CheckedRunnable<E> body) throws E {
@@ -386,7 +393,7 @@ private void blockWriteCoordinationThreadPool(CountDownLatch blockingLatch) {
386393
});
387394
}
388395

389-
public void testDispatchesToWriteCoordinationThreadPoolOnce() throws Exception {
396+
public void testDispatchesToWriteCoordinationThreadPool() throws Exception {
390397
BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap()));
391398
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
392399
ThreadPoolStats.Stats stats = threadPool.stats()
@@ -401,8 +408,7 @@ public void testDispatchesToWriteCoordinationThreadPoolOnce() throws Exception {
401408

402409
assertBusy(() -> {
403410
// Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the
404-
// index
405-
// is created.
411+
// index is created.
406412
assertThat(
407413
threadPool.stats()
408414
.stats()
@@ -416,6 +422,37 @@ public void testDispatchesToWriteCoordinationThreadPoolOnce() throws Exception {
416422
});
417423
}
418424

425+
public void testSystemWriteDispatchesToSystemWriteCoordinationThreadPool() throws Exception {
426+
BulkRequest bulkRequest = new BulkRequest().add(
427+
new IndexRequest(".transport_bulk_tests_system_1").id("id").source(Collections.emptyMap())
428+
);
429+
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
430+
ThreadPoolStats.Stats stats = threadPool.stats()
431+
.stats()
432+
.stream()
433+
.filter(s -> s.name().equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))
434+
.findAny()
435+
.get();
436+
assertThat(stats.completed(), equalTo(0L));
437+
ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
438+
future.actionGet();
439+
440+
assertBusy(() -> {
441+
// Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the
442+
// index is created.
443+
assertThat(
444+
threadPool.stats()
445+
.stats()
446+
.stream()
447+
.filter(s -> s.name().equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))
448+
.findAny()
449+
.get()
450+
.completed(),
451+
equalTo(2L)
452+
);
453+
});
454+
}
455+
419456
public void testRejectCoordination() {
420457
BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap()));
421458

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
7777
ThreadPool.Names.WRITE,
7878
ThreadPool.Names.WRITE_COORDINATION,
7979
ThreadPool.Names.SYSTEM_WRITE,
80+
ThreadPool.Names.SYSTEM_WRITE_COORDINATION,
8081
ThreadPool.Names.SEARCH,
8182
ThreadPool.Names.MANAGEMENT
8283
);

0 commit comments

Comments
 (0)