Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/134524.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 134524
summary: Add support for flexible access pattern to `NormalizeForStreamProcessor`
area: Ingest Node
type: bug
issues: []
2 changes: 1 addition & 1 deletion modules/ingest-otel/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ dependencies {

restResources {
restApi {
include '_common', 'indices', 'index', 'cluster', 'nodes', 'get', 'ingest'
include '_common', 'indices', 'index', 'cluster', 'nodes', 'get', 'ingest', 'capabilities'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class NormalizeForStreamProcessor extends AbstractProcessor {
/**
* Mapping of ECS field names to their corresponding OpenTelemetry-compatible counterparts.
*/
private static final Map<String, String> RENAME_KEYS = Map.ofEntries(
static final Map<String, String> RENAME_KEYS = Map.ofEntries(
entry("span.id", "span_id"),
entry("message", "body.text"),
entry("log.level", "severity_text"),
Expand All @@ -69,12 +69,12 @@ public class NormalizeForStreamProcessor extends AbstractProcessor {
Set<String> keepKeys = new HashSet<>(Set.of("@timestamp", "attributes", "resource"));
Set<String> renamedTopLevelFields = new HashSet<>();
for (String value : RENAME_KEYS.values()) {
// if the renamed field is nested, we only need to know the top level field
int dotIndex = value.indexOf('.');
if (dotIndex != -1) {
renamedTopLevelFields.add(value.substring(0, dotIndex));
} else {
renamedTopLevelFields.add(value);
// if the renamed field is nested, generate the full list of paths that it could be rooted under
String workingKey = null;
String[] values = value.split("\\.");
for (String part : values) {
workingKey = workingKey == null ? part : workingKey + "." + part;
renamedTopLevelFields.add(workingKey);
}
}
keepKeys.addAll(renamedTopLevelFields);
Expand Down Expand Up @@ -244,7 +244,29 @@ static boolean isOTelDocument(Map<String, Object> source) {
}

/**
* Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts, based on the {@code RENAME_KEYS} map.
* Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts in a way that is compatible with the
* current access pattern on the IngestDocument.
*
* <p>This method performs the following operations:
* <ul>
* <li>For each key in the {@code RENAME_KEYS} map, it checks if a corresponding field exists in the document.</li>
* <li>If the field exists, it removes it from the document and adds a new field with the corresponding name from the
* {@code RENAME_KEYS} map and the same value.</li>
* <li>If the key is nested (contains dots), it recursively removes empty parent fields after renaming.</li>
* </ul>
*
* @param document the document to process
*/
static void renameSpecialKeys(IngestDocument document) {
switch (document.getCurrentAccessPatternSafe()) {
case CLASSIC -> renameSpecialKeysClassic(document);
case FLEXIBLE -> renameSpecialKeysFlexible(document);
}
}

/**
* Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts using logic compatible with the
* {@link org.elasticsearch.ingest.IngestPipelineFieldAccessPattern#CLASSIC} access pattern and based on the {@code RENAME_KEYS} map.
*
* <p>This method performs the following operations:
* <ul>
Expand All @@ -257,7 +279,7 @@ static boolean isOTelDocument(Map<String, Object> source) {
*
* @param document the document to process
*/
static void renameSpecialKeys(IngestDocument document) {
static void renameSpecialKeysClassic(IngestDocument document) {
RENAME_KEYS.forEach((nonOtelName, otelName) -> {
boolean fieldExists = false;
Object value = null;
Expand All @@ -272,7 +294,7 @@ static void renameSpecialKeys(IngestDocument document) {
String parentName = nonOtelName.substring(0, lastDot);
// parent should never be null and must be a map if we are here
@SuppressWarnings("unchecked")
Map<String, Object> parent = (Map<String, Object>) document.getFieldValue(parentName, Map.class);
Map<String, Object> parent = document.getFieldValue(parentName, Map.class);
if (parent.isEmpty()) {
document.removeField(parentName);
} else {
Expand All @@ -294,6 +316,76 @@ static void renameSpecialKeys(IngestDocument document) {
});
}

/**
* Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts using logic compatible with the
* {@link org.elasticsearch.ingest.IngestPipelineFieldAccessPattern#FLEXIBLE} access pattern and based on the {@code RENAME_KEYS} map.
*
* <p>This method performs the following operations:
* <ul>
* <li>For each key in the {@code RENAME_KEYS} map, it checks if a corresponding field exists in the document.</li>
* <li>If the field exists, it removes it from the document and adds a new field with the corresponding name from the
* {@code RENAME_KEYS} map and the same value. If a field's parent objects do not exist, it will progressively build
* each parent object instead of concatenating the field names together.</li>
* <li>If the key is nested (contains dots), it recursively removes empty parent fields after renaming.</li>
* </ul>
*
* @param document the document to process
*/
static void renameSpecialKeysFlexible(IngestDocument document) {
RENAME_KEYS.forEach((nonOtelName, otelName) -> {
boolean fieldExists = false;
Object value = null;
if (document.hasField(nonOtelName)) {
// Dotted fields are treated the same as normalized fields in flexible mode
fieldExists = true;
value = document.getFieldValue(nonOtelName, Object.class, true);
document.removeField(nonOtelName);
// recursively remove empty parent fields
int lastDot = nonOtelName.lastIndexOf('.');
while (lastDot > 0) {
String parentName = nonOtelName.substring(0, lastDot);
// In flexible mode, dotted field names can be removed. Parent paths may not exist since they might be included
// by the dotted field removal (e.g. For the doc {a:{b.c:1}}, removing a.b.c will not leave an a.b field because
// there is no a.b field to start with.
@SuppressWarnings("unchecked")
Map<String, Object> parent = document.getFieldValue(parentName, Map.class, true);
if (parent != null) {
if (parent.isEmpty()) {
document.removeField(parentName);
} else {
break;
}
}
lastDot = parentName.lastIndexOf('.');
}
}
if (fieldExists) {
// Flexible mode creates dotted field names when parent fields are not present. We expect the rename keys to be
// normalized after processing, so we progressively build each field's parents if it's a dotted field.
Map<String, Object> source = document.getSource();
String remainingPath = otelName;
int dot = remainingPath.indexOf('.');
while (dot > 0) {
// Dotted field, emulate classic mode by building out each parent object
String fieldName = remainingPath.substring(0, dot);
remainingPath = remainingPath.substring(dot + 1);
Object existingParent = source.get(fieldName);
if (existingParent instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> castAssignment = (Map<String, Object>) existingParent;
source = castAssignment;
} else {
Map<String, Object> map = new HashMap<>();
source.put(fieldName, map);
source = map;
}
dot = remainingPath.indexOf('.');
}
source.put(remainingPath, value);
}
});
}

private static void moveResourceAttributes(Map<String, Object> attributes, Map<String, Object> resourceAttributes) {
Set<String> ecsResourceFields = EcsOTelResourceAttributes.LATEST;
Iterator<Map.Entry<String, Object>> attributeIterator = attributes.entrySet().iterator();
Expand Down
Loading