@@ -49,7 +49,7 @@ public class NormalizeForStreamProcessor extends AbstractProcessor {
4949 /**
5050 * Mapping of ECS field names to their corresponding OpenTelemetry-compatible counterparts.
5151 */
52- private static final Map <String , String > RENAME_KEYS = Map .ofEntries (
52+ static final Map <String , String > RENAME_KEYS = Map .ofEntries (
5353 entry ("span.id" , "span_id" ),
5454 entry ("message" , "body.text" ),
5555 entry ("log.level" , "severity_text" ),
@@ -69,12 +69,12 @@ public class NormalizeForStreamProcessor extends AbstractProcessor {
6969 Set <String > keepKeys = new HashSet <>(Set .of ("@timestamp" , "attributes" , "resource" ));
7070 Set <String > renamedTopLevelFields = new HashSet <>();
7171 for (String value : RENAME_KEYS .values ()) {
72- // if the renamed field is nested, we only need to know the top level field
73- int dotIndex = value . indexOf ( '.' ) ;
74- if ( dotIndex != - 1 ) {
75- renamedTopLevelFields . add ( value . substring ( 0 , dotIndex ));
76- } else {
77- renamedTopLevelFields .add (value );
72+ // if the renamed field is nested, generate the full list of paths that it could be rooted under
73+ String workingKey = null ;
74+ String [] values = value . split ( " \\ ." );
75+ for ( String part : values ) {
76+ workingKey = workingKey == null ? part : workingKey + "." + part ;
77+ renamedTopLevelFields .add (workingKey );
7878 }
7979 }
8080 keepKeys .addAll (renamedTopLevelFields );
@@ -244,7 +244,29 @@ static boolean isOTelDocument(Map<String, Object> source) {
244244 }
245245
246246 /**
247- * Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts, based on the {@code RENAME_KEYS} map.
247+ * Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts in a way that is compatible with the
248+ * current access pattern on the IngestDocument.
249+ *
250+ * <p>This method performs the following operations:
251+ * <ul>
252+ * <li>For each key in the {@code RENAME_KEYS} map, it checks if a corresponding field exists in the document.</li>
253+ * <li>If the field exists, it removes it from the document and adds a new field with the corresponding name from the
254+ * {@code RENAME_KEYS} map and the same value.</li>
255+ * <li>If the key is nested (contains dots), it recursively removes empty parent fields after renaming.</li>
256+ * </ul>
257+ *
258+ * @param document the document to process
259+ */
260+ static void renameSpecialKeys (IngestDocument document ) {
261+ switch (document .getCurrentAccessPatternSafe ()) {
262+ case CLASSIC -> renameSpecialKeysClassic (document );
263+ case FLEXIBLE -> renameSpecialKeysFlexible (document );
264+ }
265+ }
266+
267+ /**
268+ * Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts using logic compatible with the
269+ * {@link org.elasticsearch.ingest.IngestPipelineFieldAccessPattern#CLASSIC} access pattern and based on the {@code RENAME_KEYS} map.
248270 *
249271 * <p>This method performs the following operations:
250272 * <ul>
@@ -257,7 +279,7 @@ static boolean isOTelDocument(Map<String, Object> source) {
257279 *
258280 * @param document the document to process
259281 */
260- static void renameSpecialKeys (IngestDocument document ) {
282+ static void renameSpecialKeysClassic (IngestDocument document ) {
261283 RENAME_KEYS .forEach ((nonOtelName , otelName ) -> {
262284 boolean fieldExists = false ;
263285 Object value = null ;
@@ -272,7 +294,7 @@ static void renameSpecialKeys(IngestDocument document) {
272294 String parentName = nonOtelName .substring (0 , lastDot );
273295 // parent should never be null and must be a map if we are here
274296 @ SuppressWarnings ("unchecked" )
275- Map <String , Object > parent = ( Map < String , Object >) document .getFieldValue (parentName , Map .class );
297+ Map <String , Object > parent = document .getFieldValue (parentName , Map .class );
276298 if (parent .isEmpty ()) {
277299 document .removeField (parentName );
278300 } else {
@@ -294,6 +316,76 @@ static void renameSpecialKeys(IngestDocument document) {
294316 });
295317 }
296318
319+ /**
320+ * Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts using logic compatible with the
321+ * {@link org.elasticsearch.ingest.IngestPipelineFieldAccessPattern#FLEXIBLE} access pattern and based on the {@code RENAME_KEYS} map.
322+ *
323+ * <p>This method performs the following operations:
324+ * <ul>
325+ * <li>For each key in the {@code RENAME_KEYS} map, it checks if a corresponding field exists in the document.</li>
326+ * <li>If the field exists, it removes it from the document and adds a new field with the corresponding name from the
327+ * {@code RENAME_KEYS} map and the same value. If a field's parent objects do not exist, it will progressively build
328+ * each parent object instead of concatenating the field names together.</li>
329+ * <li>If the key is nested (contains dots), it recursively removes empty parent fields after renaming.</li>
330+ * </ul>
331+ *
332+ * @param document the document to process
333+ */
334+ static void renameSpecialKeysFlexible (IngestDocument document ) {
335+ RENAME_KEYS .forEach ((nonOtelName , otelName ) -> {
336+ boolean fieldExists = false ;
337+ Object value = null ;
338+ if (document .hasField (nonOtelName )) {
339+ // Dotted fields are treated the same as normalized fields in flexible mode
340+ fieldExists = true ;
341+ value = document .getFieldValue (nonOtelName , Object .class , true );
342+ document .removeField (nonOtelName );
343+ // recursively remove empty parent fields
344+ int lastDot = nonOtelName .lastIndexOf ('.' );
345+ while (lastDot > 0 ) {
346+ String parentName = nonOtelName .substring (0 , lastDot );
347+ // In flexible mode, dotted field names can be removed. Parent paths may not exist since they might be included
348+ // by the dotted field removal (e.g. For the doc {a:{b.c:1}}, removing a.b.c will not leave an a.b field because
349+ // there is no a.b field to start with.
350+ @ SuppressWarnings ("unchecked" )
351+ Map <String , Object > parent = document .getFieldValue (parentName , Map .class , true );
352+ if (parent != null ) {
353+ if (parent .isEmpty ()) {
354+ document .removeField (parentName );
355+ } else {
356+ break ;
357+ }
358+ }
359+ lastDot = parentName .lastIndexOf ('.' );
360+ }
361+ }
362+ if (fieldExists ) {
363+ // Flexible mode creates dotted field names when parent fields are not present. We expect the rename keys to be
364+ // normalized after processing, so we progressively build each field's parents if it's a dotted field.
365+ Map <String , Object > source = document .getSource ();
366+ String remainingPath = otelName ;
367+ int dot = remainingPath .indexOf ('.' );
368+ while (dot > 0 ) {
369+ // Dotted field, emulate classic mode by building out each parent object
370+ String fieldName = remainingPath .substring (0 , dot );
371+ remainingPath = remainingPath .substring (dot + 1 );
372+ Object existingParent = source .get (fieldName );
373+ if (existingParent instanceof Map ) {
374+ @ SuppressWarnings ("unchecked" )
375+ Map <String , Object > castAssignment = (Map <String , Object >) existingParent ;
376+ source = castAssignment ;
377+ } else {
378+ Map <String , Object > map = new HashMap <>();
379+ source .put (fieldName , map );
380+ source = map ;
381+ }
382+ dot = remainingPath .indexOf ('.' );
383+ }
384+ source .put (remainingPath , value );
385+ }
386+ });
387+ }
388+
297389 private static void moveResourceAttributes (Map <String , Object > attributes , Map <String , Object > resourceAttributes ) {
298390 Set <String > ecsResourceFields = EcsOTelResourceAttributes .LATEST ;
299391 Iterator <Map .Entry <String , Object >> attributeIterator = attributes .entrySet ().iterator ();
0 commit comments