Skip to content

Commit a64f3d3

Browse files
authored
Feature: Generate function and extension logs via Telemetry API receiver (open-telemetry#1347)
* Added WithLogs and its handling * nits * Added extensions * Fixed unit tests * Added unit test cases * Added config (open-telemetry#26) * Added severityTextToNumber function * Corrected README.md * Handled empty types array * Added CRITICAL & ALL * Removed invalid test case * Fixed code after rebase * Updated README.md * Used time.RFC3339 format * Applied review comments * Added WARNING, Updated test cases, Added String.ToUpper
1 parent aeb3e92 commit a64f3d3

File tree

14 files changed

+798
-80
lines changed

14 files changed

+798
-80
lines changed

collector/internal/lifecycle/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex
7575
}
7676

7777
telemetryClient := telemetryapi.NewClient(logger)
78-
_, err = telemetryClient.Subscribe(ctx, res.ExtensionID, addr)
78+
_, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr)
7979
if err != nil {
8080
logger.Fatal("Cannot register Telemetry API client", zap.Error(err))
8181
}

collector/internal/telemetryapi/client.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,7 @@ func NewClient(logger *zap.Logger) *Client {
4949
}
5050
}
5151

52-
func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI string) (string, error) {
53-
eventTypes := []EventType{
54-
Platform,
55-
// Function,
56-
// Extension,
57-
}
58-
52+
func (c *Client) Subscribe(ctx context.Context, eventTypes []EventType, extensionID string, listenerURI string) (string, error) {
5953
bufferingConfig := BufferingCfg{
6054
MaxItems: 1000,
6155
MaxBytes: 256 * 1024,

collector/receiver/telemetryapireceiver/README.md

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
# Telemetry API Receiver
22

3-
| Status | |
4-
| ------------------------ |-----------------|
5-
| Stability | [alpha] |
6-
| Supported pipeline types | traces |
7-
| Distributions | [extension] |
3+
| Status | |
4+
| ------------------------ |--------------|
5+
| Stability | [alpha] |
6+
| Supported pipeline types | traces, logs |
7+
| Distributions | [extension] |
88

99
This receiver generates telemetry in response to events from the [Telemetry API](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html). It does this by setting up an endpoint and registering itself with the Telemetry API on startup.
1010

@@ -15,11 +15,25 @@ Supported events:
1515

1616
## Configuration
1717

18-
There are currently no configuration parameters available for this receiver. It can be enabled via the following configuration:
18+
| Field | Default | Description |
19+
|---------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------|
20+
| `port` | 4325 | HTTP server port to receive Telemetry API data. |
21+
| `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to |
22+
1923

2024
```yaml
2125
receivers:
2226
telemetryapi:
27+
telemetryapi/1:
28+
port: 4326
29+
telemetryapi/2:
30+
port: 4327
31+
types:
32+
- platform
33+
- function
34+
telemetryapi/3:
35+
port: 4328
36+
types: ["platform", "function"]
2337
```
2438
2539
[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha

collector/receiver/telemetryapireceiver/config.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,23 @@
1414

1515
package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver"
1616

17+
import (
18+
"fmt"
19+
)
20+
1721
// Config defines the configuration for the various elements of the receiver agent.
1822
type Config struct {
1923
extensionID string
24+
Port int `mapstructure:"port"`
25+
Types []string `mapstructure:"types"`
2026
}
2127

2228
// Validate validates the configuration by checking for missing or invalid fields
2329
func (cfg *Config) Validate() error {
30+
for _, t := range cfg.Types {
31+
if t != platform && t != function && t != extension {
32+
return fmt.Errorf("unknown extension type: %s", t)
33+
}
34+
}
2435
return nil
2536
}

collector/receiver/telemetryapireceiver/config_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,107 @@
1515
package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver"
1616

1717
import (
18+
"fmt"
19+
"path/filepath"
1820
"testing"
1921

2022
"github.com/stretchr/testify/require"
23+
"go.opentelemetry.io/collector/component"
24+
"go.opentelemetry.io/collector/confmap/confmaptest"
2125
)
2226

27+
func TestLoadConfig(t *testing.T) {
28+
t.Parallel()
29+
30+
// Helper function to create expected Config
31+
createExpectedConfig := func(types []string) *Config {
32+
return &Config{
33+
extensionID: "extensionID",
34+
Port: 12345,
35+
Types: types,
36+
}
37+
}
38+
39+
tests := []struct {
40+
name string
41+
id component.ID
42+
expected component.Config
43+
}{
44+
{
45+
name: "default",
46+
id: component.NewID(component.MustNewType("telemetryapi")),
47+
expected: NewFactory("extensionID").CreateDefaultConfig(),
48+
},
49+
{
50+
name: "all types",
51+
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "1"),
52+
expected: createExpectedConfig([]string{platform, function, extension}),
53+
},
54+
{
55+
name: "platform only",
56+
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "2"),
57+
expected: createExpectedConfig([]string{platform}),
58+
},
59+
{
60+
name: "function only",
61+
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "3"),
62+
expected: createExpectedConfig([]string{function}),
63+
},
64+
{
65+
name: "extension only",
66+
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "4"),
67+
expected: createExpectedConfig([]string{extension}),
68+
},
69+
{
70+
name: "platform and function",
71+
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "5"),
72+
expected: createExpectedConfig([]string{platform, function}),
73+
},
74+
{
75+
name: "platform and extension",
76+
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "6"),
77+
expected: createExpectedConfig([]string{platform, extension}),
78+
},
79+
{
80+
name: "function and extension",
81+
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "7"),
82+
expected: createExpectedConfig([]string{function, extension}),
83+
},
84+
{
85+
name: "empty types",
86+
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "8"),
87+
expected: createExpectedConfig([]string{}),
88+
},
89+
{
90+
name: "function and extension (alternative syntax)",
91+
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "9"),
92+
expected: createExpectedConfig([]string{function, extension}),
93+
},
94+
{
95+
name: "function and extension (another syntax)",
96+
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "10"),
97+
expected: createExpectedConfig([]string{function, extension}),
98+
},
99+
}
100+
101+
for _, tt := range tests {
102+
t.Run(tt.name, func(t *testing.T) {
103+
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
104+
require.NoError(t, err)
105+
106+
factory := NewFactory("extensionID")
107+
cfg := factory.CreateDefaultConfig()
108+
109+
sub, err := cm.Sub(tt.id.String())
110+
require.NoError(t, err)
111+
require.NoError(t, sub.Unmarshal(cfg))
112+
require.NoError(t, component.ValidateConfig(cfg))
113+
114+
require.Equal(t, tt.expected, cfg)
115+
})
116+
}
117+
}
118+
23119
func TestValidate(t *testing.T) {
24120
testCases := []struct {
25121
desc string
@@ -31,6 +127,13 @@ func TestValidate(t *testing.T) {
31127
cfg: &Config{},
32128
expectedErr: nil,
33129
},
130+
{
131+
desc: "invalid config",
132+
cfg: &Config{
133+
Types: []string{"invalid"},
134+
},
135+
expectedErr: fmt.Errorf("unknown extension type: invalid"),
136+
},
34137
}
35138

36139
for _, tc := range testCases {

collector/receiver/telemetryapireceiver/factory.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,19 @@ import (
1818
"context"
1919
"errors"
2020

21+
"github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent"
2122
"go.opentelemetry.io/collector/component"
2223
"go.opentelemetry.io/collector/consumer"
2324
"go.opentelemetry.io/collector/receiver"
2425
)
2526

2627
const (
27-
typeStr = "telemetryapi"
28-
stability = component.StabilityLevelDevelopment
28+
typeStr = "telemetryapi"
29+
stability = component.StabilityLevelDevelopment
30+
defaultPort = 4325
31+
platform = "platform"
32+
function = "function"
33+
extension = "extension"
2934
)
3035

3136
var errConfigNotTelemetryAPI = errors.New("config was not a Telemetry API receiver config")
@@ -37,16 +42,38 @@ func NewFactory(extensionID string) receiver.Factory {
3742
func() component.Config {
3843
return &Config{
3944
extensionID: extensionID,
45+
Port: defaultPort,
46+
Types: []string{platform, function, extension},
4047
}
4148
},
42-
receiver.WithTraces(createTracesReceiver, stability))
49+
receiver.WithTraces(createTracesReceiver, stability),
50+
receiver.WithLogs(createLogsReceiver, stability))
4351
}
4452

4553
func createTracesReceiver(ctx context.Context, params receiver.Settings, rConf component.Config, next consumer.Traces) (receiver.Traces, error) {
4654
cfg, ok := rConf.(*Config)
4755
if !ok {
4856
return nil, errConfigNotTelemetryAPI
4957
}
58+
r := receivers.GetOrAdd(cfg, func() component.Component {
59+
t, _ := newTelemetryAPIReceiver(cfg, params)
60+
return t
61+
})
62+
r.Unwrap().(*telemetryAPIReceiver).registerTracesConsumer(next)
63+
return r, nil
64+
}
5065

51-
return newTelemetryAPIReceiver(cfg, next, params)
66+
func createLogsReceiver(ctx context.Context, params receiver.Settings, rConf component.Config, next consumer.Logs) (receiver.Logs, error) {
67+
cfg, ok := rConf.(*Config)
68+
if !ok {
69+
return nil, errConfigNotTelemetryAPI
70+
}
71+
r := receivers.GetOrAdd(cfg, func() component.Component {
72+
t, _ := newTelemetryAPIReceiver(cfg, params)
73+
return t
74+
})
75+
r.Unwrap().(*telemetryAPIReceiver).registerLogsConsumer(next)
76+
return r, nil
5277
}
78+
79+
var receivers = sharedcomponent.NewSharedComponents()

collector/receiver/telemetryapireceiver/factory_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestNewFactory(t *testing.T) {
4141
testFunc: func(t *testing.T) {
4242
factory := NewFactory("test")
4343

44-
var expectedCfg component.Config = &Config{extensionID: "test"}
44+
var expectedCfg component.Config = &Config{extensionID: "test", Port: defaultPort, Types: []string{platform, function, extension}}
4545

4646
require.Equal(t, expectedCfg, factory.CreateDefaultConfig())
4747
},

collector/receiver/telemetryapireceiver/go.mod

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ require (
1111
github.com/open-telemetry/opentelemetry-lambda/collector v0.98.0
1212
github.com/stretchr/testify v1.9.0
1313
go.opentelemetry.io/collector/component v0.107.0
14-
go.opentelemetry.io/collector/consumer v0.106.1
15-
go.opentelemetry.io/collector/consumer/consumertest v0.106.1
14+
go.opentelemetry.io/collector/confmap v0.107.0
15+
go.opentelemetry.io/collector/consumer v0.107.0
16+
go.opentelemetry.io/collector/consumer/consumertest v0.107.0
1617
go.opentelemetry.io/collector/pdata v1.13.0
17-
go.opentelemetry.io/collector/receiver v0.106.1
18-
go.opentelemetry.io/collector/semconv v0.106.1
18+
go.opentelemetry.io/collector/receiver v0.107.0
19+
go.opentelemetry.io/collector/semconv v0.107.0
1920
go.uber.org/zap v1.27.0
2021
)
2122

@@ -25,9 +26,16 @@ require (
2526
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
2627
github.com/go-logr/logr v1.4.2 // indirect
2728
github.com/go-logr/stdr v1.2.2 // indirect
29+
github.com/go-viper/mapstructure/v2 v2.0.0 // indirect
2830
github.com/gogo/protobuf v1.3.2 // indirect
2931
github.com/google/uuid v1.6.0 // indirect
32+
github.com/hashicorp/go-version v1.7.0 // indirect
3033
github.com/json-iterator/go v1.1.12 // indirect
34+
github.com/knadh/koanf/maps v0.1.1 // indirect
35+
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
36+
github.com/knadh/koanf/v2 v2.1.1 // indirect
37+
github.com/mitchellh/copystructure v1.2.0 // indirect
38+
github.com/mitchellh/reflectwalk v1.0.2 // indirect
3139
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
3240
github.com/modern-go/reflect2 v1.0.2 // indirect
3341
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
@@ -37,18 +45,20 @@ require (
3745
github.com/prometheus/common v0.55.0 // indirect
3846
github.com/prometheus/procfs v0.15.1 // indirect
3947
go.opentelemetry.io/collector/config/configtelemetry v0.107.0 // indirect
40-
go.opentelemetry.io/collector/consumer/consumerprofiles v0.106.1 // indirect
41-
go.opentelemetry.io/collector/pdata/pprofile v0.106.1 // indirect
48+
go.opentelemetry.io/collector/consumer/consumerprofiles v0.107.0 // indirect
49+
go.opentelemetry.io/collector/featuregate v1.13.0 // indirect
50+
go.opentelemetry.io/collector/internal/globalgates v0.107.0 // indirect
51+
go.opentelemetry.io/collector/pdata/pprofile v0.107.0 // indirect
4252
go.opentelemetry.io/otel v1.28.0 // indirect
4353
go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect
4454
go.opentelemetry.io/otel/metric v1.28.0 // indirect
4555
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
4656
go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect
4757
go.opentelemetry.io/otel/trace v1.28.0 // indirect
4858
go.uber.org/multierr v1.11.0 // indirect
49-
golang.org/x/net v0.27.0 // indirect
50-
golang.org/x/sys v0.22.0 // indirect
51-
golang.org/x/text v0.16.0 // indirect
59+
golang.org/x/net v0.28.0 // indirect
60+
golang.org/x/sys v0.23.0 // indirect
61+
golang.org/x/text v0.17.0 // indirect
5262
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
5363
google.golang.org/grpc v1.65.0 // indirect
5464
google.golang.org/protobuf v1.34.2 // indirect

0 commit comments

Comments
 (0)