Skip to content

Conversation

@Kavindu-Dodan
Copy link
Contributor

@Kavindu-Dodan Kavindu-Dodan commented Nov 26, 2025

Description

This PR adds CloudWatch logs Lambda subscription handling support to awslambdareceiver.

image

With this change, the receiver can now be deployed as a Lambda and support handling either of the signals: S3 triggers or CloudWatch subscription logs.

Note - Receiver can only handle one signal type per deployment.

Link to tracking issue

Completes part of #43504

Testing

I have added unit tests to cover most of the new functionality. One of them (TestHandleCloudwatchLogEvent) provides end to end validation of a CloudWatch event to plogs conversion.

Documentation

Existing documentation already covered some aspects. However, I have attempted to uplift them along with code level comments.

@Kavindu-Dodan Kavindu-Dodan requested review from a team, axw and pjanotti as code owners November 26, 2025 22:51
@Kavindu-Dodan Kavindu-Dodan force-pushed the feat/awslambdareceiver-cloudwatch-signal branch 2 times, most recently from 2ae6e5d to e9aa82a Compare November 26, 2025 23:08
Copy link
Contributor

@axw axw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks fine, but I'd like to revisit the config. Implicitly making it CloudWatch-based because the S3 encoding isn't configured is a bit unintuitive.

@Kavindu-Dodan
Copy link
Contributor Author

Kavindu-Dodan commented Nov 27, 2025

Code looks fine, but I'd like to revisit the config. Implicitly making it CloudWatch-based because the S3 encoding isn't configured is a bit unintuitive.

Thanks @axw for this remark. Personally, I also saw this to be misleading (detecting CW logs operation mode when the S3 encoder is missing felt wrong). I am exploring a new strategy for this but it will be a considerable refactoring. And I think this is the right time to fix this.

