Skip to content

Commit 89fabe3

Browse files
Add a terminate ingest processor (elastic#114157)
This processor simply causes any remaining processors in the pipeline to be skipped. It will normally be executed conditionally using the `if` option. (If this pipeline is being called from another pipeline, the calling pipeline is *not* terminated.) For example, this: ``` POST /_ingest/pipeline/_simulate { "pipeline": { "description": "Appends just 'before' to the steps field if the number field is present, or both 'before' and 'after' if not", "processors": [ { "append": { "field": "steps", "value": "before" } }, { "terminate": { "if": "ctx.error != null" } }, { "append": { "field": "steps", "value": "after" } } ] }, "docs": [ { "_index": "index", "_id": "doc1", "_source": { "name": "okay", "steps": [] } }, { "_index": "index", "_id": "doc2", "_source": { "name": "bad", "error": "oh no", "steps": [] } } ] } ``` returns something like this: ``` { "docs": [ { "doc": { "_index": "index", "_version": "-3", "_id": "doc1", "_source": { "name": "okay", "steps": [ "before", "after" ] }, "_ingest": { "timestamp": "2024-10-04T16:25:20.448881Z" } } }, { "doc": { "_index": "index", "_version": "-3", "_id": "doc2", "_source": { "name": "bad", "error": "oh no", "steps": [ "before" ] }, "_ingest": { "timestamp": "2024-10-04T16:25:20.448932Z" } } } ] } ```
1 parent f1901b0 commit 89fabe3

File tree

9 files changed

+327
-3
lines changed

9 files changed

+327
-3
lines changed

docs/changelog/114157.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 114157
2+
summary: Add a `terminate` ingest processor
3+
area: Ingest Node
4+
type: feature
5+
issues:
6+
- 110218
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
[[terminate-processor]]
2+
=== Terminate processor
3+
4+
++++
5+
<titleabbrev>Terminate</titleabbrev>
6+
++++
7+
8+
Terminates the current ingest pipeline, causing no further processors to be run.
9+
This will normally be executed conditionally, using the `if` option.
10+
11+
If this pipeline is being called from another pipeline, the calling pipeline is *not* terminated.
12+
13+
[[terminate-options]]
14+
.Terminate Options
15+
[options="header"]
16+
|======
17+
| Name | Required | Default | Description
18+
include::common-options.asciidoc[]
19+
|======
20+
21+
[source,js]
22+
--------------------------------------------------
23+
{
24+
"description" : "terminates the current pipeline if the error field is present",
25+
"terminate": {
26+
"if": "ctx.error != null"
27+
}
28+
}
29+
--------------------------------------------------
30+
// NOTCONSOLE

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
7272
entry(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService)),
7373
entry(SortProcessor.TYPE, new SortProcessor.Factory()),
7474
entry(SplitProcessor.TYPE, new SplitProcessor.Factory()),
75+
entry(TerminateProcessor.TYPE, new TerminateProcessor.Factory()),
7576
entry(TrimProcessor.TYPE, new TrimProcessor.Factory()),
7677
entry(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory()),
7778
entry(UppercaseProcessor.TYPE, new UppercaseProcessor.Factory()),
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest.common;
11+
12+
import org.elasticsearch.ingest.AbstractProcessor;
13+
import org.elasticsearch.ingest.IngestDocument;
14+
import org.elasticsearch.ingest.Processor;
15+
16+
import java.util.Map;
17+
18+
/**
19+
* A {@link Processor} which simply prevents subsequent processors in the pipeline from running (without failing, like {@link FailProcessor}
20+
* does). This will normally be run conditionally, using the {@code if} option.
21+
*/
22+
public class TerminateProcessor extends AbstractProcessor {
23+
24+
static final String TYPE = "terminate";
25+
26+
TerminateProcessor(String tag, String description) {
27+
super(tag, description);
28+
}
29+
30+
@Override
31+
public IngestDocument execute(IngestDocument ingestDocument) {
32+
ingestDocument.terminate();
33+
return ingestDocument;
34+
}
35+
36+
@Override
37+
public String getType() {
38+
return TYPE;
39+
}
40+
41+
public static final class Factory implements Processor.Factory {
42+
43+
@Override
44+
public Processor create(
45+
Map<String, Processor.Factory> processorFactories,
46+
String tag,
47+
String description,
48+
Map<String, Object> config
49+
) {
50+
return new TerminateProcessor(tag, description);
51+
}
52+
}
53+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest.common;
11+
12+
import org.elasticsearch.ingest.CompoundProcessor;
13+
import org.elasticsearch.ingest.IngestDocument;
14+
import org.elasticsearch.ingest.Pipeline;
15+
import org.elasticsearch.ingest.TestTemplateService;
16+
import org.elasticsearch.ingest.ValueSource;
17+
import org.elasticsearch.test.ESTestCase;
18+
19+
import java.util.Map;
20+
21+
import static org.elasticsearch.ingest.RandomDocumentPicks.randomIngestDocument;
22+
import static org.hamcrest.Matchers.is;
23+
import static org.hamcrest.Matchers.nullValue;
24+
25+
public class TerminateProcessorTests extends ESTestCase {
26+
27+
public void testTerminateInPipeline() throws Exception {
28+
Pipeline pipeline = new Pipeline(
29+
"my-pipeline",
30+
null,
31+
null,
32+
null,
33+
new CompoundProcessor(
34+
new SetProcessor(
35+
"before-set",
36+
"Sets before field to true",
37+
new TestTemplateService.MockTemplateScript.Factory("before"),
38+
ValueSource.wrap(true, TestTemplateService.instance()),
39+
null
40+
),
41+
new TerminateProcessor("terminate", "terminates the pipeline"),
42+
new SetProcessor(
43+
"after-set",
44+
"Sets after field to true",
45+
new TestTemplateService.MockTemplateScript.Factory("after"),
46+
ValueSource.wrap(true, TestTemplateService.instance()),
47+
null
48+
)
49+
)
50+
);
51+
IngestDocument input = randomIngestDocument(random(), Map.of("foo", "bar"));
52+
PipelineOutput output = new PipelineOutput();
53+
54+
pipeline.execute(input, output::set);
55+
56+
assertThat(output.exception, nullValue());
57+
// We expect the before-set processor to have run, but not the after-set one:
58+
assertThat(output.document.getSource(), is(Map.of("foo", "bar", "before", true)));
59+
}
60+
61+
private static class PipelineOutput {
62+
IngestDocument document;
63+
Exception exception;
64+
65+
void set(IngestDocument document, Exception exception) {
66+
this.document = document;
67+
this.exception = exception;
68+
}
69+
}
70+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
---
2+
setup:
3+
- do:
4+
ingest.put_pipeline:
5+
id: "test-pipeline"
6+
body: >
7+
{
8+
"description": "Appends just 'before' to the steps field if the number field is less than 50, or both 'before' and 'after' if not",
9+
"processors": [
10+
{
11+
"append": {
12+
"field": "steps",
13+
"value": "before"
14+
}
15+
},
16+
{
17+
"terminate": {
18+
"if": "ctx.number < 50"
19+
}
20+
},
21+
{
22+
"append": {
23+
"field": "steps",
24+
"value": "after"
25+
}
26+
}
27+
]
28+
}
29+
- do:
30+
ingest.put_pipeline:
31+
id: "test-final-pipeline"
32+
body: >
33+
{
34+
"description": "Appends 'final' to the steps field",
35+
"processors": [
36+
{
37+
"append": {
38+
"field": "steps",
39+
"value": "final"
40+
}
41+
}
42+
]
43+
}
44+
- do:
45+
ingest.put_pipeline:
46+
id: "test-outer-pipeline"
47+
body: >
48+
{
49+
"description": "Runs test-pipeline and then append 'outer' to the steps field",
50+
"processors": [
51+
{
52+
"pipeline": {
53+
"name": "test-pipeline"
54+
}
55+
},
56+
{
57+
"append": {
58+
"field": "steps",
59+
"value": "outer"
60+
}
61+
}
62+
]
63+
}
64+
- do:
65+
indices.create:
66+
index: "test-index-with-default-and-final-pipelines"
67+
body:
68+
settings:
69+
index:
70+
default_pipeline: "test-pipeline"
71+
final_pipeline: "test-final-pipeline"
72+
- do:
73+
indices.create:
74+
index: "test-vanilla-index"
75+
76+
---
77+
teardown:
78+
- do:
79+
indices.delete:
80+
index: "test-index-with-default-and-final-pipelines"
81+
ignore_unavailable: true
82+
- do:
83+
indices.delete:
84+
index: "test-vanilla-index"
85+
ignore_unavailable: true
86+
- do:
87+
ingest.delete_pipeline:
88+
id: "test-pipeline"
89+
ignore: 404
90+
- do:
91+
ingest.delete_pipeline:
92+
id: "test-outer-pipeline"
93+
ignore: 404
94+
95+
---
96+
"Test pipeline including conditional terminate pipeline":
97+
98+
- do:
99+
bulk:
100+
refresh: true
101+
body:
102+
- '{ "index": {"_index": "test-index-with-default-and-final-pipelines" } }'
103+
- '{ "comment": "should terminate", "number": 40, "steps": [] }'
104+
- '{ "index": {"_index": "test-index-with-default-and-final-pipelines" } }'
105+
- '{ "comment": "should continue to end", "number": 60, "steps": [] }'
106+
107+
- do:
108+
search:
109+
rest_total_hits_as_int: true
110+
index: "test-index-with-default-and-final-pipelines"
111+
body:
112+
sort: "number"
113+
- match: { hits.total: 2 }
114+
- match: { hits.hits.0._source.number: 40 }
115+
- match: { hits.hits.1._source.number: 60 }
116+
- match: { hits.hits.0._source.steps: ["before", "final"] }
117+
- match: { hits.hits.1._source.steps: ["before", "after", "final"] }
118+
119+
---
120+
"Test pipeline with terminate invoked from an outer pipeline":
121+
122+
- do:
123+
bulk:
124+
refresh: true
125+
pipeline: "test-outer-pipeline"
126+
body:
127+
- '{ "index": {"_index": "test-vanilla-index" } }'
128+
- '{ "comment": "should terminate inner pipeline but not outer", "number": 40, "steps": [] }'
129+
130+
- do:
131+
search:
132+
rest_total_hits_as_int: true
133+
index: "test-vanilla-index"
134+
body:
135+
sort: "number"
136+
- match: { hits.total: 1 }
137+
- match: { hits.hits.0._source.number: 40 }
138+
- match: { hits.hits.0._source.steps: ["before", "outer"] }

server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
148148

149149
void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiConsumer<IngestDocument, Exception> handler) {
150150
assert currentProcessor <= processorsWithMetrics.size();
151-
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) {
151+
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute() || ingestDocument.isTerminate()) {
152152
handler.accept(ingestDocument, null);
153153
return;
154154
}
@@ -159,7 +159,8 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC
159159
// iteratively execute any sync processors
160160
while (currentProcessor < processorsWithMetrics.size()
161161
&& processorsWithMetrics.get(currentProcessor).v1().isAsync() == false
162-
&& ingestDocument.isReroute() == false) {
162+
&& ingestDocument.isReroute() == false
163+
&& ingestDocument.isTerminate() == false) {
163164
processorWithMetric = processorsWithMetrics.get(currentProcessor);
164165
processor = processorWithMetric.v1();
165166
metric = processorWithMetric.v2();
@@ -185,7 +186,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC
185186
}
186187

187188
assert currentProcessor <= processorsWithMetrics.size();
188-
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) {
189+
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute() || ingestDocument.isTerminate()) {
189190
handler.accept(ingestDocument, null);
190191
return;
191192
}

