Skip to content

Commit bc84928

Browse files
jbaieramridula-s109
authored andcommitted
Add support for flexible access pattern to NormalizeForStreamProcessor (elastic#134524)
Fixes field querying and writing logic for NormalizeForStreamProcessor so that it can function on both `classic` and `flexible` ingest pipeline access patterns. NormalizeForStreamProcessor was added in elastic#125699 with support for the default ingest node field access logic (now known as `classic` mode). We have since added support for the `flexible` access pattern in ingest pipelines, which allows for querying dotted field names and writing dotted field names when parent path elements are missing. The NormalizeForStreamProcessor was written with the classic access pattern in mind. The processor was designed to look for singular field names and to rely on the classic field writing logic which creates intermediate parent objects when setting a value that is nested in the document. When flexible mode was enabled, the logic did not anticipate dotted field names that could be inconsistently accessible from the source map at certain points in the path notation. Further, the flexible access pattern does not create intermediate parent objects like before. A secondary renaming method was added to take these changes into account.
1 parent 0c545d8 commit bc84928

File tree

6 files changed

+537
-28
lines changed

6 files changed

+537
-28
lines changed

docs/changelog/134524.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 134524
2+
summary: Add support for flexible access pattern to `NormalizeForStreamProcessor`
3+
area: Ingest Node
4+
type: bug
5+
issues: []

modules/ingest-otel/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,6 @@ dependencies {
2424

2525
restResources {
2626
restApi {
27-
include '_common', 'indices', 'index', 'cluster', 'nodes', 'get', 'ingest'
27+
include '_common', 'indices', 'index', 'cluster', 'nodes', 'get', 'ingest', 'capabilities'
2828
}
2929
}

modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java

Lines changed: 102 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class NormalizeForStreamProcessor extends AbstractProcessor {
4949
/**
5050
* Mapping of ECS field names to their corresponding OpenTelemetry-compatible counterparts.
5151
*/
52-
private static final Map<String, String> RENAME_KEYS = Map.ofEntries(
52+
static final Map<String, String> RENAME_KEYS = Map.ofEntries(
5353
entry("span.id", "span_id"),
5454
entry("message", "body.text"),
5555
entry("log.level", "severity_text"),
@@ -69,12 +69,12 @@ public class NormalizeForStreamProcessor extends AbstractProcessor {
6969
Set<String> keepKeys = new HashSet<>(Set.of("@timestamp", "attributes", "resource"));
7070
Set<String> renamedTopLevelFields = new HashSet<>();
7171
for (String value : RENAME_KEYS.values()) {
72-
// if the renamed field is nested, we only need to know the top level field
73-
int dotIndex = value.indexOf('.');
74-
if (dotIndex != -1) {
75-
renamedTopLevelFields.add(value.substring(0, dotIndex));
76-
} else {
77-
renamedTopLevelFields.add(value);
72+
// if the renamed field is nested, generate the full list of paths that it could be rooted under
73+
String workingKey = null;
74+
String[] values = value.split("\\.");
75+
for (String part : values) {
76+
workingKey = workingKey == null ? part : workingKey + "." + part;
77+
renamedTopLevelFields.add(workingKey);
7878
}
7979
}
8080
keepKeys.addAll(renamedTopLevelFields);
@@ -244,7 +244,29 @@ static boolean isOTelDocument(Map<String, Object> source) {
244244
}
245245

246246
/**
247-
* Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts, based on the {@code RENAME_KEYS} map.
247+
* Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts in a way that is compatible with the
248+
* current access pattern on the IngestDocument.
249+
*
250+
* <p>This method performs the following operations:
251+
* <ul>
252+
* <li>For each key in the {@code RENAME_KEYS} map, it checks if a corresponding field exists in the document.</li>
253+
* <li>If the field exists, it removes it from the document and adds a new field with the corresponding name from the
254+
* {@code RENAME_KEYS} map and the same value.</li>
255+
* <li>If the key is nested (contains dots), it recursively removes empty parent fields after renaming.</li>
256+
* </ul>
257+
*
258+
* @param document the document to process
259+
*/
260+
static void renameSpecialKeys(IngestDocument document) {
261+
switch (document.getCurrentAccessPatternSafe()) {
262+
case CLASSIC -> renameSpecialKeysClassic(document);
263+
case FLEXIBLE -> renameSpecialKeysFlexible(document);
264+
}
265+
}
266+
267+
/**
268+
* Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts using logic compatible with the
269+
* {@link org.elasticsearch.ingest.IngestPipelineFieldAccessPattern#CLASSIC} access pattern and based on the {@code RENAME_KEYS} map.
248270
*
249271
* <p>This method performs the following operations:
250272
* <ul>
@@ -257,7 +279,7 @@ static boolean isOTelDocument(Map<String, Object> source) {
257279
*
258280
* @param document the document to process
259281
*/
260-
static void renameSpecialKeys(IngestDocument document) {
282+
static void renameSpecialKeysClassic(IngestDocument document) {
261283
RENAME_KEYS.forEach((nonOtelName, otelName) -> {
262284
boolean fieldExists = false;
263285
Object value = null;
@@ -272,7 +294,7 @@ static void renameSpecialKeys(IngestDocument document) {
272294
String parentName = nonOtelName.substring(0, lastDot);
273295
// parent should never be null and must be a map if we are here
274296
@SuppressWarnings("unchecked")
275-
Map<String, Object> parent = (Map<String, Object>) document.getFieldValue(parentName, Map.class);
297+
Map<String, Object> parent = document.getFieldValue(parentName, Map.class);
276298
if (parent.isEmpty()) {
277299
document.removeField(parentName);
278300
} else {
@@ -294,6 +316,76 @@ static void renameSpecialKeys(IngestDocument document) {
294316
});
295317
}
296318

319+
/**
320+
* Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts using logic compatible with the
321+
* {@link org.elasticsearch.ingest.IngestPipelineFieldAccessPattern#FLEXIBLE} access pattern and based on the {@code RENAME_KEYS} map.
322+
*
323+
* <p>This method performs the following operations:
324+
* <ul>
325+
* <li>For each key in the {@code RENAME_KEYS} map, it checks if a corresponding field exists in the document.</li>
326+
* <li>If the field exists, it removes it from the document and adds a new field with the corresponding name from the
327+
* {@code RENAME_KEYS} map and the same value. If a field's parent objects do not exist, it will progressively build
328+
* each parent object instead of concatenating the field names together.</li>
329+
* <li>If the key is nested (contains dots), it recursively removes empty parent fields after renaming.</li>
330+
* </ul>
331+
*
332+
* @param document the document to process
333+
*/
334+
static void renameSpecialKeysFlexible(IngestDocument document) {
335+
RENAME_KEYS.forEach((nonOtelName, otelName) -> {
336+
boolean fieldExists = false;
337+
Object value = null;
338+
if (document.hasField(nonOtelName)) {
339+
// Dotted fields are treated the same as normalized fields in flexible mode
340+
fieldExists = true;
341+
value = document.getFieldValue(nonOtelName, Object.class, true);
342+
document.removeField(nonOtelName);
343+
// recursively remove empty parent fields
344+
int lastDot = nonOtelName.lastIndexOf('.');
345+
while (lastDot > 0) {
346+
String parentName = nonOtelName.substring(0, lastDot);
347+
// In flexible mode, dotted field names can be removed. Parent paths may not exist since they might be included
348+
// by the dotted field removal (e.g. For the doc {a:{b.c:1}}, removing a.b.c will not leave an a.b field because
349+
// there is no a.b field to start with.
350+
@SuppressWarnings("unchecked")
351+
Map<String, Object> parent = document.getFieldValue(parentName, Map.class, true);
352+
if (parent != null) {
353+
if (parent.isEmpty()) {
354+
document.removeField(parentName);
355+
} else {
356+
break;
357+
}
358+
}
359+
lastDot = parentName.lastIndexOf('.');
360+
}
361+
}
362+
if (fieldExists) {
363+
// Flexible mode creates dotted field names when parent fields are not present. We expect the rename keys to be
364+
// normalized after processing, so we progressively build each field's parents if it's a dotted field.
365+
Map<String, Object> source = document.getSource();
366+
String remainingPath = otelName;
367+
int dot = remainingPath.indexOf('.');
368+
while (dot > 0) {
369+
// Dotted field, emulate classic mode by building out each parent object
370+
String fieldName = remainingPath.substring(0, dot);
371+
remainingPath = remainingPath.substring(dot + 1);
372+
Object existingParent = source.get(fieldName);
373+
if (existingParent instanceof Map) {
374+
@SuppressWarnings("unchecked")
375+
Map<String, Object> castAssignment = (Map<String, Object>) existingParent;
376+
source = castAssignment;
377+
} else {
378+
Map<String, Object> map = new HashMap<>();
379+
source.put(fieldName, map);
380+
source = map;
381+
}
382+
dot = remainingPath.indexOf('.');
383+
}
384+
source.put(remainingPath, value);
385+
}
386+
});
387+
}
388+
297389
private static void moveResourceAttributes(Map<String, Object> attributes, Map<String, Object> resourceAttributes) {
298390
Set<String> ecsResourceFields = EcsOTelResourceAttributes.LATEST;
299391
Iterator<Map.Entry<String, Object>> attributeIterator = attributes.entrySet().iterator();

0 commit comments

Comments
 (0)