Skip to content

Commit 7d536a7

Browse files
committed
Adding accurate resource attributes handling
1 parent c66663b commit 7d536a7

File tree

7 files changed

+673
-142
lines changed

7 files changed

+673
-142
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest.otel;
11+
12+
import java.util.Set;
13+
14+
final class EcsOTelResourceAttributes {
15+
16+
/**
17+
* The set of ECS (Elastic Common Schema) field names that are mapped to OpenTelemetry resource attributes,
18+
* as defined by the OpenTelemetry Semantic Conventions.
19+
* The list is produced by the {@code ResourceAttributesTests#testAttributesSetUpToDate} test.
20+
*
21+
* @see <a href="https://github.com/open-telemetry/semantic-conventions">OpenTelemetry Semantic Conventions</a>
22+
*/
23+
static final Set<String> LATEST = Set.of(
24+
"agent.type",
25+
"agent.build.original",
26+
"agent.name",
27+
"agent.id",
28+
"agent.ephemeral_id",
29+
"agent.version",
30+
"container.image.tag",
31+
"device.model.identifier",
32+
"container.image.hash.all",
33+
"service.node.name",
34+
"process.pid",
35+
"device.id",
36+
"host.mac",
37+
"host.type",
38+
"container.id",
39+
"cloud.availability_zone",
40+
"host.ip",
41+
"container.name",
42+
"container.image.name",
43+
"device.model.name",
44+
"host.name",
45+
"host.id",
46+
"process.executable",
47+
"user_agent.original",
48+
"service.environment",
49+
"cloud.region",
50+
"service.name",
51+
"faas.name",
52+
"device.manufacturer",
53+
"process.args",
54+
"host.architecture",
55+
"cloud.provider",
56+
"container.runtime",
57+
"service.version",
58+
"cloud.service.name",
59+
"cloud.account.id",
60+
"process.command_line",
61+
"faas.version"
62+
);
63+
64+
private EcsOTelResourceAttributes() {}
65+
}

modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeToOTelProcessor.java

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.HashMap;
1919
import java.util.HashSet;
20+
import java.util.Iterator;
2021
import java.util.Map;
2122
import java.util.Set;
2223

@@ -30,8 +31,9 @@
3031
* <p>More specifically, this processor performs the following operations:
3132
* <ul>
3233
* <li>Renames specific ECS fields to their corresponding OpenTelemetry-compatible counterparts.</li>
33-
* <li>Moves fields to the "attributes" and "resource.attributes" namespaces.</li>
34-
* <li>Flattens the "attributes" and "resource.attributes" maps.</li>
34+
* <li>Moves all other fields to the "attributes" namespace.</li>
35+
* <li>Flattens all attributes in the "attributes" namespace.</li>
36+
* <li>Moves resource fields from the "attributes" namespace to the "resource.attributes" namespace.</li>
3537
* </ul>
3638
*
3739
* <p>If a document is identified as OpenTelemetry-compatible, no transformation is performed.
@@ -74,10 +76,6 @@ public class NormalizeToOTelProcessor extends AbstractProcessor {
7476
KEEP_KEYS = Set.copyOf(keepKeys);
7577
}
7678

77-
private static final String AGENT_PREFIX = "agent";
78-
private static final String CLOUD_PREFIX = "cloud";
79-
private static final String HOST_PREFIX = "host";
80-
8179
private static final String ATTRIBUTES_KEY = "attributes";
8280
private static final String RESOURCE_KEY = "resource";
8381
private static final String SCOPE_KEY = "scope";
@@ -119,35 +117,29 @@ public IngestDocument execute(IngestDocument document) {
119117
}
120118
}
121119

122-
Map<String, Object> newResource = new HashMap<>();
123-
Map<String, Object> newResourceAttributes = new HashMap<>();
124-
newResource.put(ATTRIBUTES_KEY, newResourceAttributes);
125-
126120
source.put(ATTRIBUTES_KEY, newAttributes);
127-
source.put(RESOURCE_KEY, newResource);
128121

129122
renameSpecialKeys(document);
130123

131-
// Iterate through all top level keys and move them to the appropriate namespace
124+
// move all top level keys except from specific ones to the "attributes" namespace
132125
final var sourceItr = source.entrySet().iterator();
133126
while (sourceItr.hasNext()) {
134127
final var entry = sourceItr.next();
135-
final var key = entry.getKey();
136-
final var value = entry.getValue();
137-
if (KEEP_KEYS.contains(key)) {
138-
continue;
139-
}
140-
if (shouldMoveToResourceAttributes(key)) {
141-
newResourceAttributes.put(key, value);
142-
} else {
143-
newAttributes.put(key, value);
128+
if (KEEP_KEYS.contains(entry.getKey()) == false) {
129+
newAttributes.put(entry.getKey(), entry.getValue());
130+
sourceItr.remove();
144131
}
145-
sourceItr.remove();
146132
}
147133

148134
// Flatten attributes
149-
source.put(ATTRIBUTES_KEY, Maps.flatten(newAttributes, false, false));
150-
newResource.put(ATTRIBUTES_KEY, Maps.flatten(newResourceAttributes, false, false));
135+
Map<String, Object> flattenAttributes = Maps.flatten(newAttributes, false, false);
136+
source.put(ATTRIBUTES_KEY, flattenAttributes);
137+
138+
Map<String, Object> newResource = new HashMap<>();
139+
Map<String, Object> newResourceAttributes = new HashMap<>();
140+
newResource.put(ATTRIBUTES_KEY, newResourceAttributes);
141+
source.put(RESOURCE_KEY, newResource);
142+
moveResourceAttributes(flattenAttributes, newResourceAttributes);
151143

152144
return document;
153145
}
@@ -257,13 +249,16 @@ static void renameSpecialKeys(IngestDocument document) {
257249
});
258250
}
259251

