Skip to content

feat: lambda support for DSM #672

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 21 commits into
base: main
Choose a base branch
from
Draft

Conversation

michael-zhao459
Copy link
Collaborator

What does this PR do?

This PR adds lambda support for Data Streams Monitoring (DSM), sets a checkpoint after extracting context from trace propagation headers, DSM context gets packaged in with the trace prop headers. Queues supported by DSM include SQS, SNS, SNS->SQS, and Kinesis

Motivation

Consume calls of lambdas are set as triggers to queue and do not have any explicit calls the tracers can hook into to set a checkpoint

Testing Guidelines

Tried all configurations with sandbox AWS account. Wrote unit tests ensuring all cases were covered.

Additional Notes

Types of Changes

  • Bug fix
  • New feature
  • Breaking change
  • Misc (docs, refactoring, dependency upgrade, etc.)

Check all that apply

  • This PR's description is comprehensive
  • This PR contains breaking changes that are documented in the description
  • This PR introduces new APIs or parameters that are documented and unlikely to change in the foreseeable future
  • This PR impacts documentation, and it has been updated (or a ticket has been logged)
  • This PR's changes are covered by the automated tests
  • This PR collects user input/sensitive content into Datadog
  • This PR passes the integration tests (ask a Datadog member to run the tests)

}

try {
this.tracer.dataStreamsCheckpointer.setConsumeCheckpoint(eventType, arn, contextJson, false);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to wait for a tracer release that has this manual_checkpoint parameter

const kinesisData = event?.Records?.[0]?.kinesis.data;
if (kinesisData === undefined) return null;

sourceARN = event.Records[0].eventSourceARN;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if event is null or Records is null or it has no records? i would use optional chaining like above.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using records[0] (so the first message). Can a lambda be called with many messages? What do we want to do in that case? Set many checkpoints or just one for the first checkpoint ? What do we do in the Python lambda?

try {
const decodedData = Buffer.from(kinesisData, "base64").toString("ascii");
const parsedBody = JSON.parse(decodedData);
const headers = parsedBody?._datadog;
if (headers) {
const traceContext = this.tracerWrapper.extract(headers);
this.tracerWrapper.setConsumeCheckpoint(headers, "kinesis", sourceARN);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we fail to capture the sourceARN, we probably want a debug log & not set a checkpoint

try {
// First try to extract trace context from message attributes
if (event?.Records?.[0]?.body) {
sourceARN = event.Records[0].eventSourceARN;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comments as for Kinesis

try {
// First try to extract trace context from message attributes
const messageAttribute = event?.Records?.[0]?.Sns?.MessageAttributes?._datadog;
sourceARN = event.Records[0].Sns.TopicArn;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as for Kinesis.

return;
}

if (getEnvValue("DD_DATA_STREAMS_ENABLED", "false").toLowerCase() !== "true") {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that the way javascript tests for env variables? In other languages, we sometimes have a helper function. (so you can set: DD_DATA_STREAMS_ENABLED=1 for example

Copy link
Collaborator Author

@michael-zhao459 michael-zhao459 Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this behavior we want?
VARIABLE = result === "true" || result === "1"; is a pattern used, but not sure we want this.

@@ -98,4 +99,22 @@ export class TracerWrapper {
this.tracer.inject(span, "text_map", dest);
return dest;
}

public setConsumeCheckpoint(contextJson: any, eventType: string, arn: string): void {
if (!arn) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we don't set a checkpoint if arn is not set. Let's just add a debug log in this case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants