Skip to content

Commit a6f0f6f

Browse files
authored
Support Fields API in conditional ingest processors (#121914)
1 parent 6ffe27d commit a6f0f6f

File tree

17 files changed

+659
-354
lines changed

17 files changed

+659
-354
lines changed

docs/changelog/121914.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 121914
2+
summary: Support Fields API in conditional ingest processors
3+
area: Infra/Core
4+
type: feature
5+
issues: []

modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/210_conditional_processor.yml

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,150 @@ teardown:
7474
- match: { _source.bytes_source_field: "1kb" }
7575
- match: { _source.conditional_field: "bar" }
7676
- is_false: _source.bytes_target_field
77+
78+
---
79+
"Test conditional processor with fields API":
80+
- do:
81+
ingest.put_pipeline:
82+
id: "my_pipeline"
83+
body:
84+
description: "_description"
85+
processors:
86+
- set:
87+
if: "field('get.field').get('') == 'one'"
88+
field: "one"
89+
value: 1
90+
- set:
91+
if: "field('get.field').get('') == 'two'"
92+
field: "missing"
93+
value: "missing"
94+
- set:
95+
if: " /* avoid yaml stash */ $('get.field', 'one') == 'one'"
96+
field: "dollar"
97+
value: true
98+
- set:
99+
if: "field('missing.field').get('fallback') == 'fallback'"
100+
field: "fallback"
101+
value: "fallback"
102+
- set:
103+
if: "field('nested.array.get.with.index.field').get(1, null) == 'two'"
104+
field: "two"
105+
value: 2
106+
- set:
107+
if: "field('getName.field').getName() == 'getName.field'"
108+
field: "three"
109+
value: 3
110+
- set:
111+
if: "field('existing.field').exists()"
112+
field: "four"
113+
value: 4
114+
- set:
115+
if: "!field('empty.field').isEmpty()"
116+
field: "missing"
117+
value: "missing"
118+
- set:
119+
if: "field('size.field').size() == 2"
120+
field: "five"
121+
value: 5
122+
- set:
123+
if: >
124+
def iterator = field('iterator.field').iterator();
125+
def sum = 0;
126+
while (iterator.hasNext()) {
127+
sum += iterator.next();
128+
}
129+
return sum == 6;
130+
field: "six"
131+
value: 6
132+
- set:
133+
if: "field('hasValue.field').hasValue(v -> v == 'two')"
134+
field: "seven"
135+
value: 7
136+
- match: { acknowledged: true }
137+
138+
- do:
139+
index:
140+
index: test
141+
id: "1"
142+
pipeline: "my_pipeline"
143+
body:
144+
get.field: "one"
145+
nested:
146+
array:
147+
get.with.index.field: ["one", "two", "three"]
148+
getName.field: "my_name"
149+
existing.field: "indeed"
150+
empty.field: []
151+
size.field: ["one", "two"]
152+
iterator.field: [1, 2, 3]
153+
hasValue.field: ["one", "two", "three"]
154+
155+
- do:
156+
get:
157+
index: test
158+
id: "1"
159+
- match: { _source.get\.field: "one" }
160+
- match: { _source.one: 1 }
161+
- is_false: _source.missing
162+
- is_true: _source.dollar
163+
- match: { _source.fallback: "fallback" }
164+
- match: { _source.nested.array.get\.with\.index\.field: ["one", "two", "three"] }
165+
- match: { _source.two: 2 }
166+
- match: { _source.three: 3 }
167+
- match: { _source.four: 4 }
168+
- match: { _source.five: 5 }
169+
- match: { _source.six: 6 }
170+
- match: { _source.seven: 7 }
171+
172+
---
173+
"Test fields iterator is unmodifiable":
174+
- do:
175+
ingest.put_pipeline:
176+
id: "my_pipeline"
177+
body:
178+
description: "_description"
179+
processors:
180+
- set:
181+
if: >
182+
def iterator = field('iterator.field').iterator();
183+
def sum = 0;
184+
while (iterator.hasNext()) {
185+
sum += iterator.next();
186+
iterator.remove();
187+
}
188+
return sum == 6;
189+
field: "sum"
190+
value: 6
191+
- match: { acknowledged: true }
192+
193+
- do:
194+
index:
195+
index: test
196+
id: "1"
197+
pipeline: "my_pipeline"
198+
body:
199+
test.field: [1, 2, 3]
200+
- match: { error: null }
201+
202+
- do:
203+
index:
204+
index: test
205+
id: "2"
206+
pipeline: "my_pipeline"
207+
body:
208+
iterator.field: [1, 2, 3]
209+
catch: bad_request
210+
- length: { error.root_cause: 1 }
211+
212+
- do:
213+
get:
214+
index: test
215+
id: "1"
216+
- match: { _source.test\.field: [1, 2, 3] }
217+
- is_false: _source.sum
218+
219+
- do:
220+
get:
221+
index: test
222+
id: "2"
223+
catch: missing

modules/lang-painless/src/main/resources/org/elasticsearch/painless/org.elasticsearch.script.ingest.txt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,12 @@ class org.elasticsearch.script.IngestScript {
4646
}
4747

4848
class org.elasticsearch.script.field.WriteField {
49-
String getName()
5049
boolean exists()
5150
WriteField move(def)
5251
WriteField overwrite(def)
5352
void remove()
5453
WriteField set(def)
5554
WriteField append(def)
56-
boolean isEmpty()
57-
int size()
5855
Iterator iterator()
5956
def get(def)
6057
def get(int, def)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#
2+
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
# or more contributor license agreements. Licensed under the "Elastic License
4+
# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
# Public License v 1"; you may not use this file except in compliance with, at
6+
# your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
# License v3.0 only", or the "Server Side Public License, v 1".
8+
#
9+
10+
# This file contains a whitelist for conditional ingest scripts
11+
12+
class org.elasticsearch.script.IngestConditionalScript {
13+
SourceMapField field(String)
14+
}
15+
16+
class org.elasticsearch.script.field.SourceMapField {
17+
boolean exists()
18+
Iterator iterator()
19+
def get(def)
20+
def get(int, def)
21+
boolean hasValue(Predicate)
22+
}

modules/lang-painless/src/main/resources/org/elasticsearch/painless/org.elasticsearch.script.reindex.txt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,12 @@ class org.elasticsearch.script.ReindexScript {
4242
}
4343

4444
class org.elasticsearch.script.field.WriteField {
45-
String getName()
4645
boolean exists()
4746
WriteField move(def)
4847
WriteField overwrite(def)
4948
void remove()
5049
WriteField set(def)
5150
WriteField append(def)
52-
boolean isEmpty()
53-
int size()
5451
Iterator iterator()
5552
def get(def)
5653
def get(int, def)

modules/lang-painless/src/main/resources/org/elasticsearch/painless/org.elasticsearch.script.update.txt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,12 @@ class org.elasticsearch.script.UpdateScript {
3737
}
3838

3939
class org.elasticsearch.script.field.WriteField {
40-
String getName()
4140
boolean exists()
4241
WriteField move(def)
4342
WriteField overwrite(def)
4443
void remove()
4544
WriteField set(def)
4645
WriteField append(def)
47-
boolean isEmpty()
48-
int size()
4946
Iterator iterator()
5047
def get(def)
5148
def get(int, def)

modules/lang-painless/src/main/resources/org/elasticsearch/painless/org.elasticsearch.script.update_by_query.txt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,12 @@ class org.elasticsearch.script.UpdateByQueryScript {
3636
}
3737

3838
class org.elasticsearch.script.field.WriteField {
39-
String getName()
4039
boolean exists()
4140
WriteField move(def)
4241
WriteField overwrite(def)
4342
void remove()
4443
WriteField set(def)
4544
WriteField append(def)
46-
boolean isEmpty()
47-
int size()
4845
Iterator iterator()
4946
def get(def)
5047
def get(int, def)

server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.common.logging.DeprecationCategory;
1313
import org.elasticsearch.common.logging.DeprecationLogger;
14+
import org.elasticsearch.script.CtxMapWrapper;
1415
import org.elasticsearch.script.DynamicMap;
1516
import org.elasticsearch.script.IngestConditionalScript;
1617
import org.elasticsearch.script.Script;
@@ -57,6 +58,7 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
5758
private final IngestMetric metric;
5859
private final LongSupplier relativeTimeProvider;
5960
private final IngestConditionalScript precompiledConditionScript;
61+
private final CtxMapWrapper ctxMapWrapper;
6062

6163
ConditionalProcessor(String tag, String description, Script script, ScriptService scriptService, Processor processor) {
6264
this(tag, description, script, scriptService, processor, System::nanoTime);
@@ -76,11 +78,12 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
7678
this.processor = processor;
7779
this.metric = new IngestMetric();
7880
this.relativeTimeProvider = relativeTimeProvider;
81+
this.ctxMapWrapper = new CtxMapWrapper();
7982

8083
try {
8184
final IngestConditionalScript.Factory factory = scriptService.compile(script, IngestConditionalScript.CONTEXT);
8285
if (ScriptType.INLINE.equals(script.getType())) {
83-
precompiledConditionScript = factory.newInstance(script.getParams());
86+
precompiledConditionScript = factory.newInstance(script.getParams(), ctxMapWrapper);
8487
} else {
8588
// stored script, so will have to compile at runtime
8689
precompiledConditionScript = null;
@@ -144,9 +147,14 @@ boolean evaluate(IngestDocument ingestDocument) {
144147
IngestConditionalScript script = precompiledConditionScript;
145148
if (script == null) {
146149
IngestConditionalScript.Factory factory = scriptService.compile(condition, IngestConditionalScript.CONTEXT);
147-
script = factory.newInstance(condition.getParams());
150+
script = factory.newInstance(condition.getParams(), ctxMapWrapper);
151+
}
152+
ctxMapWrapper.setCtxMap(new UnmodifiableIngestData(new DynamicMap(ingestDocument.getSourceAndMetadata(), FUNCTIONS)));
153+
try {
154+
return script.execute();
155+
} finally {
156+
ctxMapWrapper.clearCtxMap();
148157
}
149-
return script.execute(new UnmodifiableIngestData(new DynamicMap(ingestDocument.getSourceAndMetadata(), FUNCTIONS)));
150158
}
151159

152160
public Processor getInnerProcessor() {
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.script;
11+
12+
import java.util.Map;
13+
14+
/**
15+
* A wrapper for a {@link CtxMap} that allows for ad-hoc setting for script execution.
16+
* This allows for precompilation of scripts that can be executed with different contexts.
17+
* The wrapped {@link CtxMap} should be cleared after use to avoid leaks.
18+
*/
19+
public class CtxMapWrapper {
20+
private Map<String, Object> ctxMap;
21+
22+
public Map<String, Object> getCtxMap() {
23+
if (ctxMap == null) {
24+
throw new IllegalStateException("CtxMap is not set");
25+
}
26+
return ctxMap;
27+
}
28+
29+
public void setCtxMap(Map<String, Object> ctxMap) {
30+
this.ctxMap = ctxMap;
31+
}
32+
33+
public void clearCtxMap() {
34+
this.ctxMap = null;
35+
}
36+
}

server/src/main/java/org/elasticsearch/script/IngestConditionalScript.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515

1616
/**
1717
* A script used by {@link org.elasticsearch.ingest.ConditionalProcessor}.
18+
* To properly expose the {@link SourceMapFieldScript#field(String)} API, make sure to provide a valid {@link CtxMap} before execution
19+
* through the {@link CtxMapWrapper} passed to the constructor and make sure to clear it after use to avoid leaks.
1820
*/
19-
public abstract class IngestConditionalScript {
21+
public abstract class IngestConditionalScript extends SourceMapFieldScript {
2022

21-
public static final String[] PARAMETERS = { "ctx" };
23+
public static final String[] PARAMETERS = {};
2224

2325
/** The context used to compile {@link IngestConditionalScript} factories. */
2426
public static final ScriptContext<Factory> CONTEXT = new ScriptContext<>(
@@ -33,7 +35,8 @@ public abstract class IngestConditionalScript {
3335
/** The generic runtime parameters for the script. */
3436
private final Map<String, Object> params;
3537

36-
public IngestConditionalScript(Map<String, Object> params) {
38+
public IngestConditionalScript(Map<String, Object> params, CtxMapWrapper ctxMapWrapper) {
39+
super(ctxMapWrapper);
3740
this.params = params;
3841
}
3942

@@ -42,9 +45,9 @@ public Map<String, Object> getParams() {
4245
return params;
4346
}
4447

45-
public abstract boolean execute(Map<String, Object> ctx);
48+
public abstract boolean execute();
4649

4750
public interface Factory {
48-
IngestConditionalScript newInstance(Map<String, Object> params);
51+
IngestConditionalScript newInstance(Map<String, Object> params, CtxMapWrapper ctxMapWrapper);
4952
}
5053
}

0 commit comments

Comments
 (0)