260-
private static boolean shouldMoveToResourceAttributes(String key) {
261-
return key.startsWith(AGENT_PREFIX + ".")
262-
|| key.equals(AGENT_PREFIX)
263-
|| key.startsWith(CLOUD_PREFIX + ".")
264-
|| key.equals(CLOUD_PREFIX)
265-
|| key.startsWith(HOST_PREFIX + ".")
266-
|| key.equals(HOST_PREFIX);
252+
private static void moveResourceAttributes(Map<String, Object> attributes, Map<String, Object> resourceAttributes) {
253+
Set<String> ecsResourceFields = EcsOTelResourceAttributes.LATEST;
254+
Iterator<Map.Entry<String, Object>> attributeIterator = attributes.entrySet().iterator();
255+
while (attributeIterator.hasNext()) {
256+
Map.Entry<String, Object> entry = attributeIterator.next();
257+
if (ecsResourceFields.contains(entry.getKey())) {
258+
resourceAttributes.put(entry.getKey(), entry.getValue());
259+
attributeIterator.remove();
260+
}
261+
}
267262
}
268263

269264
public static final class Factory implements Processor.Factory {
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest.otel;
11+
12+
import org.elasticsearch.xcontent.XContentFactory;
13+
import org.elasticsearch.xcontent.XContentParser;
14+
import org.elasticsearch.xcontent.XContentParserConfiguration;
15+
import org.elasticsearch.xcontent.XContentType;
16+
17+
import java.io.IOException;
18+
import java.io.InputStream;
19+
import java.net.URI;
20+
import java.net.http.HttpClient;
21+
import java.net.http.HttpRequest;
22+
import java.net.http.HttpResponse;
23+
import java.util.HashMap;
24+
import java.util.HashSet;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Set;
28+
29+
class EcsFieldsDiscoverer {
30+
31+
private static final String ECS_FLAT_FILE_URL = "https://raw.githubusercontent.com/elastic/ecs/main/generated/ecs/ecs_flat.yml";
32+
private static final String AGENT_FIELDS_PREFIX = "agent.";
33+
34+
private static final EcsFieldsDiscoverer INSTANCE = new EcsFieldsDiscoverer();
35+
36+
private final Map<String, String> ecsToOTelAttributeNames = new HashMap<>();
37+
private final Set<String> ecsResourceFields = new HashSet<>();
38+
39+
private EcsFieldsDiscoverer() {
40+
try {
41+
collectEcsAttributeNames();
42+
} catch (IOException | InterruptedException e) {
43+
throw new RuntimeException("Failed to load ECS to OpenTelemetry attribute names", e);
44+
}
45+
}
46+
47+
Map<String, String> getEcsToOTelAttributeNames() {
48+
return ecsToOTelAttributeNames;
49+
}
50+
51+
Set<String> getEcsResourceFields() {
52+
return ecsResourceFields;
53+
}
54+
55+
static EcsFieldsDiscoverer getInstance() {
56+
return INSTANCE;
57+
}
58+
59+
private void collectEcsAttributeNames() throws IOException, InterruptedException {
60+
Map<String, Object> ecsFields = loadEcsFields();
61+
for (Map.Entry<String, Object> entry : ecsFields.entrySet()) {
62+
String ecsName = entry.getKey();
63+
@SuppressWarnings("unchecked")
64+
Map<String, Object> fieldData = (Map<String, Object>) entry.getValue();
65+
@SuppressWarnings("unchecked")
66+
List<Map<String, String>> otelDataEntries = (List<Map<String, String>>) fieldData.get("otel");
67+
if (otelDataEntries != null) {
68+
for (Map<String, String> otelData : otelDataEntries) {
69+
String relation = otelData.get("relation");
70+
if ("match".equals(relation)) {
71+
ecsToOTelAttributeNames.put(ecsName, ecsName);
72+
} else if ("equivalent".equals(relation)) {
73+
String attribute = otelData.get("attribute");
74+
if (attribute != null) {
75+
ecsToOTelAttributeNames.put(ecsName, attribute);
76+
}
77+
}
78+
}
79+
}
80+
if (ecsName.startsWith(AGENT_FIELDS_PREFIX)) {
81+
// for now, we consider all agent.* fields as resource attributes, but this may change in the future
82+
ecsResourceFields.add(ecsName);
83+
}
84+
}
85+
}
86+
87+
private static Map<String, Object> loadEcsFields() throws IOException, InterruptedException {
88+
try (HttpClient httpClient = HttpClient.newHttpClient()) {
89+
HttpRequest request = HttpRequest.newBuilder().uri(URI.create(ECS_FLAT_FILE_URL)).build();
90+
HttpResponse<InputStream> response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
91+
92+
try (
93+
InputStream is = response.body();
94+
XContentParser parser = XContentFactory.xContent(XContentType.YAML).createParser(XContentParserConfiguration.EMPTY, is)
95+
) {
96+
return parser.map();
97+
}
98+
}
99+
}
100+
}

0 commit comments

Comments
 (0)