Skip to content

Commit c15e845

Browse files
[elasticsearchexporter] Support 'encoding.format' scope attribute for dataset routing (#42844)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Adds an additional way to set dataset based on scope attributes set by econding extensions. The main use case we are supporting with this right now is awslogsencodingextension which sets `encoding.format` like `aws.cloudtrail`, `aws.vpcflow`, `aws.elbaccess`. See #42899 for more detail. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes <!--Describe what testing was performed and which tests were added.--> #### Testing Unit test added. <!--Describe the documentation added.--> #### Documentation README updated. <!--Please delete paragraphs that you did not use before submitting.--> --------- Co-authored-by: Carson Ip <[email protected]>
1 parent 1af8d44 commit c15e845

File tree

4 files changed

+240
-79
lines changed

4 files changed

+240
-79
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: exporter/elasticsearch
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Support experimental 'encoding.format' scope attribute for dataset routing.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [42844]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]
28+

exporter/elasticsearchexporter/README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,11 @@ where `data_stream.type` is `logs` for log records, `metrics` for data points, a
111111
In a special case with `mapping::mode: bodymap`, `data_stream.type` field (valid values: `logs`, `metrics`) can be dynamically set from attributes.
112112
The resulting documents will contain the corresponding `data_stream.*` fields, see restrictions applied to [Data Stream Fields](https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html).
113113
1. `data_stream.dataset` or `data_stream.namespace` in attributes (precedence: log record / data point / span attribute > scope attribute > resource attribute)
114-
2. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1
115-
3. Otherwise, `data_stream.dataset` falls back to `generic` and `data_stream.namespace` falls back to `default`.
114+
2. Otherwise, if a scope attribute with the name `encoding.format` exists and contains a string value, `data_stream.dataset` will be set to this value.
115+
116+
Note that whilst enabled by default, this behaviour is considered experimental. Some encoding extensions set this field (e.g. [awslogsencodingextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/encoding/awslogsencodingextension)), but it is not yet part of Semantic Conventions. There is the potential that the name of this routing field evolves as the [discussion progresses in SemConv](https://github.com/open-telemetry/semantic-conventions/issues/2854).
117+
3. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1
118+
4. Otherwise, `data_stream.dataset` falls back to `generic` and `data_stream.namespace` falls back to `default`.
116119

117120
[^3]: See additional handling in [Document routing exceptions for OTel data mode](#document-routing-exceptions-for-otel-data-mode)
118121

exporter/elasticsearchexporter/data_stream_router.go

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ var selfTelemetryScopeNames = map[string]bool{
2626
}
2727

2828
const (
29-
maxDataStreamBytes = 100
30-
disallowedNamespaceRunes = "\\/*?\"<>| ,#:"
31-
disallowedDatasetRunes = "-\\/*?\"<>| ,#:"
29+
maxDataStreamBytes = 100
30+
disallowedNamespaceRunes = "\\/*?\"<>| ,#:"
31+
disallowedDatasetRunes = "-\\/*?\"<>| ,#:"
32+
encodingFormatAttributeName = "encoding.format"
3233
)
3334

3435
// Sanitize the datastream fields (dataset, namespace) to apply restrictions
@@ -166,7 +167,7 @@ func routeRecord(
166167
// Order:
167168
// 1. elasticsearch.index from attributes
168169
// 2. read data_stream.* from attributes
169-
// 3. receiver-based routing
170+
// 3. scope-based routing
170171
// 4. use default hardcoded data_stream.*
171172
if esIndex, esIndexExists := getFromAttributes(elasticsearch.IndexAttributeName, "", recordAttr, scopeAttr, resourceAttr); esIndexExists {
172173
// Advanced users can route documents by setting IndexAttributeName in a processor earlier in the pipeline.
@@ -186,20 +187,10 @@ func routeRecord(
186187
}
187188
}
188189

189-
// Only use receiver-based routing if dataset is not specified.
190+
// Only use scope-based routing if dataset is not specified.
190191
if !datasetExists {
191-
if selfTelemetryScopeNames[scope.Name()] {
192-
// For collector self-telemetry, use a fixed dataset name
193-
dataset = collectorSelfTelemetryDataStreamDataset
194-
} else {
195-
// Receiver-based routing
196-
// For example, hostmetricsreceiver (or hostmetricsreceiver.otel in the OTel output mode)
197-
// for the scope name
198-
// github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper
199-
loc := receiverRegex.FindStringSubmatchIndex(scope.Name())
200-
if len(loc) == 4 {
201-
dataset = scope.Name()[loc[2]:loc[3]]
202-
}
192+
if ds, ok := applyScopeRouting(scope); ok {
193+
dataset = ds
203194
}
204195
}
205196

@@ -214,3 +205,37 @@ func routeRecord(
214205
namespace = sanitizeDataStreamField(namespace, disallowedNamespaceRunes, "")
215206
return elasticsearch.NewDataStreamIndex(dsType, dataset, namespace), nil
216207
}
208+
209+
func applyScopeRouting(scope pcommon.InstrumentationScope) (string, bool) {
210+
// Priority:
211+
// 1. self-telemetry
212+
// 2. encoding-based routing
213+
// 3. receiver-based routing
214+
215+
// For collector self-telemetry, use a fixed dataset name
216+
if selfTelemetryScopeNames[scope.Name()] {
217+
return collectorSelfTelemetryDataStreamDataset, true
218+
}
219+
220+
// Encoding-based routing
221+
// Encoding extensions may set the `encoding.format` scope attribute according to log types.
222+
// For example, awslogsencodingextension sets `aws.elbaccess`, `aws.vpcflow`, etc.
223+
if format, ok := scope.Attributes().Get(encodingFormatAttributeName); ok {
224+
if format.Type() == pcommon.ValueTypeStr {
225+
if stringVal := format.Str(); stringVal != "" {
226+
return stringVal, true
227+
}
228+
}
229+
}
230+
231+
// Receiver-based routing
232+
// For example, hostmetricsreceiver (or hostmetricsreceiver.otel in the OTel output mode)
233+
// for the scope name
234+
// github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper
235+
loc := receiverRegex.FindStringSubmatchIndex(scope.Name())
236+
if len(loc) == 4 {
237+
return scope.Name()[loc[2]:loc[3]], true
238+
}
239+
240+
return "", false
241+
}

exporter/elasticsearchexporter/data_stream_router_test.go

Lines changed: 165 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type routeTestCase struct {
1717
name string
1818
mode MappingMode
1919
scopeName string
20+
scopeAttrs map[string]any
2021
recordAttrs map[string]any
2122
want elasticsearch.Index
2223
}
@@ -40,66 +41,6 @@ func createRouteTests(dsType string) []routeTestCase {
4041
mode: MappingOTel,
4142
want: renderWantRoute(dsType, defaultDataStreamDataset, defaultDataStreamNamespace, MappingOTel),
4243
},
43-
{
44-
name: "default with receiver scope name",
45-
mode: MappingNone,
46-
scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper",
47-
want: renderWantRoute(dsType, "hostmetricsreceiver", defaultDataStreamNamespace, MappingNone),
48-
},
49-
{
50-
name: "otel with receiver scope name",
51-
mode: MappingOTel,
52-
scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper",
53-
want: renderWantRoute(dsType, "hostmetricsreceiver", defaultDataStreamNamespace, MappingOTel),
54-
},
55-
{
56-
name: "default with non-receiver scope name",
57-
mode: MappingNone,
58-
scopeName: "some_other_scope_name",
59-
want: renderWantRoute(dsType, defaultDataStreamDataset, defaultDataStreamNamespace, MappingNone),
60-
},
61-
{
62-
name: "otel with non-receiver scope name",
63-
mode: MappingOTel,
64-
scopeName: "some_other_scope_name",
65-
want: renderWantRoute(dsType, defaultDataStreamDataset, defaultDataStreamNamespace, MappingOTel),
66-
},
67-
{
68-
name: "receiver without a receiver name",
69-
mode: MappingOTel,
70-
scopeName: "some.scope.name/receiver/receiver/should/be/ignored",
71-
want: renderWantRoute(dsType, defaultDataStreamDataset, defaultDataStreamNamespace, MappingOTel),
72-
},
73-
{
74-
name: "otel collector self-telemetry for receivers",
75-
mode: MappingOTel,
76-
scopeName: "go.opentelemetry.io/collector/receiver/receiverhelper",
77-
want: renderWantRoute(dsType, collectorSelfTelemetryDataStreamDataset, defaultDataStreamNamespace, MappingOTel),
78-
},
79-
{
80-
name: "otel collector self-telemetry for scrapers",
81-
mode: MappingOTel,
82-
scopeName: "go.opentelemetry.io/collector/scraper/scraperhelper",
83-
want: renderWantRoute(dsType, collectorSelfTelemetryDataStreamDataset, defaultDataStreamNamespace, MappingOTel),
84-
},
85-
{
86-
name: "otel collector self-telemetry for processors",
87-
mode: MappingOTel,
88-
scopeName: "go.opentelemetry.io/collector/processor/processorhelper",
89-
want: renderWantRoute(dsType, collectorSelfTelemetryDataStreamDataset, defaultDataStreamNamespace, MappingOTel),
90-
},
91-
{
92-
name: "otel collector self-telemetry for exporters",
93-
mode: MappingOTel,
94-
scopeName: "go.opentelemetry.io/collector/exporter/exporterhelper",
95-
want: renderWantRoute(dsType, collectorSelfTelemetryDataStreamDataset, defaultDataStreamNamespace, MappingOTel),
96-
},
97-
{
98-
name: "otel collector self-telemetry for service",
99-
mode: MappingOTel,
100-
scopeName: "go.opentelemetry.io/collector/service",
101-
want: renderWantRoute(dsType, collectorSelfTelemetryDataStreamDataset, defaultDataStreamNamespace, MappingOTel),
102-
},
10344
{
10445
name: "otel with elasticsearch.index",
10546
mode: MappingOTel,
@@ -121,6 +62,27 @@ func createRouteTests(dsType string) []routeTestCase {
12162
},
12263
want: renderWantRoute(dsType, "foo", "bar", MappingOTel),
12364
},
65+
{
66+
name: "default with scope-based routing for self-telemetry (sanity)",
67+
mode: MappingNone,
68+
scopeName: "go.opentelemetry.io/collector/receiver/receiverhelper",
69+
want: renderWantRoute(dsType, collectorSelfTelemetryDataStreamDataset, defaultDataStreamNamespace, MappingNone),
70+
},
71+
{
72+
name: "otel with scope-based routing for encoding (sanity)",
73+
mode: MappingOTel,
74+
scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension",
75+
scopeAttrs: map[string]any{
76+
"encoding.format": "aws.cloudtrail",
77+
},
78+
want: renderWantRoute(dsType, "aws.cloudtrail", defaultDataStreamNamespace, MappingOTel),
79+
},
80+
{
81+
name: "otel with scope-based routing for receivers (sanity)",
82+
mode: MappingOTel,
83+
scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper",
84+
want: renderWantRoute(dsType, "hostmetricsreceiver", defaultDataStreamNamespace, MappingOTel),
85+
},
12486
}
12587
}
12688

@@ -132,6 +94,7 @@ func TestRouteLogRecord(t *testing.T) {
13294
router := dynamicDocumentRouter{mode: tc.mode}
13395
scope := pcommon.NewInstrumentationScope()
13496
scope.SetName(tc.scopeName)
97+
fillAttributeMap(scope.Attributes(), tc.scopeAttrs)
13598

13699
recordAttrMap := pcommon.NewMap()
137100
fillAttributeMap(recordAttrMap, tc.recordAttrs)
@@ -179,6 +142,7 @@ func TestRouteDataPoint(t *testing.T) {
179142
router := dynamicDocumentRouter{mode: tc.mode}
180143
scope := pcommon.NewInstrumentationScope()
181144
scope.SetName(tc.scopeName)
145+
fillAttributeMap(scope.Attributes(), tc.scopeAttrs)
182146

183147
recordAttrMap := pcommon.NewMap()
184148
fillAttributeMap(recordAttrMap, tc.recordAttrs)
@@ -198,6 +162,7 @@ func TestRouteSpan(t *testing.T) {
198162
router := dynamicDocumentRouter{mode: tc.mode}
199163
scope := pcommon.NewInstrumentationScope()
200164
scope.SetName(tc.scopeName)
165+
fillAttributeMap(scope.Attributes(), tc.scopeAttrs)
201166

202167
recordAttrMap := pcommon.NewMap()
203168
fillAttributeMap(recordAttrMap, tc.recordAttrs)
@@ -208,3 +173,143 @@ func TestRouteSpan(t *testing.T) {
208173
})
209174
}
210175
}
176+
177+
func TestApplyRouting(t *testing.T) {
178+
tests := []struct {
179+
name string
180+
scopeName string
181+
scopeAttrs map[string]any
182+
wantDataset string
183+
wantFound bool
184+
}{
185+
{
186+
name: "no routing applied for default scope",
187+
scopeName: "",
188+
wantDataset: "",
189+
wantFound: false,
190+
},
191+
{
192+
name: "no routing applied for non-receiver scope name",
193+
scopeName: "some_other_scope_name",
194+
wantDataset: "",
195+
wantFound: false,
196+
},
197+
{
198+
name: "receiver-based routing with hostmetricsreceiver",
199+
scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper",
200+
wantDataset: "hostmetricsreceiver",
201+
wantFound: true,
202+
},
203+
{
204+
name: "receiver without a receiver name",
205+
scopeName: "some.scope.name/receiver/receiver/should/be/ignored",
206+
wantDataset: "",
207+
wantFound: false,
208+
},
209+
{
210+
name: "otel collector self-telemetry for receivers",
211+
scopeName: "go.opentelemetry.io/collector/receiver/receiverhelper",
212+
wantDataset: collectorSelfTelemetryDataStreamDataset,
213+
wantFound: true,
214+
},
215+
{
216+
name: "otel collector self-telemetry for scrapers",
217+
scopeName: "go.opentelemetry.io/collector/scraper/scraperhelper",
218+
wantDataset: collectorSelfTelemetryDataStreamDataset,
219+
wantFound: true,
220+
},
221+
{
222+
name: "otel collector self-telemetry for processors",
223+
scopeName: "go.opentelemetry.io/collector/processor/processorhelper",
224+
wantDataset: collectorSelfTelemetryDataStreamDataset,
225+
wantFound: true,
226+
},
227+
{
228+
name: "otel collector self-telemetry for exporters",
229+
scopeName: "go.opentelemetry.io/collector/exporter/exporterhelper",
230+
wantDataset: collectorSelfTelemetryDataStreamDataset,
231+
wantFound: true,
232+
},
233+
{
234+
name: "otel collector self-telemetry for service",
235+
scopeName: "go.opentelemetry.io/collector/service",
236+
wantDataset: collectorSelfTelemetryDataStreamDataset,
237+
wantFound: true,
238+
},
239+
{
240+
name: "encoding-based routing with aws.cloudtrail",
241+
scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension",
242+
scopeAttrs: map[string]any{
243+
"encoding.format": "aws.cloudtrail",
244+
},
245+
wantDataset: "aws.cloudtrail",
246+
wantFound: true,
247+
},
248+
{
249+
name: "encoding-based routing with aws.vpcflow",
250+
scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension",
251+
scopeAttrs: map[string]any{
252+
"encoding.format": "aws.vpcflow",
253+
},
254+
wantDataset: "aws.vpcflow",
255+
wantFound: true,
256+
},
257+
{
258+
name: "encoding-based routing takes precedence over receiver-based routing",
259+
scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper",
260+
scopeAttrs: map[string]any{
261+
"encoding.format": "aws.vpcflow",
262+
},
263+
wantDataset: "aws.vpcflow",
264+
wantFound: true,
265+
},
266+
{
267+
name: "self-telemetry takes precedence over encoding-based routing",
268+
scopeName: "go.opentelemetry.io/collector/receiver/receiverhelper",
269+
scopeAttrs: map[string]any{
270+
"encoding.format": "aws.cloudtrail",
271+
},
272+
wantDataset: collectorSelfTelemetryDataStreamDataset,
273+
wantFound: true,
274+
},
275+
{
276+
name: "encoding format that is wrong type is ignored",
277+
scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension",
278+
scopeAttrs: map[string]any{
279+
"encoding.format": true,
280+
},
281+
wantDataset: "",
282+
wantFound: false,
283+
},
284+
{
285+
name: "non-encoding scope attributes are ignored",
286+
scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension",
287+
scopeAttrs: map[string]any{
288+
"some_other_attr": "should_be_ignored",
289+
},
290+
wantDataset: "",
291+
wantFound: false,
292+
},
293+
{
294+
name: "empty encoding.format scope attribute is ignored",
295+
scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension",
296+
scopeAttrs: map[string]any{
297+
"encoding.format": "",
298+
},
299+
wantDataset: "",
300+
wantFound: false,
301+
},
302+
}
303+
304+
for _, tc := range tests {
305+
t.Run(tc.name, func(t *testing.T) {
306+
scope := pcommon.NewInstrumentationScope()
307+
scope.SetName(tc.scopeName)
308+
fillAttributeMap(scope.Attributes(), tc.scopeAttrs)
309+
310+
dataset, found := applyScopeRouting(scope)
311+
assert.Equal(t, tc.wantDataset, dataset)
312+
assert.Equal(t, tc.wantFound, found)
313+
})
314+
}
315+
}

0 commit comments

Comments
 (0)