Skip to content

Commit 93bb5dd

Browse files
committed
feat: enhance receiver & handler behaviour
Signed-off-by: Kavindu Dodanduwa <[email protected]>
1 parent bc67b7a commit 93bb5dd

File tree

11 files changed

+229
-318
lines changed

11 files changed

+229
-318
lines changed

receiver/awslambdareceiver/README.md

Lines changed: 20 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -54,44 +54,19 @@ The `awslambdareceiver` operates as follows:
5454

5555
The following receiver configuration parameters are supported.
5656

57-
| Name | Description | Required |
58-
|:--------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|
59-
| `s3_encoding` | Name of the encoding extension to use for S3 objects. For example, `awslogs_encoding` to use AWS logs encoding extensions. When unspecified, falls back to CloudWatch subscription filter handling | Optional |
57+
| Name | Description |
58+
|:---------------------|:-------------------------------------------------------------------------------------------------------------------|
59+
| `encoding_extension` | Required: The name of the encoding extension to load which will decode the data received through supported sources |
6060

6161
### Example Configuration
6262

63-
```yaml
64-
receivers:
65-
awslambda:
66-
s3_encoding: awslogs_encoding
67-
68-
extensions:
69-
awslogs_encoding:
70-
format: vpcflow
71-
vpcflow:
72-
file_format: plain-text
73-
74-
exporters:
75-
otlphttp:
76-
endpoint: "https://my-backend:443"
77-
78-
service:
79-
extensions:
80-
- awslogs_encoding
81-
pipelines:
82-
logs:
83-
receivers: [awslambda]
84-
exporters: [otlphttp]
85-
```
86-
87-
## Examples
8863

8964
### Example 1: VPC Flow Logs from S3
9065

9166
```yaml
9267
receivers:
9368
awslambda:
94-
s3_encoding: awslogs_encoding
69+
encoding_extension: awslogs_encoding
9570

9671
extensions:
9772
awslogs_encoding:
@@ -112,14 +87,16 @@ service:
11287
exporters: [otlphttp]
11388
```
11489
115-
In this example, `awslambdareceiver` receives a notification when a new VPC flow log file is stored in an S3 bucket. The receiver fetches the log file from S3 and parses it using the `awslogs_encoding` extension with vpcflow format. The parsed logs are then sent to an OTLP listener using the `otlphttp` exporter.
90+
In this example, the `awslambdareceiver` is expected to be triggered when a VPC flow log is created at S3 bucket.
91+
The receiver retrieves the log file from S3 and decodes it using the `awslogs_encoding` extension with the vpcflow format.
92+
Parsed logs are forwarded to an OTLP listener via the `otlphttp` exporter.
11693

11794
### Example 2: ELB Access Logs from S3
11895

11996
```yaml
12097
receivers:
12198
awslambda:
122-
s3_encoding: awslogs_encoding
99+
encoding_extension: awslogs_encoding
123100
124101
extensions:
125102
awslogs_encoding:
@@ -140,32 +117,37 @@ service:
140117
exporters: [otlphttp]
141118
```
142119

143-
### Example 3: CloudWatch Logs Subscription
120+
### Example 3: CloudWatch Logs mode
144121

145122
```yaml
146123
receivers:
147124
awslambda:
125+
encoding_extension: awslogs_encoding
126+
127+
extensions:
128+
awslogs_encoding:
129+
format: cloudwatch
148130
149131
exporters:
150132
otlphttp:
151133
endpoint: "https://my-backend:443"
152134
153135
service:
136+
extensions:
137+
- awslogs_encoding
154138
pipelines:
155139
logs:
156140
receivers: [awslambda]
157141
exporters: [otlphttp]
158142
```
159143

160-
In this example, `awslambdareceiver` is invoked by a CloudWatch Logs subscription filter.
161-
`s3_encoding` configuration is omitted since it is not needed for CloudWatch Logs.
162-
The receiver automatically parses the CloudWatch Logs data using the default `awslogs_encoding` extension with cloudwatch format.
163-
No explicit encoding configuration is needed. The parsed logs are then sent to an OTLP listener using the `otlphttp` exporter.
144+
In this example, `awslambdareceiver` is expected to be triggered by a CloudWatch Logs subscription filter.
145+
The receiver retrieves the logs from the event payload and decodes them using the `awslogs_encoding` extension with the cloudwatch format.
164146

165147
## Supported Data Types
166148

