diff --git a/docs/changelog/125699.yaml b/docs/changelog/125699.yaml
new file mode 100644
index 0000000000000..29ee24da4c974
--- /dev/null
+++ b/docs/changelog/125699.yaml
@@ -0,0 +1,5 @@
+pr: 125699
+summary: Adding `NormalizeForStreamProcessor`
+area: Ingest Node
+type: feature
+issues: []
diff --git a/docs/reference/enrich-processor/index.md b/docs/reference/enrich-processor/index.md
index cf5a21ea27e0e..e220e763024e3 100644
--- a/docs/reference/enrich-processor/index.md
+++ b/docs/reference/enrich-processor/index.md
@@ -84,6 +84,9 @@ Refer to [Enrich your data](docs-content://manage-data/ingest/transform-enrich/d
[`network_direction` processor](/reference/enrich-processor/network-direction-processor.md)
: Calculates the network direction given a source IP address, destination IP address, and a list of internal networks.
+[`normalize_for_stream` processor](/reference/enrich-processor/normalize-for-stream.md)
+: Normalizes non-OpenTelemetry documents to be OpenTelemetry-compliant.
+
[`registered_domain` processor](/reference/enrich-processor/registered-domain-processor.md)
: Extracts the registered domain (also known as the effective top-level domain or eTLD), sub-domain, and top-level domain from a fully qualified domain name (FQDN).
diff --git a/docs/reference/enrich-processor/normalize-for-stream.md b/docs/reference/enrich-processor/normalize-for-stream.md
new file mode 100644
index 0000000000000..0deb8b5d8abc3
--- /dev/null
+++ b/docs/reference/enrich-processor/normalize-for-stream.md
@@ -0,0 +1,152 @@
+---
+navigation_title: "Normalize for Stream"
+mapped_pages:
+ - https://www.elastic.co/guide/en/elasticsearch/reference/current/normalize-for-stream-processor.html
+---
+
+# Normalize-for-Stream processor [normalize-for-stream-processor]
+
+
+Detects whether a document is OpenTelemetry-compliant and if not -
+normalizes it as described below. If used in combination with the OTel-related
+mappings such as the ones defined in `logs-otel@template`, the resulting
+document can be queried seamlessly by clients that expect either [ECS](https://www.elastic.co/guide/en/ecs/current/index.html) or OpenTelemetry-[Semantic-Conventions](https://github.com/open-telemetry/semantic-conventions) formats.
+
+::::{note}
+This processor is in tech preview and is not available in our serverless offering.
+::::
+
+## Detecting OpenTelemetry compliance
+
+The processor detects OpenTelemetry compliance by checking the following fields:
+* `resource` exists as a key and the value is a map
+* `resource` either doesn't contain an `attributes` field, or contains an `attributes` field of type map
+* `scope` is either missing or a map
+* `attributes` is either missing or a map
+* `body` is either missing or a map
+* `body` either doesn't contain a `text` field, or contains a `text` field of type `String`
+* `body` either doesn't contain a `structured` field, or contains a `structured` field that is not of type `String`
+
+If all of these conditions are met, the document is considered OpenTelemetry-compliant and is not modified by the processor.
+
+## Normalization
+
+If the document is not OpenTelemetry-compliant, the processor normalizes it as follows:
+* Specific ECS fields are renamed to have their corresponding OpenTelemetry Semantic Conventions attribute names. These include the following:
+
+ | ECS Field | Semantic Conventions Attribute |
+ |-------------|--------------------------------|
+ | `span.id` | `span_id` |
+ | `trace.id` | `trace_id` |
+ | `message` | `body.text` |
+ | `log.level` | `severity_text` |
+ The processor first looks for the nested form of the ECS field and if such does not exist, it looks for a top-level field with the dotted field name.
+* Other specific ECS fields that describe resources and have corresponding counterparts in the OpenTelemetry Semantic Conventions are moved to the `resource.attribtues` map. Fields that are considered resource attributes are such that conform to the following conditions:
+ * They are ECS fields that have corresponding counterparts (either with
+ the same name or with a different name) in OpenTelemetry Semantic Conventions.
+ * The corresponding OpenTelemetry attribute is defined in
+ [Semantic Conventions](https://github.com/open-telemetry/semantic-conventions/tree/main/model)
+ within a group that is defined as `type: enitity`.
+* All other fields, except for `@timestamp`, are moved to the `attributes` map.
+* All non-array entries of the `attributes` and `resource.attributes` maps are flattened. Flattening means that nested objects are merged into their parent object, and the keys are concatenated with a dot. See examples below.
+
+## Examples
+
+If an OpenTelemetry-compliant document is detected, the processor does nothing. For example, the following document will stay unchanged:
+
+```json
+{
+ "resource": {
+ "attributes": {
+ "service.name": "my-service"
+ }
+ },
+ "scope": {
+ "name": "my-library",
+ "version": "1.0.0"
+ },
+ "attributes": {
+ "http.method": "GET"
+ },
+ "body": {
+ "text": "Hello, world!"
+ }
+}
+```
+
+If a non-OpenTelemetry-compliant document is detected, the processor normalizes it. For example, the following document:
+
+```json
+{
+ "@timestamp": "2023-10-01T12:00:00Z",
+ "service": {
+ "name": "my-service",
+ "version": "1.0.0",
+ "environment": "production",
+ "language": {
+ "name": "python",
+ "version": "3.8"
+ }
+ },
+ "log": {
+ "level": "INFO"
+ },
+ "message": "Hello, world!",
+ "http": {
+ "method": "GET",
+ "url": {
+ "path": "/api/v1/resource"
+ },
+ "headers": [
+ {
+ "name": "Authorization",
+ "value": "Bearer token"
+ },
+ {
+ "name": "User-Agent",
+ "value": "my-client/1.0"
+ }
+ ]
+ },
+ "span" : {
+ "id": "1234567890abcdef"
+ },
+ "span.id": "abcdef1234567890",
+ "trace.id": "abcdef1234567890abcdef1234567890"
+}
+```
+will be normalized into the following form:
+
+```json
+{
+ "@timestamp": "2023-10-01T12:00:00Z",
+ "resource": {
+ "attributes": {
+ "service.name": "my-service",
+ "service.version": "1.0.0",
+ "service.environment": "production"
+ }
+ },
+ "attributes": {
+ "service.language.name": "python",
+ "service.language.version": "3.8",
+ "http.method": "GET",
+ "http.url.path": "/api/v1/resource",
+ "http.headers": [
+ {
+ "name": "Authorization",
+ "value": "Bearer token"
+ },
+ {
+ "name": "User-Agent",
+ "value": "my-client/1.0"
+ }
+ ]
+ },
+ "body": {
+ "text": "Hello, world!"
+ },
+ "span_id": "1234567890abcdef",
+ "trace_id": "abcdef1234567890abcdef1234567890"
+}
+```
diff --git a/docs/reference/enrich-processor/toc.yml b/docs/reference/enrich-processor/toc.yml
index f60fe7909d70f..7da271e6f0554 100644
--- a/docs/reference/enrich-processor/toc.yml
+++ b/docs/reference/enrich-processor/toc.yml
@@ -28,6 +28,7 @@ toc:
- file: kv-processor.md
- file: lowercase-processor.md
- file: network-direction-processor.md
+ - file: normalize-for-stream.md
- file: pipeline-processor.md
- file: redact-processor.md
- file: registered-domain-processor.md
@@ -44,4 +45,4 @@ toc:
- file: uppercase-processor.md
- file: urldecode-processor.md
- file: uri-parts-processor.md
- - file: user-agent-processor.md
\ No newline at end of file
+ - file: user-agent-processor.md
diff --git a/modules/ingest-otel/build.gradle b/modules/ingest-otel/build.gradle
new file mode 100644
index 0000000000000..54a00508a0a07
--- /dev/null
+++ b/modules/ingest-otel/build.gradle
@@ -0,0 +1,21 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+apply plugin: 'elasticsearch.internal-yaml-rest-test'
+
+esplugin {
+ description = 'Ingest processor that normalizes ECS documents to OpenTelemetry-compatible namespaces'
+ classname ='org.elasticsearch.ingest.otel.NormalizeForStreamPlugin'
+}
+
+restResources {
+ restApi {
+ include '_common', 'indices', 'index', 'cluster', 'nodes', 'get', 'ingest'
+ }
+}
diff --git a/modules/ingest-otel/src/main/java/module-info.java b/modules/ingest-otel/src/main/java/module-info.java
new file mode 100644
index 0000000000000..20b349d930c85
--- /dev/null
+++ b/modules/ingest-otel/src/main/java/module-info.java
@@ -0,0 +1,13 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+module org.elasticsearch.ingest.otel {
+ requires org.elasticsearch.base;
+ requires org.elasticsearch.server;
+}
diff --git a/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/EcsOTelResourceAttributes.java b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/EcsOTelResourceAttributes.java
new file mode 100644
index 0000000000000..3453a2d55506e
--- /dev/null
+++ b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/EcsOTelResourceAttributes.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest.otel;
+
+import java.util.Set;
+
+final class EcsOTelResourceAttributes {
+
+ /**
+ * The set of ECS (Elastic Common Schema) field names that are mapped to OpenTelemetry resource attributes,
+ * as defined by the OpenTelemetry Semantic Conventions.
+ * The list is produced by the {@code ResourceAttributesTests#testAttributesSetUpToDate} test.
+ *
+ * @see OpenTelemetry Semantic Conventions
+ */
+ static final Set LATEST = Set.of(
+ "agent.type",
+ "agent.build.original",
+ "agent.name",
+ "agent.id",
+ "agent.ephemeral_id",
+ "agent.version",
+ "container.image.tag",
+ "device.model.identifier",
+ "container.image.hash.all",
+ "service.node.name",
+ "process.pid",
+ "device.id",
+ "host.mac",
+ "host.type",
+ "container.id",
+ "cloud.availability_zone",
+ "host.ip",
+ "container.name",
+ "container.image.name",
+ "device.model.name",
+ "host.name",
+ "host.id",
+ "process.executable",
+ "user_agent.original",
+ "service.environment",
+ "cloud.region",
+ "service.name",
+ "faas.name",
+ "device.manufacturer",
+ "process.args",
+ "host.architecture",
+ "cloud.provider",
+ "container.runtime",
+ "service.version",
+ "cloud.service.name",
+ "cloud.account.id",
+ "process.command_line",
+ "faas.version"
+ );
+
+ private EcsOTelResourceAttributes() {}
+}
diff --git a/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamPlugin.java b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamPlugin.java
new file mode 100644
index 0000000000000..bd88603407ea5
--- /dev/null
+++ b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamPlugin.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest.otel;
+
+import org.elasticsearch.ingest.Processor;
+import org.elasticsearch.plugins.IngestPlugin;
+import org.elasticsearch.plugins.Plugin;
+
+import java.util.Map;
+
+public class NormalizeForStreamPlugin extends Plugin implements IngestPlugin {
+
+ @Override
+ public Map getProcessors(Processor.Parameters parameters) {
+ return Map.of(NormalizeForStreamProcessor.TYPE, new NormalizeForStreamProcessor.Factory());
+ }
+}
diff --git a/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java
new file mode 100644
index 0000000000000..d0b2385916823
--- /dev/null
+++ b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java
@@ -0,0 +1,276 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest.otel;
+
+import org.elasticsearch.cluster.metadata.ProjectId;
+import org.elasticsearch.common.util.Maps;
+import org.elasticsearch.ingest.AbstractProcessor;
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import static java.util.Map.entry;
+
+/**
+ * This processor is responsible for transforming non-OpenTelemetry-compliant documents into a namespaced flavor of ECS
+ * that makes them compatible with OpenTelemetry.
+ * It DOES NOT translate the entire ECS schema into OpenTelemetry semantic conventions.
+ *
+ *
More specifically, this processor performs the following operations:
+ *
+ *
Renames specific ECS fields to their corresponding OpenTelemetry-compatible counterparts.
+ *
Moves all other fields to the "attributes" namespace.
+ *
Flattens all attributes in the "attributes" namespace.
+ *
Moves resource fields from the "attributes" namespace to the "resource.attributes" namespace.
+ *
+ *
+ *
If a document is identified as OpenTelemetry-compatible, no transformation is performed.
+ * @see org.elasticsearch.ingest.AbstractProcessor
+ */
+public class NormalizeForStreamProcessor extends AbstractProcessor {
+
+ public static final String TYPE = "normalize_for_stream";
+
+ /**
+ * Mapping of ECS field names to their corresponding OpenTelemetry-compatible counterparts.
+ */
+ private static final Map RENAME_KEYS = Map.ofEntries(
+ entry("span.id", "span_id"),
+ entry("message", "body.text"),
+ entry("log.level", "severity_text"),
+ entry("trace.id", "trace_id")
+ );
+
+ /**
+ * A closed-set of keys that should be kept at the top level of the processed document after applying the namespacing.
+ * In essence, these are the fields that should not be moved to the "attributes" or "resource.attributes" namespaces.
+ * Besides the @timestamp field, this set obviously contains the attributes and the resource fields, as well as the
+ * OpenTelemetry-compatible fields that are renamed by the processor.
+ */
+ private static final Set KEEP_KEYS;
+ static {
+ Set keepKeys = new HashSet<>(Set.of("@timestamp", "attributes", "resource"));
+ Set renamedTopLevelFields = new HashSet<>();
+ for (String value : RENAME_KEYS.values()) {
+ // if the renamed field is nested, we only need to know the top level field
+ int dotIndex = value.indexOf('.');
+ if (dotIndex != -1) {
+ renamedTopLevelFields.add(value.substring(0, dotIndex));
+ } else {
+ renamedTopLevelFields.add(value);
+ }
+ }
+ keepKeys.addAll(renamedTopLevelFields);
+ KEEP_KEYS = Set.copyOf(keepKeys);
+ }
+
+ private static final String ATTRIBUTES_KEY = "attributes";
+ private static final String RESOURCE_KEY = "resource";
+ private static final String SCOPE_KEY = "scope";
+ private static final String BODY_KEY = "body";
+ private static final String TEXT_KEY = "text";
+ private static final String STRUCTURED_KEY = "structured";
+
+ NormalizeForStreamProcessor(String tag, String description) {
+ super(tag, description);
+ }
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ public IngestDocument execute(IngestDocument document) {
+ Map source = document.getSource();
+
+ boolean isOTel = isOTelDocument(source);
+ if (isOTel) {
+ return document;
+ }
+
+ // non-OTel document
+
+ Map newAttributes = new HashMap<>();
+ // The keep keys indicate the fields that should be kept at the top level later on when applying the namespacing.
+ // However, at this point we need to move their original values (if they exist) to the one of the new attributes namespaces, except
+ // for the @timestamp field. The assumption is that at this point the document is not OTel compliant, so even if a valid top
+ // level field is found, we assume that it does not bear the OTel semantics.
+ for (String keepKey : KEEP_KEYS) {
+ if (keepKey.equals("@timestamp")) {
+ continue;
+ }
+ if (source.containsKey(keepKey)) {
+ newAttributes.put(keepKey, source.remove(keepKey));
+ }
+ }
+
+ source.put(ATTRIBUTES_KEY, newAttributes);
+
+ renameSpecialKeys(document);
+
+ // move all top level keys except from specific ones to the "attributes" namespace
+ final var sourceItr = source.entrySet().iterator();
+ while (sourceItr.hasNext()) {
+ final var entry = sourceItr.next();
+ if (KEEP_KEYS.contains(entry.getKey()) == false) {
+ newAttributes.put(entry.getKey(), entry.getValue());
+ sourceItr.remove();
+ }
+ }
+
+ // Flatten attributes
+ Map flattenAttributes = Maps.flatten(newAttributes, false, false);
+ source.put(ATTRIBUTES_KEY, flattenAttributes);
+
+ Map newResource = new HashMap<>();
+ Map newResourceAttributes = new HashMap<>();
+ newResource.put(ATTRIBUTES_KEY, newResourceAttributes);
+ source.put(RESOURCE_KEY, newResource);
+ moveResourceAttributes(flattenAttributes, newResourceAttributes);
+
+ return document;
+ }
+
+ /**
+ * Checks if the given document is OpenTelemetry-compliant.
+ *
+ *
A document is considered OpenTelemetry-compliant if it meets the following criteria:
+ *
+ *
The "resource" field is present and is a map
+ *
The resource field either doesn't contain an "attributes" field, or the "attributes" field is a map.
+ *
The "scope" field is either absent or a map.
+ *
The "attributes" field is either absent or a map.
+ *
The "body" field is either absent or a map.
+ *
If exists, the "body" either doesn't contain a "text" field, or the "text" field is a string.
+ *
If exists, the "body" either doesn't contain a "structured" field, or the "structured" field is not a string.
+ *
+ *
+ * @param source the document to check
+ * @return {@code true} if the document is OpenTelemetry-compliant, {@code false} otherwise
+ */
+ static boolean isOTelDocument(Map source) {
+ Object resource = source.get(RESOURCE_KEY);
+ if (resource instanceof Map, ?> resourceMap) {
+ Object resourceAttributes = resourceMap.get(ATTRIBUTES_KEY);
+ if (resourceAttributes != null && (resourceAttributes instanceof Map) == false) {
+ return false;
+ }
+ } else {
+ return false;
+ }
+
+ Object scope = source.get(SCOPE_KEY);
+ if (scope != null && scope instanceof Map == false) {
+ return false;
+ }
+
+ Object attributes = source.get(ATTRIBUTES_KEY);
+ if (attributes != null && attributes instanceof Map == false) {
+ return false;
+ }
+
+ Object body = source.get(BODY_KEY);
+ if (body != null) {
+ if (body instanceof Map, ?> bodyMap) {
+ Object bodyText = bodyMap.get(TEXT_KEY);
+ if (bodyText != null && (bodyText instanceof String) == false) {
+ return false;
+ }
+ Object bodyStructured = bodyMap.get(STRUCTURED_KEY);
+ return (bodyStructured instanceof String) == false;
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts, based on the {@code RENAME_KEYS} map.
+ *
+ *
This method performs the following operations:
+ *
+ *
For each key in the {@code RENAME_KEYS} map, it checks if a corresponding field exists in the document. It first looks for the
+ * field assuming dot notation for nested fields. If the field is not found, it looks for a top level field with a dotted name.
+ *
If the field exists, it removes it from the document and adds a new field with the corresponding name from the
+ * {@code RENAME_KEYS} map and the same value.
+ *
If the key is nested (contains dots), it recursively removes empty parent fields after renaming.
+ *
+ *
+ * @param document the document to process
+ */
+ static void renameSpecialKeys(IngestDocument document) {
+ RENAME_KEYS.forEach((nonOtelName, otelName) -> {
+ boolean fieldExists = false;
+ Object value = null;
+ // first look assuming dot notation for nested fields
+ if (document.hasField(nonOtelName)) {
+ fieldExists = true;
+ value = document.getFieldValue(nonOtelName, Object.class, true);
+ document.removeField(nonOtelName);
+ // recursively remove empty parent fields
+ int lastDot = nonOtelName.lastIndexOf('.');
+ while (lastDot > 0) {
+ String parentName = nonOtelName.substring(0, lastDot);
+ // parent should never be null and must be a map if we are here
+ @SuppressWarnings("unchecked")
+ Map parent = (Map) document.getFieldValue(parentName, Map.class);
+ if (parent.isEmpty()) {
+ document.removeField(parentName);
+ } else {
+ break;
+ }
+ lastDot = parentName.lastIndexOf('.');
+ }
+ } else if (nonOtelName.contains(".")) {
+ // look for dotted field names
+ Map source = document.getSource();
+ if (source.containsKey(nonOtelName)) {
+ fieldExists = true;
+ value = source.remove(nonOtelName);
+ }
+ }
+ if (fieldExists) {
+ document.setFieldValue(otelName, value);
+ }
+ });
+ }
+
+ private static void moveResourceAttributes(Map attributes, Map resourceAttributes) {
+ Set ecsResourceFields = EcsOTelResourceAttributes.LATEST;
+ Iterator> attributeIterator = attributes.entrySet().iterator();
+ while (attributeIterator.hasNext()) {
+ Map.Entry entry = attributeIterator.next();
+ if (ecsResourceFields.contains(entry.getKey())) {
+ resourceAttributes.put(entry.getKey(), entry.getValue());
+ attributeIterator.remove();
+ }
+ }
+ }
+
+ public static final class Factory implements Processor.Factory {
+ @Override
+ public Processor create(
+ Map registry,
+ String tag,
+ String description,
+ Map config,
+ ProjectId projectId
+ ) {
+ return new NormalizeForStreamProcessor(tag, description);
+ }
+ }
+}
diff --git a/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/EcsFieldsDiscoverer.java b/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/EcsFieldsDiscoverer.java
new file mode 100644
index 0000000000000..46368de1ba642
--- /dev/null
+++ b/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/EcsFieldsDiscoverer.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest.otel;
+
+import org.elasticsearch.xcontent.XContentFactory;
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
+import org.elasticsearch.xcontent.XContentType;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+class EcsFieldsDiscoverer {
+
+ private static final String ECS_FLAT_FILE_URL = "https://raw.githubusercontent.com/elastic/ecs/main/generated/ecs/ecs_flat.yml";
+ private static final String AGENT_FIELDS_PREFIX = "agent.";
+
+ private static final EcsFieldsDiscoverer INSTANCE = new EcsFieldsDiscoverer();
+
+ private final Map ecsToOTelAttributeNames = new HashMap<>();
+ private final Set ecsResourceFields = new HashSet<>();
+
+ private EcsFieldsDiscoverer() {
+ try {
+ collectEcsAttributeNames();
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException("Failed to load ECS to OpenTelemetry attribute names", e);
+ }
+ }
+
+ Map getEcsToOTelAttributeNames() {
+ return ecsToOTelAttributeNames;
+ }
+
+ Set getEcsResourceFields() {
+ return ecsResourceFields;
+ }
+
+ static EcsFieldsDiscoverer getInstance() {
+ return INSTANCE;
+ }
+
+ private void collectEcsAttributeNames() throws IOException, InterruptedException {
+ Map ecsFields = loadEcsFields();
+ for (Map.Entry entry : ecsFields.entrySet()) {
+ String ecsName = entry.getKey();
+ @SuppressWarnings("unchecked")
+ Map fieldData = (Map) entry.getValue();
+ @SuppressWarnings("unchecked")
+ List