Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/feat_awslambdareceiver-cloudwatch-signal.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: receiver/awslambda

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for AWS Lambda receiver to trigger by CloudWatch logs subscription filters for Lambda

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [43504]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
11 changes: 7 additions & 4 deletions receiver/awslambdareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ The `awslambdareceiver` operates as follows:

The following receiver configuration parameters are supported.

| Name | Description | Default | Required |
|:----------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------|----------|
| `s3_encoding` | Name of the encoding extension to use for S3 objects | "awslogs_encoding" | Optional |
| Name | Description | Required |
|:--------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|
| `s3_encoding` | Name of the encoding extension to use for S3 objects. For example, `awslogs_encoding` to use AWS logs encoding extensions. When unspecified, falls back to CloudWatch subscription filter handling | Optional |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit confusing to me. CloudWatch subscription filter handling isn't relevant when decoding S3 objects, right? So maybe reword to:

Name of the encoding extension to use for S3 objects. This is only used for handling S3 events, and not for CloudWatch Log events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @axw yes this is something I thought about but I upstreamed the existing mechanism as is. However, we now have a chance to improve this usage.

As next step, I will come up with a proposal to improve this


### Example Configuration

Expand Down Expand Up @@ -157,7 +157,10 @@ service:
exporters: [otlphttp]
```

In this example, `awslambdareceiver` is invoked by a CloudWatch Logs subscription filter. The receiver automatically parses the CloudWatch Logs data using the default `awslogs_encoding` extension with cloudwatch format. No explicit encoding configuration is needed. The parsed logs are then sent to an OTLP listener using the `otlphttp` exporter.
In this example, `awslambdareceiver` is invoked by a CloudWatch Logs subscription filter.
`s3_encoding` configuration is omitted since it is not needed for CloudWatch Logs.
The receiver automatically parses the CloudWatch Logs data using the default `awslogs_encoding` extension with cloudwatch format.
No explicit encoding configuration is needed. The parsed logs are then sent to an OTLP listener using the `otlphttp` exporter.

## Supported Data Types

Expand Down
6 changes: 3 additions & 3 deletions receiver/awslambdareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ const (
type Config struct {
// S3Encoding identifies the encoding of the S3 objects that trigger the Lambda.
//
// If S3Encoding is unspecified, the receiver will return an error for any S3 event notifications.
// If S3 data is in multiple formats (ex:- VPC flow logs, CloudTrail logs), you should deploy
// separate Lambda functions with specific extension configurations.
//
// If you have objects with multiple different encodings to handle, you should deploy
// separate Lambda functions with different configurations.
// If unspecified, the receiver falls back to work with CloudWatch Log subscription encoding extension.
Comment on lines -18 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we went back and forth on this internally, but I'd really like to keep these things independent. Can we maintain the existing doc comment please? It's confusing to talk about CloudWatch logs in the same breath as S3 encoding - they're totally separate events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am exploring the option described here - #44562 (comment) . Let me know your view on this.

S3Encoding string `mapstructure:"s3_encoding"`

_ struct{} // Prevent unkeyed literal initialization
Expand Down
1 change: 1 addition & 0 deletions receiver/awslambdareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

func TestCreateLogsReceiver(t *testing.T) {
cfg := createDefaultConfig()
require.Empty(t, cfg.(*Config).S3Encoding)

r, err := createLogsReceiver(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, consumertest.NewNop())
require.NoError(t, err)
Expand Down
17 changes: 17 additions & 0 deletions receiver/awslambdareceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.32.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.92.0
github.com/goccy/go-json v0.10.5
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension v0.140.1
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.140.1
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.140.1
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/collector/component v1.46.1-0.20251120204106-2e9c82787618
go.opentelemetry.io/collector/component/componenttest v0.140.1-0.20251120204106-2e9c82787618
Expand Down Expand Up @@ -43,6 +46,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.9 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.41.1 // indirect
github.com/aws/smithy-go v1.23.2 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand All @@ -51,13 +55,16 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.18.1 // indirect
github.com/knadh/koanf/maps v0.1.2 // indirect
github.com/knadh/koanf/providers/confmap v1.0.0 // indirect
github.com/knadh/koanf/v2 v2.3.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.140.1 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.140.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/collector/consumer/xconsumer v0.140.1-0.20251120204106-2e9c82787618 // indirect
Expand All @@ -77,3 +84,13 @@ require (
google.golang.org/protobuf v1.36.10 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension => ../../extension/encoding/awslogsencodingextension

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
10 changes: 8 additions & 2 deletions receiver/awslambdareceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 52 additions & 3 deletions receiver/awslambdareceiver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package awslambdareceiver // import "github.com/open-telemetry/opentelemetry-col

import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
Expand All @@ -20,9 +21,10 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver/internal"
)

type s3EventConsumerFunc[T any] func(context.Context, events.S3EventRecord, T) error

type unmarshalFunc[T any] func([]byte) (T, error)
type (
unmarshalFunc[T any] func([]byte) (T, error)
s3EventConsumerFunc[T any] func(context.Context, events.S3EventRecord, T) error
)

type lambdaEventHandler interface {
handlerType() eventType
Expand Down Expand Up @@ -115,6 +117,53 @@ func (*s3Handler[T]) parseEvent(raw json.RawMessage) (event events.S3EventRecord
return message.Records[0], nil
}

// cwLogsSubscriptionHandler is specialized in CloudWatch log stream subscription filter events
type cwLogsSubscriptionHandler struct {
logger *zap.Logger
unmarshal unmarshalFunc[plog.Logs]
consumer func(context.Context, plog.Logs) error
}

func newCWLogsSubscriptionHandler(
baseLogger *zap.Logger,
unmarshal unmarshalFunc[plog.Logs],
consumer func(context.Context, plog.Logs) error,
) *cwLogsSubscriptionHandler {
return &cwLogsSubscriptionHandler{
logger: baseLogger.Named("cw-logs-subscription"),
unmarshal: unmarshal,
consumer: consumer,
}
}

func (*cwLogsSubscriptionHandler) handlerType() eventType {
return cwEvent
}

func (c *cwLogsSubscriptionHandler) handle(ctx context.Context, event json.RawMessage) error {
var log events.CloudwatchLogsEvent
if err := gojson.Unmarshal(event, &log); err != nil {
return fmt.Errorf("failed to unmarshal cloudwatch event log: %w", err)
}

decoded, err := base64.StdEncoding.DecodeString(log.AWSLogs.Data)
if err != nil {
return fmt.Errorf("failed to decode data from cloudwatch logs event: %w", err)
}

data, err := c.unmarshal(decoded)
if err != nil {
return fmt.Errorf("failed to unmarshal logs: %w", err)
}

if err := c.consumer(ctx, data); err != nil {
// consumer errors are marked for retrying
return consumererror.NewRetryableError(err)
}

return nil
}

// setObservedTimestampForAllLogs adds observedTimestamp to all logs
func setObservedTimestampForAllLogs(logs plog.Logs, observedTimestamp time.Time) {
for _, resourceLogs := range logs.ResourceLogs().All() {
Expand Down
95 changes: 95 additions & 0 deletions receiver/awslambdareceiver/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
package awslambdareceiver

import (
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"encoding/json"
"errors"
"os"
"path/filepath"
"testing"
"time"

Expand All @@ -19,9 +24,14 @@ import (
"go.uber.org/mock/gomock"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver/internal"
)

const testDataDirectory = "testdata"

func TestProcessLambdaEvent_S3Notification(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -220,6 +230,56 @@ func TestS3HandlerParseEvent(t *testing.T) {
}
}

func TestHandleCloudwatchLogEvent(t *testing.T) {
t.Parallel()

tests := map[string]struct {
eventData string
expectedErr string
eventConsumer consumer.Logs
}{
"valid_cloudwatch_log_event": {
eventData: loadCompressedData(t, filepath.Join(testDataDirectory, "cloudwatch_log.json")),
eventConsumer: &logConsumerWithGoldenValidation{logsExpectedPath: filepath.Join(testDataDirectory, "cloudwatch_log_expected.yaml")},
},
"invalid_base64_data": {
eventData: "#",
expectedErr: "failed to decode data from cloudwatch logs event",
eventConsumer: &noOpLogsConsumer{},
},
"invalid_cloudwatch_log_data": {
eventData: "test",
expectedErr: "failed to unmarshal logs",
eventConsumer: &noOpLogsConsumer{},
},
}

unmarshaler, err := loadSubFilterLogUnmarshaler(t.Context(), awslogsencodingextension.NewFactory())
require.NoError(t, err)

for name, test := range tests {
t.Run(name, func(t *testing.T) {
cwEvent := events.CloudwatchLogsEvent{
AWSLogs: events.CloudwatchLogsRawData{
Data: test.eventData,
},
}

var lambdaEvent json.RawMessage
lambdaEvent, err = json.Marshal(cwEvent)
require.NoError(t, err)

handler := newCWLogsSubscriptionHandler(zap.NewNop(), unmarshaler.UnmarshalLogs, test.eventConsumer.ConsumeLogs)
err := handler.handle(t.Context(), lambdaEvent)
if test.expectedErr != "" {
require.ErrorContains(t, err, test.expectedErr)
} else {
require.NoError(t, err)
}
})
}
}

func TestSetObservedTimestampForAllLogs(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -328,6 +388,23 @@ func TestConsumerErrorHandling(t *testing.T) {
}
}

type logConsumerWithGoldenValidation struct {
logsExpectedPath string
}

func (logConsumerWithGoldenValidation) Capabilities() consumer.Capabilities {
return consumer.Capabilities{}
}

func (l logConsumerWithGoldenValidation) ConsumeLogs(_ context.Context, logs plog.Logs) error {
expectedLogs, err := golden.ReadLogs(l.logsExpectedPath)
if err != nil {
return err
}

return plogtest.CompareLogs(expectedLogs, logs)
}

type noOpLogsConsumer struct {
consumeCount int
err error
Expand Down Expand Up @@ -368,3 +445,21 @@ func (n *mockPlogEventHandler) handle(context.Context, json.RawMessage) error {
n.handleCount++
return nil
}

func loadCompressedData(t *testing.T, file string) string {
data, err := os.ReadFile(file)
require.NoError(t, err)

compressed := compressData(t, data)
return base64.StdEncoding.EncodeToString(compressed)
}

func compressData(t *testing.T, data []byte) []byte {
var buf bytes.Buffer
gzipWriter := gzip.NewWriter(&buf)
_, err := gzipWriter.Write(data)
require.NoError(t, err)
err = gzipWriter.Close()
require.NoError(t, err)
return buf.Bytes()
}
Loading