167-
- **Logs** (Primary support)
168-
- **Metrics** (Future consideration)
149+
- **Logs**: Supported through S3 and CloudWatch Logs event sources
150+
- **Metrics**: Supported through S3
169151

170152
## AWS Permissions
171153

receiver/awslambdareceiver/benchmark_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"encoding/json"
99
"testing"
10+
"time"
1011

1112
"github.com/aws/aws-lambda-go/events"
1213
"github.com/stretchr/testify/require"
@@ -40,8 +41,8 @@ func BenchmarkHandleS3Notification(b *testing.B) {
4041

4142
consumer := noOpLogsConsumer{}
4243
// Wrap the consumer to match the new s3EventConsumerFunc signature
43-
logsConsumer := func(ctx context.Context, event events.S3EventRecord, logs plog.Logs) error {
44-
setObservedTimestampForAllLogs(logs, event.EventTime)
44+
logsConsumer := func(ctx context.Context, time time.Time, logs plog.Logs) error {
45+
setObservedTimestampForAllLogs(logs, time)
4546
return consumer.ConsumeLogs(ctx, logs)
4647
}
4748
handler := newS3Handler(service, zap.NewNop(), mockS3LogUnmarshaler{}.UnmarshalLogs, logsConsumer)

receiver/awslambdareceiver/config.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,19 @@
44
package awslambdareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver"
55

66
import (
7-
"go.opentelemetry.io/collector/component"
8-
)
7+
"errors"
98

10-
const (
11-
awsLogsEncoding = "awslogs_encoding"
12-
s3ARNPrefix = "arn:aws:s3:::"
9+
"go.opentelemetry.io/collector/component"
1310
)
1411

1512
type Config struct {
16-
// S3Encoding identifies the encoding of the S3 objects that trigger the Lambda.
13+
// EncodingExtension defines the encoding extension the receiver must use to decode receiving data
1714
//
18-
// If S3 data is in multiple formats (ex:- VPC flow logs, CloudTrail logs), you should deploy
15+
// If receiving data is in different formats(ex:- VPC flow logs, CloudTrail logs), receiver must be deployed in
1916
// separate Lambda functions with specific extension configurations.
2017
//
21-
// If unspecified, the receiver falls back to work with CloudWatch Log subscription encoding extension.
22-
S3Encoding string `mapstructure:"s3_encoding"`
18+
// This property is mandatory and must refer to a valid encoding extension configured in the collector configurations.
19+
EncodingExtension string `mapstructure:"encoding_extension"`
2320

2421
_ struct{} // Prevent unkeyed literal initialization
2522
}
@@ -30,6 +27,10 @@ func createDefaultConfig() component.Config {
3027
return &Config{}
3128
}
3229

33-
func (*Config) Validate() error {
30+
func (c *Config) Validate() error {
31+
if c.EncodingExtension == "" {
32+
return errors.New("encoding_extension is mandatory, please use a valid encoding extension name configured in the collector configurations")
33+
}
34+
3435
return nil
3536
}

receiver/awslambdareceiver/config_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,23 @@ func TestLoadConfig(t *testing.T) {
2828
expectError string
2929
}{
3030
{
31-
id: component.NewIDWithName(metadata.Type, awsLogsEncoding),
31+
id: component.NewIDWithName(metadata.Type, "awslogs_encoding"),
3232
expected: &Config{
33-
S3Encoding: awsLogsEncoding,
33+
EncodingExtension: "awslogs_encoding",
3434
},
3535
},
3636
{
3737
id: component.NewIDWithName(metadata.Type, "json_log_encoding"),
3838
expected: &Config{
39-
S3Encoding: "json_log_encoding",
39+
EncodingExtension: "json_log_encoding",
4040
},
4141
},
4242
{
4343
id: component.NewIDWithName(metadata.Type, "empty_encoding"),
4444
expected: &Config{
45-
S3Encoding: "",
45+
EncodingExtension: "",
4646
},
47+
expectError: "encoding_extension is mandatory",
4748
},
4849
}
4950

receiver/awslambdareceiver/factory_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414

1515
func TestCreateLogsReceiver(t *testing.T) {
1616
cfg := createDefaultConfig()
17-
require.Empty(t, cfg.(*Config).S3Encoding)
17+
cfg.(*Config).EncodingExtension = "awslogs_encoding"
1818

1919
r, err := createLogsReceiver(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, consumertest.NewNop())
2020
require.NoError(t, err)

receiver/awslambdareceiver/handler.go

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"encoding/json"
1010
"errors"
1111
"fmt"
12+
"strings"
1213
"time"
1314

1415
"github.com/aws/aws-lambda-go/events"
@@ -23,9 +24,51 @@ import (
2324

2425
type (
2526
unmarshalFunc[T any] func([]byte) (T, error)
26-
s3EventConsumerFunc[T any] func(context.Context, events.S3EventRecord, T) error
27+
s3EventConsumerFunc[T any] func(context.Context, time.Time, T) error
28+
handlerRegistry map[eventType]func() lambdaEventHandler
2729
)
2830

31+
type handlerProvider interface {
32+
getHandler(eventType eventType) (lambdaEventHandler, error)
33+
}
34+
35+
// handlerProvider is responsible for providing event handlers based on event types.
36+
// It operates with a registry of handler factories and caches loadedHandlers for reuse.
37+
type handlerProviderImpl struct {
38+
registry handlerRegistry
39+
loadedHandlers map[eventType]lambdaEventHandler
40+
knownTypes []string
41+
}
42+
43+
func newHandlerProvider(registry handlerRegistry) handlerProvider {
44+
var types []string
45+
for t := range registry {
46+
types = append(types, string(t))
47+
}
48+
49+
return &handlerProviderImpl{
50+
loadedHandlers: map[eventType]lambdaEventHandler{},
51+
registry: registry,
52+
knownTypes: types,
53+
}
54+
}
55+
56+
func (h *handlerProviderImpl) getHandler(eventType eventType) (lambdaEventHandler, error) {
57+
if loaded, exists := h.loadedHandlers[eventType]; exists {
58+
return loaded, nil
59+
}
60+
61+
factory, exists := h.registry[eventType]
62+
if !exists {
63+
return nil, fmt.Errorf("no handler registered for event type %s, known types: '%s'", eventType, strings.Join(h.knownTypes, ","))
64+
}
65+
66+
handler := factory()
67+
h.loadedHandlers[eventType] = handler
68+
return handler, nil
69+
}
70+
71+
// lambdaEventHandler defines the contract for AWS Lambda event handlers
2972
type lambdaEventHandler interface {
3073
handlerType() eventType
3174
handle(ctx context.Context, event json.RawMessage) error
@@ -68,7 +111,7 @@ func (s *s3Handler[T]) handle(ctx context.Context, event json.RawMessage) error
68111
zap.String("S3Bucket", parsedEvent.S3.Bucket.Arn),
69112
)
70113

71-
// Skip processing zero length objects. This includes events from folder creation and empty object .
114+
// Skip processing zero length objects. This includes events from folder creation and empty object.
72115
if parsedEvent.S3.Object.Size == 0 {
73116
s.logger.Info("Empty object, skipping download", zap.String("File", parsedEvent.S3.Object.Key))
74117
return nil
@@ -84,7 +127,7 @@ func (s *s3Handler[T]) handle(ctx context.Context, event json.RawMessage) error
84127
return fmt.Errorf("failed to unmarshal logs: %w", err)
85128
}
86129

87-
if err := s.consumer(ctx, parsedEvent, data); err != nil {
130+
if err := s.consumer(ctx, parsedEvent.EventTime, data); err != nil {
88131
// If permanent, return as-is (don't retry)
89132
if consumererror.IsPermanent(err) {
90133
return err

receiver/awslambdareceiver/handler_test.go

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ import (
1616

1717
"github.com/aws/aws-lambda-go/events"
1818
"github.com/stretchr/testify/require"
19+
"go.opentelemetry.io/collector/component"
1920
"go.opentelemetry.io/collector/consumer"
2021
"go.opentelemetry.io/collector/consumer/consumererror"
22+
"go.opentelemetry.io/collector/extension"
2123
"go.opentelemetry.io/collector/pdata/pcommon"
2224
"go.opentelemetry.io/collector/pdata/plog"
2325
conventions "go.opentelemetry.io/otel/semconv/v1.27.0"
@@ -114,8 +116,8 @@ func TestProcessLambdaEvent_S3Notification(t *testing.T) {
114116
s3Service.EXPECT().ReadObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(test.mockContent, nil).AnyTimes()
115117

116118
// Wrap the consumer to match the new s3EventConsumerFunc signature
117-
logsConsumer := func(ctx context.Context, event events.S3EventRecord, logs plog.Logs) error {
118-
setObservedTimestampForAllLogs(logs, event.EventTime)
119+
logsConsumer := func(ctx context.Context, time time.Time, logs plog.Logs) error {
120+
setObservedTimestampForAllLogs(logs, time)
119121
return test.eventConsumer.ConsumeLogs(ctx, logs)
120122
}
121123

@@ -207,8 +209,8 @@ func TestS3HandlerParseEvent(t *testing.T) {
207209

208210
var consumer noOpLogsConsumer
209211
// Wrap the consumer to match the new s3EventConsumerFunc signature
210-
logsConsumer := func(ctx context.Context, event events.S3EventRecord, logs plog.Logs) error {
211-
setObservedTimestampForAllLogs(logs, event.EventTime)
212+
logsConsumer := func(ctx context.Context, time time.Time, logs plog.Logs) error {
213+
setObservedTimestampForAllLogs(logs, time)
212214
return consumer.ConsumeLogs(ctx, logs)
213215
}
214216
handler := newS3Handler(s3Service, zap.NewNop(), mockS3LogUnmarshaler{}.UnmarshalLogs, logsConsumer)
@@ -254,8 +256,27 @@ func TestHandleCloudwatchLogEvent(t *testing.T) {
254256
},
255257
}
256258

257-
unmarshaler, err := loadSubFilterLogUnmarshaler(t.Context(), awslogsencodingextension.NewFactory())
258-
require.NoError(t, err)
259+
var extensionID component.ID
260+
err := extensionID.UnmarshalText([]byte("awslogs_encoding"))
261+
if err != nil {
262+
t.Errorf("failed to unmarshal identifier: %v", err)
263+
return
264+
}
265+
266+
factory := awslogsencodingextension.NewFactory()
267+
ext, err := factory.Create(t.Context(), extension.Settings{ID: extensionID}, &awslogsencodingextension.Config{
268+
Format: "cloudwatch",
269+
})
270+
if err != nil {
271+
t.Errorf("failed to create awslogs encoding extension: %v", err)
272+
return
273+
}
274+
275+
subscriptionFilterLogUnmarshaler, ok := ext.(plog.Unmarshaler)
276+
if !ok {
277+
t.Errorf("extension does not implement log.Unmarshaler")
278+
return
279+
}
259280

260281
for name, test := range tests {
261282
t.Run(name, func(t *testing.T) {
@@ -266,11 +287,11 @@ func TestHandleCloudwatchLogEvent(t *testing.T) {
266287
}
267288

268289
var lambdaEvent json.RawMessage
269-
lambdaEvent, err = json.Marshal(cwEvent)
290+
lambdaEvent, err := json.Marshal(cwEvent)
270291
require.NoError(t, err)
271292

272-
handler := newCWLogsSubscriptionHandler(zap.NewNop(), unmarshaler.UnmarshalLogs, test.eventConsumer.ConsumeLogs)
273-
err := handler.handle(t.Context(), lambdaEvent)
293+
handler := newCWLogsSubscriptionHandler(zap.NewNop(), subscriptionFilterLogUnmarshaler.UnmarshalLogs, test.eventConsumer.ConsumeLogs)
294+
err = handler.handle(t.Context(), lambdaEvent)
274295
if test.expectedErr != "" {
275296
require.ErrorContains(t, err, test.expectedErr)
276297
} else {
@@ -366,7 +387,7 @@ func TestConsumerErrorHandling(t *testing.T) {
366387
s3Service.EXPECT().ReadObject(gomock.Any(), gomock.Any(), gomock.Any()).Return([]byte(mockContent), nil).Times(1)
367388

368389
// Consumer that returns the test error
369-
logsConsumer := func(_ context.Context, _ events.S3EventRecord, _ plog.Logs) error {
390+
logsConsumer := func(_ context.Context, _ time.Time, _ plog.Logs) error {
370391
return test.consumerErr
371392
}
372393

@@ -432,6 +453,14 @@ func (mockS3LogUnmarshaler) UnmarshalLogs(data []byte) (plog.Logs, error) {
432453
return plog.Logs{}, errors.New("logs not in the correct format")
433454
}
434455

456+
type mockHandlerProvider struct {
457+
handler lambdaEventHandler
458+
}
459+
460+
func (m mockHandlerProvider) getHandler(_ eventType) (lambdaEventHandler, error) {
461+
return m.handler, nil
462+
}
463+
435464
type mockPlogEventHandler struct {
436465
handleCount int
437466
event eventType

receiver/awslambdareceiver/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ tests:
1212
# TODO Fix this
1313
skip_lifecycle: true
1414
config:
15-
s3_encoding: awslogs
15+
encoding_extension: awslogs

0 commit comments

Comments
 (0)