Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/125699.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125699
summary: Adding `NormalizeForStreamProcessor`
area: Ingest Node
type: feature
issues: []
4 changes: 4 additions & 0 deletions docs/reference/ingest/processors.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ Adds information about the geographical location of an IPv4 or IPv6 address from
<<network-direction-processor, `network_direction` processor>>::
Calculates the network direction given a source IP address, destination IP address, and a list of internal networks.

<<normalize-for-stream-processor, `normalize_for_stream` processor>>::
Normalizes non-OpenTelemetry documents to be OpenTelemetry-compliant.

<<registered-domain-processor, `registered_domain` processor>>::
Extracts the registered domain (also known as the effective top-level domain or eTLD), sub-domain, and top-level domain from a fully qualified domain name (FQDN).

Expand Down Expand Up @@ -254,6 +257,7 @@ include::processors/json.asciidoc[]
include::processors/kv.asciidoc[]
include::processors/lowercase.asciidoc[]
include::processors/network-direction.asciidoc[]
include::processors/normalize-for-stream.asciidoc[]
include::processors/pipeline.asciidoc[]
include::processors/redact.asciidoc[]
include::processors/registered-domain.asciidoc[]
Expand Down
157 changes: 157 additions & 0 deletions docs/reference/ingest/processors/normalize-for-stream.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
[role="xpack"]
[[normalize-for-stream-processor]]
=== Normalize for Stream processor
++++
<titleabbrev>Normalize for Stream</titleabbrev>
++++

Detects whether a document is OpenTelemetry-compliant and if not -
normalizes it as described below. If used in combination with the OTel-related
mappings such as the ones defined in `logs-otel@template`, the resulting
document can be queried seamlessly by clients that expect either https://www.elastic.co/guide/en/ecs/current/index.html[ECS] or https://github.com/open-telemetry/semantic-conventions[OpenTelemetry-Semantic-Conventions] formats.

preview::["This processor is in tech preview and is not available in our serverless offering."]

[[normalize-for-stream-detecting]]
==== Detecting OpenTelemetry compliance

The processor detects OpenTelemetry compliance by checking the following fields:

* `resource` exists as a key and the value is a map
* `resource` either doesn't contain an `attributes` field, or contains an `attributes` field of type map
* `scope` is either missing or a map
* `attributes` is either missing or a map
* `body` is either missing or a map
* `body` either doesn't contain a `text` field, or contains a `text` field of type `String`
* `body` either doesn't contain a `structured` field, or contains a `structured` field that is not of type `String`

If all of these conditions are met, the document is considered OpenTelemetry-compliant and is not modified by the processor.

[[normalize-for-stream-normalization]]
==== Normalization

If the document is not OpenTelemetry-compliant, the processor normalizes it as follows:

* Specific ECS fields are renamed to have their corresponding OpenTelemetry Semantic Conventions attribute names. These include the following:
+
|======
| ECS Field | Semantic Conventions Attribute |
| `span.id` | `span_id` |
| `trace.id` | `trace_id` |
| `message` | `body.text` |
| `log.level` | `severity_text` |
|======
+
The processor first looks for the nested form of the ECS field and if such does not exist, it looks for a top-level field with the dotted field name.

* Other specific ECS fields that describe resources and have corresponding counterparts in the OpenTelemetry Semantic Conventions are moved to the `resource.attribtues` map. Fields that are considered resource attributes are such that conform to the following conditions:
** They are ECS fields that have corresponding counterparts (either with
the same name or with a different name) in OpenTelemetry Semantic Conventions.
** The corresponding OpenTelemetry attribute is defined in
https://github.com/open-telemetry/semantic-conventions/tree/main/model[Semantic Conventions]
within a group that is defined as `type: enitity`.
* All other fields, except for `@timestamp`, are moved to the `attributes` map.
* All non-array entries of the `attributes` and `resource.attributes` maps are flattened. Flattening means that nested objects are merged into their parent object, and the keys are concatenated with a dot. See examples below.

[[normalize-for-stream-examples]]
==== Examples

If an OpenTelemetry-compliant document is detected, the processor does nothing. For example, the following document will stay unchanged:

