Skip to content
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
080b1a6
Support Fields API in conditional ingest processors
eyalkoren Feb 6, 2025
ffba104
Fixing whitelist
eyalkoren Feb 6, 2025
03a216b
Fix access to dotted field from source in yaml test
eyalkoren Feb 6, 2025
b70d222
Merge remote-tracking branch 'upstream/main' into support-dotted-fiel…
eyalkoren Feb 6, 2025
d7fbae9
Merge remote-tracking branch 'upstream/main' into support-dotted-fiel…
eyalkoren Feb 9, 2025
bfc1635
Separate field access and field modification APIs
eyalkoren Feb 9, 2025
b051a58
Extending yaml test to cover all field access APIs
eyalkoren Feb 10, 2025
e56d88d
Separate WriteField and SourceMapField tests
eyalkoren Feb 10, 2025
3ad7f4c
Add test verifying that iterator() API does not allow removal
eyalkoren Feb 11, 2025
ae12f65
Small test change
eyalkoren Feb 11, 2025
740ec51
Merge remote-tracking branch 'upstream/main' into support-dotted-fiel…
eyalkoren Feb 11, 2025
d25d892
Merge remote-tracking branch 'upstream/main' into support-dotted-fiel…
eyalkoren Feb 11, 2025
ac0d251
Merge remote-tracking branch 'upstream/main' into support-dotted-fiel…
eyalkoren Feb 12, 2025
bce9070
Merge remote-tracking branch 'upstream/main' into support-dotted-fiel…
eyalkoren Mar 6, 2025
b49f029
Merge remote-tracking branch 'upstream/main' into support-dotted-fiel…
eyalkoren Mar 9, 2025
4979002
Merge remote-tracking branch 'upstream/main' into support-dotted-fiel…
eyalkoren Jun 26, 2025
cd9751b
Caching script instance instead of script factory
eyalkoren Jun 26, 2025
e9de9ce
Update docs/changelog/121914.yaml
eyalkoren Jun 26, 2025
2c55cd3
Merge remote-tracking branch 'upstream/main' into support-dotted-fiel…
eyalkoren Jun 26, 2025
31a08d7
Merge remote-tracking branch 'upstream/main' into support-dotted-fiel…
eyalkoren Jul 8, 2025
f55689d
Initial review changes
eyalkoren Jul 8, 2025
8d2fc53
Removing params from execute
eyalkoren Jul 8, 2025
bc67d49
Remove todo
eyalkoren Jul 8, 2025
ce09705
Removing Field's methods from whitelists
eyalkoren Jul 8, 2025
5a60385
Merge remote-tracking branch 'upstream/main' into support-dotted-fiel…
eyalkoren Jul 8, 2025
62bb43d
[CI] Auto commit changes from spotless
Jul 8, 2025
34ddc6e
Merge branch 'main' into support-dotted-fields-in-ingest-processors
eyalkoren Jul 9, 2025
25a66d6
Merge remote-tracking branch 'upstream/main' into support-dotted-fiel…
eyalkoren Jul 13, 2025
90cddcf
Merge remote-tracking branch 'eyalkoren/support-dotted-fields-in-inge…
eyalkoren Jul 13, 2025
bd5affd
Merge branch 'main' into support-dotted-fields-in-ingest-processors
eyalkoren Jul 14, 2025
140ac66
Merge remote-tracking branch 'upstream/main' into support-dotted-fiel…
eyalkoren Jul 15, 2025
45d0254
Not relying on specific error type in yaml test
eyalkoren Jul 15, 2025
45a23e1
Merge branch 'main' into support-dotted-fields-in-ingest-processors
eyalkoren Jul 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/121914.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121914
summary: Support Fields API in conditional ingest processors
area: Infra/Core
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,150 @@ teardown:
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.conditional_field: "bar" }
- is_false: _source.bytes_target_field

