diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index c7fd31e529c19..96fbb2c5d6649 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -297,6 +297,7 @@ private void processBulkIndexIngestRequest( ) { final long ingestStartTimeInNanos = relativeTimeNanos(); final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); + final Thread originalThread = Thread.currentThread(); getIngestService(original).executeBulkRequest( metadata.id(), original.numberOfActions(), @@ -305,50 +306,42 @@ private void processBulkIndexIngestRequest( (indexName) -> resolveFailureStore(indexName, metadata, threadPool.absoluteTimeInMillis()), bulkRequestModifier::markItemForFailureStore, bulkRequestModifier::markItemAsFailed, - (originalThread, exception) -> { - if (exception != null) { - logger.debug("failed to execute pipeline for a bulk request", exception); - listener.onFailure(exception); + listener.delegateFailureAndWrap((l, unused) -> { + long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeNanos() - ingestStartTimeInNanos); + BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest(); + ActionListener actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, l); + if (bulkRequest.requests().isEmpty()) { + // at this stage, the transport bulk action can't deal with a bulk request with no requests, + // so we stop and send an empty response back to the client. + // (this will happen if pre-processing all items in the bulk failed) + actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); } else { - long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeNanos() - ingestStartTimeInNanos); - BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest(); - ActionListener actionListener = bulkRequestModifier.wrapActionListenerIfNeeded( - ingestTookInMillis, - listener - ); - if (bulkRequest.requests().isEmpty()) { - // at this stage, the transport bulk action can't deal with a bulk request with no requests, - // so we stop and send an empty response back to the client. - // (this will happen if pre-processing all items in the bulk failed) - actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); - } else { - ActionRunnable runnable = new ActionRunnable<>(actionListener) { - @Override - protected void doRun() throws IOException { - applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener); - } + ActionRunnable runnable = new ActionRunnable<>(actionListener) { + @Override + protected void doRun() throws IOException { + applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener); + } - @Override - public boolean isForceExecution() { - // If we fork back to a coordination thread we **not** should fail, because tp queue is full. - // (Otherwise the work done during ingest will be lost) - // It is okay to force execution here. Throttling of write requests happens prior to - // ingest when a node receives a bulk request. - return true; - } - }; - // If a processor went async and returned a response on a different thread then - // before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform - // coordination steps on the write thread - if (originalThread == Thread.currentThread()) { - runnable.run(); - } else { - executor.execute(runnable); + @Override + public boolean isForceExecution() { + // If we fork back to a coordination thread we **not** should fail, because tp queue is full. + // (Otherwise the work done during ingest will be lost) + // It is okay to force execution here. Throttling of write requests happens prior to + // ingest when a node receives a bulk request. + return true; } + }; + // If a processor went async and returned a response on a different thread then + // before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform + // coordination steps on the write thread + if (originalThread == Thread.currentThread()) { + runnable.run(); + } else { + executor.execute(runnable); } } - }, - executor + + }) ); } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 59cd9c11c9b64..97bd5f4adbba8 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -95,7 +95,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -823,10 +822,7 @@ private static IngestPipelinesExecutionResult failWithoutStoringIn(String index, * @param onFailure A callback executed when a document fails ingestion and does not need to be * persisted. Accepts the slot in the collection of requests that the document * occupies, and the exception that the document encountered. - * @param onCompletion A callback executed once all documents have been processed. Accepts the thread - * that ingestion completed on or an exception in the event that the entire operation - * has failed. - * @param executor Which executor the bulk request should be executed on. + * @param listener A callback executed once all documents have been processed. */ public void executeBulkRequest( final ProjectId projectId, @@ -836,25 +832,23 @@ public void executeBulkRequest( final Function resolveFailureStore, final TriConsumer onStoreFailure, final TriConsumer onFailure, - final BiConsumer onCompletion, - final Executor executor + final ActionListener listener ) { assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]"; // Adapt handler to ensure node features during ingest logic final Function adaptedResolveFailureStore = wrapResolverWithFeatureCheck(resolveFailureStore); - executor.execute(new AbstractRunnable() { + new AbstractRunnable() { @Override public void onFailure(Exception e) { - onCompletion.accept(null, e); + listener.onFailure(e); } @Override protected void doRun() { - final Thread originalThread = Thread.currentThread(); - try (var refs = new RefCountingRunnable(() -> onCompletion.accept(originalThread, null))) { + try (var refs = new RefCountingRunnable(() -> listener.onResponse(null))) { int i = 0; for (DocWriteRequest actionRequest : actionRequests) { IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest); @@ -933,7 +927,7 @@ public void onFailure(Exception e) { } } } - }); + }.run(); } /** diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index a8634d0a7dac5..a54cd08c3738a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -77,7 +77,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiConsumer; import java.util.function.Function; import static org.hamcrest.Matchers.containsString; @@ -85,7 +84,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -127,7 +125,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { @Captor ArgumentCaptor> failureHandler; @Captor - ArgumentCaptor> completionHandler; + ArgumentCaptor> listener; @Captor ArgumentCaptor> remoteResponseHandler; @Captor @@ -425,10 +423,9 @@ public void testIngestLocal() throws Exception { redirectPredicate.capture(), redirectHandler.capture(), failureHandler.capture(), - completionHandler.capture(), - same(writeCoordinationExecutor) + listener.capture() ); - completionHandler.getValue().accept(null, exception); + listener.getValue().onFailure(exception); assertTrue(failureCalled.get()); // now check success @@ -441,7 +438,7 @@ public void testIngestLocal() throws Exception { assertNull(redirectPredicate.getValue().apply(WITH_DEFAULT_PIPELINE)); // no redirects for random existing indices assertNull(redirectPredicate.getValue().apply("index")); // no redirects for non-existent indices with no templates redirectHandler.getValue().apply(2, WITH_FAILURE_STORE_ENABLED + "-1", exception); // exception and redirect for request 3 (slot 2) - completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); // all ingestion completed + listener.getValue().onResponse(null); // all ingestion completed assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyNoMoreInteractions(transportService); @@ -476,15 +473,14 @@ public void testSingleItemBulkActionIngestLocal() throws Exception { any(), any(), failureHandler.capture(), - completionHandler.capture(), - same(writeCoordinationExecutor) + listener.capture() ); - completionHandler.getValue().accept(null, exception); + listener.getValue().onFailure(exception); assertTrue(failureCalled.get()); // now check success indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); + listener.getValue().onResponse(null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyNoMoreInteractions(transportService); @@ -525,10 +521,9 @@ public void testIngestSystemLocal() throws Exception { any(), any(), failureHandler.capture(), - completionHandler.capture(), - same(systemWriteCoordinationExecutor) + listener.capture() ); - completionHandler.getValue().accept(null, exception); + listener.getValue().onFailure(exception); assertTrue(failureCalled.get()); // now check success @@ -536,7 +531,7 @@ public void testIngestSystemLocal() throws Exception { // have an exception for our one index request failureHandler.getValue().apply(0, exception, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN); indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); + listener.getValue().onResponse(null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyNoMoreInteractions(transportService); @@ -558,7 +553,7 @@ public void testIngestForward() throws Exception { ActionTestUtils.execute(action, null, bulkRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(eq(projectId), anyInt(), any(), any(), any(), any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(eq(projectId), anyInt(), any(), any(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(TransportBulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -598,7 +593,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception { ActionTestUtils.execute(singleItemBulkWriteAction, null, indexRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(eq(projectId), anyInt(), any(), any(), any(), any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(eq(projectId), anyInt(), any(), any(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(TransportBulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -686,20 +681,19 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa any(), any(), failureHandler.capture(), - completionHandler.capture(), - same(writeCoordinationExecutor) + listener.capture() ); assertEquals(indexRequest1.getPipeline(), "default_pipeline"); assertEquals(indexRequest2.getPipeline(), "default_pipeline"); assertEquals(indexRequest3.getPipeline(), "default_pipeline"); - completionHandler.getValue().accept(null, exception); + listener.getValue().onFailure(exception); assertTrue(failureCalled.get()); // now check success of the transport bulk action indexRequest1.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing indexRequest3.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); + listener.getValue().onResponse(null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyNoMoreInteractions(transportService); @@ -737,16 +731,15 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception { any(), any(), failureHandler.capture(), - completionHandler.capture(), - same(writeCoordinationExecutor) + listener.capture() ); - completionHandler.getValue().accept(null, exception); + listener.getValue().onFailure(exception); assertFalse(action.indexCreated); // still no index yet, the ingest node failed. assertTrue(failureCalled.get()); // now check success indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); + listener.getValue().onResponse(null); assertTrue(action.isExecuted); assertTrue(action.indexCreated); // now the index is created since we skipped the ingest node path. assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one @@ -831,8 +824,7 @@ public void testFindDefaultPipelineFromTemplateMatch() { any(), any(), failureHandler.capture(), - completionHandler.capture(), - same(writeCoordinationExecutor) + listener.capture() ); } @@ -872,8 +864,7 @@ public void testFindDefaultPipelineFromV2TemplateMatch() { any(), any(), failureHandler.capture(), - completionHandler.capture(), - same(writeCoordinationExecutor) + listener.capture() ); } @@ -902,11 +893,10 @@ public void testIngestCallbackExceptionHandled() throws Exception { any(), any(), failureHandler.capture(), - completionHandler.capture(), - same(writeCoordinationExecutor) + listener.capture() ); indexRequest1.autoGenerateId(); - completionHandler.getValue().accept(Thread.currentThread(), null); + listener.getValue().onResponse(null); // check failure passed through to the listener assertFalse(action.isExecuted); @@ -942,16 +932,15 @@ private void validateDefaultPipeline(IndexRequest indexRequest) { any(), any(), failureHandler.capture(), - completionHandler.capture(), - same(writeCoordinationExecutor) + listener.capture() ); assertEquals(indexRequest.getPipeline(), "default_pipeline"); - completionHandler.getValue().accept(null, exception); + listener.getValue().onFailure(exception); assertTrue(failureCalled.get()); // now check success indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); + listener.getValue().onResponse(null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyNoMoreInteractions(transportService); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 6350b882d86c6..15ff956c598dc 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -242,7 +242,7 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { }; @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); ingestService.executeBulkRequest( randomProjectIdOrDefault(), @@ -252,12 +252,11 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { (s) -> noRedirect, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); assertTrue(failure.get()); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); } public void testUpdatePipelines() { @@ -1226,7 +1225,7 @@ public String getType() { }; @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); ingestService.executeBulkRequest( projectId, @@ -1236,12 +1235,11 @@ public String getType() { (s) -> noRedirect, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); assertTrue(failure.get()); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); } public void testExecuteBulkPipelineDoesNotExist() { @@ -1272,7 +1270,7 @@ public void testExecuteBulkPipelineDoesNotExist() { @SuppressWarnings("unchecked") TriConsumer failureHandler = mock(TriConsumer.class); @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); Boolean noRedirect = randomBoolean() ? false : null; IndexDocFailureStoreStatus fsStatus = noRedirect == null @@ -1287,15 +1285,14 @@ public void testExecuteBulkPipelineDoesNotExist() { (s) -> noRedirect, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); verify(failureHandler, times(1)).apply( argThat(item -> item == 2), argThat(iae -> "pipeline with id [does_not_exist] does not exist".equals(iae.getMessage())), argThat(fsStatus::equals) ); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); } public void testExecuteSuccess() { @@ -1317,7 +1314,7 @@ public void testExecuteSuccess() { @SuppressWarnings("unchecked") final TriConsumer failureHandler = mock(TriConsumer.class); @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); ingestService.executeBulkRequest( projectId, 1, @@ -1326,11 +1323,10 @@ public void testExecuteSuccess() { (s) -> false, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); verifyNoInteractions(failureHandler); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); } public void testDynamicTemplates() throws Exception { @@ -1361,7 +1357,7 @@ public void testDynamicTemplates() throws Exception { final TriConsumer failureHandler = (v, e, s) -> { throw new AssertionError("must never fail", e); }; - final BiConsumer completionHandler = (t, e) -> latch.countDown(); + final ActionListener listener = ActionListener.running(latch::countDown); ingestService.executeBulkRequest( projectId, 1, @@ -1370,8 +1366,7 @@ public void testDynamicTemplates() throws Exception { (s) -> false, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); latch.await(); assertThat(indexRequest.getDynamicTemplates(), equalTo(Map.of("foo", "bar", "foo.bar", "baz"))); @@ -1395,7 +1390,7 @@ public void testExecuteEmptyPipeline() throws Exception { @SuppressWarnings("unchecked") final TriConsumer failureHandler = mock(TriConsumer.class); @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); ingestService.executeBulkRequest( projectId, 1, @@ -1404,11 +1399,10 @@ public void testExecuteEmptyPipeline() throws Exception { (s) -> false, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); verifyNoInteractions(failureHandler); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); } public void testExecutePropagateAllMetadataUpdates() throws Exception { @@ -1458,7 +1452,7 @@ public void testExecutePropagateAllMetadataUpdates() throws Exception { @SuppressWarnings("unchecked") final TriConsumer failureHandler = mock(TriConsumer.class); @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); ingestService.executeBulkRequest( projectId, 1, @@ -1467,12 +1461,11 @@ public void testExecutePropagateAllMetadataUpdates() throws Exception { (s) -> false, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); verify(processor).execute(any(), any()); verifyNoInteractions(failureHandler); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); assertThat(indexRequest.index(), equalTo("update_index")); assertThat(indexRequest.id(), equalTo("update_id")); assertThat(indexRequest.routing(), equalTo("update_routing")); @@ -1512,7 +1505,7 @@ public void testExecuteFailure() throws Exception { @SuppressWarnings("unchecked") final TriConsumer failureHandler = mock(TriConsumer.class); @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); Boolean noRedirect = randomBoolean() ? false : null; IndexDocFailureStoreStatus fsStatus = noRedirect == null @@ -1526,12 +1519,11 @@ public void testExecuteFailure() throws Exception { (s) -> noRedirect, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); verify(failureHandler, times(1)).apply(eq(0), any(RuntimeException.class), eq(fsStatus)); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); } public void testExecuteSuccessWithOnFailure() throws Exception { @@ -1577,7 +1569,7 @@ public void testExecuteSuccessWithOnFailure() throws Exception { @SuppressWarnings("unchecked") final TriConsumer failureHandler = mock(TriConsumer.class); @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); ingestService.executeBulkRequest( projectId, 1, @@ -1586,11 +1578,10 @@ public void testExecuteSuccessWithOnFailure() throws Exception { (s) -> false, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); verifyNoInteractions(failureHandler); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); } public void testExecuteFailureWithNestedOnFailure() throws Exception { @@ -1631,7 +1622,7 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { @SuppressWarnings("unchecked") final TriConsumer failureHandler = mock(TriConsumer.class); @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); Boolean noRedirect = randomBoolean() ? false : null; IndexDocFailureStoreStatus fsStatus = noRedirect == null @@ -1646,12 +1637,11 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { (s) -> noRedirect, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); verify(failureHandler, times(1)).apply(eq(0), any(RuntimeException.class), eq(fsStatus)); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); } public void testBulkRequestExecutionWithFailures() throws Exception { @@ -1700,7 +1690,7 @@ public void testBulkRequestExecutionWithFailures() throws Exception { @SuppressWarnings("unchecked") TriConsumer requestItemErrorHandler = mock(TriConsumer.class); @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); Boolean noRedirect = randomBoolean() ? false : null; IndexDocFailureStoreStatus fsStatus = noRedirect == null @@ -1715,12 +1705,11 @@ public void testBulkRequestExecutionWithFailures() throws Exception { (s) -> noRedirect, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), requestItemErrorHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); verify(requestItemErrorHandler, times(numIndexRequests)).apply(anyInt(), argThat(e -> e.getCause().equals(error)), eq(fsStatus)); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); } public void testExecuteFailureRedirection() throws Exception { @@ -1756,7 +1745,7 @@ public void testExecuteFailureRedirection() throws Exception { @SuppressWarnings("unchecked") final TriConsumer failureHandler = mock(TriConsumer.class); @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); ingestService.executeBulkRequest( projectId, 1, @@ -1765,13 +1754,12 @@ public void testExecuteFailureRedirection() throws Exception { redirectCheck, redirectHandler, failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); verify(redirectHandler, times(1)).apply(eq(0), eq(indexRequest.index()), any(RuntimeException.class)); verifyNoInteractions(failureHandler); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); } public void testFailureRedirectionWithoutNodeFeatureEnabled() throws Exception { @@ -1808,7 +1796,7 @@ public void testFailureRedirectionWithoutNodeFeatureEnabled() throws Exception { @SuppressWarnings("unchecked") final TriConsumer failureHandler = mock(TriConsumer.class); @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); ingestService.executeBulkRequest( projectId, 1, @@ -1817,8 +1805,7 @@ public void testFailureRedirectionWithoutNodeFeatureEnabled() throws Exception { redirectCheck, redirectHandler, failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); verifyNoInteractions(redirectHandler); @@ -1827,7 +1814,7 @@ public void testFailureRedirectionWithoutNodeFeatureEnabled() throws Exception { any(RuntimeException.class), eq(IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN) ); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); } public void testExecuteFailureStatusOnFailureWithoutRedirection() throws Exception { @@ -1860,7 +1847,7 @@ public void testExecuteFailureStatusOnFailureWithoutRedirection() throws Excepti @SuppressWarnings("unchecked") final TriConsumer failureHandler = mock(TriConsumer.class); @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); ingestService.executeBulkRequest( DEFAULT_PROJECT_ID, 1, @@ -1869,13 +1856,12 @@ public void testExecuteFailureStatusOnFailureWithoutRedirection() throws Excepti redirectCheck, redirectHandler, failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); verifyNoInteractions(redirectHandler); verify(failureHandler, times(1)).apply(eq(0), any(RuntimeException.class), eq(IndexDocFailureStoreStatus.NOT_ENABLED)); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); } public void testExecuteFailureRedirectionWithNestedOnFailure() throws Exception { @@ -1919,7 +1905,7 @@ public void testExecuteFailureRedirectionWithNestedOnFailure() throws Exception @SuppressWarnings("unchecked") final TriConsumer failureHandler = mock(TriConsumer.class); @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); ingestService.executeBulkRequest( projectId, 1, @@ -1928,13 +1914,12 @@ public void testExecuteFailureRedirectionWithNestedOnFailure() throws Exception redirectCheck, redirectHandler, failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); verify(redirectHandler, times(1)).apply(eq(0), eq(indexRequest.index()), any(RuntimeException.class)); verifyNoInteractions(failureHandler); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); } public void testBulkRequestExecutionWithRedirectedFailures() throws Exception { @@ -1985,7 +1970,7 @@ public void testBulkRequestExecutionWithRedirectedFailures() throws Exception { @SuppressWarnings("unchecked") TriConsumer requestItemErrorHandler = mock(TriConsumer.class); @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); ingestService.executeBulkRequest( projectId, numRequest, @@ -1994,13 +1979,12 @@ public void testBulkRequestExecutionWithRedirectedFailures() throws Exception { (s) -> true, requestItemRedirectHandler, requestItemErrorHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); verify(requestItemRedirectHandler, times(numIndexRequests)).apply(anyInt(), anyString(), argThat(e -> e.getCause().equals(error))); verifyNoInteractions(requestItemErrorHandler); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); } public void testBulkRequestExecution() throws Exception { @@ -2050,7 +2034,7 @@ public void testBulkRequestExecution() throws Exception { @SuppressWarnings("unchecked") TriConsumer requestItemErrorHandler = mock(TriConsumer.class); @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); ingestService.executeBulkRequest( projectId, numRequest, @@ -2059,12 +2043,11 @@ public void testBulkRequestExecution() throws Exception { (s) -> false, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), requestItemErrorHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); verifyNoInteractions(requestItemErrorHandler); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); for (int i = 0; i < bulkRequest.requests().size(); i++) { DocWriteRequest docWriteRequest = bulkRequest.requests().get(i); IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(docWriteRequest); @@ -2177,8 +2160,7 @@ public String execute() { (s) -> false, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), (integer, e, status) -> {}, - (thread, e) -> {}, - EsExecutors.DIRECT_EXECUTOR_SERVICE + ActionListener.noop() ); { @@ -2249,7 +2231,7 @@ public void testStats() throws Exception { @SuppressWarnings("unchecked") final TriConsumer failureHandler = mock(TriConsumer.class); @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1").setFinalPipeline("_none"); @@ -2263,8 +2245,7 @@ public void testStats() throws Exception { (s) -> false, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); final IngestStats afterFirstRequestStats = ingestService.stats(); var endSize1 = indexRequest.ramBytesUsed(); @@ -2292,8 +2273,7 @@ public void testStats() throws Exception { (s) -> false, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); final IngestStats afterSecondRequestStats = ingestService.stats(); var endSize2 = indexRequest.ramBytesUsed(); @@ -2322,8 +2302,7 @@ public void testStats() throws Exception { (s) -> false, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); final IngestStats afterThirdRequestStats = ingestService.stats(); endSize1 += indexRequest.ramBytesUsed(); @@ -2357,8 +2336,7 @@ public void testStats() throws Exception { (s) -> false, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); final IngestStats afterForthRequestStats = ingestService.stats(); endSize1 += indexRequest.ramBytesUsed(); @@ -2388,8 +2366,7 @@ public void testStats() throws Exception { (s) -> false, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); final IngestStats afterFifthRequestStats = ingestService.stats(); assertThat(afterFifthRequestStats.pipelineStats().size(), equalTo(3)); @@ -2475,7 +2452,7 @@ public String getDescription() { @SuppressWarnings("unchecked") final TriConsumer failureHandler = mock(TriConsumer.class); @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final ActionListener listener = mock(ActionListener.class); @SuppressWarnings("unchecked") final IntConsumer dropHandler = mock(IntConsumer.class); ingestService.executeBulkRequest( @@ -2486,11 +2463,10 @@ public String getDescription() { (s) -> false, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE + listener ); verifyNoInteractions(failureHandler); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(listener, times(1)).onResponse(null); verify(dropHandler, times(1)).accept(1); } @@ -2582,8 +2558,7 @@ public void testCBORParsing() throws Exception { (s) -> false, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), (integer, e, status) -> {}, - (thread, e) -> {}, - EsExecutors.DIRECT_EXECUTOR_SERVICE + ActionListener.noop() ); } @@ -2658,8 +2633,7 @@ public void testSetsRawTimestamp() { (s) -> false, (slot, targetIndex, e) -> fail("Should not be redirecting failures"), (integer, e, status) -> {}, - (thread, e) -> {}, - EsExecutors.DIRECT_EXECUTOR_SERVICE + ActionListener.noop() ); assertThat(indexRequest1.getRawTimestamp(), nullValue());