```json
{
"resource": {
"attributes": {
"service.name": "my-service"
}
},
"scope": {
"name": "my-library",
"version": "1.0.0"
},
"attributes": {
"http.method": "GET"
},
"body": {
"text": "Hello, world!"
}
}
```

If a non-OpenTelemetry-compliant document is detected, the processor normalizes it. For example, the following document:

```json
{
"@timestamp": "2023-10-01T12:00:00Z",
"service": {
"name": "my-service",
"version": "1.0.0",
"environment": "production",
"language": {
"name": "python",
"version": "3.8"
}
},
"log": {
"level": "INFO"
},
"message": "Hello, world!",
"http": {
"method": "GET",
"url": {
"path": "/api/v1/resource"
},
"headers": [
{
"name": "Authorization",
"value": "Bearer token"
},
{
"name": "User-Agent",
"value": "my-client/1.0"
}
]
},
"span" : {
"id": "1234567890abcdef"
},
"span.id": "abcdef1234567890",
"trace.id": "abcdef1234567890abcdef1234567890"
}
```

will be normalized into the following form:

```json
{
"@timestamp": "2023-10-01T12:00:00Z",
"resource": {
"attributes": {
"service.name": "my-service",
"service.version": "1.0.0",
"service.environment": "production"
}
},
"attributes": {
"service.language.name": "python",
"service.language.version": "3.8",
"http.method": "GET",
"http.url.path": "/api/v1/resource",
"http.headers": [
{
"name": "Authorization",
"value": "Bearer token"
},
{
"name": "User-Agent",
"value": "my-client/1.0"
}
]
},
"body": {
"text": "Hello, world!"
},
"span_id": "1234567890abcdef",
"trace_id": "abcdef1234567890abcdef1234567890"
}
```
21 changes: 21 additions & 0 deletions modules/ingest-otel/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

apply plugin: 'elasticsearch.internal-yaml-rest-test'

esplugin {
description = 'Ingest processor that normalizes ECS documents to OpenTelemetry-compatible namespaces'
classname ='org.elasticsearch.ingest.otel.NormalizeForStreamPlugin'
}

restResources {
restApi {
include '_common', 'indices', 'index', 'cluster', 'nodes', 'get', 'ingest'
}
}
13 changes: 13 additions & 0 deletions modules/ingest-otel/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

module org.elasticsearch.ingest.otel {
requires org.elasticsearch.base;
requires org.elasticsearch.server;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.ingest.otel;

import java.util.Set;

final class EcsOTelResourceAttributes {

/**
* The set of ECS (Elastic Common Schema) field names that are mapped to OpenTelemetry resource attributes,
* as defined by the OpenTelemetry Semantic Conventions.
* The list is produced by the {@code ResourceAttributesTests#testAttributesSetUpToDate} test.
*
* @see <a href="https://github.com/open-telemetry/semantic-conventions">OpenTelemetry Semantic Conventions</a>
*/
static final Set<String> LATEST = Set.of(
"agent.type",
"agent.build.original",
"agent.name",
"agent.id",
"agent.ephemeral_id",
"agent.version",
"container.image.tag",
"device.model.identifier",
"container.image.hash.all",
"service.node.name",
"process.pid",
"device.id",
"host.mac",
"host.type",
"container.id",
"cloud.availability_zone",
"host.ip",
"container.name",
"container.image.name",
"device.model.name",
"host.name",
"host.id",
"process.executable",
"user_agent.original",
"service.environment",
"cloud.region",
"service.name",
"faas.name",
"device.manufacturer",
"process.args",
"host.architecture",
"cloud.provider",
"container.runtime",
"service.version",
"cloud.service.name",
"cloud.account.id",
"process.command_line",
"faas.version"
);

private EcsOTelResourceAttributes() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.ingest.otel;

import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;

import java.util.Map;

public class NormalizeForStreamPlugin extends Plugin implements IngestPlugin {

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Map.of(NormalizeForStreamProcessor.TYPE, new NormalizeForStreamProcessor.Factory());
}
}
Loading