diff --git a/docs/changelog/134524.yaml b/docs/changelog/134524.yaml new file mode 100644 index 0000000000000..2dac72415c15b --- /dev/null +++ b/docs/changelog/134524.yaml @@ -0,0 +1,5 @@ +pr: 134524 +summary: Add support for flexible access pattern to `NormalizeForStreamProcessor` +area: Ingest Node +type: bug +issues: [] diff --git a/modules/ingest-otel/build.gradle b/modules/ingest-otel/build.gradle index e2c4d53ca68bd..38c1631fe749d 100644 --- a/modules/ingest-otel/build.gradle +++ b/modules/ingest-otel/build.gradle @@ -24,6 +24,6 @@ dependencies { restResources { restApi { - include '_common', 'indices', 'index', 'cluster', 'nodes', 'get', 'ingest' + include '_common', 'indices', 'index', 'cluster', 'nodes', 'get', 'ingest', 'capabilities' } } diff --git a/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java index bde8420c8021c..7dc33ded87dfe 100644 --- a/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java +++ b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java @@ -49,7 +49,7 @@ public class NormalizeForStreamProcessor extends AbstractProcessor { /** * Mapping of ECS field names to their corresponding OpenTelemetry-compatible counterparts. */ - private static final Map RENAME_KEYS = Map.ofEntries( + static final Map RENAME_KEYS = Map.ofEntries( entry("span.id", "span_id"), entry("message", "body.text"), entry("log.level", "severity_text"), @@ -69,12 +69,12 @@ public class NormalizeForStreamProcessor extends AbstractProcessor { Set keepKeys = new HashSet<>(Set.of("@timestamp", "attributes", "resource")); Set renamedTopLevelFields = new HashSet<>(); for (String value : RENAME_KEYS.values()) { - // if the renamed field is nested, we only need to know the top level field - int dotIndex = value.indexOf('.'); - if (dotIndex != -1) { - renamedTopLevelFields.add(value.substring(0, dotIndex)); - } else { - renamedTopLevelFields.add(value); + // if the renamed field is nested, generate the full list of paths that it could be rooted under + String workingKey = null; + String[] values = value.split("\\."); + for (String part : values) { + workingKey = workingKey == null ? part : workingKey + "." + part; + renamedTopLevelFields.add(workingKey); } } keepKeys.addAll(renamedTopLevelFields); @@ -244,7 +244,29 @@ static boolean isOTelDocument(Map source) { } /** - * Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts, based on the {@code RENAME_KEYS} map. + * Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts in a way that is compatible with the + * current access pattern on the IngestDocument. + * + *

This method performs the following operations: + *

    + *
  • For each key in the {@code RENAME_KEYS} map, it checks if a corresponding field exists in the document.
  • + *
  • If the field exists, it removes it from the document and adds a new field with the corresponding name from the + * {@code RENAME_KEYS} map and the same value.
  • + *
  • If the key is nested (contains dots), it recursively removes empty parent fields after renaming.
  • + *
+ * + * @param document the document to process + */ + static void renameSpecialKeys(IngestDocument document) { + switch (document.getCurrentAccessPatternSafe()) { + case CLASSIC -> renameSpecialKeysClassic(document); + case FLEXIBLE -> renameSpecialKeysFlexible(document); + } + } + + /** + * Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts using logic compatible with the + * {@link org.elasticsearch.ingest.IngestPipelineFieldAccessPattern#CLASSIC} access pattern and based on the {@code RENAME_KEYS} map. * *

This method performs the following operations: *

    @@ -257,7 +279,7 @@ static boolean isOTelDocument(Map source) { * * @param document the document to process */ - static void renameSpecialKeys(IngestDocument document) { + static void renameSpecialKeysClassic(IngestDocument document) { RENAME_KEYS.forEach((nonOtelName, otelName) -> { boolean fieldExists = false; Object value = null; @@ -272,7 +294,7 @@ static void renameSpecialKeys(IngestDocument document) { String parentName = nonOtelName.substring(0, lastDot); // parent should never be null and must be a map if we are here @SuppressWarnings("unchecked") - Map parent = (Map) document.getFieldValue(parentName, Map.class); + Map parent = document.getFieldValue(parentName, Map.class); if (parent.isEmpty()) { document.removeField(parentName); } else { @@ -294,6 +316,76 @@ static void renameSpecialKeys(IngestDocument document) { }); } + /** + * Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts using logic compatible with the + * {@link org.elasticsearch.ingest.IngestPipelineFieldAccessPattern#FLEXIBLE} access pattern and based on the {@code RENAME_KEYS} map. + * + *

    This method performs the following operations: + *

      + *
    • For each key in the {@code RENAME_KEYS} map, it checks if a corresponding field exists in the document.
    • + *
    • If the field exists, it removes it from the document and adds a new field with the corresponding name from the + * {@code RENAME_KEYS} map and the same value. If a field's parent objects do not exist, it will progressively build + * each parent object instead of concatenating the field names together.
    • + *
    • If the key is nested (contains dots), it recursively removes empty parent fields after renaming.
    • + *
    + * + * @param document the document to process + */ + static void renameSpecialKeysFlexible(IngestDocument document) { + RENAME_KEYS.forEach((nonOtelName, otelName) -> { + boolean fieldExists = false; + Object value = null; + if (document.hasField(nonOtelName)) { + // Dotted fields are treated the same as normalized fields in flexible mode + fieldExists = true; + value = document.getFieldValue(nonOtelName, Object.class, true); + document.removeField(nonOtelName); + // recursively remove empty parent fields + int lastDot = nonOtelName.lastIndexOf('.'); + while (lastDot > 0) { + String parentName = nonOtelName.substring(0, lastDot); + // In flexible mode, dotted field names can be removed. Parent paths may not exist since they might be included + // by the dotted field removal (e.g. For the doc {a:{b.c:1}}, removing a.b.c will not leave an a.b field because + // there is no a.b field to start with. + @SuppressWarnings("unchecked") + Map parent = document.getFieldValue(parentName, Map.class, true); + if (parent != null) { + if (parent.isEmpty()) { + document.removeField(parentName); + } else { + break; + } + } + lastDot = parentName.lastIndexOf('.'); + } + } + if (fieldExists) { + // Flexible mode creates dotted field names when parent fields are not present. We expect the rename keys to be + // normalized after processing, so we progressively build each field's parents if it's a dotted field. + Map source = document.getSource(); + String remainingPath = otelName; + int dot = remainingPath.indexOf('.'); + while (dot > 0) { + // Dotted field, emulate classic mode by building out each parent object + String fieldName = remainingPath.substring(0, dot); + remainingPath = remainingPath.substring(dot + 1); + Object existingParent = source.get(fieldName); + if (existingParent instanceof Map) { + @SuppressWarnings("unchecked") + Map castAssignment = (Map) existingParent; + source = castAssignment; + } else { + Map map = new HashMap<>(); + source.put(fieldName, map); + source = map; + } + dot = remainingPath.indexOf('.'); + } + source.put(remainingPath, value); + } + }); + } + private static void moveResourceAttributes(Map attributes, Map resourceAttributes) { Set ecsResourceFields = EcsOTelResourceAttributes.LATEST; Iterator> attributeIterator = attributes.entrySet().iterator(); diff --git a/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessorTests.java b/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessorTests.java index d1a504f8b0ee1..3c51f67b00400 100644 --- a/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessorTests.java +++ b/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessorTests.java @@ -10,7 +10,13 @@ package org.elasticsearch.ingest.otel; import org.elasticsearch.common.Strings; +import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestPipelineFieldAccessPattern; +import org.elasticsearch.ingest.IngestProcessorException; +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; @@ -20,13 +26,42 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static java.util.Map.entry; +import static org.hamcrest.CoreMatchers.is; public class NormalizeForStreamProcessorTests extends ESTestCase { private final NormalizeForStreamProcessor processor = new NormalizeForStreamProcessor("test", "test processor"); + /** + * The processor uses a static map of field paths to use for transforming from one format to another. These field paths + * must be able to work on both classic and flexible access patterns, which means they cannot use any syntax that is exclusive + * to one or the other, nor should they use features that work differently between the access patterns. + */ + public void testRenameKeysHaveUniversalSyntax() { + NormalizeForStreamProcessor.RENAME_KEYS.forEach((key, value) -> { + var keyParts = key.split("\\."); + for (String keyPart : keyParts) { + assertThat("Cannot use open bracket in rename keys", keyPart.contains("]"), is(false)); + assertThat("Cannot use close bracket in rename keys", keyPart.contains("["), is(false)); + expectThrows(NumberFormatException.class, "Cannot use numeric field name in rename keys", () -> Integer.parseInt(keyPart)); + } + var valueParts = value.split("\\."); + for (String valuePart : valueParts) { + assertThat("Cannot use open bracket in rename keys", valuePart.contains("]"), is(false)); + assertThat("Cannot use close bracket in rename keys", valuePart.contains("["), is(false)); + expectThrows( + NumberFormatException.class, + "Cannot use numeric field name in rename keys", + () -> Integer.parseInt(valuePart) + ); + } + }); + } + public void testIsOTelDocument_validMinimalOTelDocument() { Map source = new HashMap<>(); source.put("resource", new HashMap<>()); @@ -126,7 +161,7 @@ public void testExecute_validOTelDocument() { ); IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); Map shallowCopy = new HashMap<>(source); - processor.execute(document); + runWithRandomAccessPattern(document); // verify that top level keys are not moved when processing a valid OTel document assertEquals(shallowCopy, document.getSource()); } @@ -137,7 +172,7 @@ public void testExecute_nonOTelDocument() { source.put("key2", "value2"); IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); - processor.execute(document); + runWithRandomAccessPattern(document); Map result = document.getSource(); assertTrue(result.containsKey("attributes")); @@ -163,7 +198,7 @@ public void testExecute_nonOTelDocument_withExistingAttributes() { source.put("key1", "value1"); IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); - processor.execute(document); + runWithRandomAccessPattern(document); Map result = document.getSource(); assertTrue(result.containsKey("attributes")); @@ -188,7 +223,7 @@ public void testExecute_nonOTelDocument_withExistingResource() { source.put("key1", "value1"); IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); - processor.execute(document); + runWithRandomAccessPattern(document); Map result = document.getSource(); assertTrue(result.containsKey("attributes")); @@ -218,7 +253,7 @@ public void testRenameSpecialKeys_nestedForm() { source.put("trace", trace); IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); - NormalizeForStreamProcessor.renameSpecialKeys(document); + doWithRandomAccessPattern(document, NormalizeForStreamProcessor::renameSpecialKeys); Map result = document.getSource(); assertEquals("spanIdValue", result.get("span_id")); @@ -237,7 +272,7 @@ public void testRenameSpecialKeys_topLevelDottedField() { source.put("message", "this is a message"); IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); - NormalizeForStreamProcessor.renameSpecialKeys(document); + doWithRandomAccessPattern(document, NormalizeForStreamProcessor::renameSpecialKeys); Map result = document.getSource(); assertEquals("spanIdValue", result.get("span_id")); @@ -260,7 +295,7 @@ public void testRenameSpecialKeys_mixedForm() { source.put("span.id", "topLevelSpanIdValue"); IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); - NormalizeForStreamProcessor.renameSpecialKeys(document); + doWithRandomAccessPattern(document, NormalizeForStreamProcessor::renameSpecialKeys); Map result = document.getSource(); // nested form should take precedence @@ -279,7 +314,7 @@ public void testExecute_moveFlatAttributes() { source.putAll(expectedAttributes); IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); - processor.execute(document); + runWithRandomAccessPattern(document); assertTrue(source.containsKey("resource")); Map resource = get(source, "resource"); @@ -308,7 +343,7 @@ public void testExecute_moveNestedAttributes() { Map expectedAttributes = Map.of("agent.non-resource", "value", "service.non-resource", "value", "foo", "bar"); expectedAttributes.forEach(document::setFieldValue); - processor.execute(document); + runWithRandomAccessPattern(document); Map source = document.getSource(); @@ -340,7 +375,7 @@ public void testKeepNullValues() { source.put("agent.name", null); IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); - processor.execute(document); + runWithRandomAccessPattern(document); assertFalse(source.containsKey("span")); assertTrue(source.containsKey("span_id")); @@ -377,7 +412,7 @@ public void testExecute_deepFlattening() { IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); - processor.execute(document); + runWithRandomAccessPattern(document); Map result = document.getSource(); @@ -424,7 +459,7 @@ public void testExecute_arraysNotFlattened() { IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); - processor.execute(document); + runWithRandomAccessPattern(document); Map result = document.getSource(); @@ -505,7 +540,7 @@ public void testExecute_ecsJsonMessageNormalization() throws IOException { source.put("message", representJsonAsString(message)); IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); - processor.execute(document); + runWithRandomAccessPattern(document); Map result = document.getSource(); @@ -595,7 +630,7 @@ public void testExecute_nonEcsJsonMessageNormalization() throws IOException { source.put("message", representJsonAsString(message)); IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); - processor.execute(document); + runWithRandomAccessPattern(document); Map result = document.getSource(); @@ -612,7 +647,7 @@ public void testOtherPrimitiveMessage() { source.put("message", 42); IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); - processor.execute(document); + runWithRandomAccessPattern(document); Map result = document.getSource(); assertEquals(42, ((Map) result.get("body")).get("text")); @@ -627,7 +662,7 @@ public void testObjectMessage() { source.put("message", message); IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); - processor.execute(document); + runWithRandomAccessPattern(document); Map result = document.getSource(); assertEquals(message, ((Map) result.get("body")).get("text")); @@ -646,4 +681,58 @@ private static String representJsonAsString(Map json) throws IOE private static T get(Map context, String key) { return (T) context.get(key); } + + private void runWithRandomAccessPattern(IngestDocument document) { + runWithAccessPattern(randomFrom(IngestPipelineFieldAccessPattern.values()), document); + } + + private void runWithAccessPattern(IngestPipelineFieldAccessPattern accessPattern, IngestDocument document) { + runProcessorWithAccessPattern(accessPattern, document, processor); + } + + private void doWithRandomAccessPattern(IngestDocument document, Consumer action) { + doWithAccessPattern(randomFrom(IngestPipelineFieldAccessPattern.values()), document, action); + } + + private void doWithAccessPattern( + IngestPipelineFieldAccessPattern accessPattern, + IngestDocument document, + Consumer action + ) { + runProcessorWithAccessPattern(accessPattern, document, new TestProcessor(action)); + } + + private void runProcessorWithAccessPattern( + IngestPipelineFieldAccessPattern accessPattern, + IngestDocument document, + Processor processor + ) { + AtomicReference exceptionAtomicReference = new AtomicReference<>(null); + document.executePipeline( + new Pipeline( + randomAlphanumericOfLength(10), + null, + null, + null, + new CompoundProcessor(processor), + accessPattern, + null, + null, + null + ), + (ignored, ex) -> { + if (ex != null) { + if (ex instanceof IngestProcessorException ingestProcessorException) { + exceptionAtomicReference.set((Exception) ingestProcessorException.getCause()); + } else { + exceptionAtomicReference.set(ex); + } + } + } + ); + Exception exception = exceptionAtomicReference.get(); + if (exception != null) { + fail(exception); + } + } } diff --git a/modules/ingest-otel/src/yamlRestTest/resources/rest-api-spec/test/ingest-otel/30_normalize_flexible_access.yml b/modules/ingest-otel/src/yamlRestTest/resources/rest-api-spec/test/ingest-otel/30_normalize_flexible_access.yml new file mode 100644 index 0000000000000..9d5ace44f29bb --- /dev/null +++ b/modules/ingest-otel/src/yamlRestTest/resources/rest-api-spec/test/ingest-otel/30_normalize_flexible_access.yml @@ -0,0 +1,323 @@ +--- +setup: + - requires: + reason: "Flexible access pattern was added in 9.2+" + test_runner_features: [ capabilities ] + capabilities: + - method: PUT + path: /_ingest/pipeline/{id} + capabilities: [ 'field_access_pattern.flexible' ] + + - do: + ingest.put_pipeline: + id: "normalize_for_stream_pipeline" + body: + field_access_pattern: flexible + processors: + - normalize_for_stream: {} + +--- +teardown: + - do: + ingest.delete_pipeline: + id: "normalize_for_stream_pipeline" + ignore: 404 + +--- +"Test normalized document": + - do: + index: + index: normalize_for_stream_test + id: "nested_and_flat_attributes" + pipeline: "normalize_for_stream_pipeline" + body: { + "@timestamp": "2024-01-01T00:00:00.000Z", + "message": "test", + "log": { + "level": "info", + "logger": "nginx" + } + } + + - do: + get: + index: normalize_for_stream_test + id: "nested_and_flat_attributes" + - match: { _source.attributes.log\.logger: "nginx" } + - match: { _source.body.text: "test" } + - match: { _source.resource.attributes: {} } + - match: { _source.severity_text: "info" } + - match: { _source.log: null } + - match: { _source.message: null } + +--- +"Test json message with dotted fields": + - do: + index: + index: normalize_for_stream_test + id: "nested_and_flat_attributes" + pipeline: "normalize_for_stream_pipeline" + body: { + "@timestamp": "2024-01-01T00:00:00.000Z", + "message": '{"@timestamp": "2024-01-01T00:00:00.000Z","log.level": "info","log.logger": "nginx","message": "test"}' + } + + - do: + get: + index: normalize_for_stream_test + id: "nested_and_flat_attributes" + - match: { _source.attributes.log\.logger: "nginx" } + - match: { _source.body.text: "test" } + - match: { _source.resource.attributes: {} } + - match: { _source.severity_text: "info" } + - match: { _source.log: null } + - match: { _source.message: null } + +--- +"Test attributes namespacing": + - do: + index: + index: normalize_for_stream_test + id: "nested_and_flat_attributes" + pipeline: "normalize_for_stream_pipeline" + body: { + "agent.name": "agentNameValue", + "agent": { + "type": "agentTypeValue", + "deep": { + "nested": "nestedValue", + "scalar-array": [ + "arrayValue1", + "arrayValue2" + ], + "object-array": [ + { + "key1": "value1" + }, + { + "key2": "value2" + } + ] + }, + "scalar-array": [ + "arrayValue1", + "arrayValue2" + ] + }, + "cloud.region": "cloudRegionValue", + "cloud": { + "service": { + "name": [ + "nameArrayValue1", + "nameArrayValue2" + ] + }, + "account.id": [ + { + "key1": "value1" + }, + { + "key2": "value2" + } + ], + }, + "host.name": "hostNameValue", + "host": { + "type": "hostTypeValue" + }, + "service.name": "serviceNameValue", + "service": { + "type": "serviceTypeValue", + } + } + + - do: + get: + index: normalize_for_stream_test + id: "nested_and_flat_attributes" + - match: { _source.resource.attributes.agent\.name: "agentNameValue" } + - match: { _source.resource.attributes.agent\.type: "agentTypeValue" } + - match: { _source.resource.attributes.cloud\.region: "cloudRegionValue" } + - match: { _source.resource.attributes.cloud\.service\.name: ["nameArrayValue1", "nameArrayValue2"] } + - match: { _source.resource.attributes.cloud\.service\.name.0: "nameArrayValue1" } + - match: { _source.resource.attributes.cloud\.service\.name.1: "nameArrayValue2" } + - match: { _source.resource.attributes.cloud\.account\.id: [{"key1" : "value1"}, {"key2" : "value2"}] } + - match: { _source.resource.attributes.cloud\.account\.id.0.key1: "value1" } + - match: { _source.resource.attributes.cloud\.account\.id.1.key2: "value2" } + - match: { _source.resource.attributes.host\.name: "hostNameValue" } + - match: { _source.resource.attributes.host\.type: "hostTypeValue" } + - match: { _source.resource.attributes.service\.name: "serviceNameValue" } + - match: { _source.attributes.agent\.scalar-array.0: "arrayValue1" } + - match: { _source.attributes.agent\.scalar-array.1: "arrayValue2" } + - match: { _source.attributes.agent\.deep\.nested: "nestedValue" } + - match: { _source.attributes.agent\.deep\.scalar-array.0: "arrayValue1" } + - match: { _source.attributes.agent\.deep\.scalar-array.1: "arrayValue2" } + - match: { _source.attributes.agent\.deep\.object-array.0.key1: "value1" } + - match: { _source.attributes.agent\.deep\.object-array.1.key2: "value2" } + - match: { _source.attributes.service\.type: "serviceTypeValue" } + - match: { _source.agent\.name: null } + - match: { _source.agent: null } + - match: { _source.agent.type: null } + - match: { _source.cloud\.region: null } + - match: { _source.cloud: null } + - match: { _source.host\.name: null } + - match: { _source.host: null } + - match: { _source.service\.name: null } + - match: { _source.service: null } + +--- +"Test rename special keys": + - do: + index: + index: normalize_for_stream_test + id: "rename_special_keys" + pipeline: "normalize_for_stream_pipeline" + body: { + "span": { + "id": "nestedSpanIdValue" + }, + "span.id": "topLevelSpanIdValue", + "log.level": "topLevelLogLevelValue", + "trace": { + "id": "traceIdValue" + }, + "trace.id": "topLevelTraceIdValue", + "message": "this is a message" + } + + - do: + get: + index: normalize_for_stream_test + id: "rename_special_keys" + - match: { _source.span_id: "nestedSpanIdValue" } + - match: { _source.severity_text: "topLevelLogLevelValue" } + - match: { _source.trace_id: "traceIdValue" } + - match: { _source.body.text: "this is a message" } + - match: { _source.span: null } + - match: { _source.span\.id: null } + - match: { _source.log\.level: null } + - match: { _source.trace: null } + - match: { _source.trace\.id: null } + - match: { _source.message: null } + +--- +"Test valid OTel document": + - do: + index: + index: normalize_for_stream_test + id: "valid_otel_document" + pipeline: "normalize_for_stream_pipeline" + body: { + "resource": { + "attributes": { + "foo": "bar" + } + }, + "scope": { + "foo": "bar" + }, + "attributes": { + "foo": "bar" + }, + "body": { + "text": "a string", + "structured": {} + }, + "span_id": "spanIdValue", + "trace_id": "traceIdValue", + "severity_text": "severityTextValue", + "foo": "bar" + } + + - do: + get: + index: normalize_for_stream_test + id: "valid_otel_document" + - match: { _source.resource.attributes.foo: "bar" } + - match: { _source.scope.foo: "bar" } + - match: { _source.attributes.foo: "bar" } + - match: { _source.body.text: "a string" } + - match: { _source.body.structured: {} } + - match: { _source.span_id: "spanIdValue" } + - match: { _source.trace_id: "traceIdValue" } + - match: { _source.severity_text: "severityTextValue" } + - match: { _source.foo: "bar" } + +--- +"Test dots in a valid OTel document": + - do: + index: + index: normalize_for_stream_test + id: "valid_otel_document" + pipeline: "normalize_for_stream_pipeline" + body: { + "resource": { + "attributes": { + "foo": "bar" + } + }, + "scope": { + "foo": "bar" + }, + "attributes": { + "foo": "bar" + }, + "body.text": "a string", + "body.structured": {}, + "span_id": "spanIdValue", + "trace_id": "traceIdValue", + "severity_text": "severityTextValue", + "foo": "bar" + } + + - do: + get: + index: normalize_for_stream_test + id: "valid_otel_document" + - match: { _source.resource.attributes.foo: "bar" } + - match: { _source.scope.foo: "bar" } + - match: { _source.attributes.foo: "bar" } + - match: { _source.body\.text: "a string" } + - match: { _source.body\.structured: {} } + - match: { _source.span_id: "spanIdValue" } + - match: { _source.trace_id: "traceIdValue" } + - match: { _source.severity_text: "severityTextValue" } + - match: { _source.foo: "bar" } + +--- +"Test invalid body field": + - do: + index: + index: normalize_for_stream_test + id: "invalid_body_field" + pipeline: "normalize_for_stream_pipeline" + body: { + "resource": {}, + "scope": { + "foo": "bar" + }, + "body": { + "text": 123, + "structured": { + "foo": "bar" + } + }, + "span_id": "spanIdValue", + "trace_id": "traceIdValue", + "severity_text": "severityTextValue", + "foo": "bar" + } + + - do: + get: + index: normalize_for_stream_test + id: "invalid_body_field" + - match: { _source.attributes.body\.text: 123 } + - match: { _source.attributes.body\.structured\.foo: "bar" } + - match: { _source.attributes.scope\.foo: "bar" } + - match: { _source.attributes.span_id: "spanIdValue" } + - match: { _source.attributes.trace_id: "traceIdValue" } + - match: { _source.attributes.severity_text: "severityTextValue" } + - match: { _source.attributes.foo: "bar" } + - match: { _source.body: null } + - match: { _source.scope: null } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index b97968de8da14..f1ff163721004 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -1166,7 +1166,7 @@ public Optional getCurrentAccessPattern() { * @return The access pattern for any currently executing pipelines, or {@link IngestPipelineFieldAccessPattern#CLASSIC} if no * pipelines are in progress for this doc for the sake of backwards compatibility */ - private IngestPipelineFieldAccessPattern getCurrentAccessPatternSafe() { + public IngestPipelineFieldAccessPattern getCurrentAccessPatternSafe() { return getCurrentAccessPattern().orElse(IngestPipelineFieldAccessPattern.CLASSIC); }