server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public final class IngestDocument {
8282

8383
private boolean doNoSelfReferencesCheck = false;
8484
private boolean reroute = false;
85+
private boolean terminate = false;
8586

8687
public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map<String, Object> source) {
8788
this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source);
@@ -935,6 +936,27 @@ void resetReroute() {
935936
reroute = false;
936937
}
937938

939+
/**
940+
* Sets the terminate flag to true, to indicate that no further processors in the current pipeline should be run for this document.
941+
*/
942+
public void terminate() {
943+
terminate = true;
944+
}
945+
946+
/**
947+
* Returns whether the {@link #terminate()} flag was set.
948+
*/
949+
boolean isTerminate() {
950+
return terminate;
951+
}
952+
953+
/**
954+
* Resets the {@link #terminate()} flag.
955+
*/
956+
void resetTerminate() {
957+
terminate = false;
958+
}
959+
938960
public enum Metadata {
939961
INDEX(IndexFieldMapper.NAME),
940962
TYPE("_type"),

server/src/main/java/org/elasticsearch/ingest/Pipeline.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
133133
if (e != null) {
134134
metrics.ingestFailed();
135135
}
136+
// Reset the terminate status now that pipeline execution is complete (if this was executed as part of another pipeline, the
137+
// outer pipeline should continue):
138+
ingestDocument.resetTerminate();
136139
handler.accept(result, e);
137140
});
138141
}

0 commit comments

Comments
 (0)