-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Adding NormalizeForStreamProcessor
#125699
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 10 commits
Commits
Show all changes
66 commits
Select commit
Hold shift + click to select a range
30b0ed3
Adding EcsNamespacingProcessor
eyalkoren b96cc0c
Adding module-info
eyalkoren 469df61
Merge remote-tracking branch 'upstream/main' into ECS-namespacing-pro…
eyalkoren 2bd819f
Exposing and testing the processor
eyalkoren 1c2a670
Add test and some algorithm fixes
eyalkoren 904c19c
Making scope non-mandatory
eyalkoren bd75b06
Minimize dependencies
eyalkoren 50f3c4d
Extending REST tests
eyalkoren cb0dcee
Merge remote-tracking branch 'upstream/main' into ECS-namespacing-pro…
eyalkoren f68cf93
Update docs/changelog/125699.yaml
eyalkoren 9dbe94b
Merge remote-tracking branch 'upstream/main' into ECS-namespacing-pro…
eyalkoren 79bf683
instanceOf with pattern matching
eyalkoren dbc4d4a
instanceOf with pattern matching
eyalkoren d160fe6
revert constants usage
eyalkoren dfe33fc
Merge remote-tracking branch 'eyalkoren/ECS-namespacing-processor' in…
eyalkoren 2b77e3c
Complete review change proposals
eyalkoren 4773cf6
fix typo in test name
eyalkoren 054a76f
[CI] Auto commit changes from spotless
4afacd0
Applying review suggestions
eyalkoren d67fa2c
Merge remote-tracking branch 'eyalkoren/ECS-namespacing-processor' in…
eyalkoren bef3643
Silence a warning from Intellij
joegallo 49d439f
Rename this variable
joegallo e15984b
Fix some typos and reflow a comment
joegallo e09fa14
Use ofEntries for increased clarity
joegallo b71fa3f
Save a rehash and some traversals
joegallo e8338f8
Merge branch 'main' into ECS-namespacing-processor
joegallo 98880ca
This can be static
joegallo d37587d
Rely on mutability for these tests
joegallo 21f2f9e
Rename some variables
joegallo b0037c3
Drop the top-level warnings suppression
joegallo 96ff183
Merge branch 'main' into ECS-namespacing-processor
joegallo 65f7ea2
Rewrite this test
joegallo 3639c77
Merge branch 'main' into ECS-namespacing-processor
joegallo f9421ac
Drop yaml-rest-compat-test
joegallo b2dd61d
Refactor ECS Namespacing to Normalize to OTel
eyalkoren c66663b
Apply review comments
eyalkoren 7d536a7
Adding accurate resource attributes handling
eyalkoren 9493a73
Merge remote-tracking branch 'upstream/main' into ECS-namespacing-pro…
eyalkoren bb538a6
Suppress warning for forbidden usage of System.out in tests
eyalkoren e7c1f9d
Eliminating some more forbidden APIs
eyalkoren 605bdc9
[CI] Auto commit changes from spotless
e65c0ef
Reverting shameful refactoring errors
eyalkoren 916eaca
Merge remote-tracking branch 'eyalkoren/ECS-namespacing-processor' in…
eyalkoren 07e78a2
Merge remote-tracking branch 'upstream/main' into ECS-namespacing-pro…
eyalkoren 5c34f94
Disabling ResourceAttributesTests
eyalkoren e11ea51
Adding local disk crawler
eyalkoren c85ddf1
Disabling tests with @LuceneTestCase.Nightly()
eyalkoren 717472b
Disabling forbidden APIs
eyalkoren d93f043
Disabling test and adding javadoc
eyalkoren 1c4bcaa
Merge remote-tracking branch 'upstream/main' into ECS-namespacing-pro…
eyalkoren 9d0da6a
Fix Logger#warn usage
eyalkoren 0a6a5d4
Adding documentation
eyalkoren 0f12253
Merge remote-tracking branch 'upstream/main' into ECS-namespacing-pro…
eyalkoren cd90ca9
Fix typo
eyalkoren c1c4d1f
Merge remote-tracking branch 'upstream/main' into ECS-namespacing-pro…
eyalkoren a259ce6
Disabling the tests temporarily with @Ignore
eyalkoren 9d77fcd
Merge remote-tracking branch 'upstream/main' into ECS-namespacing-pro…
eyalkoren 45f02b6
Adding to toc
eyalkoren 675ef90
Merge remote-tracking branch 'upstream/main' into ECS-namespacing-pro…
eyalkoren 2b8441d
Remove GitHub-API-based crawler
eyalkoren bf906e2
Merge remote-tracking branch 'upstream/main' into ECS-namespacing-pro…
eyalkoren f279b7a
Refactor: renaming to normalize_for_stream
eyalkoren 44ac64b
Merge branch 'main' into ECS-namespacing-processor
joegallo 55885a8
Merge branch 'main' into ECS-namespacing-processor
joegallo 978b163
Merge branch 'main' into ECS-namespacing-processor
joegallo bc4a015
Merge branch 'main' into ECS-namespacing-processor
eyalkoren File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 125699 | ||
| summary: Adding `EcsNamespacingProcessor` | ||
| area: Ingest Node | ||
| type: feature | ||
| issues: [] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| apply plugin: 'elasticsearch.internal-yaml-rest-test' | ||
| apply plugin: 'elasticsearch.yaml-rest-compat-test' | ||
|
|
||
| esplugin { | ||
| description = 'Ingest processor that applies ECS namespacing' | ||
| classname ='org.elasticsearch.ingest.ecs.EcsNamespacingPlugin' | ||
| } | ||
|
|
||
| restResources { | ||
| restApi { | ||
| include '_common', 'indices', 'index', 'cluster', 'nodes', 'get', 'ingest' | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| module org.elasticsearch.ingest.ecs { | ||
| requires org.elasticsearch.base; | ||
| requires org.elasticsearch.server; | ||
| } |
24 changes: 24 additions & 0 deletions
24
modules/ingest-ecs/src/main/java/org/elasticsearch/ingest/ecs/EcsNamespacingPlugin.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.ingest.ecs; | ||
|
|
||
| import org.elasticsearch.ingest.Processor; | ||
| import org.elasticsearch.plugins.IngestPlugin; | ||
| import org.elasticsearch.plugins.Plugin; | ||
|
|
||
| import java.util.Map; | ||
|
|
||
| public class EcsNamespacingPlugin extends Plugin implements IngestPlugin { | ||
|
|
||
| @Override | ||
| public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) { | ||
| return Map.of(EcsNamespacingProcessor.TYPE, new EcsNamespacingProcessor.Factory()); | ||
| } | ||
| } |
217 changes: 217 additions & 0 deletions
217
modules/ingest-ecs/src/main/java/org/elasticsearch/ingest/ecs/EcsNamespacingProcessor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,217 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.ingest.ecs; | ||
|
|
||
| import org.elasticsearch.cluster.metadata.ProjectId; | ||
| import org.elasticsearch.common.util.Maps; | ||
| import org.elasticsearch.common.util.set.Sets; | ||
| import org.elasticsearch.ingest.AbstractProcessor; | ||
| import org.elasticsearch.ingest.IngestDocument; | ||
| import org.elasticsearch.ingest.Processor; | ||
|
|
||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
|
|
||
| public class EcsNamespacingProcessor extends AbstractProcessor { | ||
|
|
||
| public static final String TYPE = "ecs_namespacing"; | ||
eyalkoren marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| private static final Map<String, String> RENAME_KEYS = Map.of( | ||
eyalkoren marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| "span.id", | ||
| "span_id", | ||
| "message", | ||
| "body.text", | ||
| "log.level", | ||
| "severity_text", | ||
| "trace.id", | ||
| "trace_id" | ||
| ); | ||
|
|
||
| private static final Set<String> KEEP_KEYS; | ||
eyalkoren marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| static { | ||
| Set<String> keepKeys = new HashSet<>(Set.of("@timestamp", "attributes", "resource")); | ||
| Set<String> renamedTopLevelFields = new HashSet<>(); | ||
| for (String value : RENAME_KEYS.values()) { | ||
| int dotIndex = value.indexOf('.'); | ||
| if (dotIndex != -1) { | ||
| renamedTopLevelFields.add(value.substring(0, dotIndex)); | ||
| } else { | ||
| renamedTopLevelFields.add(value); | ||
| } | ||
| } | ||
| keepKeys.addAll(renamedTopLevelFields); | ||
| KEEP_KEYS = Set.copyOf(keepKeys); | ||
| } | ||
|
|
||
| private static final String AGENT_PREFIX = "agent."; | ||
| private static final String CLOUD_PREFIX = "cloud."; | ||
| private static final String HOST_PREFIX = "host."; | ||
|
|
||
| private static final String ATTRIBUTES_KEY = "attributes"; | ||
| private static final String RESOURCE_KEY = "resource"; | ||
| private static final String SCOPE_KEY = "scope"; | ||
| private static final String BODY_KEY = "body"; | ||
| private static final String TEXT_KEY = "text"; | ||
| private static final String STRUCTURED_KEY = "structured"; | ||
|
|
||
| EcsNamespacingProcessor(String tag, String description) { | ||
| super(tag, description); | ||
| } | ||
|
|
||
| @Override | ||
| public String getType() { | ||
| return TYPE; | ||
| } | ||
|
|
||
| @Override | ||
| public IngestDocument execute(IngestDocument document) { | ||
| Map<String, Object> source = document.getSource(); | ||
|
|
||
| boolean isOTel = isOTelDocument(source); | ||
| if (isOTel) { | ||
| return document; | ||
| } | ||
|
|
||
| // non-OTel document | ||
|
|
||
| Map<String, Object> newAttributes = new HashMap<>(); | ||
| // The keep keys indicate the fields that should be kept at the top level later on when applying the namespacing. | ||
| // However, at this point we need to move their original values to the new attributes namespace, except for the @timestamp field. | ||
| for (String keepKey : KEEP_KEYS) { | ||
| if (keepKey.equals("@timestamp")) { | ||
| continue; | ||
| } | ||
| Object value = source.remove(keepKey); | ||
| if (value != null) { | ||
| newAttributes.put(keepKey, value); | ||
| } | ||
eyalkoren marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| Map<String, Object> newResource = new HashMap<>(); | ||
| Map<String, Object> newResourceAttributes = new HashMap<>(); | ||
| newResource.put(ATTRIBUTES_KEY, newResourceAttributes); | ||
|
|
||
| source.put(ATTRIBUTES_KEY, newAttributes); | ||
| source.put(RESOURCE_KEY, newResource); | ||
|
|
||
| renameSpecialKeys(document); | ||
|
|
||
| // Iterate through all top level keys and move them to the appropriate namespace | ||
| for (String key : Sets.newHashSet(source.keySet())) { | ||
| if (KEEP_KEYS.contains(key)) { | ||
| continue; | ||
| } | ||
| if (shouldMoveToResourceAttributes(key)) { | ||
| Object value = source.remove(key); | ||
| newResourceAttributes.put(key, value); | ||
| } else { | ||
| Object value = source.remove(key); | ||
| newAttributes.put(key, value); | ||
| } | ||
| } | ||
|
|
||
| // Flatten attributes | ||
| source.replace(ATTRIBUTES_KEY, Maps.flatten(newAttributes, false, false)); | ||
| newResource.replace(ATTRIBUTES_KEY, Maps.flatten(newResourceAttributes, false, false)); | ||
|
|
||
| return document; | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| boolean isOTelDocument(Map<String, Object> source) { | ||
eyalkoren marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Object resource = source.get(RESOURCE_KEY); | ||
| if (resource instanceof Map == false) { | ||
| return false; | ||
| } else { | ||
| Object resourceAttributes = ((Map<String, Object>) resource).get(ATTRIBUTES_KEY); | ||
| if (resourceAttributes != null && (resourceAttributes instanceof Map) == false) { | ||
| return false; | ||
| } | ||
| } | ||
eyalkoren marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| Object scope = source.get(SCOPE_KEY); | ||
| if (scope != null && scope instanceof Map == false) { | ||
| return false; | ||
| } | ||
|
|
||
| Object attributes = source.get(ATTRIBUTES_KEY); | ||
| if (attributes != null && attributes instanceof Map == false) { | ||
| return false; | ||
| } | ||
|
|
||
| Object body = source.get(BODY_KEY); | ||
| if (body != null) { | ||
| if (body instanceof Map == false) { | ||
| return false; | ||
| } | ||
| Object bodyText = ((Map<String, Object>) body).get(TEXT_KEY); | ||
| if (bodyText != null && (bodyText instanceof String) == false) { | ||
| return false; | ||
| } | ||
| Object bodyStructured = ((Map<String, Object>) body).get(STRUCTURED_KEY); | ||
| return (bodyStructured instanceof String) == false; | ||
eyalkoren marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| return true; | ||
| } | ||
|
|
||
| private void renameSpecialKeys(IngestDocument document) { | ||
eyalkoren marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| RENAME_KEYS.forEach((nonOtelName, otelName) -> { | ||
| // first look assuming dot notation for nested fields | ||
| Object value = document.getFieldValue(nonOtelName, Object.class, true); | ||
| if (value != null) { | ||
| document.removeField(nonOtelName); | ||
| // recursively remove empty parent fields | ||
| int lastDot = nonOtelName.lastIndexOf('.'); | ||
| while (lastDot > 0) { | ||
| String parentName = nonOtelName.substring(0, lastDot); | ||
| // parent should never be null and must be a map if we are here | ||
| @SuppressWarnings("unchecked") | ||
| Map<String, Object> parent = (Map<String, Object>) document.getFieldValue(parentName, Map.class); | ||
| if (parent.isEmpty()) { | ||
| document.removeField(parentName); | ||
| } else { | ||
| break; | ||
| } | ||
| lastDot = parentName.lastIndexOf('.'); | ||
| } | ||
| } else if (nonOtelName.contains(".")) { | ||
| // look for dotted field names | ||
| value = document.getSource().remove(nonOtelName); | ||
| } | ||
| if (value != null) { | ||
| document.setFieldValue(otelName, value); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| private boolean shouldMoveToResourceAttributes(String key) { | ||
| return key.startsWith(AGENT_PREFIX) | ||
| || key.equals("agent") | ||
| || key.startsWith(CLOUD_PREFIX) | ||
| || key.equals("cloud") | ||
| || key.startsWith(HOST_PREFIX) | ||
| || key.equals("host"); | ||
eyalkoren marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| public static final class Factory implements Processor.Factory { | ||
| @Override | ||
| public Processor create( | ||
| Map<String, Processor.Factory> processorFactories, | ||
| String tag, | ||
| String description, | ||
| Map<String, Object> config, | ||
| ProjectId projectId | ||
| ) throws Exception { | ||
| return new EcsNamespacingProcessor(tag, description); | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.