diff --git a/docs/changelog/94000.yaml b/docs/changelog/94000.yaml new file mode 100644 index 0000000000000..debbf2fd205c7 --- /dev/null +++ b/docs/changelog/94000.yaml @@ -0,0 +1,6 @@ +pr: 94000 +summary: Introduce redirect method on IngestDocument +area: Ingest Node +type: enhancement +issues: + - 83653 diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml index 6a6e8f071024b..d9154174379bd 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml @@ -87,11 +87,7 @@ teardown: index: foo id: "1" - match: { _source.a: true } -# The next is commented out because there's a bug where the final_pipeline is executed twice under certain circumstances -# (See issue https://github.com/elastic/elasticsearch/issues/83653). -# TODO: Uncomment after the issue is fixed, and remove the repeated value test of the current behavior -#- match: { _source.accumulator: [ "non-repeated-value" ] } -- match: { _source.accumulator: [ "non-repeated-value", "non-repeated-value" ] } +- match: { _source.accumulator: [ "non-repeated-value" ] } # only the foo index - do: @@ -150,11 +146,7 @@ teardown: index: foo id: "1" - match: { _source.a: true } -# The next is commented out because there's a bug where the final_pipeline is executed twice under certain circumstances -# (See issue https://github.com/elastic/elasticsearch/issues/83653). -# TODO: Uncomment after the issue is fixed, and remove the repeated value test of the current behavior -#- match: { _source.accumulator: [ "non-repeated-value" ] } -- match: { _source.accumulator: [ "non-repeated-value", "non-repeated-value" ] } +- match: { _source.accumulator: [ "non-repeated-value" ] } # only the foo index - do: diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java index a2afa9a6285ae..003d8bcf9fafe 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.PipelineConfiguration; import org.elasticsearch.ingest.Processor; @@ -48,6 +49,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -97,6 +99,26 @@ public void testFinalPipelineCantChangeDestination() { ); } + public void testFinalPipelineCantRerouteDestination() { + final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build(); + createIndex("index", settings); + + final BytesReference finalPipelineBody = new BytesArray(""" + {"processors": [{"reroute": {}}]}"""); + client().admin().cluster().putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON)).actionGet(); + + final IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> client().prepareIndex("index").setId("1").setSource(Map.of("field", "value")).get() + ); + assertThat( + e, + hasToString( + endsWith("final pipeline [final_pipeline] can't change the target index (from [index] to [target]) for document [1]") + ) + ); + } + public void testFinalPipelineOfOldDestinationIsNotInvoked() { Settings settings = Settings.builder() .put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline") @@ -187,6 +209,73 @@ public void testDefaultPipelineOfNewDestinationIsNotInvoked() { assertFalse(target.getHits().getAt(0).getSourceAsMap().containsKey("final")); } + public void testDefaultPipelineOfRerouteDestinationIsInvoked() { + Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); + createIndex("index", settings); + + settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build(); + createIndex("target", settings); + + BytesReference defaultPipelineBody = new BytesArray(""" + {"processors": [{"reroute": {}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)) + .actionGet(); + + BytesReference targetPipeline = new BytesArray(""" + {"processors": [{"final": {}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON)) + .actionGet(); + + IndexResponse indexResponse = client().prepareIndex("index") + .setId("1") + .setSource(Map.of("field", "value")) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + assertEquals(RestStatus.CREATED, indexResponse.status()); + SearchResponse target = client().prepareSearch("target").get(); + assertEquals(1, target.getHits().getTotalHits().value); + assertTrue(target.getHits().getAt(0).getSourceAsMap().containsKey("final")); + } + + public void testAvoidIndexingLoop() { + Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); + createIndex("index", settings); + + settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build(); + createIndex("target", settings); + + BytesReference defaultPipelineBody = new BytesArray(""" + {"processors": [{"reroute": {"dest": "target"}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)) + .actionGet(); + + BytesReference targetPipeline = new BytesArray(""" + {"processors": [{"reroute": {"dest": "index"}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON)) + .actionGet(); + + IllegalStateException exception = expectThrows( + IllegalStateException.class, + () -> client().prepareIndex("index") + .setId("1") + .setSource(Map.of("dest", "index")) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get() + ); + assertThat( + exception.getMessage(), + equalTo("index cycle detected while processing pipeline [target_default_pipeline] for document [1]: [index, target, index]") + ); + } + public void testFinalPipeline() { final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build(); createIndex("index", settings); @@ -393,6 +482,26 @@ public String getType() { return "changing_dest"; } + }, + "reroute", + (processorFactories, tag, description, config) -> { + final String dest = Objects.requireNonNullElse( + ConfigurationUtils.readOptionalStringProperty(description, tag, config, "dest"), + "target" + ); + return new AbstractProcessor(tag, description) { + @Override + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { + ingestDocument.reroute(dest); + return ingestDocument; + } + + @Override + public String getType() { + return "reroute"; + } + + }; } ); } diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 2910ab11e8c94..3aa9a68bd3d3a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -141,7 +141,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer handler) { assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) { handler.accept(ingestDocument, null); return; } @@ -150,7 +150,9 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC Processor processor; IngestMetric metric; // iteratively execute any sync processors - while (currentProcessor < processorsWithMetrics.size() && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false) { + while (currentProcessor < processorsWithMetrics.size() + && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false + && ingestDocument.isReroute() == false) { processorWithMetric = processorsWithMetrics.get(currentProcessor); processor = processorWithMetric.v1(); metric = processorWithMetric.v2(); @@ -176,7 +178,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC } assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) { handler.accept(ingestDocument, null); return; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index f471926087ae5..07f7856323fb7 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -62,8 +62,8 @@ public final class IngestDocument { // Contains all pipelines that have been executed for this document private final Set executedPipelines = new LinkedHashSet<>(); - private boolean doNoSelfReferencesCheck = false; + private boolean reroute = false; public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map source) { this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source); @@ -80,6 +80,7 @@ public IngestDocument(IngestDocument other) { new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()), deepCopyMap(other.ingestMetadata) ); + this.reroute = other.reroute; } /** @@ -903,6 +904,29 @@ public String toString() { return "IngestDocument{" + " sourceAndMetadata=" + ctxMap + ", ingestMetadata=" + ingestMetadata + '}'; } + public void reroute(String destIndex) { + getMetadata().setIndex(destIndex); + reroute = true; + } + + /** + * The document is redirected to another target. + * This implies that we'll skip the current pipeline and invoke the default pipeline of the new target + * + * @return whether the document is redirected to another target + */ + boolean isReroute() { + return reroute; + } + + /** + * Set the {@link #reroute} flag to false so that subsequent calls to {@link #isReroute()} will return false until/unless + * {@link #reroute(String)} is called. + */ + void resetReroute() { + reroute = false; + } + public enum Metadata { INDEX(IndexFieldMapper.NAME), TYPE("_type"), diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 10d276dfefee8..534cafaeaa27d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; @@ -68,6 +69,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Locale; @@ -441,6 +443,10 @@ public static boolean isNoOpPipelineUpdate(ClusterState state, PutPipelineReques * Returns the pipeline by the specified id */ public Pipeline getPipeline(String id) { + if (id == null) { + return null; + } + PipelineHolder holder = pipelines.get(id); if (holder != null) { return holder.pipeline; @@ -646,21 +652,8 @@ protected void doRun() { continue; } - final String pipelineId = indexRequest.getPipeline(); - indexRequest.setPipeline(NOOP_PIPELINE_NAME); - final String finalPipelineId = indexRequest.getFinalPipeline(); - indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); - boolean hasFinalPipeline = true; - final List pipelines; - if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false - && IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { - pipelines = List.of(pipelineId, finalPipelineId); - } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) { - pipelines = List.of(pipelineId); - hasFinalPipeline = false; - } else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { - pipelines = List.of(finalPipelineId); - } else { + PipelineIterator pipelines = getAndResetPipelines(indexRequest); + if (pipelines.hasNext() == false) { i++; continue; } @@ -695,8 +688,9 @@ public void onFailure(Exception e) { }); IngestDocument ingestDocument = newIngestDocument(indexRequest); - executePipelines(pipelines.iterator(), hasFinalPipeline, indexRequest, ingestDocument, documentListener); - + LinkedHashSet indexRecursionDetection = new LinkedHashSet<>(); + indexRecursionDetection.add(indexRequest.index()); + executePipelines(pipelines, indexRequest, ingestDocument, documentListener, indexRecursionDetection); i++; } } @@ -704,21 +698,99 @@ public void onFailure(Exception e) { }); } + /** + * Returns the pipelines of the request, and updates the request so that it no longer references + * any pipelines (both the default and final pipeline are set to the noop pipeline). + */ + private PipelineIterator getAndResetPipelines(IndexRequest indexRequest) { + final String pipelineId = indexRequest.getPipeline(); + indexRequest.setPipeline(NOOP_PIPELINE_NAME); + final String finalPipelineId = indexRequest.getFinalPipeline(); + indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); + return new PipelineIterator(pipelineId, finalPipelineId); + } + + /** + * A triple for tracking the non-null id of a pipeline, the pipeline itself, and whether the pipeline is a final pipeline. + * + * @param id the non-null id of the pipeline + * @param pipeline a possibly-null reference to the pipeline for the given pipeline id + * @param isFinal true if the pipeline is a final pipeline + */ + private record PipelineSlot(String id, @Nullable Pipeline pipeline, boolean isFinal) { + public PipelineSlot { + Objects.requireNonNull(id); + } + } + + private class PipelineIterator implements Iterator { + + private final String defaultPipeline; + private final String finalPipeline; + private final Iterator pipelineSlotIterator; + + private PipelineIterator(String defaultPipeline, String finalPipeline) { + this.defaultPipeline = NOOP_PIPELINE_NAME.equals(defaultPipeline) ? null : defaultPipeline; + this.finalPipeline = NOOP_PIPELINE_NAME.equals(finalPipeline) ? null : finalPipeline; + this.pipelineSlotIterator = iterator(); + } + + public PipelineIterator withoutDefaultPipeline() { + return new PipelineIterator(null, finalPipeline); + } + + private Iterator iterator() { + PipelineSlot defaultPipelineSlot = null, finalPipelineSlot = null; + if (defaultPipeline != null) { + defaultPipelineSlot = new PipelineSlot(defaultPipeline, getPipeline(defaultPipeline), false); + } + if (finalPipeline != null) { + finalPipelineSlot = new PipelineSlot(finalPipeline, getPipeline(finalPipeline), true); + } + + if (defaultPipeline != null && finalPipeline != null) { + return List.of(defaultPipelineSlot, finalPipelineSlot).iterator(); + } else if (finalPipeline != null) { + return List.of(finalPipelineSlot).iterator(); + } else if (defaultPipeline != null) { + return List.of(defaultPipelineSlot).iterator(); + } else { + return Collections.emptyIterator(); + } + } + + @Override + public boolean hasNext() { + return pipelineSlotIterator.hasNext(); + } + + @Override + public PipelineSlot next() { + return pipelineSlotIterator.next(); + } + } + private void executePipelines( - final Iterator pipelineIds, - final boolean hasFinalPipeline, + final PipelineIterator pipelines, final IndexRequest indexRequest, final IngestDocument ingestDocument, - final ActionListener listener + final ActionListener listener, + final Set indexRecursionDetection ) { - assert pipelineIds.hasNext(); - final String pipelineId = pipelineIds.next(); + assert pipelines.hasNext(); + PipelineSlot slot = pipelines.next(); + final String pipelineId = slot.id(); + final Pipeline pipeline = slot.pipeline(); + final boolean isFinalPipeline = slot.isFinal(); + + // reset the reroute flag, at the start of a new pipeline execution this document hasn't been rerouted yet + ingestDocument.resetReroute(); + try { - final PipelineHolder holder = pipelines.get(pipelineId); - if (holder == null) { + if (pipeline == null) { throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); } - final Pipeline pipeline = holder.pipeline; + final String originalIndex = indexRequest.indices()[0]; executePipeline(ingestDocument, pipeline, (keep, e) -> { assert keep != null; @@ -767,12 +839,12 @@ private void executePipelines( return; // document failed! } - Iterator newPipelineIds = pipelineIds; - boolean newHasFinalPipeline = hasFinalPipeline; + PipelineIterator newPipelines = pipelines; final String newIndex = indexRequest.indices()[0]; if (Objects.equals(originalIndex, newIndex) == false) { - if (hasFinalPipeline && pipelineIds.hasNext() == false) { + // final pipelines cannot change the target index (either directly or by way of a reroute) + if (isFinalPipeline) { listener.onFailure( new IllegalStateException( format( @@ -785,20 +857,40 @@ private void executePipelines( ) ); return; // document failed! - } else { - indexRequest.isPipelineResolved(false); - resolvePipelinesAndUpdateIndexRequest(null, indexRequest, state.metadata()); - if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) { - newPipelineIds = Collections.singleton(indexRequest.getFinalPipeline()).iterator(); - newHasFinalPipeline = true; - } else { - newPipelineIds = Collections.emptyIterator(); - } + } + + // check for cycles in the visited indices + if (indexRecursionDetection.add(newIndex) == false) { + List indexRoute = new ArrayList<>(indexRecursionDetection); + indexRoute.add(newIndex); + listener.onFailure( + new IllegalStateException( + format( + "index cycle detected while processing pipeline [%s] for document [%s]: %s", + pipelineId, + indexRequest.id(), + indexRoute + ) + ) + ); + return; // document failed! + } + + // clear the current pipeline, then re-resolve the pipelines for this request + indexRequest.setPipeline(null); + indexRequest.isPipelineResolved(false); + resolvePipelinesAndUpdateIndexRequest(null, indexRequest, state.metadata()); + newPipelines = getAndResetPipelines(indexRequest); + + // for backwards compatibility, when a pipeline changes the target index for a document without using the reroute + // mechanism, do not invoke the default pipeline of the new target index + if (ingestDocument.isReroute() == false) { + newPipelines = newPipelines.withoutDefaultPipeline(); } } - if (newPipelineIds.hasNext()) { - executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener); + if (newPipelines.hasNext()) { + executePipelines(newPipelines, indexRequest, ingestDocument, listener, indexRecursionDetection); } else { // update the index request's source and (potentially) cache the timestamp for TSDB updateIndexRequestSource(indexRequest, ingestDocument); diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index 4bc581594d8a4..327649a9819ba 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -532,6 +532,85 @@ public void testMultipleProcessorsDoNotIgnoreFailures() { } } + public void testSkipPipeline() { + TestProcessor processor1 = new TestProcessor(doc -> doc.reroute("foo")); + TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped")); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor1, processor2), + List.of(), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 0, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testSkipAsyncProcessor() { + TestProcessor processor1 = new TestProcessor(doc -> doc.reroute("foo")) { + @Override + public boolean isAsync() { + return true; + } + }; + TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped")); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor1, processor2), + List.of(), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 0, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testSkipProcessorIgnoreFailure() { + TestProcessor processor1 = new TestProcessor(doc -> { + doc.reroute("foo"); + throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()"); + }); + TestProcessor processor2 = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor(true, List.of(processor1, processor2), List.of(), relativeTimeProvider); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testDontSkipFailureProcessor() { + TestProcessor processor = new TestProcessor(doc -> { + doc.reroute("foo"); + throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()"); + }); + TestProcessor failureProcessor1 = new TestProcessor(doc -> {}); + TestProcessor failureProcessor2 = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor), + List.of(failureProcessor1, failureProcessor2), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); + assertThat(failureProcessor1.getInvokedCounter(), equalTo(1)); + assertThat(failureProcessor2.getInvokedCounter(), equalTo(1)); + } + private TestProcessor getTestProcessor(String tag, boolean isAsync, boolean shouldThrowException) { return new TestProcessor( tag,