---
"Test conditional processor with fields API":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body:
description: "_description"
processors:
- set:
if: "field('get.field').get('') == 'one'"
field: "one"
value: 1
- set:
if: "field('get.field').get('') == 'two'"
field: "missing"
value: "missing"
- set:
if: " /* avoid yaml stash */ $('get.field', 'one') == 'one'"
field: "dollar"
value: true
- set:
if: "field('missing.field').get('fallback') == 'fallback'"
field: "fallback"
value: "fallback"
- set:
if: "field('nested.array.get.with.index.field').get(1, null) == 'two'"
field: "two"
value: 2
- set:
if: "field('getName.field').getName() == 'getName.field'"
field: "three"
value: 3
- set:
if: "field('existing.field').exists()"
field: "four"
value: 4
- set:
if: "!field('empty.field').isEmpty()"
field: "missing"
value: "missing"
- set:
if: "field('size.field').size() == 2"
field: "five"
value: 5
- set:
if: >
def iterator = field('iterator.field').iterator();
def sum = 0;
while (iterator.hasNext()) {
sum += iterator.next();
}
return sum == 6;
field: "six"
value: 6
- set:
if: "field('hasValue.field').hasValue(v -> v == 'two')"
field: "seven"
value: 7
- match: { acknowledged: true }

- do:
index:
index: test
id: "1"
pipeline: "my_pipeline"
body:
get.field: "one"
nested:
array:
get.with.index.field: ["one", "two", "three"]
getName.field: "my_name"
existing.field: "indeed"
empty.field: []
size.field: ["one", "two"]
iterator.field: [1, 2, 3]
hasValue.field: ["one", "two", "three"]

- do:
get:
index: test
id: "1"
- match: { _source.get\.field: "one" }
- match: { _source.one: 1 }
- is_false: _source.missing
- is_true: _source.dollar
- match: { _source.fallback: "fallback" }
- match: { _source.nested.array.get\.with\.index\.field: ["one", "two", "three"] }
- match: { _source.two: 2 }
- match: { _source.three: 3 }
- match: { _source.four: 4 }
- match: { _source.five: 5 }
- match: { _source.six: 6 }
- match: { _source.seven: 7 }

---
"Test fields iterator is unmodifiable":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body:
description: "_description"
processors:
- set:
if: >
def iterator = field('iterator.field').iterator();
def sum = 0;
while (iterator.hasNext()) {
sum += iterator.next();
iterator.remove();
}
return sum == 6;
field: "sum"
value: 6
- match: { acknowledged: true }

- do:
index:
index: test
id: "1"
pipeline: "my_pipeline"
body:
test.field: [1, 2, 3]

- do:
index:
index: test
id: "2"
pipeline: "my_pipeline"
body:
iterator.field: [1, 2, 3]
catch: bad_request
- match: { error.root_cause.0.type: "script_exception" }
- match: { error.root_cause.0.reason: "runtime error" }

- do:
get:
index: test
id: "1"
- match: { _source.test\.field: [1, 2, 3] }
- is_false: _source.sum

- do:
get:
index: test
id: "2"
catch: missing
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,12 @@ class org.elasticsearch.script.IngestScript {
}

