Skip to content

Commit e9aa82a

Browse files
committed
feat: add cloudwatch log handling support
Signed-off-by: Kavindu Dodanduwa <[email protected]>
1 parent 241cb29 commit e9aa82a

File tree

12 files changed

+369
-19
lines changed

12 files changed

+369
-19
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: receiver/awslambda
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for AWS Lambda receiver to trigger by CloudWatch logs subscription filters for Lambda
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [43504]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

receiver/awslambdareceiver/README.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ The `awslambdareceiver` operates as follows:
5454

5555
The following receiver configuration parameters are supported.
5656

57-
| Name | Description | Default | Required |
58-
|:----------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------|----------|
59-
| `s3_encoding` | Name of the encoding extension to use for S3 objects | "awslogs_encoding" | Optional |
57+
| Name | Description | Required |
58+
|:--------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|
59+
| `s3_encoding` | Name of the encoding extension to use for S3 objects. For example, `awslogs_encoding` to use AWS logs encoding extensions. When unspecified, falls back to CloudWatch subscription filter handling | Optional |
6060

6161
### Example Configuration
6262

@@ -157,7 +157,10 @@ service:
157157
exporters: [otlphttp]
158158
```
159159

160-
In this example, `awslambdareceiver` is invoked by a CloudWatch Logs subscription filter. The receiver automatically parses the CloudWatch Logs data using the default `awslogs_encoding` extension with cloudwatch format. No explicit encoding configuration is needed. The parsed logs are then sent to an OTLP listener using the `otlphttp` exporter.
160+
In this example, `awslambdareceiver` is invoked by a CloudWatch Logs subscription filter.
161+
`s3_encoding` configuration is omitted since it is not needed for CloudWatch Logs.
162+
The receiver automatically parses the CloudWatch Logs data using the default `awslogs_encoding` extension with cloudwatch format.
163+
No explicit encoding configuration is needed. The parsed logs are then sent to an OTLP listener using the `otlphttp` exporter.
161164

162165
## Supported Data Types
163166

receiver/awslambdareceiver/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ const (
1515
type Config struct {
1616
// S3Encoding identifies the encoding of the S3 objects that trigger the Lambda.
1717
//
18-
// If S3Encoding is unspecified, the receiver will return an error for any S3 event notifications.
18+
// If S3 data is in multiple formats (ex:- VPC flow logs, CloudTrail logs), you should deploy
19+
// separate Lambda functions with specific extension configurations.
1920
//
20-
// If you have objects with multiple different encodings to handle, you should deploy
21-
// separate Lambda functions with different configurations.
21+
// If unspecified, the receiver falls back to work with CloudWatch Log subscription encoding extension.
2222
S3Encoding string `mapstructure:"s3_encoding"`
2323

2424
_ struct{} // Prevent unkeyed literal initialization

receiver/awslambdareceiver/factory_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
func TestCreateLogsReceiver(t *testing.T) {
1616
cfg := createDefaultConfig()
17+
require.Empty(t, cfg.(*Config).S3Encoding)
1718

1819
r, err := createLogsReceiver(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, consumertest.NewNop())
1920
require.NoError(t, err)

receiver/awslambdareceiver/go.mod

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ require (
77
github.com/aws/aws-sdk-go-v2/config v1.32.1
88
github.com/aws/aws-sdk-go-v2/service/s3 v1.92.0
99
github.com/goccy/go-json v0.10.5
10+
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension v0.140.1
11+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.140.1
12+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.140.1
1013
github.com/stretchr/testify v1.11.1
1114
go.opentelemetry.io/collector/component v1.46.1-0.20251120204106-2e9c82787618
1215
go.opentelemetry.io/collector/component/componenttest v0.140.1-0.20251120204106-2e9c82787618
@@ -43,6 +46,7 @@ require (
4346
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.9 // indirect
4447
github.com/aws/aws-sdk-go-v2/service/sts v1.41.1 // indirect
4548
github.com/aws/smithy-go v1.23.2 // indirect
49+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
4650
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
4751
github.com/go-logr/logr v1.4.3 // indirect
4852
github.com/go-logr/stdr v1.2.2 // indirect
@@ -51,13 +55,16 @@ require (
5155
github.com/google/uuid v1.6.0 // indirect
5256
github.com/hashicorp/go-version v1.7.0 // indirect
5357
github.com/json-iterator/go v1.1.12 // indirect
58+
github.com/klauspost/compress v1.18.1 // indirect
5459
github.com/knadh/koanf/maps v0.1.2 // indirect
5560
github.com/knadh/koanf/providers/confmap v1.0.0 // indirect
5661
github.com/knadh/koanf/v2 v2.3.0 // indirect
5762
github.com/mitchellh/copystructure v1.2.0 // indirect
5863
github.com/mitchellh/reflectwalk v1.0.2 // indirect
5964
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
6065
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
66+
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.140.1 // indirect
67+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.140.1 // indirect
6168
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
6269
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
6370
go.opentelemetry.io/collector/consumer/xconsumer v0.140.1-0.20251120204106-2e9c82787618 // indirect
@@ -77,3 +84,13 @@ require (
7784
google.golang.org/protobuf v1.36.10 // indirect
7885
gopkg.in/yaml.v3 v3.0.1 // indirect
7986
)
87+
88+
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension => ../../extension/encoding/awslogsencodingextension
89+
90+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil
91+
92+
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding
93+
94+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest
95+
96+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

receiver/awslambdareceiver/go.sum

Lines changed: 8 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/awslambdareceiver/handler.go

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package awslambdareceiver // import "github.com/open-telemetry/opentelemetry-col
55

66
import (
77
"context"
8+
"encoding/base64"
89
"encoding/json"
910
"errors"
1011
"fmt"
@@ -20,9 +21,10 @@ import (
2021
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver/internal"
2122
)
2223

23-
type s3EventConsumerFunc[T any] func(context.Context, events.S3EventRecord, T) error
24-
25-
type unmarshalFunc[T any] func([]byte) (T, error)
24+
type (
25+
unmarshalFunc[T any] func([]byte) (T, error)
26+
s3EventConsumerFunc[T any] func(context.Context, events.S3EventRecord, T) error
27+
)
2628

2729
type lambdaEventHandler interface {
2830
handlerType() eventType
@@ -115,6 +117,53 @@ func (*s3Handler[T]) parseEvent(raw json.RawMessage) (event events.S3EventRecord
115117
return message.Records[0], nil
116118
}
117119

120+
// cwLogsSubscriptionHandler is specialized in CloudWatch log stream subscription filter events
121+
type cwLogsSubscriptionHandler struct {
122+
logger *zap.Logger
123+
unmarshal unmarshalFunc[plog.Logs]
124+
consumer func(context.Context, plog.Logs) error
125+
}
126+
127+
func newCWLogsSubscriptionHandler(
128+
baseLogger *zap.Logger,
129+
unmarshal unmarshalFunc[plog.Logs],
130+
consumer func(context.Context, plog.Logs) error,
131+
) *cwLogsSubscriptionHandler {
132+
return &cwLogsSubscriptionHandler{
133+
logger: baseLogger.Named("cw-logs-subscription"),
134+
unmarshal: unmarshal,
135+
consumer: consumer,
136+
}
137+
}
138+
139+
func (*cwLogsSubscriptionHandler) handlerType() eventType {
140+
return cwEvent
141+
}
142+
143+
func (c *cwLogsSubscriptionHandler) handle(ctx context.Context, event json.RawMessage) error {
144+
var log events.CloudwatchLogsEvent
145+
if err := gojson.Unmarshal(event, &log); err != nil {
146+
return fmt.Errorf("failed to unmarshal cloudwatch event log: %w", err)
147+
}
148+
149+
decoded, err := base64.StdEncoding.DecodeString(log.AWSLogs.Data)
150+
if err != nil {
151+
return fmt.Errorf("failed to decode data from cloudwatch logs event: %w", err)
152+
}
153+
154+
data, err := c.unmarshal(decoded)
155+
if err != nil {
156+
return fmt.Errorf("failed to unmarshal logs: %w", err)
157+
}
158+
159+
if err := c.consumer(ctx, data); err != nil {
160+
// consumer errors are marked for retrying
161+
return consumererror.NewRetryableError(err)
162+
}
163+
164+
return nil
165+
}
166+
118167
// setObservedTimestampForAllLogs adds observedTimestamp to all logs
119168
func setObservedTimestampForAllLogs(logs plog.Logs, observedTimestamp time.Time) {
120169
for _, resourceLogs := range logs.ResourceLogs().All() {

receiver/awslambdareceiver/handler_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,14 @@
33
package awslambdareceiver
44

55
import (
6+
"bytes"
7+
"compress/gzip"
68
"context"
9+
"encoding/base64"
710
"encoding/json"
811
"errors"
12+
"os"
13+
"path/filepath"
914
"testing"
1015
"time"
1116

@@ -19,9 +24,14 @@ import (
1924
"go.uber.org/mock/gomock"
2025
"go.uber.org/zap"
2126

27+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension"
28+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
29+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
2230
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver/internal"
2331
)
2432

33+
const testDataDirectory = "testdata"
34+
2535
func TestProcessLambdaEvent_S3Notification(t *testing.T) {
2636
t.Parallel()
2737

@@ -220,6 +230,56 @@ func TestS3HandlerParseEvent(t *testing.T) {
220230
}
221231
}
222232

233+
func TestHandleCloudwatchLogEvent(t *testing.T) {
234+
t.Parallel()
235+
236+
tests := map[string]struct {
237+
eventData string
238+
expectedErr string
239+
eventConsumer consumer.Logs
240+
}{
241+
"valid_cloudwatch_log_event": {
242+
eventData: loadCompressedData(t, filepath.Join(testDataDirectory, "cloudwatch_log.json")),
243+
eventConsumer: &logConsumerWithGoldenValidation{logsExpectedPath: filepath.Join(testDataDirectory, "cloudwatch_log_expected.yaml")},
244+
},
245+
"invalid_base64_data": {
246+
eventData: "#",
247+
expectedErr: "failed to decode data from cloudwatch logs event",
248+
eventConsumer: &noOpLogsConsumer{},
249+
},
250+
"invalid_cloudwatch_log_data": {
251+
eventData: "test",
252+
expectedErr: "failed to unmarshal logs",
253+
eventConsumer: &noOpLogsConsumer{},
254+
},
255+
}
256+
257+
unmarshaler, err := loadSubFilterLogUnmarshaler(t.Context(), awslogsencodingextension.NewFactory())
258+
require.NoError(t, err)
259+
260+
for name, test := range tests {
261+
t.Run(name, func(t *testing.T) {
262+
cwEvent := events.CloudwatchLogsEvent{
263+
AWSLogs: events.CloudwatchLogsRawData{
264+
Data: test.eventData,
265+
},
266+
}
267+
268+
var lambdaEvent json.RawMessage
269+
lambdaEvent, err = json.Marshal(cwEvent)
270+
require.NoError(t, err)
271+
272+
handler := newCWLogsSubscriptionHandler(zap.NewNop(), unmarshaler.UnmarshalLogs, test.eventConsumer.ConsumeLogs)
273+
err := handler.handle(t.Context(), lambdaEvent)
274+
if test.expectedErr != "" {
275+
require.ErrorContains(t, err, test.expectedErr)
276+
} else {
277+
require.NoError(t, err)
278+
}
279+
})
280+
}
281+
}
282+
223283
func TestSetObservedTimestampForAllLogs(t *testing.T) {
224284
t.Parallel()
225285

@@ -328,6 +388,23 @@ func TestConsumerErrorHandling(t *testing.T) {
328388
}
329389
}
330390

391+
type logConsumerWithGoldenValidation struct {
392+
logsExpectedPath string
393+
}
394+
395+
func (logConsumerWithGoldenValidation) Capabilities() consumer.Capabilities {
396+
return consumer.Capabilities{}
397+
}
398+
399+
func (l logConsumerWithGoldenValidation) ConsumeLogs(_ context.Context, logs plog.Logs) error {
400+
expectedLogs, err := golden.ReadLogs(l.logsExpectedPath)
401+
if err != nil {
402+
return err
403+
}
404+
405+
return plogtest.CompareLogs(expectedLogs, logs)
406+
}
407+
331408
type noOpLogsConsumer struct {
332409
consumeCount int
333410
err error
@@ -368,3 +445,21 @@ func (n *mockPlogEventHandler) handle(context.Context, json.RawMessage) error {
368445
n.handleCount++
369446
return nil
370447
}
448+
449+
func loadCompressedData(t *testing.T, file string) string {
450+
data, err := os.ReadFile(file)
451+
require.NoError(t, err)
452+
453+
compressed := compressData(t, data)
454+
return base64.StdEncoding.EncodeToString(compressed)
455+
}
456+
457+
func compressData(t *testing.T, data []byte) []byte {
458+
var buf bytes.Buffer
459+
gzipWriter := gzip.NewWriter(&buf)
460+
_, err := gzipWriter.Write(data)
461+
require.NoError(t, err)
462+
err = gzipWriter.Close()
463+
require.NoError(t, err)
464+
return buf.Bytes()
465+
}

0 commit comments

Comments
 (0)