Skip to content

Commit 547d4a4

Browse files
authored
Do not dispatch in IngestService (#130253)
The ingest service dispatches to the provided executor. This work became redundant several years ago when we moved the transport bulk action to be dispatched from the transport threads for the action itself. With this behavior, there is no reason to re-dispatch an ingest action.
1 parent 8f0c3eb commit 547d4a4

File tree

4 files changed

+125
-175
lines changed

4 files changed

+125
-175
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 32 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ private void processBulkIndexIngestRequest(
297297
) {
298298
final long ingestStartTimeInNanos = relativeTimeNanos();
299299
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
300+
final Thread originalThread = Thread.currentThread();
300301
getIngestService(original).executeBulkRequest(
301302
metadata.id(),
302303
original.numberOfActions(),
@@ -305,50 +306,42 @@ private void processBulkIndexIngestRequest(
305306
(indexName) -> resolveFailureStore(indexName, metadata, threadPool.absoluteTimeInMillis()),
306307
bulkRequestModifier::markItemForFailureStore,
307308
bulkRequestModifier::markItemAsFailed,
308-
(originalThread, exception) -> {
309-
if (exception != null) {
310-
logger.debug("failed to execute pipeline for a bulk request", exception);
311-
listener.onFailure(exception);
309+
listener.delegateFailureAndWrap((l, unused) -> {
310+
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeNanos() - ingestStartTimeInNanos);
311+
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
312+
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, l);
313+
if (bulkRequest.requests().isEmpty()) {
314+
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
315+
// so we stop and send an empty response back to the client.
316+
// (this will happen if pre-processing all items in the bulk failed)
317+
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
312318
} else {
313-
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeNanos() - ingestStartTimeInNanos);
314-
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
315-
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(
316-
ingestTookInMillis,
317-
listener
318-
);
319-
if (bulkRequest.requests().isEmpty()) {
320-
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
321-
// so we stop and send an empty response back to the client.
322-
// (this will happen if pre-processing all items in the bulk failed)
323-
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
324-
} else {
325-
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
326-
@Override
327-
protected void doRun() throws IOException {
328-
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
329-
}
319+
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
320+
@Override
321+
protected void doRun() throws IOException {
322+
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
323+
}
330324

331-
@Override
332-
public boolean isForceExecution() {
333-
// If we fork back to a coordination thread we **not** should fail, because tp queue is full.
334-
// (Otherwise the work done during ingest will be lost)
335-
// It is okay to force execution here. Throttling of write requests happens prior to
336-
// ingest when a node receives a bulk request.
337-
return true;
338-
}
339-
};
340-
// If a processor went async and returned a response on a different thread then
341-
// before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform
342-
// coordination steps on the write thread
343-
if (originalThread == Thread.currentThread()) {
344-
runnable.run();
345-
} else {
346-
executor.execute(runnable);
325+
@Override
326+
public boolean isForceExecution() {
327+
// If we fork back to a coordination thread we **not** should fail, because tp queue is full.
328+
// (Otherwise the work done during ingest will be lost)
329+
// It is okay to force execution here. Throttling of write requests happens prior to
330+
// ingest when a node receives a bulk request.
331+
return true;
347332
}
333+
};
334+
// If a processor went async and returned a response on a different thread then
335+
// before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform
336+
// coordination steps on the write thread
337+
if (originalThread == Thread.currentThread()) {
338+
runnable.run();
339+
} else {
340+
executor.execute(runnable);
348341
}
349342
}
350-
},
351-
executor
343+
344+
})
352345
);
353346
}
354347

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@
9696
import java.util.Set;
9797
import java.util.TreeMap;
9898
import java.util.concurrent.CopyOnWriteArrayList;
99-
import java.util.concurrent.Executor;
10099
import java.util.function.BiConsumer;
101100
import java.util.function.BiFunction;
102101
import java.util.function.Consumer;
@@ -854,10 +853,7 @@ private static IngestPipelinesExecutionResult failWithoutStoringIn(String index,
854853
* @param onFailure A callback executed when a document fails ingestion and does not need to be
855854
* persisted. Accepts the slot in the collection of requests that the document
856855
* occupies, and the exception that the document encountered.
857-
* @param onCompletion A callback executed once all documents have been processed. Accepts the thread
858-
* that ingestion completed on or an exception in the event that the entire operation
859-
* has failed.
860-
* @param executor Which executor the bulk request should be executed on.
856+
* @param listener A callback executed once all documents have been processed.
861857
*/
862858
public void executeBulkRequest(
863859
final ProjectId projectId,
@@ -867,25 +863,23 @@ public void executeBulkRequest(
867863
final Function<String, Boolean> resolveFailureStore,
868864
final TriConsumer<Integer, String, Exception> onStoreFailure,
869865
final TriConsumer<Integer, Exception, IndexDocFailureStoreStatus> onFailure,
870-
final BiConsumer<Thread, Exception> onCompletion,
871-
final Executor executor
866+
final ActionListener<Void> listener
872867
) {
873868
assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]";
874869

875870
// Adapt handler to ensure node features during ingest logic
876871
final Function<String, Boolean> adaptedResolveFailureStore = wrapResolverWithFeatureCheck(resolveFailureStore);
877872

878-
executor.execute(new AbstractRunnable() {
873+
new AbstractRunnable() {
879874

880875
@Override
881876
public void onFailure(Exception e) {
882-
onCompletion.accept(null, e);
877+
listener.onFailure(e);
883878
}
884879

885880
@Override
886881
protected void doRun() {
887-
final Thread originalThread = Thread.currentThread();
888-
try (var refs = new RefCountingRunnable(() -> onCompletion.accept(originalThread, null))) {
882+
try (var refs = new RefCountingRunnable(() -> listener.onResponse(null))) {
889883
int i = 0;
890884
for (DocWriteRequest<?> actionRequest : actionRequests) {
891885
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
@@ -964,7 +958,7 @@ public void onFailure(Exception e) {
964958
}
965959
}
966960
}
967-
});
961+
}.run();
968962
}
969963

970964
/**

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 25 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -77,15 +77,13 @@
7777
import java.util.concurrent.Future;
7878
import java.util.concurrent.TimeUnit;
7979
import java.util.concurrent.atomic.AtomicBoolean;
80-
import java.util.function.BiConsumer;
8180
import java.util.function.Function;
8281

8382
import static org.hamcrest.Matchers.containsString;
8483
import static org.hamcrest.Matchers.sameInstance;
8584
import static org.mockito.ArgumentMatchers.any;
8685
import static org.mockito.ArgumentMatchers.anyInt;
8786
import static org.mockito.ArgumentMatchers.eq;
88-
import static org.mockito.ArgumentMatchers.same;
8987
import static org.mockito.Mockito.doAnswer;
9088
import static org.mockito.Mockito.mock;
9189
import static org.mockito.Mockito.never;
@@ -127,7 +125,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
127125
@Captor
128126
ArgumentCaptor<TriConsumer<Integer, Exception, IndexDocFailureStoreStatus>> failureHandler;
129127
@Captor
130-
ArgumentCaptor<BiConsumer<Thread, Exception>> completionHandler;
128+
ArgumentCaptor<ActionListener<Void>> listener;
131129
@Captor
132130
ArgumentCaptor<TransportResponseHandler<BulkResponse>> remoteResponseHandler;
133131
@Captor
@@ -425,10 +423,9 @@ public void testIngestLocal() throws Exception {
425423
redirectPredicate.capture(),
426424
redirectHandler.capture(),
427425
failureHandler.capture(),
428-
completionHandler.capture(),
429-
same(writeCoordinationExecutor)
426+
listener.capture()
430427
);
431-
completionHandler.getValue().accept(null, exception);
428+
listener.getValue().onFailure(exception);
432429
assertTrue(failureCalled.get());
433430

434431
// now check success
@@ -441,7 +438,7 @@ public void testIngestLocal() throws Exception {
441438
assertNull(redirectPredicate.getValue().apply(WITH_DEFAULT_PIPELINE)); // no redirects for random existing indices
442439
assertNull(redirectPredicate.getValue().apply("index")); // no redirects for non-existent indices with no templates
443440
redirectHandler.getValue().apply(2, WITH_FAILURE_STORE_ENABLED + "-1", exception); // exception and redirect for request 3 (slot 2)
444-
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); // all ingestion completed
441+
listener.getValue().onResponse(null); // all ingestion completed
445442
assertTrue(action.isExecuted);
446443
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
447444
verifyNoMoreInteractions(transportService);
@@ -476,15 +473,14 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
476473
any(),
477474
any(),
478475
failureHandler.capture(),
479-
completionHandler.capture(),
480-
same(writeCoordinationExecutor)
476+
listener.capture()
481477
);
482-
completionHandler.getValue().accept(null, exception);
478+
listener.getValue().onFailure(exception);
483479
assertTrue(failureCalled.get());
484480

485481
// now check success
486482
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
487-
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
483+
listener.getValue().onResponse(null);
488484
assertTrue(action.isExecuted);
489485
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
490486
verifyNoMoreInteractions(transportService);
@@ -525,18 +521,17 @@ public void testIngestSystemLocal() throws Exception {
525521
any(),
526522
any(),
527523
failureHandler.capture(),
528-
completionHandler.capture(),
529-
same(systemWriteCoordinationExecutor)
524+
listener.capture()
530525
);
531-
completionHandler.getValue().accept(null, exception);
526+
listener.getValue().onFailure(exception);
532527
assertTrue(failureCalled.get());
533528

534529
// now check success
535530
Iterator<DocWriteRequest<?>> req = bulkDocsItr.getValue().iterator();
536531
// have an exception for our one index request
537532
failureHandler.getValue().apply(0, exception, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
538533
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
539-
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
534+
listener.getValue().onResponse(null);
540535
assertTrue(action.isExecuted);
541536
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
542537
verifyNoMoreInteractions(transportService);
@@ -558,7 +553,7 @@ public void testIngestForward() throws Exception {
558553
ActionTestUtils.execute(action, null, bulkRequest, listener);
559554

560555
// should not have executed ingest locally
561-
verify(ingestService, never()).executeBulkRequest(eq(projectId), anyInt(), any(), any(), any(), any(), any(), any(), any());
556+
verify(ingestService, never()).executeBulkRequest(eq(projectId), anyInt(), any(), any(), any(), any(), any(), any());
562557
// but instead should have sent to a remote node with the transport service
563558
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
564559
verify(transportService).sendRequest(node.capture(), eq(TransportBulkAction.NAME), any(), remoteResponseHandler.capture());
@@ -598,7 +593,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception {
598593
ActionTestUtils.execute(singleItemBulkWriteAction, null, indexRequest, listener);
599594

600595
// should not have executed ingest locally
601-
verify(ingestService, never()).executeBulkRequest(eq(projectId), anyInt(), any(), any(), any(), any(), any(), any(), any());
596+
verify(ingestService, never()).executeBulkRequest(eq(projectId), anyInt(), any(), any(), any(), any(), any(), any());
602597
// but instead should have sent to a remote node with the transport service
603598
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
604599
verify(transportService).sendRequest(node.capture(), eq(TransportBulkAction.NAME), any(), remoteResponseHandler.capture());
@@ -686,20 +681,19 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa
686681
any(),
687682
any(),
688683
failureHandler.capture(),
689-
completionHandler.capture(),
690-
same(writeCoordinationExecutor)
684+
listener.capture()
691685
);
692686
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
693687
assertEquals(indexRequest2.getPipeline(), "default_pipeline");
694688
assertEquals(indexRequest3.getPipeline(), "default_pipeline");
695-
completionHandler.getValue().accept(null, exception);
689+
listener.getValue().onFailure(exception);
696690
assertTrue(failureCalled.get());
697691

698692
// now check success of the transport bulk action
699693
indexRequest1.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
700694
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
701695
indexRequest3.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
702-
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
696+
listener.getValue().onResponse(null);
703697
assertTrue(action.isExecuted);
704698
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
705699
verifyNoMoreInteractions(transportService);
@@ -737,16 +731,15 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception {
737731
any(),
738732
any(),
739733
failureHandler.capture(),
740-
completionHandler.capture(),
741-
same(writeCoordinationExecutor)
734+
listener.capture()
742735
);
743-
completionHandler.getValue().accept(null, exception);
736+
listener.getValue().onFailure(exception);
744737
assertFalse(action.indexCreated); // still no index yet, the ingest node failed.
745738
assertTrue(failureCalled.get());
746739

747740
// now check success
748741
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
749-
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
742+
listener.getValue().onResponse(null);
750743
assertTrue(action.isExecuted);
751744
assertTrue(action.indexCreated); // now the index is created since we skipped the ingest node path.
752745
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
@@ -831,8 +824,7 @@ public void testFindDefaultPipelineFromTemplateMatch() {
831824
any(),
832825
any(),
833826
failureHandler.capture(),
834-
completionHandler.capture(),
835-
same(writeCoordinationExecutor)
827+
listener.capture()
836828
);
837829
}
838830

@@ -872,8 +864,7 @@ public void testFindDefaultPipelineFromV2TemplateMatch() {
872864
any(),
873865
any(),
874866
failureHandler.capture(),
875-
completionHandler.capture(),
876-
same(writeCoordinationExecutor)
867+
listener.capture()
877868
);
878869
}
879870

@@ -902,11 +893,10 @@ public void testIngestCallbackExceptionHandled() throws Exception {
902893
any(),
903894
any(),
904895
failureHandler.capture(),
905-
completionHandler.capture(),
906-
same(writeCoordinationExecutor)
896+
listener.capture()
907897
);
908898
indexRequest1.autoGenerateId();
909-
completionHandler.getValue().accept(Thread.currentThread(), null);
899+
listener.getValue().onResponse(null);
910900

911901
// check failure passed through to the listener
912902
assertFalse(action.isExecuted);
@@ -942,16 +932,15 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
942932
any(),
943933
any(),
944934
failureHandler.capture(),
945-
completionHandler.capture(),
946-
same(writeCoordinationExecutor)
935+
listener.capture()
947936
);
948937
assertEquals(indexRequest.getPipeline(), "default_pipeline");
949-
completionHandler.getValue().accept(null, exception);
938+
listener.getValue().onFailure(exception);
950939
assertTrue(failureCalled.get());
951940

952941
// now check success
953942
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
954-
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
943+
listener.getValue().onResponse(null);
955944
assertTrue(action.isExecuted);
956945
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
957946
verifyNoMoreInteractions(transportService);

0 commit comments

Comments
 (0)