diff --git a/.chloggen/feat_awslambdareceiver-cloudwatch-signal.yaml b/.chloggen/feat_awslambdareceiver-cloudwatch-signal.yaml new file mode 100644 index 0000000000000..eb4519218ba2e --- /dev/null +++ b/.chloggen/feat_awslambdareceiver-cloudwatch-signal.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/awslambda + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for AWS Lambda receiver to trigger by CloudWatch logs subscription filters for Lambda + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [43504] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/awslambdareceiver/README.md b/receiver/awslambdareceiver/README.md index 36a867a5c57c3..7a0a52fa3626a 100644 --- a/receiver/awslambdareceiver/README.md +++ b/receiver/awslambdareceiver/README.md @@ -54,9 +54,9 @@ The `awslambdareceiver` operates as follows: The following receiver configuration parameters are supported. -| Name | Description | Default | Required | -|:----------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------|----------| -| `s3_encoding` | Name of the encoding extension to use for S3 objects | "awslogs_encoding" | Optional | +| Name | Description | Required | +|:--------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------| +| `s3_encoding` | Name of the encoding extension to use for S3 objects. For example, `awslogs_encoding` to use AWS logs encoding extensions. When unspecified, falls back to CloudWatch subscription filter handling | Optional | ### Example Configuration @@ -157,7 +157,10 @@ service: exporters: [otlphttp] ``` -In this example, `awslambdareceiver` is invoked by a CloudWatch Logs subscription filter. The receiver automatically parses the CloudWatch Logs data using the default `awslogs_encoding` extension with cloudwatch format. No explicit encoding configuration is needed. The parsed logs are then sent to an OTLP listener using the `otlphttp` exporter. +In this example, `awslambdareceiver` is invoked by a CloudWatch Logs subscription filter. +`s3_encoding` configuration is omitted since it is not needed for CloudWatch Logs. +The receiver automatically parses the CloudWatch Logs data using the default `awslogs_encoding` extension with cloudwatch format. +No explicit encoding configuration is needed. The parsed logs are then sent to an OTLP listener using the `otlphttp` exporter. ## Supported Data Types diff --git a/receiver/awslambdareceiver/config.go b/receiver/awslambdareceiver/config.go index 7e6fd675827b2..8a5060d86413a 100644 --- a/receiver/awslambdareceiver/config.go +++ b/receiver/awslambdareceiver/config.go @@ -15,10 +15,10 @@ const ( type Config struct { // S3Encoding identifies the encoding of the S3 objects that trigger the Lambda. // - // If S3Encoding is unspecified, the receiver will return an error for any S3 event notifications. + // If S3 data is in multiple formats (ex:- VPC flow logs, CloudTrail logs), you should deploy + // separate Lambda functions with specific extension configurations. // - // If you have objects with multiple different encodings to handle, you should deploy - // separate Lambda functions with different configurations. + // If unspecified, the receiver falls back to work with CloudWatch Log subscription encoding extension. S3Encoding string `mapstructure:"s3_encoding"` _ struct{} // Prevent unkeyed literal initialization diff --git a/receiver/awslambdareceiver/factory_test.go b/receiver/awslambdareceiver/factory_test.go index 350b22e091417..a2692c8f98719 100644 --- a/receiver/awslambdareceiver/factory_test.go +++ b/receiver/awslambdareceiver/factory_test.go @@ -14,6 +14,7 @@ import ( func TestCreateLogsReceiver(t *testing.T) { cfg := createDefaultConfig() + require.Empty(t, cfg.(*Config).S3Encoding) r, err := createLogsReceiver(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, consumertest.NewNop()) require.NoError(t, err) diff --git a/receiver/awslambdareceiver/go.mod b/receiver/awslambdareceiver/go.mod index fd55a8e757cfd..c6980226d3e1e 100644 --- a/receiver/awslambdareceiver/go.mod +++ b/receiver/awslambdareceiver/go.mod @@ -7,6 +7,9 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.32.1 github.com/aws/aws-sdk-go-v2/service/s3 v1.92.0 github.com/goccy/go-json v0.10.5 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension v0.140.1 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.140.1 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.140.1 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/collector/component v1.46.1-0.20251120204106-2e9c82787618 go.opentelemetry.io/collector/component/componenttest v0.140.1-0.20251120204106-2e9c82787618 @@ -43,6 +46,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.9 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.41.1 // indirect github.com/aws/smithy-go v1.23.2 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -51,6 +55,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.18.1 // indirect github.com/knadh/koanf/maps v0.1.2 // indirect github.com/knadh/koanf/providers/confmap v1.0.0 // indirect github.com/knadh/koanf/v2 v2.3.0 // indirect @@ -58,6 +63,8 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.140.1 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.140.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.140.1-0.20251120204106-2e9c82787618 // indirect @@ -77,3 +84,13 @@ require ( google.golang.org/protobuf v1.36.10 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension => ../../extension/encoding/awslogsencodingextension + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/receiver/awslambdareceiver/go.sum b/receiver/awslambdareceiver/go.sum index db4c0bdaf76f8..f15d1e78c5934 100644 --- a/receiver/awslambdareceiver/go.sum +++ b/receiver/awslambdareceiver/go.sum @@ -38,6 +38,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.41.1 h1:GdGmKtG+/Krag7VfyOXV17xjTCz0 github.com/aws/aws-sdk-go-v2/service/sts v1.41.1/go.mod h1:6TxbXoDSgBQ225Qd8Q+MbxUxUh6TtNKwbRt/EPS9xso= github.com/aws/smithy-go v1.23.2 h1:Crv0eatJUQhaManss33hS5r40CG3ZFH+21XSkqMrIUM= github.com/aws/smithy-go v1.23.2/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -64,6 +66,8 @@ github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKe github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= +github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= github.com/knadh/koanf/maps v0.1.2 h1:RBfmAW5CnZT+PJ1CVc1QSJKf4Xu9kxfQgYVQSu8hpbo= github.com/knadh/koanf/maps v0.1.2/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= github.com/knadh/koanf/providers/confmap v1.0.0 h1:mHKLJTE7iXEys6deO5p6olAiZdG5zwp8Aebir+/EaRE= @@ -113,14 +117,16 @@ go.opentelemetry.io/collector/consumer/xconsumer v0.140.1-0.20251120204106-2e9c8 go.opentelemetry.io/collector/consumer/xconsumer v0.140.1-0.20251120204106-2e9c82787618/go.mod h1:CtwSgAXVisCEJ+ElKeDa0yDo/Oie7l1vWAx1elFyWZc= go.opentelemetry.io/collector/extension v1.46.1-0.20251120204106-2e9c82787618 h1:dEOdTe91ephosris+SqcCNMigbTAVKVeTH5y/RKFWok= go.opentelemetry.io/collector/extension v1.46.1-0.20251120204106-2e9c82787618/go.mod h1:/NGiZQFF7hTyfRULTgtYw27cIW8i0hWUTp12lDftZS0= +go.opentelemetry.io/collector/extension/extensiontest v0.140.1-0.20251120204106-2e9c82787618 h1:fSpDfhL6/nbWUdGZusxKS37wjrpHJiuAejf2OnmwU1o= +go.opentelemetry.io/collector/extension/extensiontest v0.140.1-0.20251120204106-2e9c82787618/go.mod h1:0uBMV4lDDg1GF98ht5I/iJ6xpFFV1vW1xi1Y3HEG5To= go.opentelemetry.io/collector/featuregate v1.46.1-0.20251120204106-2e9c82787618 h1:kVcNLQHeTacufSIPasBJG7WbR02qxEBUWfrMAu/Un40= go.opentelemetry.io/collector/featuregate v1.46.1-0.20251120204106-2e9c82787618/go.mod h1:d0tiRzVYrytB6LkcYgz2ESFTv7OktRPQe0QEQcPt1L4= go.opentelemetry.io/collector/pdata v1.46.1-0.20251120204106-2e9c82787618 h1:w4Sd8D+T6wdekkBJlfjAsa7wpXDUmb3wQicikJ8vI9M= go.opentelemetry.io/collector/pdata v1.46.1-0.20251120204106-2e9c82787618/go.mod h1:AqZXTFkj01IxuiHZ1/I7UcGqaljvF5xiUXNYGxRqVp8= go.opentelemetry.io/collector/pdata/pprofile v0.140.1-0.20251120204106-2e9c82787618 h1:EPM+f1DSlHtcTT32N2tfIwXY58N5lOChdYNEEgBk5uA= go.opentelemetry.io/collector/pdata/pprofile v0.140.1-0.20251120204106-2e9c82787618/go.mod h1:01EwjIBpIcmJva7IoXPmHPmACGzsGxFi9xhZhY7W4q8= -go.opentelemetry.io/collector/pdata/testdata v0.140.0 h1:jMhHRS8HbiYwXeElnuTNT+17QGUF+5A5MPgdSOjpJrw= -go.opentelemetry.io/collector/pdata/testdata v0.140.0/go.mod h1:4BZo10Ua0sbxrqMOPzVU4J/EJdE3js472lskyPW4re8= +go.opentelemetry.io/collector/pdata/testdata v0.140.1-0.20251120204106-2e9c82787618 h1:TQAaixNDABYOOGEmg4ViEXxSTvBJuzeZqGyZoPBcz7I= +go.opentelemetry.io/collector/pdata/testdata v0.140.1-0.20251120204106-2e9c82787618/go.mod h1:4BZo10Ua0sbxrqMOPzVU4J/EJdE3js472lskyPW4re8= go.opentelemetry.io/collector/pipeline v1.46.1-0.20251120204106-2e9c82787618 h1:shpb1oV7YQgGaPP59WC/A72B+wdki9DfdJ315O2UtnY= go.opentelemetry.io/collector/pipeline v1.46.1-0.20251120204106-2e9c82787618/go.mod h1:xUrAqiebzYbrgxyoXSkk6/Y3oi5Sy3im2iCA51LwUAI= go.opentelemetry.io/collector/receiver v1.46.1-0.20251120204106-2e9c82787618 h1:lEhxCaW1jtBwV3lTPErL2YhfzH8J9FOn2zLtU6aqJt0= diff --git a/receiver/awslambdareceiver/handler.go b/receiver/awslambdareceiver/handler.go index b28fe17b35639..0783e3e6cbe2d 100644 --- a/receiver/awslambdareceiver/handler.go +++ b/receiver/awslambdareceiver/handler.go @@ -5,6 +5,7 @@ package awslambdareceiver // import "github.com/open-telemetry/opentelemetry-col import ( "context" + "encoding/base64" "encoding/json" "errors" "fmt" @@ -20,9 +21,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver/internal" ) -type s3EventConsumerFunc[T any] func(context.Context, events.S3EventRecord, T) error - -type unmarshalFunc[T any] func([]byte) (T, error) +type ( + unmarshalFunc[T any] func([]byte) (T, error) + s3EventConsumerFunc[T any] func(context.Context, events.S3EventRecord, T) error +) type lambdaEventHandler interface { handlerType() eventType @@ -115,6 +117,53 @@ func (*s3Handler[T]) parseEvent(raw json.RawMessage) (event events.S3EventRecord return message.Records[0], nil } +// cwLogsSubscriptionHandler is specialized in CloudWatch log stream subscription filter events +type cwLogsSubscriptionHandler struct { + logger *zap.Logger + unmarshal unmarshalFunc[plog.Logs] + consumer func(context.Context, plog.Logs) error +} + +func newCWLogsSubscriptionHandler( + baseLogger *zap.Logger, + unmarshal unmarshalFunc[plog.Logs], + consumer func(context.Context, plog.Logs) error, +) *cwLogsSubscriptionHandler { + return &cwLogsSubscriptionHandler{ + logger: baseLogger.Named("cw-logs-subscription"), + unmarshal: unmarshal, + consumer: consumer, + } +} + +func (*cwLogsSubscriptionHandler) handlerType() eventType { + return cwEvent +} + +func (c *cwLogsSubscriptionHandler) handle(ctx context.Context, event json.RawMessage) error { + var log events.CloudwatchLogsEvent + if err := gojson.Unmarshal(event, &log); err != nil { + return fmt.Errorf("failed to unmarshal cloudwatch event log: %w", err) + } + + decoded, err := base64.StdEncoding.DecodeString(log.AWSLogs.Data) + if err != nil { + return fmt.Errorf("failed to decode data from cloudwatch logs event: %w", err) + } + + data, err := c.unmarshal(decoded) + if err != nil { + return fmt.Errorf("failed to unmarshal logs: %w", err) + } + + if err := c.consumer(ctx, data); err != nil { + // consumer errors are marked for retrying + return consumererror.NewRetryableError(err) + } + + return nil +} + // setObservedTimestampForAllLogs adds observedTimestamp to all logs func setObservedTimestampForAllLogs(logs plog.Logs, observedTimestamp time.Time) { for _, resourceLogs := range logs.ResourceLogs().All() { diff --git a/receiver/awslambdareceiver/handler_test.go b/receiver/awslambdareceiver/handler_test.go index d5d245175c983..e7510a1460928 100644 --- a/receiver/awslambdareceiver/handler_test.go +++ b/receiver/awslambdareceiver/handler_test.go @@ -3,9 +3,14 @@ package awslambdareceiver import ( + "bytes" + "compress/gzip" "context" + "encoding/base64" "encoding/json" "errors" + "os" + "path/filepath" "testing" "time" @@ -19,9 +24,14 @@ import ( "go.uber.org/mock/gomock" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver/internal" ) +const testDataDirectory = "testdata" + func TestProcessLambdaEvent_S3Notification(t *testing.T) { t.Parallel() @@ -220,6 +230,56 @@ func TestS3HandlerParseEvent(t *testing.T) { } } +func TestHandleCloudwatchLogEvent(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + eventData string + expectedErr string + eventConsumer consumer.Logs + }{ + "valid_cloudwatch_log_event": { + eventData: loadCompressedData(t, filepath.Join(testDataDirectory, "cloudwatch_log.json")), + eventConsumer: &logConsumerWithGoldenValidation{logsExpectedPath: filepath.Join(testDataDirectory, "cloudwatch_log_expected.yaml")}, + }, + "invalid_base64_data": { + eventData: "#", + expectedErr: "failed to decode data from cloudwatch logs event", + eventConsumer: &noOpLogsConsumer{}, + }, + "invalid_cloudwatch_log_data": { + eventData: "test", + expectedErr: "failed to unmarshal logs", + eventConsumer: &noOpLogsConsumer{}, + }, + } + + unmarshaler, err := loadSubFilterLogUnmarshaler(t.Context(), awslogsencodingextension.NewFactory()) + require.NoError(t, err) + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + cwEvent := events.CloudwatchLogsEvent{ + AWSLogs: events.CloudwatchLogsRawData{ + Data: test.eventData, + }, + } + + var lambdaEvent json.RawMessage + lambdaEvent, err = json.Marshal(cwEvent) + require.NoError(t, err) + + handler := newCWLogsSubscriptionHandler(zap.NewNop(), unmarshaler.UnmarshalLogs, test.eventConsumer.ConsumeLogs) + err := handler.handle(t.Context(), lambdaEvent) + if test.expectedErr != "" { + require.ErrorContains(t, err, test.expectedErr) + } else { + require.NoError(t, err) + } + }) + } +} + func TestSetObservedTimestampForAllLogs(t *testing.T) { t.Parallel() @@ -328,6 +388,23 @@ func TestConsumerErrorHandling(t *testing.T) { } } +type logConsumerWithGoldenValidation struct { + logsExpectedPath string +} + +func (logConsumerWithGoldenValidation) Capabilities() consumer.Capabilities { + return consumer.Capabilities{} +} + +func (l logConsumerWithGoldenValidation) ConsumeLogs(_ context.Context, logs plog.Logs) error { + expectedLogs, err := golden.ReadLogs(l.logsExpectedPath) + if err != nil { + return err + } + + return plogtest.CompareLogs(expectedLogs, logs) +} + type noOpLogsConsumer struct { consumeCount int err error @@ -368,3 +445,21 @@ func (n *mockPlogEventHandler) handle(context.Context, json.RawMessage) error { n.handleCount++ return nil } + +func loadCompressedData(t *testing.T, file string) string { + data, err := os.ReadFile(file) + require.NoError(t, err) + + compressed := compressData(t, data) + return base64.StdEncoding.EncodeToString(compressed) +} + +func compressData(t *testing.T, data []byte) []byte { + var buf bytes.Buffer + gzipWriter := gzip.NewWriter(&buf) + _, err := gzipWriter.Write(data) + require.NoError(t, err) + err = gzipWriter.Close() + require.NoError(t, err) + return buf.Bytes() +} diff --git a/receiver/awslambdareceiver/receiver.go b/receiver/awslambdareceiver/receiver.go index 5b781fc72d66a..1e418aef831e6 100644 --- a/receiver/awslambdareceiver/receiver.go +++ b/receiver/awslambdareceiver/receiver.go @@ -10,17 +10,20 @@ import ( "errors" "fmt" "os" + "strings" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver/internal" ) @@ -91,12 +94,11 @@ func newMetricsReceiver(cfg *Config, set receiver.Settings, next consumer.Metric }, nil } -// Start registers the main handler function for -// when lambda is triggered +// Start registers the main handler function that get executed when lambda is triggered func (a *awsLambdaReceiver) Start(ctx context.Context, host component.Host) error { // Verify we're running in a Lambda environment - if os.Getenv("_LAMBDA_SERVER_PORT") == "" || os.Getenv("AWS_LAMBDA_RUNTIME_API") == "" { - return errors.New("receiver must be used in an AWS Lambda environment: required environment variables _LAMBDA_SERVER_PORT and AWS_LAMBDA_RUNTIME_API are not set") + if os.Getenv("AWS_EXECUTION_ENV") == "" || !strings.HasPrefix(os.Getenv("AWS_EXECUTION_ENV"), "AWS_Lambda_") { + return errors.New("awslambdareceiver must be used in an AWS Lambda environment: missing environment variable AWS_EXECUTION_ENV") } handler, err := a.newHandler(ctx, host, a.s3Provider) @@ -164,9 +166,14 @@ func newLogsHandler( next consumer.Logs, s3Provider internal.S3Provider, ) (lambdaEventHandler, error) { - // If S3Encoding is not set, return an error + // If S3Encoding is not set, then we consider this to be CloudWatch subscription mode if cfg.S3Encoding == "" { - return nil, errors.New("s3_encoding is required for logs stored in S3") + unmarshaler, err := loadSubFilterLogUnmarshaler(ctx, awslogsencodingextension.NewFactory()) + if err != nil { + return nil, err + } + + return newCWLogsSubscriptionHandler(set.Logger, unmarshaler.UnmarshalLogs, next.ConsumeLogs), nil } encodingExtension, err := loadEncodingExtension[plog.Unmarshaler](host, cfg.S3Encoding, "logs") @@ -212,7 +219,7 @@ func newMetricsHandler( return newS3Handler(s3Service, set.Logger, encodingExtension.UnmarshalMetrics, metricsConsumer), nil } -// loadEncodingExtension tries to load an available extension for the given encoding. +// loadEncodingExtension attempts to load an available extension for the given encoding. func loadEncodingExtension[T any](host component.Host, encoding, signalType string) (T, error) { var zero T var extensionID component.ID @@ -231,6 +238,36 @@ func loadEncodingExtension[T any](host component.Host, encoding, signalType stri return unmarshaler, nil } +type extensionFactory interface { + Create(ctx context.Context, set extension.Settings, cfg component.Config) (extension.Extension, error) +} + +// loadSubFilterLogUnmarshaler manually loads the encoding extension for cloudwatch subscription filter unmarshaler +func loadSubFilterLogUnmarshaler(ctx context.Context, logsEncodingFactory extensionFactory) (plog.Unmarshaler, error) { + var extensionID component.ID + err := extensionID.UnmarshalText([]byte(awsLogsEncoding)) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal identifier: %w", err) + } + + // Create the extension using the factory + ext, err := logsEncodingFactory.Create(ctx, extension.Settings{ID: extensionID}, &awslogsencodingextension.Config{ + Format: "cloudwatch", + }) + if err != nil { + return nil, fmt.Errorf("failed to create the extension: %w", err) + } + + // Assert that the extension implements plog.Unmarshaler + subscriptionFilterLogUnmarshaler, ok := ext.(plog.Unmarshaler) + if !ok { + return nil, fmt.Errorf("extension %q does not implement plog.Unmarshaler", extensionID.String()) + } + + // Return the unmarshaler + return subscriptionFilterLogUnmarshaler, nil +} + // detectTriggerType is a helper to derive the eventType based on the payload content. // Supported trigger types are: // - S3Event diff --git a/receiver/awslambdareceiver/receiver_test.go b/receiver/awslambdareceiver/receiver_test.go index 306d0700ad601..b44bd5ec74d01 100644 --- a/receiver/awslambdareceiver/receiver_test.go +++ b/receiver/awslambdareceiver/receiver_test.go @@ -5,6 +5,7 @@ package awslambdareceiver import ( "context" "encoding/json" + "errors" "reflect" "testing" @@ -25,8 +26,7 @@ import ( func TestCreateLogs(t *testing.T) { // Set Lambda environment variables required by Start() - t.Setenv("_LAMBDA_SERVER_PORT", "9001") - t.Setenv("AWS_LAMBDA_RUNTIME_API", "localhost:9001") + t.Setenv("AWS_EXECUTION_ENV", "AWS_Lambda_python3.12") // Create receiver using factory with S3 encoding config. // Note: The S3Encoding value must match the component ID used when registering the extension. @@ -95,8 +95,7 @@ func TestCreateLogs(t *testing.T) { func TestCreateMetrics(t *testing.T) { // Set Lambda environment variables required by Start() - t.Setenv("_LAMBDA_SERVER_PORT", "9001") - t.Setenv("AWS_LAMBDA_RUNTIME_API", "localhost:9001") + t.Setenv("AWS_EXECUTION_ENV", "AWS_Lambda_python3.12") // Create receiver using factory with a dummy encoding extension. factory := NewFactory() @@ -173,8 +172,7 @@ func TestCreateMetricsRequiresS3Encoding(t *testing.T) { func TestStartRequiresLambdaEnvironment(t *testing.T) { // Ensure Lambda environment variables are not set - t.Setenv("_LAMBDA_SERVER_PORT", "") - t.Setenv("AWS_LAMBDA_RUNTIME_API", "") + t.Setenv("AWS_EXECUTION_ENV", "") factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) @@ -263,7 +261,15 @@ func TestLoadLogsHandler(t *testing.T) { isErr bool }{ { - name: "Prioritize ID based loading - success", + name: "Default to CW Subscription filter", + factoryMock: func(_ context.Context, _ extension.Settings, _ component.Config) (extension.Extension, error) { + return &mockExtensionWithPLogUnmarshaler{}, nil + }, + isErr: false, + expectedHandlerType: reflect.TypeOf(&cwLogsSubscriptionHandler{}), + }, + { + name: "Logs handler based on S3 encoding", s3Encoding: "my_encoding", hostMock: func() map[component.ID]component.Component { id := component.NewID(component.MustNewType("my_encoding")) @@ -276,7 +282,7 @@ func TestLoadLogsHandler(t *testing.T) { expectedHandlerType: reflect.TypeOf(&s3Handler[plog.Logs]{}), }, { - name: "Error if handler loading fails", + name: "Error if no matching S3 encoding extension found", s3Encoding: "custom_encoding", hostMock: func() map[component.ID]component.Component { // no registered extensions matching s3Encoding @@ -459,6 +465,50 @@ func TestExtractFirstKey(t *testing.T) { } } +func TestLoadSubFilterLogUnmarshaler(t *testing.T) { + tests := []struct { + name string + mockFactory func(ctx context.Context, settings extension.Settings, config component.Config) (extension.Extension, error) + expectedErr string + }{ + { + name: "successful_case", + mockFactory: func(_ context.Context, _ extension.Settings, _ component.Config) (extension.Extension, error) { + return &mockExtensionWithPLogUnmarshaler{}, nil + }, + }, + { + name: "create_extension_error", + mockFactory: func(_ context.Context, _ extension.Settings, _ component.Config) (extension.Extension, error) { + return nil, errors.New("mock factory creation error") + }, + expectedErr: "failed to create the extension", + }, + { + name: "invalid_unmarshaler", + mockFactory: func(_ context.Context, _ extension.Settings, _ component.Config) (extension.Extension, error) { + return &mockExtension{}, nil + }, + expectedErr: "does not implement plog.Unmarshaler", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + mockFactory := &mockExtFactory{ + CreateFunc: test.mockFactory, + } + + _, errP := loadSubFilterLogUnmarshaler(t.Context(), mockFactory) + if test.expectedErr != "" { + require.ErrorContains(t, errP, test.expectedErr) + } else { + require.NoError(t, errP) + } + }) + } +} + func TestDetectTriggerType(t *testing.T) { tests := []struct { name string @@ -498,6 +548,14 @@ func TestDetectTriggerType(t *testing.T) { } } +type mockExtFactory struct { + CreateFunc func(ctx context.Context, settings extension.Settings, config component.Config) (extension.Extension, error) +} + +func (m *mockExtFactory) Create(ctx context.Context, settings extension.Settings, config component.Config) (extension.Extension, error) { + return m.CreateFunc(ctx, settings, config) +} + type mockExtensionWithPLogUnmarshaler struct { mockExtension // Embed the base mock implementation. plog.Unmarshaler // Add the unmarshaler interface when needed. diff --git a/receiver/awslambdareceiver/testdata/cloudwatch_log.json b/receiver/awslambdareceiver/testdata/cloudwatch_log.json new file mode 100644 index 0000000000000..9b2baae5b5708 --- /dev/null +++ b/receiver/awslambdareceiver/testdata/cloudwatch_log.json @@ -0,0 +1,23 @@ +{ + "owner": "123456789012", + "logGroup": "/aws/cloudtrail/management", + "logStream": "123456789012_CloudTrail_us-east-1", + "messageType": "DATA_MESSAGE", + "logEvents": [ + { + "id": "evt-001", + "timestamp": 1732638456000, + "message": "{\"eventName\":\"ConsoleLogin\",\"userIdentity\":{\"type\":\"Root\"},\"sourceIPAddress\":\"99.84.22.14\",\"awsRegion\":\"us-east-1\"}" + }, + { + "id": "evt-002", + "timestamp": 1732638469000, + "message": "{\"eventName\":\"StartInstances\",\"userIdentity\":{\"type\":\"IAMUser\",\"userName\":\"devops\"},\"awsRegion\":\"us-east-1\"}" + }, + { + "id": "evt-003", + "timestamp": 1732638478000, + "message": "{\"eventName\":\"PutObject\",\"userIdentity\":{\"type\":\"AssumedRole\"},\"awsRegion\":\"us-east-1\",\"requestParameters\":{\"bucket\":\"app-logs\"}}" + } + ] +} diff --git a/receiver/awslambdareceiver/testdata/cloudwatch_log_expected.yaml b/receiver/awslambdareceiver/testdata/cloudwatch_log_expected.yaml new file mode 100644 index 0000000000000..62f7b81d639a8 --- /dev/null +++ b/receiver/awslambdareceiver/testdata/cloudwatch_log_expected.yaml @@ -0,0 +1,32 @@ +resourceLogs: + - resource: + attributes: + - key: cloud.provider + value: + stringValue: aws + - key: cloud.account.id + value: + stringValue: "123456789012" + - key: aws.log.group.names + value: + stringValue: /aws/cloudtrail/management + - key: aws.log.stream.names + value: + stringValue: 123456789012_CloudTrail_us-east-1 + scopeLogs: + - logRecords: + - body: + stringValue: '{"eventName":"ConsoleLogin","userIdentity":{"type":"Root"},"sourceIPAddress":"99.84.22.14","awsRegion":"us-east-1"}' + timeUnixNano: "1732638456000000000" + - body: + stringValue: '{"eventName":"StartInstances","userIdentity":{"type":"IAMUser","userName":"devops"},"awsRegion":"us-east-1"}' + timeUnixNano: "1732638469000000000" + - body: + stringValue: '{"eventName":"PutObject","userIdentity":{"type":"AssumedRole"},"awsRegion":"us-east-1","requestParameters":{"bucket":"app-logs"}}' + timeUnixNano: "1732638478000000000" + scope: + attributes: + - key: encoding.format + value: + stringValue: aws.cloudwatch + name: github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension