Skip to content

Commit d3d2d9b

Browse files
authored
Adding NormalizeForStreamProcessor (#125699)
1 parent 66733cb commit d3d2d9b

File tree

15 files changed

+1625
-1
lines changed

15 files changed

+1625
-1
lines changed

docs/changelog/125699.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 125699
2+
summary: Adding `NormalizeForStreamProcessor`
3+
area: Ingest Node
4+
type: feature
5+
issues: []

docs/reference/enrich-processor/index.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ Refer to [Enrich your data](docs-content://manage-data/ingest/transform-enrich/d
8484
[`network_direction` processor](/reference/enrich-processor/network-direction-processor.md)
8585
: Calculates the network direction given a source IP address, destination IP address, and a list of internal networks.
8686

87+
[`normalize_for_stream` processor](/reference/enrich-processor/normalize-for-stream.md)
88+
: Normalizes non-OpenTelemetry documents to be OpenTelemetry-compliant.
89+
8790
[`registered_domain` processor](/reference/enrich-processor/registered-domain-processor.md)
8891
: Extracts the registered domain (also known as the effective top-level domain or eTLD), sub-domain, and top-level domain from a fully qualified domain name (FQDN).
8992

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
---
2+
navigation_title: "Normalize for Stream"
3+
mapped_pages:
4+
- https://www.elastic.co/guide/en/elasticsearch/reference/current/normalize-for-stream-processor.html
5+
---
6+
7+
# Normalize-for-Stream processor [normalize-for-stream-processor]
8+
9+
10+
Detects whether a document is OpenTelemetry-compliant and if not -
11+
normalizes it as described below. If used in combination with the OTel-related
12+
mappings such as the ones defined in `logs-otel@template`, the resulting
13+
document can be queried seamlessly by clients that expect either [ECS](https://www.elastic.co/guide/en/ecs/current/index.html) or OpenTelemetry-[Semantic-Conventions](https://github.com/open-telemetry/semantic-conventions) formats.
14+
15+
::::{note}
16+
This processor is in tech preview and is not available in our serverless offering.
17+
::::
18+
19+
## Detecting OpenTelemetry compliance
20+
21+
The processor detects OpenTelemetry compliance by checking the following fields:
22+
* `resource` exists as a key and the value is a map
23+
* `resource` either doesn't contain an `attributes` field, or contains an `attributes` field of type map
24+
* `scope` is either missing or a map
25+
* `attributes` is either missing or a map
26+
* `body` is either missing or a map
27+
* `body` either doesn't contain a `text` field, or contains a `text` field of type `String`
28+
* `body` either doesn't contain a `structured` field, or contains a `structured` field that is not of type `String`
29+
30+
If all of these conditions are met, the document is considered OpenTelemetry-compliant and is not modified by the processor.
31+
32+
## Normalization
33+
34+
If the document is not OpenTelemetry-compliant, the processor normalizes it as follows:
35+
* Specific ECS fields are renamed to have their corresponding OpenTelemetry Semantic Conventions attribute names. These include the following:
36+
37+
| ECS Field | Semantic Conventions Attribute |
38+
|-------------|--------------------------------|
39+
| `span.id` | `span_id` |
40+
| `trace.id` | `trace_id` |
41+
| `message` | `body.text` |
42+
| `log.level` | `severity_text` |
43+
The processor first looks for the nested form of the ECS field and if such does not exist, it looks for a top-level field with the dotted field name.
44+
* Other specific ECS fields that describe resources and have corresponding counterparts in the OpenTelemetry Semantic Conventions are moved to the `resource.attribtues` map. Fields that are considered resource attributes are such that conform to the following conditions:
45+
* They are ECS fields that have corresponding counterparts (either with
46+
the same name or with a different name) in OpenTelemetry Semantic Conventions.
47+
* The corresponding OpenTelemetry attribute is defined in
48+
[Semantic Conventions](https://github.com/open-telemetry/semantic-conventions/tree/main/model)
49+
within a group that is defined as `type: enitity`.
50+
* All other fields, except for `@timestamp`, are moved to the `attributes` map.
51+
* All non-array entries of the `attributes` and `resource.attributes` maps are flattened. Flattening means that nested objects are merged into their parent object, and the keys are concatenated with a dot. See examples below.
52+
53+
## Examples
54+
55+
If an OpenTelemetry-compliant document is detected, the processor does nothing. For example, the following document will stay unchanged:
56+
57+
```json
58+
{
59+
"resource": {
60+
"attributes": {
61+
"service.name": "my-service"
62+
}
63+
},
64+
"scope": {
65+
"name": "my-library",
66+
"version": "1.0.0"
67+
},
68+
"attributes": {
69+
"http.method": "GET"
70+
},
71+
"body": {
72+
"text": "Hello, world!"
73+
}
74+
}
75+
```
76+
77+
If a non-OpenTelemetry-compliant document is detected, the processor normalizes it. For example, the following document:
78+
79+
```json
80+
{
81+
"@timestamp": "2023-10-01T12:00:00Z",
82+
"service": {
83+
"name": "my-service",
84+
"version": "1.0.0",
85+
"environment": "production",
86+
"language": {
87+
"name": "python",
88+
"version": "3.8"
89+
}
90+
},
91+
"log": {
92+
"level": "INFO"
93+
},
94+
"message": "Hello, world!",
95+
"http": {
96+
"method": "GET",
97+
"url": {
98+
"path": "/api/v1/resource"
99+
},
100+
"headers": [
101+
{
102+
"name": "Authorization",
103+
"value": "Bearer token"
104+
},
105+
{
106+
"name": "User-Agent",
107+
"value": "my-client/1.0"
108+
}
109+
]
110+
},
111+
"span" : {
112+
"id": "1234567890abcdef"
113+
},
114+
"span.id": "abcdef1234567890",
115+
"trace.id": "abcdef1234567890abcdef1234567890"
116+
}
117+
```
118+
will be normalized into the following form:
119+
120+
```json
121+
{
122+
"@timestamp": "2023-10-01T12:00:00Z",
123+
"resource": {
124+
"attributes": {
125+
"service.name": "my-service",
126+
"service.version": "1.0.0",
127+
"service.environment": "production"
128+
}
129+
},
130+
"attributes": {
131+
"service.language.name": "python",
132+
"service.language.version": "3.8",
133+
"http.method": "GET",
134+
"http.url.path": "/api/v1/resource",
135+
"http.headers": [
136+
{
137+
"name": "Authorization",
138+
"value": "Bearer token"
139+
},
140+
{
141+
"name": "User-Agent",
142+
"value": "my-client/1.0"
143+
}
144+
]
145+
},
146+
"body": {
147+
"text": "Hello, world!"
148+
},
149+
"span_id": "1234567890abcdef",
150+
"trace_id": "abcdef1234567890abcdef1234567890"
151+
}
152+
```

docs/reference/enrich-processor/toc.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ toc:
2828
- file: kv-processor.md
2929
- file: lowercase-processor.md
3030
- file: network-direction-processor.md
31+
- file: normalize-for-stream.md
3132
- file: pipeline-processor.md
3233
- file: redact-processor.md
3334
- file: registered-domain-processor.md
@@ -44,4 +45,4 @@ toc:
4445
- file: uppercase-processor.md
4546
- file: urldecode-processor.md
4647
- file: uri-parts-processor.md
47-
- file: user-agent-processor.md
48+
- file: user-agent-processor.md

modules/ingest-otel/build.gradle

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
apply plugin: 'elasticsearch.internal-yaml-rest-test'
11+
12+
esplugin {
13+
description = 'Ingest processor that normalizes ECS documents to OpenTelemetry-compatible namespaces'
14+
classname ='org.elasticsearch.ingest.otel.NormalizeForStreamPlugin'
15+
}
16+
17+
restResources {
18+
restApi {
19+
include '_common', 'indices', 'index', 'cluster', 'nodes', 'get', 'ingest'
20+
}
21+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
module org.elasticsearch.ingest.otel {
11+
requires org.elasticsearch.base;
12+
requires org.elasticsearch.server;
13+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest.otel;
11+
12+
import java.util.Set;
13+
14+
final class EcsOTelResourceAttributes {
15+
16+
/**
17+
* The set of ECS (Elastic Common Schema) field names that are mapped to OpenTelemetry resource attributes,
18+
* as defined by the OpenTelemetry Semantic Conventions.
19+
* The list is produced by the {@code ResourceAttributesTests#testAttributesSetUpToDate} test.
20+
*
21+
* @see <a href="https://github.com/open-telemetry/semantic-conventions">OpenTelemetry Semantic Conventions</a>
22+
*/
23+
static final Set<String> LATEST = Set.of(
24+
"agent.type",
25+
"agent.build.original",
26+
"agent.name",
27+
"agent.id",
28+
"agent.ephemeral_id",
29+
"agent.version",
30+
"container.image.tag",
31+
"device.model.identifier",
32+
"container.image.hash.all",
33+
"service.node.name",
34+
"process.pid",
35+
"device.id",
36+
"host.mac",
37+
"host.type",
38+
"container.id",
39+
"cloud.availability_zone",
40+
"host.ip",
41+
"container.name",
42+
"container.image.name",
43+
"device.model.name",
44+
"host.name",
45+
"host.id",
46+
"process.executable",
47+
"user_agent.original",
48+
"service.environment",
49+
"cloud.region",
50+
"service.name",
51+
"faas.name",
52+
"device.manufacturer",
53+
"process.args",
54+
"host.architecture",
55+
"cloud.provider",
56+
"container.runtime",
57+
"service.version",
58+
"cloud.service.name",
59+
"cloud.account.id",
60+
"process.command_line",
61+
"faas.version"
62+
);
63+
64+
private EcsOTelResourceAttributes() {}
65+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest.otel;
11+
12+
import org.elasticsearch.ingest.Processor;
13+
import org.elasticsearch.plugins.IngestPlugin;
14+
import org.elasticsearch.plugins.Plugin;
15+
16+
import java.util.Map;
17+
18+
public class NormalizeForStreamPlugin extends Plugin implements IngestPlugin {
19+
20+
@Override
21+
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
22+
return Map.of(NormalizeForStreamProcessor.TYPE, new NormalizeForStreamProcessor.Factory());
23+
}
24+
}

0 commit comments

Comments
 (0)