Skip to content

Commit cae14d0

Browse files
authored
[8.19] Adding NormalizeForStreamProcessor (#129092)
1 parent bc00b12 commit cae14d0

File tree

14 files changed

+1622
-0
lines changed

14 files changed

+1622
-0
lines changed

docs/changelog/125699.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 125699
2+
summary: Adding `NormalizeForStreamProcessor`
3+
area: Ingest Node
4+
type: feature
5+
issues: []

docs/reference/ingest/processors.asciidoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ Adds information about the geographical location of an IPv4 or IPv6 address from
8585
<<network-direction-processor, `network_direction` processor>>::
8686
Calculates the network direction given a source IP address, destination IP address, and a list of internal networks.
8787

88+
<<normalize-for-stream-processor, `normalize_for_stream` processor>>::
89+
Normalizes non-OpenTelemetry documents to be OpenTelemetry-compliant.
90+
8891
<<registered-domain-processor, `registered_domain` processor>>::
8992
Extracts the registered domain (also known as the effective top-level domain or eTLD), sub-domain, and top-level domain from a fully qualified domain name (FQDN).
9093

@@ -254,6 +257,7 @@ include::processors/json.asciidoc[]
254257
include::processors/kv.asciidoc[]
255258
include::processors/lowercase.asciidoc[]
256259
include::processors/network-direction.asciidoc[]
260+
include::processors/normalize-for-stream.asciidoc[]
257261
include::processors/pipeline.asciidoc[]
258262
include::processors/redact.asciidoc[]
259263
include::processors/registered-domain.asciidoc[]
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
[role="xpack"]
2+
[[normalize-for-stream-processor]]
3+
=== Normalize for Stream processor
4+
++++
5+
<titleabbrev>Normalize for Stream</titleabbrev>
6+
++++
7+
8+
Detects whether a document is OpenTelemetry-compliant and if not -
9+
normalizes it as described below. If used in combination with the OTel-related
10+
mappings such as the ones defined in `logs-otel@template`, the resulting
11+
document can be queried seamlessly by clients that expect either https://www.elastic.co/guide/en/ecs/current/index.html[ECS] or https://github.com/open-telemetry/semantic-conventions[OpenTelemetry-Semantic-Conventions] formats.
12+
13+
preview::["This processor is in tech preview and is not available in our serverless offering."]
14+
15+
[[normalize-for-stream-detecting]]
16+
==== Detecting OpenTelemetry compliance
17+
18+
The processor detects OpenTelemetry compliance by checking the following fields:
19+
20+
* `resource` exists as a key and the value is a map
21+
* `resource` either doesn't contain an `attributes` field, or contains an `attributes` field of type map
22+
* `scope` is either missing or a map
23+
* `attributes` is either missing or a map
24+
* `body` is either missing or a map
25+
* `body` either doesn't contain a `text` field, or contains a `text` field of type `String`
26+
* `body` either doesn't contain a `structured` field, or contains a `structured` field that is not of type `String`
27+
28+
If all of these conditions are met, the document is considered OpenTelemetry-compliant and is not modified by the processor.
29+
30+
[[normalize-for-stream-normalization]]
31+
==== Normalization
32+
33+
If the document is not OpenTelemetry-compliant, the processor normalizes it as follows:
34+
35+
* Specific ECS fields are renamed to have their corresponding OpenTelemetry Semantic Conventions attribute names. These include the following:
36+
+
37+
|======
38+
| ECS Field | Semantic Conventions Attribute |
39+
| `span.id` | `span_id` |
40+
| `trace.id` | `trace_id` |
41+
| `message` | `body.text` |
42+
| `log.level` | `severity_text` |
43+
|======
44+
+
45+
The processor first looks for the nested form of the ECS field and if such does not exist, it looks for a top-level field with the dotted field name.
46+
47+
* Other specific ECS fields that describe resources and have corresponding counterparts in the OpenTelemetry Semantic Conventions are moved to the `resource.attribtues` map. Fields that are considered resource attributes are such that conform to the following conditions:
48+
** They are ECS fields that have corresponding counterparts (either with
49+
the same name or with a different name) in OpenTelemetry Semantic Conventions.
50+
** The corresponding OpenTelemetry attribute is defined in
51+
https://github.com/open-telemetry/semantic-conventions/tree/main/model[Semantic Conventions]
52+
within a group that is defined as `type: enitity`.
53+
* All other fields, except for `@timestamp`, are moved to the `attributes` map.
54+
* All non-array entries of the `attributes` and `resource.attributes` maps are flattened. Flattening means that nested objects are merged into their parent object, and the keys are concatenated with a dot. See examples below.
55+
56+
[[normalize-for-stream-examples]]
57+
==== Examples
58+
59+
If an OpenTelemetry-compliant document is detected, the processor does nothing. For example, the following document will stay unchanged:
60+
61+
```json
62+
{
63+
"resource": {
64+
"attributes": {
65+
"service.name": "my-service"
66+
}
67+
},
68+
"scope": {
69+
"name": "my-library",
70+
"version": "1.0.0"
71+
},
72+
"attributes": {
73+
"http.method": "GET"
74+
},
75+
"body": {
76+
"text": "Hello, world!"
77+
}
78+
}
79+
```
80+
81+
If a non-OpenTelemetry-compliant document is detected, the processor normalizes it. For example, the following document:
82+
83+
```json
84+
{
85+
"@timestamp": "2023-10-01T12:00:00Z",
86+
"service": {
87+
"name": "my-service",
88+
"version": "1.0.0",
89+
"environment": "production",
90+
"language": {
91+
"name": "python",
92+
"version": "3.8"
93+
}
94+
},
95+
"log": {
96+
"level": "INFO"
97+
},
98+
"message": "Hello, world!",
99+
"http": {
100+
"method": "GET",
101+
"url": {
102+
"path": "/api/v1/resource"
103+
},
104+
"headers": [
105+
{
106+
"name": "Authorization",
107+
"value": "Bearer token"
108+
},
109+
{
110+
"name": "User-Agent",
111+
"value": "my-client/1.0"
112+
}
113+
]
114+
},
115+
"span" : {
116+
"id": "1234567890abcdef"
117+
},
118+
"span.id": "abcdef1234567890",
119+
"trace.id": "abcdef1234567890abcdef1234567890"
120+
}
121+
```
122+
123+
will be normalized into the following form:
124+
125+
```json
126+
{
127+
"@timestamp": "2023-10-01T12:00:00Z",
128+
"resource": {
129+
"attributes": {
130+
"service.name": "my-service",
131+
"service.version": "1.0.0",
132+
"service.environment": "production"
133+
}
134+
},
135+
"attributes": {
136+
"service.language.name": "python",
137+
"service.language.version": "3.8",
138+
"http.method": "GET",
139+
"http.url.path": "/api/v1/resource",
140+
"http.headers": [
141+
{
142+
"name": "Authorization",
143+
"value": "Bearer token"
144+
},
145+
{
146+
"name": "User-Agent",
147+
"value": "my-client/1.0"
148+
}
149+
]
150+
},
151+
"body": {
152+
"text": "Hello, world!"
153+
},
154+
"span_id": "1234567890abcdef",
155+
"trace_id": "abcdef1234567890abcdef1234567890"
156+
}
157+
```

modules/ingest-otel/build.gradle

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
apply plugin: 'elasticsearch.internal-yaml-rest-test'
11+
12+
esplugin {
13+
description = 'Ingest processor that normalizes ECS documents to OpenTelemetry-compatible namespaces'
14+
classname ='org.elasticsearch.ingest.otel.NormalizeForStreamPlugin'
15+
}
16+
17+
restResources {
18+
restApi {
19+
include '_common', 'indices', 'index', 'cluster', 'nodes', 'get', 'ingest'
20+
}
21+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
module org.elasticsearch.ingest.otel {
11+
requires org.elasticsearch.base;
12+
requires org.elasticsearch.server;
13+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest.otel;
11+
12+
import java.util.Set;
13+
14+
final class EcsOTelResourceAttributes {
15+
16+
/**
17+
* The set of ECS (Elastic Common Schema) field names that are mapped to OpenTelemetry resource attributes,
18+
* as defined by the OpenTelemetry Semantic Conventions.
19+
* The list is produced by the {@code ResourceAttributesTests#testAttributesSetUpToDate} test.
20+
*
21+
* @see <a href="https://github.com/open-telemetry/semantic-conventions">OpenTelemetry Semantic Conventions</a>
22+
*/
23+
static final Set<String> LATEST = Set.of(
24+
"agent.type",
25+
"agent.build.original",
26+
"agent.name",
27+
"agent.id",
28+
"agent.ephemeral_id",
29+
"agent.version",
30+
"container.image.tag",
31+
"device.model.identifier",
32+
"container.image.hash.all",
33+
"service.node.name",
34+
"process.pid",
35+
"device.id",
36+
"host.mac",
37+
"host.type",
38+
"container.id",
39+
"cloud.availability_zone",
40+
"host.ip",
41+
"container.name",
42+
"container.image.name",
43+
"device.model.name",
44+
"host.name",
45+
"host.id",
46+
"process.executable",
47+
"user_agent.original",
48+
"service.environment",
49+
"cloud.region",
50+
"service.name",
51+
"faas.name",
52+
"device.manufacturer",
53+
"process.args",
54+
"host.architecture",
55+
"cloud.provider",
56+
"container.runtime",
57+
"service.version",
58+
"cloud.service.name",
59+
"cloud.account.id",
60+
"process.command_line",
61+
"faas.version"
62+
);
63+
64+
private EcsOTelResourceAttributes() {}
65+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest.otel;
11+
12+
import org.elasticsearch.ingest.Processor;
13+
import org.elasticsearch.plugins.IngestPlugin;
14+
import org.elasticsearch.plugins.Plugin;
15+
16+
import java.util.Map;
17+
18+
public class NormalizeForStreamPlugin extends Plugin implements IngestPlugin {
19+
20+
@Override
21+
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
22+
return Map.of(NormalizeForStreamProcessor.TYPE, new NormalizeForStreamProcessor.Factory());
23+
}
24+
}

0 commit comments

Comments
 (0)