Skip to content

Commit 746efd9

Browse files
committed
Do not dispatch in IngestService
The ingest service dispatches to the provided executor. This work became redundant several years ago when we moved the transport bulk action to be dispatched from the transport threads for the action itself. With this behavior, there is no reason to re-dispatch an ingest action.
1 parent 5dcded2 commit 746efd9

File tree

4 files changed

+41
-81
lines changed

4 files changed

+41
-81
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,8 +347,7 @@ public boolean isForceExecution() {
347347
}
348348
}
349349
}
350-
},
351-
executor
350+
}
352351
);
353352
}
354353

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@
9595
import java.util.Set;
9696
import java.util.TreeMap;
9797
import java.util.concurrent.CopyOnWriteArrayList;
98-
import java.util.concurrent.Executor;
9998
import java.util.function.BiConsumer;
10099
import java.util.function.BiFunction;
101100
import java.util.function.Consumer;
@@ -826,7 +825,6 @@ private static IngestPipelinesExecutionResult failWithoutStoringIn(String index,
826825
* @param onCompletion A callback executed once all documents have been processed. Accepts the thread
827826
* that ingestion completed on or an exception in the event that the entire operation
828827
* has failed.
829-
* @param executor Which executor the bulk request should be executed on.
830828
*/
831829
public void executeBulkRequest(
832830
final ProjectId projectId,
@@ -836,15 +834,14 @@ public void executeBulkRequest(
836834
final Function<String, Boolean> resolveFailureStore,
837835
final TriConsumer<Integer, String, Exception> onStoreFailure,
838836
final TriConsumer<Integer, Exception, IndexDocFailureStoreStatus> onFailure,
839-
final BiConsumer<Thread, Exception> onCompletion,
840-
final Executor executor
837+
final BiConsumer<Thread, Exception> onCompletion
841838
) {
842839
assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]";
843840

844841
// Adapt handler to ensure node features during ingest logic
845842
final Function<String, Boolean> adaptedResolveFailureStore = wrapResolverWithFeatureCheck(resolveFailureStore);
846843

847-
executor.execute(new AbstractRunnable() {
844+
new AbstractRunnable() {
848845

849846
@Override
850847
public void onFailure(Exception e) {
@@ -933,7 +930,7 @@ public void onFailure(Exception e) {
933930
}
934931
}
935932
}
936-
});
933+
}.run();
937934
}
938935

939936
/**

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@
8585
import static org.mockito.ArgumentMatchers.any;
8686
import static org.mockito.ArgumentMatchers.anyInt;
8787
import static org.mockito.ArgumentMatchers.eq;
88-
import static org.mockito.ArgumentMatchers.same;
8988
import static org.mockito.Mockito.doAnswer;
9089
import static org.mockito.Mockito.mock;
9190
import static org.mockito.Mockito.never;
@@ -425,8 +424,7 @@ public void testIngestLocal() throws Exception {
425424
redirectPredicate.capture(),
426425
redirectHandler.capture(),
427426
failureHandler.capture(),
428-
completionHandler.capture(),
429-
same(writeCoordinationExecutor)
427+
completionHandler.capture()
430428
);
431429
completionHandler.getValue().accept(null, exception);
432430
assertTrue(failureCalled.get());
@@ -476,8 +474,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
476474
any(),
477475
any(),
478476
failureHandler.capture(),
479-
completionHandler.capture(),
480-
same(writeCoordinationExecutor)
477+
completionHandler.capture()
481478
);
482479
completionHandler.getValue().accept(null, exception);
483480
assertTrue(failureCalled.get());
@@ -525,8 +522,7 @@ public void testIngestSystemLocal() throws Exception {
525522
any(),
526523
any(),
527524
failureHandler.capture(),
528-
completionHandler.capture(),
529-
same(systemWriteCoordinationExecutor)
525+
completionHandler.capture()
530526
);
531527
completionHandler.getValue().accept(null, exception);
532528
assertTrue(failureCalled.get());
@@ -558,7 +554,7 @@ public void testIngestForward() throws Exception {
558554
ActionTestUtils.execute(action, null, bulkRequest, listener);
559555

560556
// should not have executed ingest locally
561-
verify(ingestService, never()).executeBulkRequest(eq(projectId), anyInt(), any(), any(), any(), any(), any(), any(), any());
557+
verify(ingestService, never()).executeBulkRequest(eq(projectId), anyInt(), any(), any(), any(), any(), any(), any());
562558
// but instead should have sent to a remote node with the transport service
563559
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
564560
verify(transportService).sendRequest(node.capture(), eq(TransportBulkAction.NAME), any(), remoteResponseHandler.capture());
@@ -598,7 +594,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception {
598594
ActionTestUtils.execute(singleItemBulkWriteAction, null, indexRequest, listener);
599595

600596
// should not have executed ingest locally
601-
verify(ingestService, never()).executeBulkRequest(eq(projectId), anyInt(), any(), any(), any(), any(), any(), any(), any());
597+
verify(ingestService, never()).executeBulkRequest(eq(projectId), anyInt(), any(), any(), any(), any(), any(), any());
602598
// but instead should have sent to a remote node with the transport service
603599
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
604600
verify(transportService).sendRequest(node.capture(), eq(TransportBulkAction.NAME), any(), remoteResponseHandler.capture());
@@ -686,8 +682,7 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa
686682
any(),
687683
any(),
688684
failureHandler.capture(),
689-
completionHandler.capture(),
690-
same(writeCoordinationExecutor)
685+
completionHandler.capture()
691686
);
692687
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
693688
assertEquals(indexRequest2.getPipeline(), "default_pipeline");
@@ -737,8 +732,7 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception {
737732
any(),
738733
any(),
739734
failureHandler.capture(),
740-
completionHandler.capture(),
741-
same(writeCoordinationExecutor)
735+
completionHandler.capture()
742736
);
743737
completionHandler.getValue().accept(null, exception);
744738
assertFalse(action.indexCreated); // still no index yet, the ingest node failed.
@@ -831,8 +825,7 @@ public void testFindDefaultPipelineFromTemplateMatch() {
831825
any(),
832826
any(),
833827
failureHandler.capture(),
834-
completionHandler.capture(),
835-
same(writeCoordinationExecutor)
828+
completionHandler.capture()
836829
);
837830
}
838831

@@ -872,8 +865,7 @@ public void testFindDefaultPipelineFromV2TemplateMatch() {
872865
any(),
873866
any(),
874867
failureHandler.capture(),
875-
completionHandler.capture(),
876-
same(writeCoordinationExecutor)
868+
completionHandler.capture()
877869
);
878870
}
879871

@@ -902,8 +894,7 @@ public void testIngestCallbackExceptionHandled() throws Exception {
902894
any(),
903895
any(),
904896
failureHandler.capture(),
905-
completionHandler.capture(),
906-
same(writeCoordinationExecutor)
897+
completionHandler.capture()
907898
);
908899
indexRequest1.autoGenerateId();
909900
completionHandler.getValue().accept(Thread.currentThread(), null);
@@ -942,8 +933,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
942933
any(),
943934
any(),
944935
failureHandler.capture(),
945-
completionHandler.capture(),
946-
same(writeCoordinationExecutor)
936+
completionHandler.capture()
947937
);
948938
assertEquals(indexRequest.getPipeline(), "default_pipeline");
949939
completionHandler.getValue().accept(null, exception);

0 commit comments

Comments
 (0)