Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/feat_awslambdareceiver-cloudwatch-signal.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: receiver/awslambda

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for AWS Lambda receiver to trigger by CloudWatch logs subscription filters for Lambda

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [43504]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
55 changes: 20 additions & 35 deletions receiver/awslambdareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,44 +54,19 @@ The `awslambdareceiver` operates as follows:

The following receiver configuration parameters are supported.

| Name | Description | Default | Required |
|:----------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------|----------|
| `s3_encoding` | Name of the encoding extension to use for S3 objects | "awslogs_encoding" | Optional |
| Name | Description |
|:---------------------|:-------------------------------------------------------------------------------------------------------------------|
| `encoding_extension` | Required: The name of the encoding extension to load which will decode the data received through supported sources |

### Example Configuration

```yaml
receivers:
awslambda:
s3_encoding: awslogs_encoding

extensions:
awslogs_encoding:
format: vpcflow
vpcflow:
file_format: plain-text

exporters:
otlphttp:
endpoint: "https://my-backend:443"

service:
extensions:
- awslogs_encoding
pipelines:
logs:
receivers: [awslambda]
exporters: [otlphttp]
```

## Examples

### Example 1: VPC Flow Logs from S3

```yaml
receivers:
awslambda:
s3_encoding: awslogs_encoding
encoding_extension: awslogs_encoding

extensions:
awslogs_encoding:
Expand All @@ -112,14 +87,16 @@ service:
exporters: [otlphttp]
```

In this example, `awslambdareceiver` receives a notification when a new VPC flow log file is stored in an S3 bucket. The receiver fetches the log file from S3 and parses it using the `awslogs_encoding` extension with vpcflow format. The parsed logs are then sent to an OTLP listener using the `otlphttp` exporter.
In this example, the `awslambdareceiver` is expected to be triggered when a VPC flow log is created at S3 bucket.
The receiver retrieves the log file from S3 and decodes it using the `awslogs_encoding` extension with the vpcflow format.
Parsed logs are forwarded to an OTLP listener via the `otlphttp` exporter.

### Example 2: ELB Access Logs from S3

```yaml
receivers:
awslambda:
s3_encoding: awslogs_encoding
encoding_extension: awslogs_encoding

extensions:
awslogs_encoding:
Expand All @@ -140,29 +117,37 @@ service:
exporters: [otlphttp]
```

### Example 3: CloudWatch Logs Subscription
### Example 3: CloudWatch Logs mode

```yaml
receivers:
awslambda:
encoding_extension: awslogs_encoding

extensions:
awslogs_encoding:
format: cloudwatch

exporters:
otlphttp:
endpoint: "https://my-backend:443"

service:
extensions:
- awslogs_encoding
pipelines:
logs:
receivers: [awslambda]
exporters: [otlphttp]
```

In this example, `awslambdareceiver` is invoked by a CloudWatch Logs subscription filter. The receiver automatically parses the CloudWatch Logs data using the default `awslogs_encoding` extension with cloudwatch format. No explicit encoding configuration is needed. The parsed logs are then sent to an OTLP listener using the `otlphttp` exporter.
In this example, `awslambdareceiver` is expected to be triggered by a CloudWatch Logs subscription filter.
The receiver retrieves the logs from the event payload and decodes them using the `awslogs_encoding` extension with the cloudwatch format.

## Supported Data Types

- **Logs** (Primary support)
- **Metrics** (Future consideration)
- **Logs**: Supported through S3 and CloudWatch Logs event sources
- **Metrics**: Supported through S3

## AWS Permissions

Expand Down
5 changes: 3 additions & 2 deletions receiver/awslambdareceiver/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"encoding/json"
"testing"
"time"

