Skip to content
Merged
83 changes: 66 additions & 17 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -201,7 +202,7 @@ public <T> T getFieldValue(String path, Class<T> clazz) {
* or if the field that is found at the provided path is not of the expected type.
*/
public <T> T getFieldValue(String path, Class<T> clazz, boolean ignoreMissing) {
final FieldPath fieldPath = FieldPath.of(path);
final FieldPath fieldPath = FieldPath.of(path, getCurrentAccessPatternSafe());
Object context = fieldPath.initialContext(this);
ResolveResult result = resolve(fieldPath.pathElements, fieldPath.pathElements.length, path, context, getCurrentAccessPatternSafe());
if (result.wasSuccessful) {
Expand Down Expand Up @@ -270,7 +271,7 @@ public boolean hasField(String path) {
* @throws IllegalArgumentException if the path is null, empty or invalid.
*/
public boolean hasField(String path, boolean failOutOfRange) {
final FieldPath fieldPath = FieldPath.of(path);
final FieldPath fieldPath = FieldPath.of(path, getCurrentAccessPatternSafe());
Object context = fieldPath.initialContext(this);
int leafKeyIndex = fieldPath.pathElements.length - 1;
int lastContainerIndex = fieldPath.pathElements.length - 2;
Expand Down Expand Up @@ -424,7 +425,7 @@ public void removeField(String path) {
* @throws IllegalArgumentException if the path is null, empty, or invalid; or if the field doesn't exist (and ignoreMissing is false).
*/
public void removeField(String path, boolean ignoreMissing) {
final FieldPath fieldPath = FieldPath.of(path);
final FieldPath fieldPath = FieldPath.of(path, getCurrentAccessPatternSafe());
Object context = fieldPath.initialContext(this);
String leafKey = fieldPath.pathElements[fieldPath.pathElements.length - 1];
ResolveResult result = resolve(
Expand Down Expand Up @@ -734,7 +735,7 @@ public void setFieldValue(String path, Object value, boolean ignoreEmptyValue) {
}

private void setFieldValue(String path, Object value, boolean append, boolean allowDuplicates) {
final FieldPath fieldPath = FieldPath.of(path);
final FieldPath fieldPath = FieldPath.of(path, getCurrentAccessPatternSafe());
Object context = fieldPath.initialContext(this);
int leafKeyIndex = fieldPath.pathElements.length - 1;
int lastContainerIndex = fieldPath.pathElements.length - 2;
Expand Down Expand Up @@ -1155,18 +1156,18 @@ List<String> getPipelineStack() {
}

/**
* @return The access pattern for any currently executing pipelines, or null if no pipelines are in progress for this doc
* @return The access pattern for any currently executing pipelines, or empty if no pipelines are in progress for this doc
*/
public IngestPipelineFieldAccessPattern getCurrentAccessPattern() {
return accessPatternStack.peek();
public Optional<IngestPipelineFieldAccessPattern> getCurrentAccessPattern() {
return Optional.ofNullable(accessPatternStack.peek());
}

/**
* @return The access pattern for any currently executing pipelines, or {@link IngestPipelineFieldAccessPattern#CLASSIC} if no
* pipelines are in progress for this doc for the sake of backwards compatibility
*/
private IngestPipelineFieldAccessPattern getCurrentAccessPatternSafe() {
return Objects.requireNonNullElse(getCurrentAccessPattern(), IngestPipelineFieldAccessPattern.CLASSIC);
return getCurrentAccessPattern().orElse(IngestPipelineFieldAccessPattern.CLASSIC);
}

/**
Expand Down Expand Up @@ -1290,8 +1291,15 @@ public String getFieldName() {

private static final class FieldPath {

/**
* A compound cache key for tracking previously parsed field paths
* @param path The field path as given by the caller
* @param accessPattern The access pattern used to parse the field path
*/
private record CacheKey(String path, IngestPipelineFieldAccessPattern accessPattern) {}

private static final int MAX_SIZE = 512;
private static final Map<String, FieldPath> CACHE = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
private static final Map<CacheKey, FieldPath> CACHE = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();

// constructing a new FieldPath requires that we parse a String (e.g. "foo.bar.baz") into an array
// of path elements (e.g. ["foo", "bar", "baz"]). Calling String#split results in the allocation
Expand All @@ -1300,27 +1308,28 @@ private static final class FieldPath {
// do some processing ourselves on the path and path elements to validate and prepare them.
// the above CACHE and the below 'FieldPath.of' method allow us to almost always avoid this work.

static FieldPath of(String path) {
static FieldPath of(String path, IngestPipelineFieldAccessPattern accessPattern) {
if (Strings.isEmpty(path)) {
throw new IllegalArgumentException("path cannot be null nor empty");
}
FieldPath res = CACHE.get(path);
CacheKey cacheKey = new CacheKey(path, accessPattern);
FieldPath res = CACHE.get(cacheKey);
if (res != null) {
return res;
}
res = new FieldPath(path);
res = new FieldPath(path, accessPattern);
if (CACHE.size() > MAX_SIZE) {
CACHE.clear();
}
CACHE.put(path, res);
CACHE.put(cacheKey, res);
return res;
}

private final String[] pathElements;
private final boolean useIngestContext;

// you shouldn't call this directly, use the FieldPath.of method above instead!
private FieldPath(String path) {
private FieldPath(String path, IngestPipelineFieldAccessPattern accessPattern) {
String newPath;
if (path.startsWith(INGEST_KEY_PREFIX)) {
useIngestContext = true;
Expand All @@ -1333,10 +1342,50 @@ private FieldPath(String path) {
newPath = path;
}
}
this.pathElements = newPath.split("\\.");
if (pathElements.length == 1 && pathElements[0].isEmpty()) {
throw new IllegalArgumentException("path [" + path + "] is not valid");
String[] pathParts = newPath.split("\\.");
this.pathElements = processPathParts(path, pathParts, accessPattern);
}

private static String[] processPathParts(String fullPath, String[] pathParts, IngestPipelineFieldAccessPattern accessPattern) {
if (pathParts.length == 1 && pathParts[0].isEmpty()) {
throw new IllegalArgumentException("path [" + fullPath + "] is not valid");
}
return switch (accessPattern) {
case CLASSIC -> validateClassicFields(fullPath, pathParts);
case FLEXIBLE -> parseFlexibleFields(fullPath, pathParts);
};
}

/**
* Parses path syntax that is specific to the {@link IngestPipelineFieldAccessPattern#CLASSIC} ingest doc access pattern. Supports
* syntax like context aware array access.
* @param fullPath The un-split path to use for error messages
* @param pathParts The tokenized field path to parse
* @return An array of Strings
*/
private static String[] validateClassicFields(String fullPath, String[] pathParts) {
for (String pathPart : pathParts) {
if (pathPart.isEmpty()) {
throw new IllegalArgumentException("path [" + fullPath + "] is not valid");
}
}
return pathParts;
}

/**
* Parses path syntax that is specific to the {@link IngestPipelineFieldAccessPattern#FLEXIBLE} ingest doc access pattern. Supports
* syntax like square bracket array access, which is the only way to index arrays in flexible mode.
* @param fullPath The un-split path to use for error messages
* @param pathParts The tokenized field path to parse
* @return An array of Strings
*/
private static String[] parseFlexibleFields(String fullPath, String[] pathParts) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it meaningful that this is named parseFlexibleFields, when the analogous method for classic mode is validateClassicFields? They're both just validating, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I heavily adapted this code from #133790 with the hope that I can rebase it on top of these changes with minimal heartburn. I think this is just the method name from that PR.

for (String pathPart : pathParts) {
if (pathPart.isEmpty() || pathPart.indexOf('[') >= 0 || pathPart.indexOf(']') >= 0) {
throw new IllegalArgumentException("path [" + fullPath + "] is not valid");
}
}
return pathParts;
}

public Object initialContext(IngestDocument document) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,90 @@ private void doWithRandomAccessPattern(Consumer<IngestDocument> action) throws E
doWithAccessPattern(randomFrom(IngestPipelineFieldAccessPattern.values()), action);
}

private void assertPathValid(IngestDocument doc, String path) {
// The fields being checked do not exist, so they all return false when running hasField
assertFalse(doc.hasField(path));
}

private void assertPathInvalid(IngestDocument doc, String path, String errorMessage) {
IllegalArgumentException expected = expectThrows(IllegalArgumentException.class, () -> doc.hasField(path));
assertThat(expected.getMessage(), equalTo(errorMessage));
}

public void testPathParsingLogic() throws Exception {
// Force a blank document for this test
document = new IngestDocument("index", "id", 1, null, null, new HashMap<>());

doWithRandomAccessPattern((doc) -> {
assertPathInvalid(doc, null, "path cannot be null nor empty");
assertPathInvalid(doc, "", "path cannot be null nor empty");
assertPathValid(doc, "a");
assertPathValid(doc, "ab");
assertPathValid(doc, "abc");
assertPathValid(doc, "a.b");
assertPathValid(doc, "a.b.c");
// Trailing empty strings are trimmed by field path parsing logic
assertPathValid(doc, "a.");
assertPathValid(doc, "a..");
assertPathValid(doc, "a...");
// Empty field names are not allowed in the beginning or middle of the path though
assertPathInvalid(doc, ".a.b", "path [.a.b] is not valid");
assertPathInvalid(doc, "a..b", "path [a..b] is not valid");
});

doWithAccessPattern(CLASSIC, (doc) -> {
// Classic allows number fields because they are treated as either field names or array indices depending on context
assertPathValid(doc, "a.0");
// Classic allows square brackets because it is not part of it's syntax
assertPathValid(doc, "a[0]");
assertPathValid(doc, "a[]");
assertPathValid(doc, "a][");
assertPathValid(doc, "[");
assertPathValid(doc, "a[");
assertPathValid(doc, "[a");
assertPathValid(doc, "]");
assertPathValid(doc, "a]");
assertPathValid(doc, "]a");
assertPathValid(doc, "[]");
assertPathValid(doc, "][");
assertPathValid(doc, "[a]");
assertPathValid(doc, "]a[");
assertPathValid(doc, "[]a");
assertPathValid(doc, "][a");
});

doWithAccessPattern(FLEXIBLE, (doc) -> {
// Flexible has specific handling of square brackets
assertPathInvalid(doc, "a[0]", "path [a[0]] is not valid");
assertPathInvalid(doc, "a[]", "path [a[]] is not valid");
assertPathInvalid(doc, "a][", "path [a][] is not valid");
assertPathInvalid(doc, "[", "path [[] is not valid");
assertPathInvalid(doc, "a[", "path [a[] is not valid");
assertPathInvalid(doc, "[a", "path [[a] is not valid");
assertPathInvalid(doc, "]", "path []] is not valid");
assertPathInvalid(doc, "a]", "path [a]] is not valid");
assertPathInvalid(doc, "]a", "path []a] is not valid");
assertPathInvalid(doc, "[]", "path [[]] is not valid");
assertPathInvalid(doc, "][", "path [][] is not valid");
assertPathInvalid(doc, "[a]", "path [[a]] is not valid");
assertPathInvalid(doc, "]a[", "path []a[] is not valid");
assertPathInvalid(doc, "[]a", "path [[]a] is not valid");
assertPathInvalid(doc, "][a", "path [][a] is not valid");

assertPathInvalid(doc, "a[0].b", "path [a[0].b] is not valid");
assertPathInvalid(doc, "a[0].b[1]", "path [a[0].b[1]] is not valid");
assertPathInvalid(doc, "a[0].b[1].c", "path [a[0].b[1].c] is not valid");
assertPathInvalid(doc, "a[0].b[1].c[2]", "path [a[0].b[1].c[2]] is not valid");
assertPathInvalid(doc, "a[0][1].c[2]", "path [a[0][1].c[2]] is not valid");
assertPathInvalid(doc, "a[0].b[1][2]", "path [a[0].b[1][2]] is not valid");
assertPathInvalid(doc, "a[0][1][2]", "path [a[0][1][2]] is not valid");

assertPathInvalid(doc, "a[0][", "path [a[0][] is not valid");
assertPathInvalid(doc, "a[0]]", "path [a[0]]] is not valid");
assertPathInvalid(doc, "a[0]blahblah", "path [a[0]blahblah] is not valid");
});
}

public void testSimpleGetFieldValue() throws Exception {
doWithRandomAccessPattern((doc) -> {
assertThat(doc.getFieldValue("foo", String.class), equalTo("bar"));
Expand Down Expand Up @@ -2034,7 +2118,7 @@ public void testNestedAccessPatternPropagation() {

// At the end of the test, there should be neither pipeline ids nor access patterns left in the stack.
assertThat(document.getPipelineStack(), is(empty()));
assertThat(document.getCurrentAccessPattern(), is(nullValue()));
assertThat(document.getCurrentAccessPattern().isEmpty(), is(true));
}

/**
Expand Down Expand Up @@ -2082,7 +2166,8 @@ void doTestNestedAccessPatternPropagation(int level, int maxCallDepth, IngestDoc

// Assert expected state
assertThat(document.getPipelineStack().getFirst(), is(expectedPipelineId));
assertThat(document.getCurrentAccessPattern(), is(expectedAccessPattern));
assertThat(document.getCurrentAccessPattern().isPresent(), is(true));
assertThat(document.getCurrentAccessPattern().get(), is(expectedAccessPattern));

// Randomly recurse: We recurse only one time per level to avoid hogging test time, but we randomize which
// pipeline to recurse on, eventually requiring a recursion on the last pipeline run if one hasn't happened yet.
Expand All @@ -2099,11 +2184,11 @@ void doTestNestedAccessPatternPropagation(int level, int maxCallDepth, IngestDoc
assertThat(document.getPipelineStack().size(), is(equalTo(level)));
if (level == 0) {
// Top level means access pattern should be empty
assertThat(document.getCurrentAccessPattern(), is(nullValue()));
assertThat(document.getCurrentAccessPattern().isEmpty(), is(true));
} else {
// If we're nested below the top level we should still have an access
// pattern on the document for the pipeline above us
assertThat(document.getCurrentAccessPattern(), is(not(nullValue())));
assertThat(document.getCurrentAccessPattern().isPresent(), is(true));
}
}
logger.debug("LEVEL {}/{}: COMPLETE", level, maxCallDepth);
Expand Down