diff --git a/docs/changelog/114157.yaml b/docs/changelog/114157.yaml new file mode 100644 index 0000000000000..22e0fda173e98 --- /dev/null +++ b/docs/changelog/114157.yaml @@ -0,0 +1,6 @@ +pr: 114157 +summary: Add a `terminate` ingest processor +area: Ingest Node +type: feature +issues: + - 110218 diff --git a/docs/reference/ingest/processors/terminate.asciidoc b/docs/reference/ingest/processors/terminate.asciidoc new file mode 100644 index 0000000000000..a2643fbd955f1 --- /dev/null +++ b/docs/reference/ingest/processors/terminate.asciidoc @@ -0,0 +1,30 @@ +[[terminate-processor]] +=== Terminate processor + +++++ +Terminate +++++ + +Terminates the current ingest pipeline, causing no further processors to be run. +This will normally be executed conditionally, using the `if` option. + +If this pipeline is being called from another pipeline, the calling pipeline is *not* terminated. + +[[terminate-options]] +.Terminate Options +[options="header"] +|====== +| Name | Required | Default | Description +include::common-options.asciidoc[] +|====== + +[source,js] +-------------------------------------------------- +{ + "description" : "terminates the current pipeline if the error field is present", + "terminate": { + "if": "ctx.error != null" + } +} +-------------------------------------------------- +// NOTCONSOLE diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index 6ef636847e2df..d585c6217202f 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -72,6 +72,7 @@ public Map getProcessors(Processor.Parameters paramet entry(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService)), entry(SortProcessor.TYPE, new SortProcessor.Factory()), entry(SplitProcessor.TYPE, new SplitProcessor.Factory()), + entry(TerminateProcessor.TYPE, new TerminateProcessor.Factory()), entry(TrimProcessor.TYPE, new TrimProcessor.Factory()), entry(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory()), entry(UppercaseProcessor.TYPE, new UppercaseProcessor.Factory()), diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/TerminateProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/TerminateProcessor.java new file mode 100644 index 0000000000000..5b6144ba8eab9 --- /dev/null +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/TerminateProcessor.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.ingest.common; + +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; + +import java.util.Map; + +/** + * A {@link Processor} which simply prevents subsequent processors in the pipeline from running (without failing, like {@link FailProcessor} + * does). This will normally be run conditionally, using the {@code if} option. + */ +public class TerminateProcessor extends AbstractProcessor { + + static final String TYPE = "terminate"; + + TerminateProcessor(String tag, String description) { + super(tag, description); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) { + ingestDocument.terminate(); + return ingestDocument; + } + + @Override + public String getType() { + return TYPE; + } + + public static final class Factory implements Processor.Factory { + + @Override + public Processor create( + Map processorFactories, + String tag, + String description, + Map config + ) { + return new TerminateProcessor(tag, description); + } + } +} diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/TerminateProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/TerminateProcessorTests.java new file mode 100644 index 0000000000000..1888f8366edd3 --- /dev/null +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/TerminateProcessorTests.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.ingest.common; + +import org.elasticsearch.ingest.CompoundProcessor; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.TestTemplateService; +import org.elasticsearch.ingest.ValueSource; +import org.elasticsearch.test.ESTestCase; + +import java.util.Map; + +import static org.elasticsearch.ingest.RandomDocumentPicks.randomIngestDocument; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class TerminateProcessorTests extends ESTestCase { + + public void testTerminateInPipeline() throws Exception { + Pipeline pipeline = new Pipeline( + "my-pipeline", + null, + null, + null, + new CompoundProcessor( + new SetProcessor( + "before-set", + "Sets before field to true", + new TestTemplateService.MockTemplateScript.Factory("before"), + ValueSource.wrap(true, TestTemplateService.instance()), + null + ), + new TerminateProcessor("terminate", "terminates the pipeline"), + new SetProcessor( + "after-set", + "Sets after field to true", + new TestTemplateService.MockTemplateScript.Factory("after"), + ValueSource.wrap(true, TestTemplateService.instance()), + null + ) + ) + ); + IngestDocument input = randomIngestDocument(random(), Map.of("foo", "bar")); + PipelineOutput output = new PipelineOutput(); + + pipeline.execute(input, output::set); + + assertThat(output.exception, nullValue()); + // We expect the before-set processor to have run, but not the after-set one: + assertThat(output.document.getSource(), is(Map.of("foo", "bar", "before", true))); + } + + private static class PipelineOutput { + IngestDocument document; + Exception exception; + + void set(IngestDocument document, Exception exception) { + this.document = document; + this.exception = exception; + } + } +} diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/330_terminate_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/330_terminate_processor.yml new file mode 100644 index 0000000000000..7a46d7bb272d8 --- /dev/null +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/330_terminate_processor.yml @@ -0,0 +1,138 @@ +--- +setup: + - do: + ingest.put_pipeline: + id: "test-pipeline" + body: > + { + "description": "Appends just 'before' to the steps field if the number field is less than 50, or both 'before' and 'after' if not", + "processors": [ + { + "append": { + "field": "steps", + "value": "before" + } + }, + { + "terminate": { + "if": "ctx.number < 50" + } + }, + { + "append": { + "field": "steps", + "value": "after" + } + } + ] + } + - do: + ingest.put_pipeline: + id: "test-final-pipeline" + body: > + { + "description": "Appends 'final' to the steps field", + "processors": [ + { + "append": { + "field": "steps", + "value": "final" + } + } + ] + } + - do: + ingest.put_pipeline: + id: "test-outer-pipeline" + body: > + { + "description": "Runs test-pipeline and then append 'outer' to the steps field", + "processors": [ + { + "pipeline": { + "name": "test-pipeline" + } + }, + { + "append": { + "field": "steps", + "value": "outer" + } + } + ] + } + - do: + indices.create: + index: "test-index-with-default-and-final-pipelines" + body: + settings: + index: + default_pipeline: "test-pipeline" + final_pipeline: "test-final-pipeline" + - do: + indices.create: + index: "test-vanilla-index" + +--- +teardown: + - do: + indices.delete: + index: "test-index-with-default-and-final-pipelines" + ignore_unavailable: true + - do: + indices.delete: + index: "test-vanilla-index" + ignore_unavailable: true + - do: + ingest.delete_pipeline: + id: "test-pipeline" + ignore: 404 + - do: + ingest.delete_pipeline: + id: "test-outer-pipeline" + ignore: 404 + +--- +"Test pipeline including conditional terminate pipeline": + + - do: + bulk: + refresh: true + body: + - '{ "index": {"_index": "test-index-with-default-and-final-pipelines" } }' + - '{ "comment": "should terminate", "number": 40, "steps": [] }' + - '{ "index": {"_index": "test-index-with-default-and-final-pipelines" } }' + - '{ "comment": "should continue to end", "number": 60, "steps": [] }' + + - do: + search: + rest_total_hits_as_int: true + index: "test-index-with-default-and-final-pipelines" + body: + sort: "number" + - match: { hits.total: 2 } + - match: { hits.hits.0._source.number: 40 } + - match: { hits.hits.1._source.number: 60 } + - match: { hits.hits.0._source.steps: ["before", "final"] } + - match: { hits.hits.1._source.steps: ["before", "after", "final"] } + +--- +"Test pipeline with terminate invoked from an outer pipeline": + + - do: + bulk: + refresh: true + pipeline: "test-outer-pipeline" + body: + - '{ "index": {"_index": "test-vanilla-index" } }' + - '{ "comment": "should terminate inner pipeline but not outer", "number": 40, "steps": [] }' + + - do: + search: + rest_total_hits_as_int: true + index: "test-vanilla-index" + body: + sort: "number" + - match: { hits.total: 1 } + - match: { hits.hits.0._source.number: 40 } + - match: { hits.hits.0._source.steps: ["before", "outer"] } diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 9620becd49d59..873f334d0a650 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -148,7 +148,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer handler) { assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute() || ingestDocument.isTerminate()) { handler.accept(ingestDocument, null); return; } @@ -159,7 +159,8 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC // iteratively execute any sync processors while (currentProcessor < processorsWithMetrics.size() && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false - && ingestDocument.isReroute() == false) { + && ingestDocument.isReroute() == false + && ingestDocument.isTerminate() == false) { processorWithMetric = processorsWithMetrics.get(currentProcessor); processor = processorWithMetric.v1(); metric = processorWithMetric.v2(); @@ -185,7 +186,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC } assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute() || ingestDocument.isTerminate()) { handler.accept(ingestDocument, null); return; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 0bc1c0d2932df..280c7684a8553 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -82,6 +82,7 @@ public final class IngestDocument { private boolean doNoSelfReferencesCheck = false; private boolean reroute = false; + private boolean terminate = false; public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map source) { this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source); @@ -935,6 +936,27 @@ void resetReroute() { reroute = false; } + /** + * Sets the terminate flag to true, to indicate that no further processors in the current pipeline should be run for this document. + */ + public void terminate() { + terminate = true; + } + + /** + * Returns whether the {@link #terminate()} flag was set. + */ + boolean isTerminate() { + return terminate; + } + + /** + * Resets the {@link #terminate()} flag. + */ + void resetTerminate() { + terminate = false; + } + public enum Metadata { INDEX(IndexFieldMapper.NAME), TYPE("_type"), diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 6153d45bce779..a8e8fbb5d3217 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -133,6 +133,9 @@ public void execute(IngestDocument ingestDocument, BiConsumer