"github.com/aws/aws-lambda-go/events"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -40,8 +41,8 @@ func BenchmarkHandleS3Notification(b *testing.B) {

consumer := noOpLogsConsumer{}
// Wrap the consumer to match the new s3EventConsumerFunc signature
logsConsumer := func(ctx context.Context, event events.S3EventRecord, logs plog.Logs) error {
setObservedTimestampForAllLogs(logs, event.EventTime)
logsConsumer := func(ctx context.Context, time time.Time, logs plog.Logs) error {
setObservedTimestampForAllLogs(logs, time)
return consumer.ConsumeLogs(ctx, logs)
}
handler := newS3Handler(service, zap.NewNop(), mockS3LogUnmarshaler{}.UnmarshalLogs, logsConsumer)
Expand Down
23 changes: 12 additions & 11 deletions receiver/awslambdareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,19 @@
package awslambdareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver"

import (
"go.opentelemetry.io/collector/component"
)
"errors"

const (
awsLogsEncoding = "awslogs_encoding"
s3ARNPrefix = "arn:aws:s3:::"
"go.opentelemetry.io/collector/component"
)

type Config struct {
// S3Encoding identifies the encoding of the S3 objects that trigger the Lambda.
// EncodingExtension defines the encoding extension the receiver must use to decode receiving data
//
// If S3Encoding is unspecified, the receiver will return an error for any S3 event notifications.
// If receiving data is in different formats(ex:- VPC flow logs, CloudTrail logs), receiver must be deployed in
// separate Lambda functions with specific extension configurations.
//
// If you have objects with multiple different encodings to handle, you should deploy
// separate Lambda functions with different configurations.
S3Encoding string `mapstructure:"s3_encoding"`
// This property is mandatory and must refer to a valid encoding extension configured in the collector configurations.
EncodingExtension string `mapstructure:"encoding_extension"`

_ struct{} // Prevent unkeyed literal initialization
}
Expand All @@ -30,6 +27,10 @@ func createDefaultConfig() component.Config {
return &Config{}
}

func (*Config) Validate() error {
func (c *Config) Validate() error {
if c.EncodingExtension == "" {
return errors.New("encoding_extension is mandatory, please use a valid encoding extension name configured in the collector configurations")
}

return nil
}
9 changes: 5 additions & 4 deletions receiver/awslambdareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,23 @@ func TestLoadConfig(t *testing.T) {
expectError string
}{
{
id: component.NewIDWithName(metadata.Type, awsLogsEncoding),
id: component.NewIDWithName(metadata.Type, "awslogs_encoding"),
expected: &Config{
S3Encoding: awsLogsEncoding,
EncodingExtension: "awslogs_encoding",
},
},
{
id: component.NewIDWithName(metadata.Type, "json_log_encoding"),
expected: &Config{
S3Encoding: "json_log_encoding",
EncodingExtension: "json_log_encoding",
},
},
{
id: component.NewIDWithName(metadata.Type, "empty_encoding"),
expected: &Config{
S3Encoding: "",
EncodingExtension: "",
},
expectError: "encoding_extension is mandatory",
},
}

Expand Down
1 change: 1 addition & 0 deletions receiver/awslambdareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

func TestCreateLogsReceiver(t *testing.T) {
cfg := createDefaultConfig()
cfg.(*Config).EncodingExtension = "awslogs_encoding"

r, err := createLogsReceiver(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, consumertest.NewNop())
require.NoError(t, err)
Expand Down
17 changes: 17 additions & 0 deletions receiver/awslambdareceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.32.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.92.0
github.com/goccy/go-json v0.10.5
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension v0.140.1
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.140.1
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.140.1
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/collector/component v1.46.1-0.20251127085441-73f090cd0d59
go.opentelemetry.io/collector/component/componenttest v0.140.1-0.20251127085441-73f090cd0d59
Expand Down Expand Up @@ -43,6 +46,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.9 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.41.1 // indirect
github.com/aws/smithy-go v1.23.2 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand All @@ -51,13 +55,16 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.18.1 // indirect
github.com/knadh/koanf/maps v0.1.2 // indirect
github.com/knadh/koanf/providers/confmap v1.0.0 // indirect
github.com/knadh/koanf/v2 v2.3.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.140.1 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.140.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/collector/consumer/xconsumer v0.140.1-0.20251127085441-73f090cd0d59 // indirect
Expand All @@ -77,3 +84,13 @@ require (
google.golang.org/protobuf v1.36.10 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension => ../../extension/encoding/awslogsencodingextension

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
10 changes: 8 additions & 2 deletions receiver/awslambdareceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading