diff --git a/docs/changelog/125699.yaml b/docs/changelog/125699.yaml new file mode 100644 index 0000000000000..29ee24da4c974 --- /dev/null +++ b/docs/changelog/125699.yaml @@ -0,0 +1,5 @@ +pr: 125699 +summary: Adding `NormalizeForStreamProcessor` +area: Ingest Node +type: feature +issues: [] diff --git a/docs/reference/ingest/processors.asciidoc b/docs/reference/ingest/processors.asciidoc index 55de1a8bee1a7..d21b873d9d9ec 100644 --- a/docs/reference/ingest/processors.asciidoc +++ b/docs/reference/ingest/processors.asciidoc @@ -85,6 +85,9 @@ Adds information about the geographical location of an IPv4 or IPv6 address from <>:: Calculates the network direction given a source IP address, destination IP address, and a list of internal networks. +<>:: +Normalizes non-OpenTelemetry documents to be OpenTelemetry-compliant. + <>:: Extracts the registered domain (also known as the effective top-level domain or eTLD), sub-domain, and top-level domain from a fully qualified domain name (FQDN). @@ -254,6 +257,7 @@ include::processors/json.asciidoc[] include::processors/kv.asciidoc[] include::processors/lowercase.asciidoc[] include::processors/network-direction.asciidoc[] +include::processors/normalize-for-stream.asciidoc[] include::processors/pipeline.asciidoc[] include::processors/redact.asciidoc[] include::processors/registered-domain.asciidoc[] diff --git a/docs/reference/ingest/processors/normalize-for-stream.asciidoc b/docs/reference/ingest/processors/normalize-for-stream.asciidoc new file mode 100644 index 0000000000000..a4bff595b4e76 --- /dev/null +++ b/docs/reference/ingest/processors/normalize-for-stream.asciidoc @@ -0,0 +1,157 @@ +[role="xpack"] +[[normalize-for-stream-processor]] +=== Normalize for Stream processor +++++ +Normalize for Stream +++++ + +Detects whether a document is OpenTelemetry-compliant and if not - +normalizes it as described below. If used in combination with the OTel-related +mappings such as the ones defined in `logs-otel@template`, the resulting +document can be queried seamlessly by clients that expect either https://www.elastic.co/guide/en/ecs/current/index.html[ECS] or https://github.com/open-telemetry/semantic-conventions[OpenTelemetry-Semantic-Conventions] formats. + +preview::["This processor is in tech preview and is not available in our serverless offering."] + +[[normalize-for-stream-detecting]] +==== Detecting OpenTelemetry compliance + +The processor detects OpenTelemetry compliance by checking the following fields: + +* `resource` exists as a key and the value is a map +* `resource` either doesn't contain an `attributes` field, or contains an `attributes` field of type map +* `scope` is either missing or a map +* `attributes` is either missing or a map +* `body` is either missing or a map +* `body` either doesn't contain a `text` field, or contains a `text` field of type `String` +* `body` either doesn't contain a `structured` field, or contains a `structured` field that is not of type `String` + +If all of these conditions are met, the document is considered OpenTelemetry-compliant and is not modified by the processor. + +[[normalize-for-stream-normalization]] +==== Normalization + +If the document is not OpenTelemetry-compliant, the processor normalizes it as follows: + +* Specific ECS fields are renamed to have their corresponding OpenTelemetry Semantic Conventions attribute names. These include the following: ++ +|====== +| ECS Field | Semantic Conventions Attribute | +| `span.id` | `span_id` | +| `trace.id` | `trace_id` | +| `message` | `body.text` | +| `log.level` | `severity_text` | +|====== ++ +The processor first looks for the nested form of the ECS field and if such does not exist, it looks for a top-level field with the dotted field name. + +* Other specific ECS fields that describe resources and have corresponding counterparts in the OpenTelemetry Semantic Conventions are moved to the `resource.attribtues` map. Fields that are considered resource attributes are such that conform to the following conditions: +** They are ECS fields that have corresponding counterparts (either with + the same name or with a different name) in OpenTelemetry Semantic Conventions. +** The corresponding OpenTelemetry attribute is defined in + https://github.com/open-telemetry/semantic-conventions/tree/main/model[Semantic Conventions] + within a group that is defined as `type: enitity`. +* All other fields, except for `@timestamp`, are moved to the `attributes` map. +* All non-array entries of the `attributes` and `resource.attributes` maps are flattened. Flattening means that nested objects are merged into their parent object, and the keys are concatenated with a dot. See examples below. + +[[normalize-for-stream-examples]] +==== Examples + +If an OpenTelemetry-compliant document is detected, the processor does nothing. For example, the following document will stay unchanged: + +```json +{ + "resource": { + "attributes": { + "service.name": "my-service" + } + }, + "scope": { + "name": "my-library", + "version": "1.0.0" + }, + "attributes": { + "http.method": "GET" + }, + "body": { + "text": "Hello, world!" + } +} +``` + +If a non-OpenTelemetry-compliant document is detected, the processor normalizes it. For example, the following document: + +```json +{ + "@timestamp": "2023-10-01T12:00:00Z", + "service": { + "name": "my-service", + "version": "1.0.0", + "environment": "production", + "language": { + "name": "python", + "version": "3.8" + } + }, + "log": { + "level": "INFO" + }, + "message": "Hello, world!", + "http": { + "method": "GET", + "url": { + "path": "/api/v1/resource" + }, + "headers": [ + { + "name": "Authorization", + "value": "Bearer token" + }, + { + "name": "User-Agent", + "value": "my-client/1.0" + } + ] + }, + "span" : { + "id": "1234567890abcdef" + }, + "span.id": "abcdef1234567890", + "trace.id": "abcdef1234567890abcdef1234567890" +} +``` + +will be normalized into the following form: + +```json +{ + "@timestamp": "2023-10-01T12:00:00Z", + "resource": { + "attributes": { + "service.name": "my-service", + "service.version": "1.0.0", + "service.environment": "production" + } + }, + "attributes": { + "service.language.name": "python", + "service.language.version": "3.8", + "http.method": "GET", + "http.url.path": "/api/v1/resource", + "http.headers": [ + { + "name": "Authorization", + "value": "Bearer token" + }, + { + "name": "User-Agent", + "value": "my-client/1.0" + } + ] + }, + "body": { + "text": "Hello, world!" + }, + "span_id": "1234567890abcdef", + "trace_id": "abcdef1234567890abcdef1234567890" +} +``` diff --git a/modules/ingest-otel/build.gradle b/modules/ingest-otel/build.gradle new file mode 100644 index 0000000000000..54a00508a0a07 --- /dev/null +++ b/modules/ingest-otel/build.gradle @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +apply plugin: 'elasticsearch.internal-yaml-rest-test' + +esplugin { + description = 'Ingest processor that normalizes ECS documents to OpenTelemetry-compatible namespaces' + classname ='org.elasticsearch.ingest.otel.NormalizeForStreamPlugin' +} + +restResources { + restApi { + include '_common', 'indices', 'index', 'cluster', 'nodes', 'get', 'ingest' + } +} diff --git a/modules/ingest-otel/src/main/java/module-info.java b/modules/ingest-otel/src/main/java/module-info.java new file mode 100644 index 0000000000000..20b349d930c85 --- /dev/null +++ b/modules/ingest-otel/src/main/java/module-info.java @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +module org.elasticsearch.ingest.otel { + requires org.elasticsearch.base; + requires org.elasticsearch.server; +} diff --git a/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/EcsOTelResourceAttributes.java b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/EcsOTelResourceAttributes.java new file mode 100644 index 0000000000000..3453a2d55506e --- /dev/null +++ b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/EcsOTelResourceAttributes.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.ingest.otel; + +import java.util.Set; + +final class EcsOTelResourceAttributes { + + /** + * The set of ECS (Elastic Common Schema) field names that are mapped to OpenTelemetry resource attributes, + * as defined by the OpenTelemetry Semantic Conventions. + * The list is produced by the {@code ResourceAttributesTests#testAttributesSetUpToDate} test. + * + * @see OpenTelemetry Semantic Conventions + */ + static final Set LATEST = Set.of( + "agent.type", + "agent.build.original", + "agent.name", + "agent.id", + "agent.ephemeral_id", + "agent.version", + "container.image.tag", + "device.model.identifier", + "container.image.hash.all", + "service.node.name", + "process.pid", + "device.id", + "host.mac", + "host.type", + "container.id", + "cloud.availability_zone", + "host.ip", + "container.name", + "container.image.name", + "device.model.name", + "host.name", + "host.id", + "process.executable", + "user_agent.original", + "service.environment", + "cloud.region", + "service.name", + "faas.name", + "device.manufacturer", + "process.args", + "host.architecture", + "cloud.provider", + "container.runtime", + "service.version", + "cloud.service.name", + "cloud.account.id", + "process.command_line", + "faas.version" + ); + + private EcsOTelResourceAttributes() {} +} diff --git a/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamPlugin.java b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamPlugin.java new file mode 100644 index 0000000000000..bd88603407ea5 --- /dev/null +++ b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamPlugin.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.ingest.otel; + +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.plugins.IngestPlugin; +import org.elasticsearch.plugins.Plugin; + +import java.util.Map; + +public class NormalizeForStreamPlugin extends Plugin implements IngestPlugin { + + @Override + public Map getProcessors(Processor.Parameters parameters) { + return Map.of(NormalizeForStreamProcessor.TYPE, new NormalizeForStreamProcessor.Factory()); + } +} diff --git a/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java new file mode 100644 index 0000000000000..224ba35a3a50b --- /dev/null +++ b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java @@ -0,0 +1,269 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.ingest.otel; + +import org.elasticsearch.common.util.Maps; +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import static java.util.Map.entry; + +/** + * This processor is responsible for transforming non-OpenTelemetry-compliant documents into a namespaced flavor of ECS + * that makes them compatible with OpenTelemetry. + * It DOES NOT translate the entire ECS schema into OpenTelemetry semantic conventions. + * + *