What I am considering is the following,

  • Instead of using s3_encoding, we will migrate to a generic configuration named encoding_extension
  • This allows the user to define any valid encoding extension to decode/process the receiving data
  • Along with this, we will construct the lambdaEventHandler dynamically based on the signal that arrives (initialized lazily and then kept for runtime)
    • This is essential as currently we use missing s3_encoding configuration to fallback to cwLogsSubscriptionHandler
    • Going forward, we will not build the lambdaEventHandler until we receive the first event
    • And based on the event, we will generate the event handler. For example, if it's s3, we construct and use s3Handler. And if the event is from CW logs, then we build the cwLogsSubscriptionHandler. Once initialized handlers will be reused
  • As a side effect of this improvement, we get the following,
    • Lambda receiver now can use any valid encoding extension
    • Lambda receiver can handle multiple sources (ex:- both S3 & CW logs) at once (note - this applied to logs. for metrics it's only S3 mode as CW logs backed metrics does not make sense)

I am actively working on this improvement and will update this as a single commit to have focus. But let me know if you think of a different approach.

@axw
Copy link
Contributor

axw commented Nov 28, 2025

Instead of using s3_encoding, we will migrate to a generic configuration named encoding_extension

How would that encoding extension be used for CloudWatch Logs? They have a specific structure -- they must be decoded in the CloudWatch log subscription filter format.

I think it would make sense to use it for decoding the message field, so e.g. you could use awslogs_encoding to decode CloudTrail logs sent to CloudWatch.

You could support both S3 and CloudWatch events in the one Lambda, but they would have to have the same encoding, which still adds a little bit of cognitive overhead.

Another option would be to have two separate attribute groups, along these lines:

receivers/
  awslambda:
    s3:
      encoding: ... # required for handling S3 events
    cloudwatch_logs:
      message_encoding: ... # optional; defaults to using the message field as the log record body

I personally think that's easier to reason about, since it keeps the two event types completely separate. Configuring the encoding for one event type has no impact on the other.

@MichaelKatsoulis
Copy link
Contributor

Reading your suggestions, I agree that keeping the configuration for s3 and cloudwatch_logs separate is the cleaner approach.

How would that encoding extension be used for CloudWatch Logs? ... I think it would make sense to use it for decoding the message field.

Yes, exactly. If message_encoding is provided, we should use it to decode the message field of the CloudWatch Log Events.

However, there is a significant technical challenge here regarding the input format expectations.

  1. Mismatch
    The CloudWatch Subscription Filter sends us a JSON payload where logEvents is an array of individual
    messages:
"logEvents": [
        {
            "id": "31953106606966983378809025079804211143289615424298221568",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        },
        {
            "id": "31953106606966983378809025079804211143289615424298221569",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        },
        {
            "id": "31953106606966983378809025079804211143289615424298221570",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        }
    ]

However, the awslogs encoding extension (e.g., for CloudTrail) expects a specific root-level structure that mirrors the original S3 file format:

{ 
   "Records": [        
       { "eventVersion": "1.08"... },        
       { "eventVersion": "1.08"... }    
   ]
}
  1. Transformation Requirement
    We cannot simply stream the message fields into the encoding extension.
    For VPC Flow Logs: Concatenating messages with \n might work (as they are line-delimited).
    For CloudTrail: We would need to explicitly construct a valid JSON wrapper {"Records": [ ... ]} around the concatenated messages before passing them to the unmarshaler.
  2. Efficiency Concern
    Ideally, if message_encoding is set, we should indeed bypass the full subscriptionFilter unmarshaler to avoid the overhead of Bytes -> PLogs (SubFilter) -> Bytes (Re-encoded) -> PLogs (Target). We can parse the raw CloudWatch JSON, extract the message strings directly, apply the necessary wrapping (e.g. {"Records":[ + joined messages + ]}), and feed that byte stream to the target unmarshaler.

I agree this implementation detail can be tackled in a follow-up PR, but it's important to note that supporting message_encoding might require the receiver to have some logic to "adapt" the stream (delimiter/wrapping) based on the target encoding type.

Signed-off-by: Kavindu Dodanduwa <[email protected]>

# Conflicts:
#	receiver/awslambdareceiver/go.sum
@Kavindu-Dodan Kavindu-Dodan force-pushed the feat/awslambdareceiver-cloudwatch-signal branch from ed8908a to 93bb5dd Compare November 28, 2025 19:41
@Kavindu-Dodan Kavindu-Dodan force-pushed the feat/awslambdareceiver-cloudwatch-signal branch from 93bb5dd to fbd585f Compare November 28, 2025 20:02
@Kavindu-Dodan
Copy link
Contributor Author

Kavindu-Dodan commented Nov 28, 2025

@axw @MichaelKatsoulis thanks for the feedback. Let me clarify my view and let's see if we can come to conclusion on this.

First, let's break down receiver operation modes. The lambda receiver can operate for both logs & metrics. Each operation mode can then have sub-trigger modes. Currently, we support the following,

  • Logs : Trigger from S3 & Trigger from CloudWatch
  • Metrics : Trigger from S3

The detection of trigger type is easy and is already done by detectTriggerType implementation. Once the trigger type is detected, we have well-defined event formats to extract payload that we must process. For example, for the S3 trigger, the event format is known 1 and that's what we use to obtain the content of the S3 object.

Once we obtain the content, that's where we need the help of encoding extension. So if the user is forwarding VPC flow logs to S3, then user must use awslogs_encoding with vpcflow format

receivers:
  awslambda:
    encoding_extension: awslogs_encoding

extensions:
  awslogs_encoding:
    format: vpcflow
    vpcflow:
      file_format: plain-tex

How would that encoding extension be used for CloudWatch Logs? They have a specific structure -- they must be decoded in the CloudWatch log subscription filter format.

If we accept the proposed change in this PR, then the user must define awslogs_encoding with cloudwatch format,

receivers:
  awslambda:
    encoding_extension: awslogs_encoding

extensions:
  awslogs_encoding:
    format: cloudwatch

However, this is where we sort of misuse the CloudWatch subscription filter. Because in the implementation 2, we are just reading the content of the message and set it to the scope log log record. In other words, if we apply the same usage of S3 event trigger, we are performing content download of the S3 object through an encoding extension.

Because of this, we have the encoder chaining need, and the complexity comes through this is explained by @MichaelKatsoulis at #44562 (comment)


Improving CloudWatch subscription trigger mode

As the next step I have a suggestion. For the CloudWatch trigger we must move message extraction into the Lambda receiver. Once we have the message content, we can choose two paths,

1 Decode the message further using the defined encoding_extension
2 Define operation mode where CW logs are converted as is (we can do the same for s3 logs as well)

The first path fixes the encoding chaining issue. And we can use improved encoding extensions (see note 3) to decode CW logs.

The second path exactly matches what we have now. Defining this operation mode needs some refactoring and can be specific to the CW event type (ex:- skip_cloudwatch_decoding: true).

@axw @MichaelKatsoulis let me know your thoughts. Also check the current state where commit fbd585f contains changes towards this direction.

Footnotes

  1. https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html

  2. https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/10c1e40324f32587e6c8c0b1bcced617486c6cb1/extension/encoding/awslogsencodingextension/internal/unmarshaler/subscription-filter/unmarshaler.go

  3. We still need improvements as encoding extensions are specialized to handle S3 triggers. For example, VPC flow logs expect the first line to contain field names - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/10c1e40324f32587e6c8c0b1bcced617486c6cb1/extension/encoding/awslogsencodingextension/internal/unmarshaler/vpc-flow-log/unmarshaler.go#L96-L99

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants