Skip to content

Commit 88d93d4

Browse files
committed
action listener
1 parent 746efd9 commit 88d93d4

File tree

4 files changed

+121
-130
lines changed

4 files changed

+121
-130
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 32 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ private void processBulkIndexIngestRequest(
297297
) {
298298
final long ingestStartTimeInNanos = relativeTimeNanos();
299299
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
300+
final Thread originalThread = Thread.currentThread();
300301
getIngestService(original).executeBulkRequest(
301302
metadata.id(),
302303
original.numberOfActions(),
@@ -305,49 +306,42 @@ private void processBulkIndexIngestRequest(
305306
(indexName) -> resolveFailureStore(indexName, metadata, threadPool.absoluteTimeInMillis()),
306307
bulkRequestModifier::markItemForFailureStore,
307308
bulkRequestModifier::markItemAsFailed,
308-
(originalThread, exception) -> {
309-
if (exception != null) {
310-
logger.debug("failed to execute pipeline for a bulk request", exception);
311-
listener.onFailure(exception);
309+
listener.delegateFailureAndWrap((l, unused) -> {
310+
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeNanos() - ingestStartTimeInNanos);
311+
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
312+
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, l);
313+
if (bulkRequest.requests().isEmpty()) {
314+
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
315+
// so we stop and send an empty response back to the client.
316+
// (this will happen if pre-processing all items in the bulk failed)
317+
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
312318
} else {
313-
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeNanos() - ingestStartTimeInNanos);
314-
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
315-
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(
316-
ingestTookInMillis,
317-
listener
318-
);
319-
if (bulkRequest.requests().isEmpty()) {
320-
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
321-
// so we stop and send an empty response back to the client.
322-
// (this will happen if pre-processing all items in the bulk failed)
323-
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
324-
} else {
325-
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
326-
@Override
327-
protected void doRun() throws IOException {
328-
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
329-
}
319+
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
320+
@Override
321+
protected void doRun() throws IOException {
322+
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
323+
}
330324

331-
@Override
332-
public boolean isForceExecution() {
333-
// If we fork back to a coordination thread we **not** should fail, because tp queue is full.
334-
// (Otherwise the work done during ingest will be lost)
335-
// It is okay to force execution here. Throttling of write requests happens prior to
336-
// ingest when a node receives a bulk request.
337-
return true;
338-
}
339-
};
340-
// If a processor went async and returned a response on a different thread then
341-
// before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform
342-
// coordination steps on the write thread
343-
if (originalThread == Thread.currentThread()) {
344-
runnable.run();
345-
} else {
346-
executor.execute(runnable);
325+
@Override
326+
public boolean isForceExecution() {
327+
// If we fork back to a coordination thread we **not** should fail, because tp queue is full.
328+
// (Otherwise the work done during ingest will be lost)
329+
// It is okay to force execution here. Throttling of write requests happens prior to
330+
// ingest when a node receives a bulk request.
331+
return true;
347332
}
333+
};
334+
// If a processor went async and returned a response on a different thread then
335+
// before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform
336+
// coordination steps on the write thread
337+
if (originalThread == Thread.currentThread()) {
338+
runnable.run();
339+
} else {
340+
executor.execute(runnable);
348341
}
349342
}
350-
}
343+
344+
})
351345
);
352346
}
353347

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -822,9 +822,7 @@ private static IngestPipelinesExecutionResult failWithoutStoringIn(String index,
822822
* @param onFailure A callback executed when a document fails ingestion and does not need to be
823823
* persisted. Accepts the slot in the collection of requests that the document
824824
* occupies, and the exception that the document encountered.
825-
* @param onCompletion A callback executed once all documents have been processed. Accepts the thread
826-
* that ingestion completed on or an exception in the event that the entire operation
827-
* has failed.
825+
* @param listener A callback executed once all documents have been processed.
828826
*/
829827
public void executeBulkRequest(
830828
final ProjectId projectId,
@@ -834,7 +832,7 @@ public void executeBulkRequest(
834832
final Function<String, Boolean> resolveFailureStore,
835833
final TriConsumer<Integer, String, Exception> onStoreFailure,
836834
final TriConsumer<Integer, Exception, IndexDocFailureStoreStatus> onFailure,
837-
final BiConsumer<Thread, Exception> onCompletion
835+
final ActionListener<Void> listener
838836
) {
839837
assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]";
840838

@@ -845,13 +843,13 @@ public void executeBulkRequest(
845843

846844
@Override
847845
public void onFailure(Exception e) {
848-
onCompletion.accept(null, e);
846+
listener.onFailure(e);
849847
}
850848

851849
@Override
852850
protected void doRun() {
853851
final Thread originalThread = Thread.currentThread();
854-
try (var refs = new RefCountingRunnable(() -> onCompletion.accept(originalThread, null))) {
852+
try (var refs = new RefCountingRunnable(() -> listener.onResponse(null))) {
855853
int i = 0;
856854
for (DocWriteRequest<?> actionRequest : actionRequests) {
857855
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@
7777
import java.util.concurrent.Future;
7878
import java.util.concurrent.TimeUnit;
7979
import java.util.concurrent.atomic.AtomicBoolean;
80-
import java.util.function.BiConsumer;
8180
import java.util.function.Function;
8281

8382
import static org.hamcrest.Matchers.containsString;
@@ -126,7 +125,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
126125
@Captor
127126
ArgumentCaptor<TriConsumer<Integer, Exception, IndexDocFailureStoreStatus>> failureHandler;
128127
@Captor
129-
ArgumentCaptor<BiConsumer<Thread, Exception>> completionHandler;
128+
ArgumentCaptor<ActionListener<Void>> listener;
130129
@Captor
131130
ArgumentCaptor<TransportResponseHandler<BulkResponse>> remoteResponseHandler;
132131
@Captor
@@ -424,9 +423,9 @@ public void testIngestLocal() throws Exception {
424423
redirectPredicate.capture(),
425424
redirectHandler.capture(),
426425
failureHandler.capture(),
427-
completionHandler.capture()
426+
listener.capture()
428427
);
429-
completionHandler.getValue().accept(null, exception);
428+
listener.getValue().onFailure(exception);
430429
assertTrue(failureCalled.get());
431430

432431
// now check success
@@ -439,7 +438,7 @@ public void testIngestLocal() throws Exception {
439438
assertNull(redirectPredicate.getValue().apply(WITH_DEFAULT_PIPELINE)); // no redirects for random existing indices
440439
assertNull(redirectPredicate.getValue().apply("index")); // no redirects for non-existent indices with no templates
441440
redirectHandler.getValue().apply(2, WITH_FAILURE_STORE_ENABLED + "-1", exception); // exception and redirect for request 3 (slot 2)
442-
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); // all ingestion completed
441+
listener.getValue().onResponse(null); // all ingestion completed
443442
assertTrue(action.isExecuted);
444443
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
445444
verifyNoMoreInteractions(transportService);
@@ -474,14 +473,14 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
474473
any(),
475474
any(),
476475
failureHandler.capture(),
477-
completionHandler.capture()
476+
listener.capture()
478477
);
479-
completionHandler.getValue().accept(null, exception);
478+
listener.getValue().onFailure(exception);
480479
assertTrue(failureCalled.get());
481480

482481
// now check success
483482
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
484-
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
483+
listener.getValue().onResponse(null);
485484
assertTrue(action.isExecuted);
486485
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
487486
verifyNoMoreInteractions(transportService);
@@ -522,17 +521,17 @@ public void testIngestSystemLocal() throws Exception {
522521
any(),
523522
any(),
524523
failureHandler.capture(),
525-
completionHandler.capture()
524+
listener.capture()
526525
);
527-
completionHandler.getValue().accept(null, exception);
526+
listener.getValue().onFailure(exception);
528527
assertTrue(failureCalled.get());
529528

530529
// now check success
531530
Iterator<DocWriteRequest<?>> req = bulkDocsItr.getValue().iterator();
532531
// have an exception for our one index request
533532
failureHandler.getValue().apply(0, exception, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
534533
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
535-
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
534+
listener.getValue().onResponse(null);
536535
assertTrue(action.isExecuted);
537536
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
538537
verifyNoMoreInteractions(transportService);
@@ -682,19 +681,19 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa
682681
any(),
683682
any(),
684683
failureHandler.capture(),
685-
completionHandler.capture()
684+
listener.capture()
686685
);
687686
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
688687
assertEquals(indexRequest2.getPipeline(), "default_pipeline");
689688
assertEquals(indexRequest3.getPipeline(), "default_pipeline");
690-
completionHandler.getValue().accept(null, exception);
689+
listener.getValue().onFailure(exception);
691690
assertTrue(failureCalled.get());
692691

693692
// now check success of the transport bulk action
694693
indexRequest1.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
695694
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
696695
indexRequest3.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
697-
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
696+
listener.getValue().onResponse(null);
698697
assertTrue(action.isExecuted);
699698
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
700699
verifyNoMoreInteractions(transportService);
@@ -732,15 +731,15 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception {
732731
any(),
733732
any(),
734733
failureHandler.capture(),
735-
completionHandler.capture()
734+
listener.capture()
736735
);
737-
completionHandler.getValue().accept(null, exception);
736+
listener.getValue().onFailure(exception);
738737
assertFalse(action.indexCreated); // still no index yet, the ingest node failed.
739738
assertTrue(failureCalled.get());
740739

741740
// now check success
742741
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
743-
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
742+
listener.getValue().onResponse(null);
744743
assertTrue(action.isExecuted);
745744
assertTrue(action.indexCreated); // now the index is created since we skipped the ingest node path.
746745
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
@@ -825,7 +824,7 @@ public void testFindDefaultPipelineFromTemplateMatch() {
825824
any(),
826825
any(),
827826
failureHandler.capture(),
828-
completionHandler.capture()
827+
listener.capture()
829828
);
830829
}
831830

@@ -865,7 +864,7 @@ public void testFindDefaultPipelineFromV2TemplateMatch() {
865864
any(),
866865
any(),
867866
failureHandler.capture(),
868-
completionHandler.capture()
867+
listener.capture()
869868
);
870869
}
871870

@@ -894,10 +893,10 @@ public void testIngestCallbackExceptionHandled() throws Exception {
894893
any(),
895894
any(),
896895
failureHandler.capture(),
897-
completionHandler.capture()
896+
listener.capture()
898897
);
899898
indexRequest1.autoGenerateId();
900-
completionHandler.getValue().accept(Thread.currentThread(), null);
899+
listener.getValue().onResponse(null);
901900

902901
// check failure passed through to the listener
903902
assertFalse(action.isExecuted);
@@ -933,15 +932,15 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
933932
any(),
934933
any(),
935934
failureHandler.capture(),
936-
completionHandler.capture()
935+
listener.capture()
937936
);
938937
assertEquals(indexRequest.getPipeline(), "default_pipeline");
939-
completionHandler.getValue().accept(null, exception);
938+
listener.getValue().onFailure(exception);
940939
assertTrue(failureCalled.get());
941940

942941
// now check success
943942
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
944-
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
943+
listener.getValue().onResponse(null);
945944
assertTrue(action.isExecuted);
946945
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
947946
verifyNoMoreInteractions(transportService);

0 commit comments

Comments
 (0)