Skip to content

Commit 2a44381

Browse files
committed
feat: auto-inject AWS account ID into telemetry from Lambda extension registration
Automatically extract the AWS account ID from the Lambda Extensions API registration response and inject it as the 'cloud.account.id' attribute into all telemetry (traces, logs, metrics) via a confmap converter. Changes: - Add AccountId field to RegisterResponse struct - Request Lambda-Extension-Accept-Feature header with 'accountId' value - Create accountidprocessor converter to inject cloud.account.id attribute - Update lambdacomponents.Components to accept accountID and return converters - Update collector.NewCollector to accept and register custom converters - Update manager to pass account ID through the initialization flow Includes: - Comprehensive tests for JSON unmarshaling with leading zero preservation - Tests for converter behavior across different pipeline configurations - Tests for account ID handling in the extension API client Benefits: - Account ID automatically available in all telemetry without configuration - No environment variables needed, uses AWS Lambda API response - Follows OpenTelemetry Collector converter pattern - Static injection at startup, no runtime overhead
1 parent 93e779f commit 2a44381

File tree

7 files changed

+552
-13
lines changed

7 files changed

+552
-13
lines changed

collector/internal/collector/collector.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,17 +69,22 @@ func getConfig(logger *zap.Logger) string {
6969
return defaultVal
7070
}
7171

72-
func NewCollector(logger *zap.Logger, factories otelcol.Factories, version string) *Collector {
72+
func NewCollector(logger *zap.Logger, factories otelcol.Factories, version string, customConverters []confmap.ConverterFactory) *Collector {
7373
l := logger.Named("NewCollector")
74+
75+
// Combine built-in converters with custom converters
76+
converters := []confmap.ConverterFactory{
77+
confmap.NewConverterFactory(func(set confmap.ConverterSettings) confmap.Converter {
78+
return disablequeuedretryconverter.New()
79+
}),
80+
}
81+
converters = append(converters, customConverters...)
82+
7483
cfgSet := otelcol.ConfigProviderSettings{
7584
ResolverSettings: confmap.ResolverSettings{
7685
URIs: []string{getConfig(l)},
7786
ProviderFactories: []confmap.ProviderFactory{fileprovider.NewFactory(), envprovider.NewFactory(), yamlprovider.NewFactory(), httpsprovider.NewFactory(), httpprovider.NewFactory(), s3provider.NewFactory(), secretsmanagerprovider.NewFactory()},
78-
ConverterFactories: []confmap.ConverterFactory{
79-
confmap.NewConverterFactory(func(set confmap.ConverterSettings) confmap.Converter {
80-
return disablequeuedretryconverter.New()
81-
}),
82-
},
87+
ConverterFactories: converters,
8388
},
8489
}
8590

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// The accountidprocessor implements the Converter for mutating Collector
16+
// configurations to automatically inject the cloud.account.id attribute
17+
// via a resource processor into all pipelines.
18+
package accountidprocessor
19+
20+
import (
21+
"context"
22+
"fmt"
23+
24+
"go.opentelemetry.io/collector/confmap"
25+
)
26+
27+
const (
28+
serviceKey = "service"
29+
pipelinesKey = "pipelines"
30+
processorsKey = "processors"
31+
resourceProc = "resource/aws-account-id"
32+
accountIDAttrKey = "cloud.account.id"
33+
)
34+
35+
type converter struct {
36+
accountID string
37+
}
38+
39+
// New returns a confmap.Converter that injects cloud.account.id into all pipelines
40+
func New(accountID string) confmap.Converter {
41+
return &converter{accountID: accountID}
42+
}
43+
44+
func (c converter) Convert(_ context.Context, conf *confmap.Conf) error {
45+
if c.accountID == "" {
46+
return nil // Skip if no account ID
47+
}
48+
49+
// Navigate to service.pipelines
50+
serviceVal := conf.Get(serviceKey)
51+
service, ok := serviceVal.(map[string]interface{})
52+
if !ok {
53+
return nil
54+
}
55+
56+
pipelinesVal, ok := service[pipelinesKey]
57+
if !ok {
58+
return nil
59+
}
60+
61+
pipelines, ok := pipelinesVal.(map[string]interface{})
62+
if !ok {
63+
return nil
64+
}
65+
66+
updates := make(map[string]interface{})
67+
68+
// For each pipeline, add resource processor to beginning
69+
for telemetryType, pipelineVal := range pipelines {
70+
pipeline, ok := pipelineVal.(map[string]interface{})
71+
if !ok {
72+
continue
73+
}
74+
75+
processorsVal, _ := pipeline[processorsKey]
76+
processors, ok := processorsVal.([]interface{})
77+
if !ok {
78+
processors = []interface{}{}
79+
}
80+
81+
// Prepend resource/aws-account-id processor
82+
processors = append([]interface{}{resourceProc}, processors...)
83+
updates[fmt.Sprintf("%s::%s::%s::%s", serviceKey, pipelinesKey, telemetryType, processorsKey)] = processors
84+
}
85+
86+
// Configure the resource processor with cloud.account.id attribute
87+
updates[fmt.Sprintf("processors::%s::attributes", resourceProc)] = []map[string]interface{}{
88+
{
89+
"key": accountIDAttrKey,
90+
"value": c.accountID,
91+
"action": "insert",
92+
},
93+
}
94+
95+
// Apply all updates
96+
if len(updates) > 0 {
97+
if err := conf.Merge(confmap.NewFromStringMap(updates)); err != nil {
98+
return err
99+
}
100+
}
101+
102+
return nil
103+
}

0 commit comments

Comments
 (0)