More specifically, this processor performs the following operations: + *

    + *
  • Renames specific ECS fields to their corresponding OpenTelemetry-compatible counterparts.
  • + *
  • Moves all other fields to the "attributes" namespace.
  • + *
  • Flattens all attributes in the "attributes" namespace.
  • + *
  • Moves resource fields from the "attributes" namespace to the "resource.attributes" namespace.
  • + *
+ * + *

If a document is identified as OpenTelemetry-compatible, no transformation is performed. + * @see org.elasticsearch.ingest.AbstractProcessor + */ +public class NormalizeForStreamProcessor extends AbstractProcessor { + + public static final String TYPE = "normalize_for_stream"; + + /** + * Mapping of ECS field names to their corresponding OpenTelemetry-compatible counterparts. + */ + private static final Map RENAME_KEYS = Map.ofEntries( + entry("span.id", "span_id"), + entry("message", "body.text"), + entry("log.level", "severity_text"), + entry("trace.id", "trace_id") + ); + + /** + * A closed-set of keys that should be kept at the top level of the processed document after applying the namespacing. + * In essence, these are the fields that should not be moved to the "attributes" or "resource.attributes" namespaces. + * Besides the @timestamp field, this set obviously contains the attributes and the resource fields, as well as the + * OpenTelemetry-compatible fields that are renamed by the processor. + */ + private static final Set KEEP_KEYS; + static { + Set keepKeys = new HashSet<>(Set.of("@timestamp", "attributes", "resource")); + Set renamedTopLevelFields = new HashSet<>(); + for (String value : RENAME_KEYS.values()) { + // if the renamed field is nested, we only need to know the top level field + int dotIndex = value.indexOf('.'); + if (dotIndex != -1) { + renamedTopLevelFields.add(value.substring(0, dotIndex)); + } else { + renamedTopLevelFields.add(value); + } + } + keepKeys.addAll(renamedTopLevelFields); + KEEP_KEYS = Set.copyOf(keepKeys); + } + + private static final String ATTRIBUTES_KEY = "attributes"; + private static final String RESOURCE_KEY = "resource"; + private static final String SCOPE_KEY = "scope"; + private static final String BODY_KEY = "body"; + private static final String TEXT_KEY = "text"; + private static final String STRUCTURED_KEY = "structured"; + + NormalizeForStreamProcessor(String tag, String description) { + super(tag, description); + } + + @Override + public String getType() { + return TYPE; + } + + @Override + public IngestDocument execute(IngestDocument document) { + Map source = document.getSource(); + + boolean isOTel = isOTelDocument(source); + if (isOTel) { + return document; + } + + // non-OTel document + + Map newAttributes = new HashMap<>(); + // The keep keys indicate the fields that should be kept at the top level later on when applying the namespacing. + // However, at this point we need to move their original values (if they exist) to the one of the new attributes namespaces, except + // for the @timestamp field. The assumption is that at this point the document is not OTel compliant, so even if a valid top + // level field is found, we assume that it does not bear the OTel semantics. + for (String keepKey : KEEP_KEYS) { + if (keepKey.equals("@timestamp")) { + continue; + } + if (source.containsKey(keepKey)) { + newAttributes.put(keepKey, source.remove(keepKey)); + } + } + + source.put(ATTRIBUTES_KEY, newAttributes); + + renameSpecialKeys(document); + + // move all top level keys except from specific ones to the "attributes" namespace + final var sourceItr = source.entrySet().iterator(); + while (sourceItr.hasNext()) { + final var entry = sourceItr.next(); + if (KEEP_KEYS.contains(entry.getKey()) == false) { + newAttributes.put(entry.getKey(), entry.getValue()); + sourceItr.remove(); + } + } + + // Flatten attributes + Map flattenAttributes = Maps.flatten(newAttributes, false, false); + source.put(ATTRIBUTES_KEY, flattenAttributes); + + Map newResource = new HashMap<>(); + Map newResourceAttributes = new HashMap<>(); + newResource.put(ATTRIBUTES_KEY, newResourceAttributes); + source.put(RESOURCE_KEY, newResource); + moveResourceAttributes(flattenAttributes, newResourceAttributes); + + return document; + } + + /** + * Checks if the given document is OpenTelemetry-compliant. + * + *

A document is considered OpenTelemetry-compliant if it meets the following criteria: + *

    + *
  • The "resource" field is present and is a map + *
  • The resource field either doesn't contain an "attributes" field, or the "attributes" field is a map.
  • + *
  • The "scope" field is either absent or a map.
  • + *
  • The "attributes" field is either absent or a map.
  • + *
  • The "body" field is either absent or a map.
  • + *
  • If exists, the "body" either doesn't contain a "text" field, or the "text" field is a string.
  • + *
  • If exists, the "body" either doesn't contain a "structured" field, or the "structured" field is not a string.
  • + *
+ * + * @param source the document to check + * @return {@code true} if the document is OpenTelemetry-compliant, {@code false} otherwise + */ + static boolean isOTelDocument(Map source) { + Object resource = source.get(RESOURCE_KEY); + if (resource instanceof Map resourceMap) { + Object resourceAttributes = resourceMap.get(ATTRIBUTES_KEY); + if (resourceAttributes != null && (resourceAttributes instanceof Map) == false) { + return false; + } + } else { + return false; + } + + Object scope = source.get(SCOPE_KEY); + if (scope != null && scope instanceof Map == false) { + return false; + } + + Object attributes = source.get(ATTRIBUTES_KEY); + if (attributes != null && attributes instanceof Map == false) { + return false; + } + + Object body = source.get(BODY_KEY); + if (body != null) { + if (body instanceof Map bodyMap) { + Object bodyText = bodyMap.get(TEXT_KEY); + if (bodyText != null && (bodyText instanceof String) == false) { + return false; + } + Object bodyStructured = bodyMap.get(STRUCTURED_KEY); + return (bodyStructured instanceof String) == false; + } else { + return false; + } + } + return true; + } + + /** + * Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts, based on the {@code RENAME_KEYS} map. + * + *

This method performs the following operations: + *

    + *
  • For each key in the {@code RENAME_KEYS} map, it checks if a corresponding field exists in the document. It first looks for the + * field assuming dot notation for nested fields. If the field is not found, it looks for a top level field with a dotted name.
  • + *
  • If the field exists, it removes it from the document and adds a new field with the corresponding name from the + * {@code RENAME_KEYS} map and the same value.
  • + *
  • If the key is nested (contains dots), it recursively removes empty parent fields after renaming.
  • + *
+ * + * @param document the document to process + */ + static void renameSpecialKeys(IngestDocument document) { + RENAME_KEYS.forEach((nonOtelName, otelName) -> { + boolean fieldExists = false; + Object value = null; + // first look assuming dot notation for nested fields + if (document.hasField(nonOtelName)) { + fieldExists = true; + value = document.getFieldValue(nonOtelName, Object.class, true); + document.removeField(nonOtelName); + // recursively remove empty parent fields + int lastDot = nonOtelName.lastIndexOf('.'); + while (lastDot > 0) { + String parentName = nonOtelName.substring(0, lastDot); + // parent should never be null and must be a map if we are here + @SuppressWarnings("unchecked") + Map parent = (Map) document.getFieldValue(parentName, Map.class); + if (parent.isEmpty()) { + document.removeField(parentName); + } else { + break; + } + lastDot = parentName.lastIndexOf('.'); + } + } else if (nonOtelName.contains(".")) { + // look for dotted field names + Map source = document.getSource(); + if (source.containsKey(nonOtelName)) { + fieldExists = true; + value = source.remove(nonOtelName); + } + } + if (fieldExists) { + document.setFieldValue(otelName, value); + } + }); + } + + private static void moveResourceAttributes(Map attributes, Map resourceAttributes) { + Set ecsResourceFields = EcsOTelResourceAttributes.LATEST; + Iterator> attributeIterator = attributes.entrySet().iterator(); + while (attributeIterator.hasNext()) { + Map.Entry entry = attributeIterator.next(); + if (ecsResourceFields.contains(entry.getKey())) { + resourceAttributes.put(entry.getKey(), entry.getValue()); + attributeIterator.remove(); + } + } + } + + public static final class Factory implements Processor.Factory { + @Override + public Processor create(Map registry, String tag, String description, Map config) { + return new NormalizeForStreamProcessor(tag, description); + } + } +} diff --git a/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/EcsFieldsDiscoverer.java b/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/EcsFieldsDiscoverer.java new file mode 100644 index 0000000000000..8a5d36ac1be7b --- /dev/null +++ b/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/EcsFieldsDiscoverer.java @@ -0,0 +1,99 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.ingest.otel; + +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +class EcsFieldsDiscoverer { + + private static final String ECS_FLAT_FILE_URL = "https://raw.githubusercontent.com/elastic/ecs/main/generated/ecs/ecs_flat.yml"; + private static final String AGENT_FIELDS_PREFIX = "agent."; + + private static final EcsFieldsDiscoverer INSTANCE = new EcsFieldsDiscoverer(); + + private final Map ecsToOTelAttributeNames = new HashMap<>(); + private final Set ecsResourceFields = new HashSet<>(); + + private EcsFieldsDiscoverer() { + try { + collectEcsAttributeNames(); + } catch (IOException | InterruptedException e) { + throw new RuntimeException("Failed to load ECS to OpenTelemetry attribute names", e); + } + } + + Map getEcsToOTelAttributeNames() { + return ecsToOTelAttributeNames; + } + + Set getEcsResourceFields() { + return ecsResourceFields; + } + + static EcsFieldsDiscoverer getInstance() { + return INSTANCE; + } + + private void collectEcsAttributeNames() throws IOException, InterruptedException { + Map ecsFields = loadEcsFields(); + for (Map.Entry entry : ecsFields.entrySet()) { + String ecsName = entry.getKey(); + @SuppressWarnings("unchecked") + Map fieldData = (Map) entry.getValue(); + @SuppressWarnings("unchecked") + List> otelDataEntries = (List>) fieldData.get("otel"); + if (otelDataEntries != null) { + for (Map otelData : otelDataEntries) { + String relation = otelData.get("relation"); + if ("match".equals(relation)) { + ecsToOTelAttributeNames.put(ecsName, ecsName); + } else if ("equivalent".equals(relation)) { + String attribute = otelData.get("attribute"); + if (attribute != null) { + ecsToOTelAttributeNames.put(ecsName, attribute); + } + } + } + } + if (ecsName.startsWith(AGENT_FIELDS_PREFIX)) { + // for now, we consider all agent.* fields as resource attributes, but this may change in the future + ecsResourceFields.add(ecsName); + } + } + } + + private static Map loadEcsFields() throws IOException, InterruptedException { + HttpClient httpClient = HttpClient.newHttpClient(); + HttpRequest request = HttpRequest.newBuilder().uri(URI.create(ECS_FLAT_FILE_URL)).build(); + HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream()); + + try ( + InputStream is = response.body(); + XContentParser parser = XContentFactory.xContent(XContentType.YAML).createParser(XContentParserConfiguration.EMPTY, is) + ) { + return parser.map(); + } + } +} diff --git a/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessorTests.java b/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessorTests.java new file mode 100644 index 0000000000000..bf2ef92c23dcb --- /dev/null +++ b/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessorTests.java @@ -0,0 +1,448 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.ingest.otel; + +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.test.ESTestCase; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Map.entry; + +public class NormalizeForStreamProcessorTests extends ESTestCase { + + private final NormalizeForStreamProcessor processor = new NormalizeForStreamProcessor("test", "test processor"); + + public void testIsOTelDocument_validMinimalOTelDocument() { + Map source = new HashMap<>(); + source.put("resource", new HashMap<>()); + assertTrue(NormalizeForStreamProcessor.isOTelDocument(source)); + } + + public void testIsOTelDocument_validOTelDocumentWithScopeAndAttributes() { + Map source = new HashMap<>(); + source.put("attributes", new HashMap<>()); + source.put("resource", new HashMap<>()); + source.put("scope", new HashMap<>()); + assertTrue(NormalizeForStreamProcessor.isOTelDocument(source)); + } + + public void testIsOTelDocument_missingResource() { + Map source = new HashMap<>(); + source.put("scope", new HashMap<>()); + assertFalse(NormalizeForStreamProcessor.isOTelDocument(source)); + } + + public void testIsOTelDocument_resourceNotMap() { + Map source = new HashMap<>(); + source.put("resource", "not a map"); + source.put("scope", new HashMap<>()); + assertFalse(NormalizeForStreamProcessor.isOTelDocument(source)); + } + + public void testIsOTelDocument_invalidResourceAttributes() { + Map resource = new HashMap<>(); + resource.put("attributes", "not a map"); + Map source = new HashMap<>(); + source.put("resource", resource); + source.put("scope", new HashMap<>()); + assertFalse(NormalizeForStreamProcessor.isOTelDocument(source)); + } + + public void testIsOTelDocument_scopeNotMap() { + Map source = new HashMap<>(); + source.put("resource", new HashMap<>()); + source.put("scope", "not a map"); + assertFalse(NormalizeForStreamProcessor.isOTelDocument(source)); + } + + public void testIsOTelDocument_invalidAttributes() { + Map source = new HashMap<>(); + source.put("resource", new HashMap<>()); + source.put("scope", new HashMap<>()); + source.put("attributes", "not a map"); + assertFalse(NormalizeForStreamProcessor.isOTelDocument(source)); + } + + public void testIsOTelDocument_invalidBody() { + Map source = new HashMap<>(); + source.put("resource", new HashMap<>()); + source.put("scope", new HashMap<>()); + source.put("body", "not a map"); + assertFalse(NormalizeForStreamProcessor.isOTelDocument(source)); + } + + public void testIsOTelDocument_invalidBodyText() { + Map body = new HashMap<>(); + body.put("text", 123); + Map source = new HashMap<>(); + source.put("resource", new HashMap<>()); + source.put("scope", new HashMap<>()); + source.put("body", body); + assertFalse(NormalizeForStreamProcessor.isOTelDocument(source)); + } + + public void testIsOTelDocument_invalidBodyStructured() { + Map body = new HashMap<>(); + body.put("structured", "a string"); + Map source = new HashMap<>(); + source.put("resource", new HashMap<>()); + source.put("scope", new HashMap<>()); + source.put("body", body); + assertFalse(NormalizeForStreamProcessor.isOTelDocument(source)); + } + + public void testIsOTelDocument_validBody() { + Map body = new HashMap<>(); + body.put("text", "a string"); + body.put("structured", new HashMap<>()); + Map source = new HashMap<>(); + source.put("resource", new HashMap<>()); + source.put("scope", new HashMap<>()); + source.put("body", body); + assertTrue(NormalizeForStreamProcessor.isOTelDocument(source)); + } + + public void testExecute_validOTelDocument() { + Map source = Map.ofEntries( + entry("resource", Map.of()), + entry("scope", Map.of()), + entry("body", Map.of("text", "a string", "structured", Map.of())), + entry("key1", "value1") + ); + IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); + Map shallowCopy = new HashMap<>(source); + processor.execute(document); + // verify that top level keys are not moved when processing a valid OTel document + assertEquals(shallowCopy, document.getSource()); + } + + public void testExecute_nonOTelDocument() { + Map source = new HashMap<>(); + source.put("key1", "value1"); + source.put("key2", "value2"); + IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); + + processor.execute(document); + + Map result = document.getSource(); + assertTrue(result.containsKey("attributes")); + assertTrue(result.containsKey("resource")); + + Map attributes = get(result, "attributes"); + assertEquals("value1", attributes.get("key1")); + assertEquals("value2", attributes.get("key2")); + assertFalse(source.containsKey("key1")); + assertFalse(source.containsKey("key2")); + + Map resource = get(result, "resource"); + assertTrue(resource.containsKey("attributes")); + Map resourceAttributes = get(resource, "attributes"); + assertTrue(resourceAttributes.isEmpty()); + } + + public void testExecute_nonOTelDocument_withExistingAttributes() { + Map source = new HashMap<>(); + Map existingAttributes = new HashMap<>(); + existingAttributes.put("existingKey", "existingValue"); + source.put("attributes", existingAttributes); + source.put("key1", "value1"); + IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); + + processor.execute(document); + + Map result = document.getSource(); + assertTrue(result.containsKey("attributes")); + assertTrue(result.containsKey("resource")); + + Map attributes = get(result, "attributes"); + assertEquals("existingValue", attributes.get("attributes.existingKey")); + assertEquals("value1", attributes.get("key1")); + + Map resource = get(result, "resource"); + assertTrue(resource.containsKey("attributes")); + Map resourceAttributes = get(resource, "attributes"); + assertTrue(resourceAttributes.isEmpty()); + } + + public void testExecute_nonOTelDocument_withExistingResource() { + Map source = new HashMap<>(); + Map existingResource = new HashMap<>(); + existingResource.put("existingKey", "existingValue"); + source.put("resource", existingResource); + source.put("scope", "invalid scope"); + source.put("key1", "value1"); + IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); + + processor.execute(document); + + Map result = document.getSource(); + assertTrue(result.containsKey("attributes")); + assertTrue(result.containsKey("resource")); + + Map attributes = get(result, "attributes"); + assertEquals("value1", attributes.get("key1")); + assertEquals("existingValue", attributes.get("resource.existingKey")); + assertEquals("invalid scope", attributes.get("scope")); + + Map resource = get(result, "resource"); + assertTrue(resource.containsKey("attributes")); + Map resourceAttributes = get(resource, "attributes"); + assertTrue(resourceAttributes.isEmpty()); + } + + public void testRenameSpecialKeys_nestedForm() { + Map source = new HashMap<>(); + Map span = new HashMap<>(); + span.put("id", "spanIdValue"); + source.put("span", span); + Map log = new HashMap<>(); + log.put("level", "logLevelValue"); + source.put("log", log); + Map trace = new HashMap<>(); + trace.put("id", "traceIdValue"); + source.put("trace", trace); + IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); + + NormalizeForStreamProcessor.renameSpecialKeys(document); + + Map result = document.getSource(); + assertEquals("spanIdValue", result.get("span_id")); + assertFalse(result.containsKey("span")); + assertEquals("logLevelValue", result.get("severity_text")); + assertFalse(result.containsKey("log")); + assertEquals("traceIdValue", result.get("trace_id")); + assertFalse(result.containsKey("trace")); + } + + public void testRenameSpecialKeys_topLevelDottedField() { + Map source = new HashMap<>(); + source.put("span.id", "spanIdValue"); + source.put("log.level", "logLevelValue"); + source.put("trace.id", "traceIdValue"); + source.put("message", "this is a message"); + IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); + + NormalizeForStreamProcessor.renameSpecialKeys(document); + + Map result = document.getSource(); + assertEquals("spanIdValue", result.get("span_id")); + assertEquals("logLevelValue", result.get("severity_text")); + assertEquals("traceIdValue", result.get("trace_id")); + Map body = get(result, "body"); + String text = get(body, "text"); + assertEquals("this is a message", text); + assertFalse(source.containsKey("span.id")); + assertFalse(source.containsKey("log.level")); + assertFalse(source.containsKey("trace.id")); + assertFalse(source.containsKey("message")); + } + + public void testRenameSpecialKeys_mixedForm() { + Map source = new HashMap<>(); + Map span = new HashMap<>(); + span.put("id", "nestedSpanIdValue"); + source.put("span", span); + source.put("span.id", "topLevelSpanIdValue"); + IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); + + NormalizeForStreamProcessor.renameSpecialKeys(document); + + Map result = document.getSource(); + // nested form should take precedence + assertEquals("nestedSpanIdValue", result.get("span_id")); + } + + public void testExecute_moveFlatAttributes() { + Map source = new HashMap<>(); + Map expectedResourceAttributes = new HashMap<>(); + EcsOTelResourceAttributes.LATEST.forEach(attribute -> { + String value = randomAlphaOfLength(10); + source.put(attribute, value); + expectedResourceAttributes.put(attribute, value); + }); + Map expectedAttributes = Map.of("agent.non-resource", "value", "service.non-resource", "value", "foo", "bar"); + source.putAll(expectedAttributes); + IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); + + processor.execute(document); + + assertTrue(source.containsKey("resource")); + Map resource = get(source, "resource"); + Map resourceAttributes = get(resource, "attributes"); + assertEquals(expectedResourceAttributes, resourceAttributes); + EcsOTelResourceAttributes.LATEST.forEach(attribute -> assertFalse(source.containsKey(attribute))); + + assertTrue(source.containsKey("attributes")); + Map attributes = get(source, "attributes"); + assertEquals(expectedAttributes, attributes); + assertFalse(source.containsKey("foo")); + assertFalse(source.containsKey("agent.non-resource")); + assertFalse(source.containsKey("service.non-resource")); + } + + public void testExecute_moveNestedAttributes() { + IngestDocument document = new IngestDocument("index", "id", 1, null, null, new HashMap<>()); + + Map expectedResourceAttributes = new HashMap<>(); + EcsOTelResourceAttributes.LATEST.forEach(attribute -> { + String value = randomAlphaOfLength(10); + // parses dots as object notations + document.setFieldValue(attribute, value); + expectedResourceAttributes.put(attribute, value); + }); + Map expectedAttributes = Map.of("agent.non-resource", "value", "service.non-resource", "value", "foo", "bar"); + expectedAttributes.forEach(document::setFieldValue); + + processor.execute(document); + + Map source = document.getSource(); + + assertTrue(source.containsKey("resource")); + Map resource = get(source, "resource"); + Map resourceAttributes = get(resource, "attributes"); + assertEquals(expectedResourceAttributes, resourceAttributes); + EcsOTelResourceAttributes.LATEST.forEach(attribute -> { + // parse first part of the key + String namespace = attribute.substring(0, attribute.indexOf('.')); + assertFalse(source.containsKey(namespace)); + }); + assertTrue(source.containsKey("attributes")); + Map attributes = get(source, "attributes"); + assertEquals(expectedAttributes, attributes); + assertFalse(source.containsKey("foo")); + assertFalse(source.containsKey("agent.non-resource")); + assertFalse(source.containsKey("service.non-resource")); + } + + public void testKeepNullValues() { + Map source = new HashMap<>(); + Map span = new HashMap<>(); + span.put("id", null); + source.put("span", span); + source.put("log.level", null); + source.put("trace_id", null); + source.put("foo", null); + source.put("agent.name", null); + IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); + + processor.execute(document); + + assertFalse(source.containsKey("span")); + assertTrue(source.containsKey("span_id")); + assertNull(source.get("span_id")); + assertFalse(source.containsKey("log")); + assertTrue(source.containsKey("severity_text")); + assertNull(source.get("severity_text")); + assertFalse(source.containsKey("trace_id")); + Map expectedAttributes = new HashMap<>(); + expectedAttributes.put("foo", null); + expectedAttributes.put("trace_id", null); + assertEquals(expectedAttributes, get(source, "attributes")); + Map expectedResourceAttributes = new HashMap<>(); + expectedResourceAttributes.put("agent.name", null); + assertEquals(expectedResourceAttributes, get(get(source, "resource"), "attributes")); + } + + public void testExecute_deepFlattening() { + Map source = new HashMap<>(); + Map service = new HashMap<>(); + service.put("name", "serviceNameValue"); + Map node = new HashMap<>(); + node.put("name", "serviceNodeNameValue"); + node.put("type", "serviceNodeTypeValue"); + service.put("node", node); + source.put("service", service); + + Map top = new HashMap<>(); + top.put("child", "childValue"); + Map nestedChild = new HashMap<>(); + nestedChild.put("grandchild", "grandchildValue"); + top.put("nested-child", nestedChild); + source.put("top", top); + + IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); + + processor.execute(document); + + Map result = document.getSource(); + + Map expectedResourceAttributes = Map.of( + "service.name", + "serviceNameValue", + "service.node.name", + "serviceNodeNameValue" + ); + + assertTrue(result.containsKey("resource")); + Map resource = get(result, "resource"); + Map resourceAttributes = get(resource, "attributes"); + assertEquals(expectedResourceAttributes, resourceAttributes); + assertNull(resource.get("service")); + + Map expectedAttributes = Map.of( + "service.node.type", + "serviceNodeTypeValue", + "top.child", + "childValue", + "top.nested-child.grandchild", + "grandchildValue" + ); + + assertTrue(result.containsKey("attributes")); + Map attributes = get(result, "attributes"); + assertEquals(expectedAttributes, attributes); + assertNull(attributes.get("top")); + } + + public void testExecute_arraysNotFlattened() { + Map source = new HashMap<>(); + Map nestedAgent = new HashMap<>(); + nestedAgent.put("name", "agentNameValue"); + List agentArray = List.of("value1", "value2"); + nestedAgent.put("array", agentArray); + source.put("agent", nestedAgent); + + Map nestedService = new HashMap<>(); + List serviceNameArray = List.of("value1", "value2"); + nestedService.put("name", serviceNameArray); + source.put("service", nestedService); + + IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); + + processor.execute(document); + + Map result = document.getSource(); + + Map expectedResourceAttributes = Map.of("agent.name", "agentNameValue", "service.name", serviceNameArray); + + assertTrue(result.containsKey("resource")); + Map resource = get(result, "resource"); + Map resourceAttributes = get(resource, "attributes"); + assertEquals(expectedResourceAttributes, resourceAttributes); + + assertTrue(result.containsKey("attributes")); + Map attributes = get(result, "attributes"); + assertEquals(Map.of("agent.array", agentArray), attributes); + + assertNull(resource.get("agent")); + assertNull(attributes.get("service")); + } + + /** + * A utility function for getting a key from a map and casting the result. + */ + @SuppressWarnings("unchecked") + private static T get(Map context, String key) { + return (T) context.get(key); + } +} diff --git a/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/OTelSemConvCrawler.java b/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/OTelSemConvCrawler.java new file mode 100644 index 0000000000000..4a4e316d2ae1d --- /dev/null +++ b/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/OTelSemConvCrawler.java @@ -0,0 +1,155 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.ingest.otel; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +/** + * This class is responsible for crawling and extracting OpenTelemetry semantic convention + * resource attributes from the OpenTelemetry GitHub repository. It handles downloading, + * unzipping, and processing YAML files to extract specific referenced resource attribute names. + * It eventually deletes the downloaded zip file and extracted repository directory. + */ +public class OTelSemConvCrawler { + + public static final String SEM_CONV_GITHUB_REPO_ZIP_URL = + "https://github.com/open-telemetry/semantic-conventions/archive/refs/heads/main.zip"; + + private static final Logger logger = LogManager.getLogger(OTelSemConvCrawler.class); + + @SuppressForbidden(reason = "writing the GitHub repo zip file to the test's runtime temp directory and deleting on exit") + static Set collectOTelSemConvResourceAttributes() { + Path semConvZipFilePath = null; + Path semConvExtractedTmpDirPath = null; + Set resourceAttributes = new HashSet<>(); + try { + HttpClient httpClient = HttpClient.newBuilder().followRedirects(HttpClient.Redirect.ALWAYS).build(); + + semConvZipFilePath = Files.createTempFile("otel-semconv-", ".zip"); + + // Download zip + HttpResponse response = httpClient.send( + HttpRequest.newBuilder(URI.create(SEM_CONV_GITHUB_REPO_ZIP_URL)).build(), + HttpResponse.BodyHandlers.ofFile(semConvZipFilePath) + ); + + if (response.statusCode() != 200) { + logger.error("failed to download semantic conventions zip file"); + return resourceAttributes; + } + + // Unzip + semConvExtractedTmpDirPath = Files.createTempDirectory("otel-semconv-extracted-"); + try (ZipInputStream zis = new ZipInputStream(Files.newInputStream(semConvZipFilePath))) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { + if (entry.isDirectory() == false) { + Path outPath = semConvExtractedTmpDirPath.resolve(entry.getName()); + Files.createDirectories(outPath.getParent()); + Files.copy(zis, outPath, StandardCopyOption.REPLACE_EXISTING); + } + } + } + + // look for the model root at semantic-conventions-main/model + Path semConvModelRootDir = semConvExtractedTmpDirPath.resolve("semantic-conventions-main/model"); + if (Files.exists(semConvModelRootDir) == false) { + logger.error("model directory not found in the extracted zip"); + return resourceAttributes; + } + + try (Stream semConvFileStream = Files.walk(semConvModelRootDir)) { + semConvFileStream.filter(path -> path.toString().endsWith(".yaml") || path.toString().endsWith(".yml")) + .parallel() + .forEach(path -> { + try ( + InputStream inputStream = Files.newInputStream(path); + XContentParser parser = XContentFactory.xContent(XContentType.YAML) + .createParser(XContentParserConfiguration.EMPTY, inputStream) + ) { + Map yamlData = parser.map(); + Object groupsObj = yamlData.get("groups"); + if (groupsObj instanceof List groups) { + for (Object group : groups) { + if (group instanceof Map groupMap && "entity".equals(groupMap.get("type"))) { + Object attrs = groupMap.get("attributes"); + if (attrs instanceof List attrList) { + for (Object attr : attrList) { + if (attr instanceof Map attrMap) { + String refVal = (String) attrMap.get("ref"); + if (refVal != null) { + resourceAttributes.add(refVal); + } + } + } + } + } + } + } + } catch (IOException e) { + logger.error("error parsing yaml file", e); + } + }); + } + } catch (InterruptedException e) { + logger.error("interrupted", e); + } catch (IOException e) { + logger.error("IO exception", e); + } finally { + if (semConvZipFilePath != null) { + try { + Files.deleteIfExists(semConvZipFilePath); + } catch (IOException e) { + logger.warn("failed to delete semconv zip file", e); + } + } + if (semConvExtractedTmpDirPath != null) { + try (Stream semConvFileStream = Files.walk(semConvExtractedTmpDirPath)) { + semConvFileStream.sorted(Comparator.reverseOrder()) // delete files first + .forEach(path -> { + try { + Files.delete(path); + } catch (IOException e) { + logger.warn("failed to delete file: " + path, e); + } + }); + } catch (IOException e) { + logger.warn("failed to delete semconv zip file", e); + } + } + } + + return resourceAttributes; + } +} diff --git a/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/ResourceAttributesTests.java b/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/ResourceAttributesTests.java new file mode 100644 index 0000000000000..ce7cde24dfee7 --- /dev/null +++ b/modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/ResourceAttributesTests.java @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.ingest.otel; + +import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.test.ESTestCase; +import org.junit.Ignore; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * DISABLED BY DEFAULT!

+ * These tests are not meant for CI, but rather to be run manually to check whether the static {@link EcsOTelResourceAttributes resource + * attributes set} is up to date with the latest ECS and/or OpenTelemetry Semantic Conventions. + * We may add them to CI in the future, but as such that run periodically (nightly/weekly) and used to notify whenever the resource + * attributes set is not up to date. + */ +@SuppressForbidden(reason = "Disabled temporarily until we set up the periodic CI pipeline") +@Ignore +public class ResourceAttributesTests extends ESTestCase { + + @SuppressForbidden(reason = "Used specifically for the output. Only meant to be run manually, not through CI.") + public void testCrawler() { + Set resourceAttributes = OTelSemConvCrawler.collectOTelSemConvResourceAttributes(); + System.out.println("Resource Attributes: " + resourceAttributes.size()); + for (String attribute : resourceAttributes) { + System.out.println(attribute); + } + } + + @SuppressForbidden(reason = "Used specifically for the output. Only meant to be run manually, not through CI.") + public void testEcsToOTelAttributeNames() { + Map attributes = EcsFieldsDiscoverer.getInstance().getEcsToOTelAttributeNames(); + System.out.println("ECS to OTel attribute mappings: " + attributes.size()); + for (Map.Entry entry : attributes.entrySet()) { + System.out.println(entry.getKey() + " --> " + entry.getValue()); + } + } + + public void testAttributesSetUpToDate() { + Map ecsToOTelAttributeNames = EcsFieldsDiscoverer.getInstance().getEcsToOTelAttributeNames(); + Set otelResourceAttributes = OTelSemConvCrawler.collectOTelSemConvResourceAttributes(); + Set latestEcsOTelResourceAttributes = new HashSet<>(); + ecsToOTelAttributeNames.forEach((ecsAttributeName, otelAttributeName) -> { + if (otelResourceAttributes.contains(otelAttributeName)) { + latestEcsOTelResourceAttributes.add(ecsAttributeName); + } + }); + latestEcsOTelResourceAttributes.addAll(EcsFieldsDiscoverer.getInstance().getEcsResourceFields()); + boolean upToDate = latestEcsOTelResourceAttributes.equals(EcsOTelResourceAttributes.LATEST); + if (upToDate == false) { + printComparisonResults(latestEcsOTelResourceAttributes); + } else { + System.out.println("Latest ECS-to-OTel resource attributes set in EcsOTelResourceAttributes is up to date."); + } + assertTrue("Latest ECS-to-OTel resource attributes set in EcsOTelResourceAttributes is not up to date.", upToDate); + } + + @SuppressForbidden( + reason = "Output is used for updating the resource attributes set. Running nightly and only prints when not up to date." + ) + private static void printComparisonResults(Set latestEcsOTelResourceAttributes) { + // find and print the diff + Set addedAttributes = new HashSet<>(latestEcsOTelResourceAttributes); + addedAttributes.removeAll(EcsOTelResourceAttributes.LATEST); + if (addedAttributes.isEmpty() == false) { + System.out.println(); + System.out.println("The current resource attributes set doesn't contain the following attributes:"); + System.out.println("-----------------------------------------------------------------------------"); + for (String attribute : addedAttributes) { + System.out.println(attribute); + } + System.out.println("-----------------------------------------------------------------------------"); + System.out.println(); + } + Set removedAttributes = new HashSet<>(EcsOTelResourceAttributes.LATEST); + removedAttributes.removeAll(latestEcsOTelResourceAttributes); + if (removedAttributes.isEmpty() == false) { + System.out.println(); + System.out.println("The following attributes are no longer considered resource attributes:"); + System.out.println("----------------------------------------------------------------------"); + for (String attribute : removedAttributes) { + System.out.println(attribute); + } + System.out.println("----------------------------------------------------------------------"); + System.out.println(); + } + System.out.println("Consider updating EcsOTelResourceAttributes accordingly"); + System.out.println(); + fail("ECS to OTel resource attributes are not up to date"); + } +} diff --git a/modules/ingest-otel/src/yamlRestTest/java/org/elasticsearch/ingest/otel/IngestOtelClientYamlTestSuiteIT.java b/modules/ingest-otel/src/yamlRestTest/java/org/elasticsearch/ingest/otel/IngestOtelClientYamlTestSuiteIT.java new file mode 100644 index 0000000000000..85189616c2ea7 --- /dev/null +++ b/modules/ingest-otel/src/yamlRestTest/java/org/elasticsearch/ingest/otel/IngestOtelClientYamlTestSuiteIT.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.ingest.otel; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; +import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; +import org.junit.ClassRule; + +public class IngestOtelClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase { + + public IngestOtelClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) { + super(testCandidate); + } + + @ClassRule + public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("ingest-otel").build(); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + @ParametersFactory + public static Iterable parameters() throws Exception { + return ESClientYamlSuiteTestCase.createParameters(); + } +} diff --git a/modules/ingest-otel/src/yamlRestTest/resources/rest-api-spec/test/ingest-otel/10_normalize_for_stream.yml b/modules/ingest-otel/src/yamlRestTest/resources/rest-api-spec/test/ingest-otel/10_normalize_for_stream.yml new file mode 100644 index 0000000000000..921c9991c1883 --- /dev/null +++ b/modules/ingest-otel/src/yamlRestTest/resources/rest-api-spec/test/ingest-otel/10_normalize_for_stream.yml @@ -0,0 +1,223 @@ +--- +setup: + - do: + ingest.put_pipeline: + id: "normalize_for_stream_pipeline" + body: + processors: + - normalize_for_stream: {} + +--- +teardown: + - do: + ingest.delete_pipeline: + id: "normalize_for_stream_pipeline" + ignore: 404 + +--- +"Test attributes namespacing": + - do: + index: + index: normalize_for_stream_test + id: "nested_and_flat_attributes" + pipeline: "normalize_for_stream_pipeline" + body: { + "agent.name": "agentNameValue", + "agent": { + "type": "agentTypeValue", + "deep": { + "nested": "nestedValue", + "scalar-array": [ + "arrayValue1", + "arrayValue2" + ], + "object-array": [ + { + "key1": "value1" + }, + { + "key2": "value2" + } + ] + }, + "scalar-array": [ + "arrayValue1", + "arrayValue2" + ] + }, + "cloud.region": "cloudRegionValue", + "cloud": { + "service": { + "name": [ + "nameArrayValue1", + "nameArrayValue2" + ] + }, + "account.id": [ + { + "key1": "value1" + }, + { + "key2": "value2" + } + ], + }, + "host.name": "hostNameValue", + "host": { + "type": "hostTypeValue" + }, + "service.name": "serviceNameValue", + "service": { + "type": "serviceTypeValue", + } + } + + - do: + get: + index: normalize_for_stream_test + id: "nested_and_flat_attributes" + - match: { _source.resource.attributes.agent\.name: "agentNameValue" } + - match: { _source.resource.attributes.agent\.type: "agentTypeValue" } + - match: { _source.resource.attributes.cloud\.region: "cloudRegionValue" } + - match: { _source.resource.attributes.cloud\.service\.name: ["nameArrayValue1", "nameArrayValue2"] } + - match: { _source.resource.attributes.cloud\.service\.name.0: "nameArrayValue1" } + - match: { _source.resource.attributes.cloud\.service\.name.1: "nameArrayValue2" } + - match: { _source.resource.attributes.cloud\.account\.id: [{"key1" : "value1"}, {"key2" : "value2"}] } + - match: { _source.resource.attributes.cloud\.account\.id.0.key1: "value1" } + - match: { _source.resource.attributes.cloud\.account\.id.1.key2: "value2" } + - match: { _source.resource.attributes.host\.name: "hostNameValue" } + - match: { _source.resource.attributes.host\.type: "hostTypeValue" } + - match: { _source.resource.attributes.service\.name: "serviceNameValue" } + - match: { _source.attributes.agent\.scalar-array.0: "arrayValue1" } + - match: { _source.attributes.agent\.scalar-array.1: "arrayValue2" } + - match: { _source.attributes.agent\.deep\.nested: "nestedValue" } + - match: { _source.attributes.agent\.deep\.scalar-array.0: "arrayValue1" } + - match: { _source.attributes.agent\.deep\.scalar-array.1: "arrayValue2" } + - match: { _source.attributes.agent\.deep\.object-array.0.key1: "value1" } + - match: { _source.attributes.agent\.deep\.object-array.1.key2: "value2" } + - match: { _source.attributes.service\.type: "serviceTypeValue" } + - match: { _source.agent\.name: null } + - match: { _source.agent: null } + - match: { _source.agent.type: null } + - match: { _source.cloud\.region: null } + - match: { _source.cloud: null } + - match: { _source.host\.name: null } + - match: { _source.host: null } + - match: { _source.service\.name: null } + - match: { _source.service: null } + +--- +"Test rename special keys": + - do: + index: + index: normalize_for_stream_test + id: "rename_special_keys" + pipeline: "normalize_for_stream_pipeline" + body: { + "span": { + "id": "nestedSpanIdValue" + }, + "span.id": "topLevelSpanIdValue", + "log.level": "topLevelLogLevelValue", + "trace": { + "id": "traceIdValue" + }, + "trace.id": "topLevelTraceIdValue", + "message": "this is a message" + } + + - do: + get: + index: normalize_for_stream_test + id: "rename_special_keys" + - match: { _source.span_id: "nestedSpanIdValue" } + - match: { _source.severity_text: "topLevelLogLevelValue" } + - match: { _source.trace_id: "traceIdValue" } + - match: { _source.body.text: "this is a message" } + - match: { _source.span: null } + - match: { _source.span\.id: null } + - match: { _source.log\.level: null } + - match: { _source.trace: null } + - match: { _source.trace\.id: null } + - match: { _source.message: null } + +--- +"Test valid OTel document": + - do: + index: + index: normalize_for_stream_test + id: "valid_otel_document" + pipeline: "normalize_for_stream_pipeline" + body: { + "resource": { + "attributes": { + "foo": "bar" + } + }, + "scope": { + "foo": "bar" + }, + "attributes": { + "foo": "bar" + }, + "body": { + "text": "a string", + "structured": {} + }, + "span_id": "spanIdValue", + "trace_id": "traceIdValue", + "severity_text": "severityTextValue", + "foo": "bar" + } + + - do: + get: + index: normalize_for_stream_test + id: "valid_otel_document" + - match: { _source.resource.attributes.foo: "bar" } + - match: { _source.scope.foo: "bar" } + - match: { _source.attributes.foo: "bar" } + - match: { _source.body.text: "a string" } + - match: { _source.body.structured: {} } + - match: { _source.span_id: "spanIdValue" } + - match: { _source.trace_id: "traceIdValue" } + - match: { _source.severity_text: "severityTextValue" } + - match: { _source.foo: "bar" } + +--- +"Test invalid body field": + - do: + index: + index: normalize_for_stream_test + id: "invalid_body_field" + pipeline: "normalize_for_stream_pipeline" + body: { + "resource": {}, + "scope": { + "foo": "bar" + }, + "body": { + "text": 123, + "structured": { + "foo": "bar" + } + }, + "span_id": "spanIdValue", + "trace_id": "traceIdValue", + "severity_text": "severityTextValue", + "foo": "bar" + } + + - do: + get: + index: normalize_for_stream_test + id: "invalid_body_field" + - match: { _source.attributes.body\.text: 123 } + - match: { _source.attributes.body\.structured\.foo: "bar" } + - match: { _source.attributes.scope\.foo: "bar" } + - match: { _source.attributes.span_id: "spanIdValue" } + - match: { _source.attributes.trace_id: "traceIdValue" } + - match: { _source.attributes.severity_text: "severityTextValue" } + - match: { _source.attributes.foo: "bar" } + - match: { _source.body: null } + - match: { _source.scope: null }