class org.elasticsearch.script.field.WriteField {
String getName()
boolean exists()
WriteField move(def)
WriteField overwrite(def)
void remove()
WriteField set(def)
WriteField append(def)
boolean isEmpty()
int size()
Iterator iterator()
def get(def)
def get(int, def)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the "Elastic License
# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
# Public License v 1"; you may not use this file except in compliance with, at
# your election, the "Elastic License 2.0", the "GNU Affero General Public
# License v3.0 only", or the "Server Side Public License, v 1".
#

# This file contains a whitelist for conditional ingest scripts

class org.elasticsearch.script.IngestConditionalScript {
SourceMapField field(String)
}

class org.elasticsearch.script.field.SourceMapField {
boolean exists()
Iterator iterator()
def get(def)
def get(int, def)
boolean hasValue(Predicate)
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,12 @@ class org.elasticsearch.script.ReindexScript {
}

class org.elasticsearch.script.field.WriteField {
String getName()
boolean exists()
WriteField move(def)
WriteField overwrite(def)
void remove()
WriteField set(def)
WriteField append(def)
boolean isEmpty()
int size()
Iterator iterator()
def get(def)
def get(int, def)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,12 @@ class org.elasticsearch.script.UpdateScript {
}

class org.elasticsearch.script.field.WriteField {
String getName()
boolean exists()
WriteField move(def)
WriteField overwrite(def)
void remove()
WriteField set(def)
WriteField append(def)
boolean isEmpty()
int size()
Iterator iterator()
def get(def)
def get(int, def)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,12 @@ class org.elasticsearch.script.UpdateByQueryScript {
}

class org.elasticsearch.script.field.WriteField {
String getName()
boolean exists()
WriteField move(def)
WriteField overwrite(def)
void remove()
WriteField set(def)
WriteField append(def)
boolean isEmpty()
int size()
Iterator iterator()
def get(def)
def get(int, def)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.script.CtxMapWrapper;
import org.elasticsearch.script.DynamicMap;
import org.elasticsearch.script.IngestConditionalScript;
import org.elasticsearch.script.Script;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
private final IngestMetric metric;
private final LongSupplier relativeTimeProvider;
private final IngestConditionalScript precompiledConditionScript;
private final CtxMapWrapper ctxMapWrapper;

ConditionalProcessor(String tag, String description, Script script, ScriptService scriptService, Processor processor) {
this(tag, description, script, scriptService, processor, System::nanoTime);
Expand All @@ -76,11 +78,12 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
this.processor = processor;
this.metric = new IngestMetric();
this.relativeTimeProvider = relativeTimeProvider;
this.ctxMapWrapper = new CtxMapWrapper();

try {
final IngestConditionalScript.Factory factory = scriptService.compile(script, IngestConditionalScript.CONTEXT);
if (ScriptType.INLINE.equals(script.getType())) {
precompiledConditionScript = factory.newInstance(script.getParams());
precompiledConditionScript = factory.newInstance(script.getParams(), ctxMapWrapper);
} else {
// stored script, so will have to compile at runtime
precompiledConditionScript = null;
Expand Down Expand Up @@ -144,9 +147,14 @@ boolean evaluate(IngestDocument ingestDocument) {
IngestConditionalScript script = precompiledConditionScript;
if (script == null) {
IngestConditionalScript.Factory factory = scriptService.compile(condition, IngestConditionalScript.CONTEXT);
script = factory.newInstance(condition.getParams());
script = factory.newInstance(condition.getParams(), ctxMapWrapper);
}
ctxMapWrapper.setCtxMap(new UnmodifiableIngestData(new DynamicMap(ingestDocument.getSourceAndMetadata(), FUNCTIONS)));
try {
return script.execute();
} finally {
ctxMapWrapper.clearCtxMap();
}
return script.execute(new UnmodifiableIngestData(new DynamicMap(ingestDocument.getSourceAndMetadata(), FUNCTIONS)));
}

public Processor getInnerProcessor() {
Expand Down
36 changes: 36 additions & 0 deletions server/src/main/java/org/elasticsearch/script/CtxMapWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.script;

import java.util.Map;

/**
* A wrapper for a {@link CtxMap} that allows for ad-hoc setting for script execution.
* This allows for precompilation of scripts that can be executed with different contexts.
* The wrapped {@link CtxMap} should be cleared after use to avoid leaks.
*/
public class CtxMapWrapper {
private Map<String, Object> ctxMap;

public Map<String, Object> getCtxMap() {
if (ctxMap == null) {
throw new IllegalStateException("CtxMap is not set");
}
return ctxMap;
}

public void setCtxMap(Map<String, Object> ctxMap) {
this.ctxMap = ctxMap;
}

public void clearCtxMap() {
this.ctxMap = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

/**
* A script used by {@link org.elasticsearch.ingest.ConditionalProcessor}.
* To properly expose the {@link SourceMapFieldScript#field(String)} API, make sure to provide a valid {@link CtxMap} before execution
* through the {@link CtxMapWrapper} passed to the constructor and make sure to clear it after use to avoid leaks.
*/
public abstract class IngestConditionalScript {
public abstract class IngestConditionalScript extends SourceMapFieldScript {

public static final String[] PARAMETERS = { "ctx" };
public static final String[] PARAMETERS = {};

/** The context used to compile {@link IngestConditionalScript} factories. */
public static final ScriptContext<Factory> CONTEXT = new ScriptContext<>(
Expand All @@ -33,7 +35,8 @@ public abstract class IngestConditionalScript {
/** The generic runtime parameters for the script. */
private final Map<String, Object> params;

public IngestConditionalScript(Map<String, Object> params) {
public IngestConditionalScript(Map<String, Object> params, CtxMapWrapper ctxMapWrapper) {
super(ctxMapWrapper);
this.params = params;
}

Expand All @@ -42,9 +45,9 @@ public Map<String, Object> getParams() {
return params;
}

public abstract boolean execute(Map<String, Object> ctx);
public abstract boolean execute();

public interface Factory {
IngestConditionalScript newInstance(Map<String, Object> params);
IngestConditionalScript newInstance(Map<String, Object> params, CtxMapWrapper ctxMapWrapper);
}
}
Loading
Loading