diff --git a/src/trace/context/extractors/kinesis.spec.ts b/src/trace/context/extractors/kinesis.spec.ts index aa68a082..17bf28a9 100644 --- a/src/trace/context/extractors/kinesis.spec.ts +++ b/src/trace/context/extractors/kinesis.spec.ts @@ -2,6 +2,15 @@ import { TracerWrapper } from "../../tracer-wrapper"; import { KinesisEventTraceExtractor } from "./kinesis"; let mockSpanContext: any = null; +let mockDataStreamsCheckpointer: any = { + setConsumeCheckpoint: jest.fn(), +}; + +jest.mock("dd-trace/packages/dd-trace/src/datastreams/checkpointer", () => { + return { + DataStreamsCheckpointer: jest.fn().mockImplementation(() => mockDataStreamsCheckpointer), + }; +}); // Mocking extract is needed, due to dd-trace being a No-op // if the detected environment is testing. This is expected, since @@ -12,6 +21,7 @@ jest.mock("dd-trace", () => { ...ddTrace, _tracer: { _service: {} }, extract: (_carrier: any, _headers: any) => mockSpanContext, + dataStreamsCheckpointer: mockDataStreamsCheckpointer, }; }); const spyTracerWrapper = jest.spyOn(TracerWrapper.prototype, "extract"); @@ -20,10 +30,13 @@ describe("KinesisEventTraceExtractor", () => { describe("extract", () => { beforeEach(() => { mockSpanContext = null; + mockDataStreamsCheckpointer.setConsumeCheckpoint.mockClear(); + process.env["DD_DATA_STREAMS_ENABLED"] = "true"; }); afterEach(() => { jest.resetModules(); + delete process.env["DD_DATA_STREAMS_ENABLED"]; }); it("extracts trace context with valid payload", () => { @@ -43,7 +56,7 @@ describe("KinesisEventTraceExtractor", () => { kinesisSchemaVersion: "1.0", partitionKey: "cdbfd750-cec0-4f0f-a4b0-82ae6152c7fb", sequenceNumber: "49625698045709644136382874226371117765484751339579768834", - data: "eyJJJ20gbWFkZSBvZiB3YXgsIExhcnJ5IjoiV2hhdCBhcmUgeW91IG1hZGUgb2Y/IiwiX2RhdGFkb2ciOnsieC1kYXRhZG9nLXRyYWNlLWlkIjoiNjY3MzA5NTE0MjIxMDM1NTM4IiwieC1kYXRhZG9nLXBhcmVudC1pZCI6IjEzNTA3MzUwMzU0OTc4MTE4MjgiLCJ4LWRhdGFkb2ctc2FtcGxlZCI6IjEiLCJ4LWRhdGFkb2ctc2FtcGxpbmctcHJpb3JpdHkiOiIxIn19", + data: "eyJJJ20gbWFkZSBvZiB3YXgsIExhcnJ5IjoiV2hhdCBhcmUgeW91IG1hZGUgb2Y/IiwiX2RhdGFkb2ciOnsieC1kYXRhZG9nLXRyYWNlLWlkIjoiNjY3MzA5NTE0MjIxMDM1NTM4IiwieC1kYXRhZG9nLXBhcmVudC1pZCI6IjEzNTA3MzUwMzU0OTc4MTE4MjgiLCJ4LWRhdGFkb2ctc2FtcGxlZCI6IjEiLCJ4LWRhdGFkb2ctc2FtcGxpbmctcHJpb3JpdHkiOiIxIiwiZGQtcGF0aHdheS1jdHgtYmFzZTY0Ijoic29tZS1iYXNlNjQtZW5jb2RlZC1jb250ZXh0In19Cg==", approximateArrivalTimestamp: 1642518727.248, }, eventSource: "aws:kinesis", @@ -67,25 +80,50 @@ describe("KinesisEventTraceExtractor", () => { "x-datadog-sampled": "1", "x-datadog-sampling-priority": "1", "x-datadog-trace-id": "667309514221035538", + "dd-pathway-ctx-base64": "some-base64-encoded-context", }); expect(traceContext?.toTraceId()).toBe("667309514221035538"); expect(traceContext?.toSpanId()).toBe("1350735035497811828"); expect(traceContext?.sampleMode()).toBe("1"); expect(traceContext?.source).toBe("event"); + + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledWith( + "kinesis", + "arn:aws:kinesis:EXAMPLE", + { + "x-datadog-parent-id": "1350735035497811828", + "x-datadog-sampled": "1", + "x-datadog-sampling-priority": "1", + "x-datadog-trace-id": "667309514221035538", + "dd-pathway-ctx-base64": "some-base64-encoded-context", + }, + false, + ); }); it.each([ - ["Records", {}], - ["Records first entry", { Records: [] }], - ["valid data in kinesis", { Records: [{ kinesis: { data: "{" } }] }], // JSON.parse should fail - ["_datadog in data", { Records: [{ kinesis: { data: "e30=" } }] }], - ])("returns null and skips extracting when payload is missing '%s'", (_, payload) => { + ["Records", {}, 0], + ["Records first entry", { Records: [] }, 0], + ["valid data in kinesis", { Records: [{ kinesis: { data: "{" }, eventSourceARN: "arn:aws:kinesis:test" }] }, 1], // JSON.parse should fail + ["_datadog in data", { Records: [{ kinesis: { data: "e30=" }, eventSourceARN: "arn:aws:kinesis:test" }] }, 1], + ])("returns null and skips extracting when payload is missing '%s'", (_, payload, dsmCalls) => { const tracerWrapper = new TracerWrapper(); const extractor = new KinesisEventTraceExtractor(tracerWrapper); const traceContext = extractor.extract(payload as any); expect(traceContext).toBeNull(); + + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledTimes(dsmCalls); + + if (dsmCalls > 0) { + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledWith( + "kinesis", + "arn:aws:kinesis:test", + null, + false, + ); + } }); it("returns null when extracted span context by tracer is null", () => { diff --git a/src/trace/context/extractors/kinesis.ts b/src/trace/context/extractors/kinesis.ts index 5f5d0b1f..78f498e2 100644 --- a/src/trace/context/extractors/kinesis.ts +++ b/src/trace/context/extractors/kinesis.ts @@ -8,6 +8,7 @@ export class KinesisEventTraceExtractor implements EventTraceExtractor { constructor(private tracerWrapper: TracerWrapper) {} extract(event: KinesisStreamEvent): SpanContextWrapper | null { + const sourceARN = event?.Records?.[0]?.eventSourceARN; const kinesisData = event?.Records?.[0]?.kinesis.data; if (kinesisData === undefined) return null; @@ -17,6 +18,7 @@ export class KinesisEventTraceExtractor implements EventTraceExtractor { const headers = parsedBody?._datadog; if (headers) { const traceContext = this.tracerWrapper.extract(headers); + this.tracerWrapper.setConsumeCheckpoint(headers, "kinesis", sourceARN); if (traceContext === null) return null; logDebug(`Extracted trace context from Kinesis event`, { traceContext, headers }); @@ -28,6 +30,8 @@ export class KinesisEventTraceExtractor implements EventTraceExtractor { } } + // Still want to set a DSM checkpoint even if DSM context not propagated + this.tracerWrapper.setConsumeCheckpoint(null, "kinesis", sourceARN); return null; } } diff --git a/src/trace/context/extractors/sns-sqs.spec.ts b/src/trace/context/extractors/sns-sqs.spec.ts index 82b39c32..f3f7c1b1 100644 --- a/src/trace/context/extractors/sns-sqs.spec.ts +++ b/src/trace/context/extractors/sns-sqs.spec.ts @@ -3,6 +3,15 @@ import { SNSSQSEventTraceExtractor } from "./sns-sqs"; import { StepFunctionContextService } from "../../step-function-service"; let mockSpanContext: any = null; +let mockDataStreamsCheckpointer: any = { + setConsumeCheckpoint: jest.fn(), +}; + +jest.mock("dd-trace/packages/dd-trace/src/datastreams/checkpointer", () => { + return { + DataStreamsCheckpointer: jest.fn().mockImplementation(() => mockDataStreamsCheckpointer), + }; +}); // Mocking extract is needed, due to dd-trace being a No-op // if the detected environment is testing. This is expected, since @@ -13,6 +22,7 @@ jest.mock("dd-trace", () => { ...ddTrace, _tracer: { _service: {} }, extract: (_carrier: any, _headers: any) => mockSpanContext, + dataStreamsCheckpointer: mockDataStreamsCheckpointer, }; }); const spyTracerWrapper = jest.spyOn(TracerWrapper.prototype, "extract"); @@ -22,10 +32,13 @@ describe("SNSSQSEventTraceExtractor", () => { beforeEach(() => { mockSpanContext = null; spyTracerWrapper.mockClear(); + mockDataStreamsCheckpointer.setConsumeCheckpoint.mockClear(); + process.env["DD_DATA_STREAMS_ENABLED"] = "true"; }); afterEach(() => { jest.resetModules(); + delete process.env["DD_DATA_STREAMS_ENABLED"]; }); it("extracts trace context with valid payload with String Value", () => { @@ -44,7 +57,7 @@ describe("SNSSQSEventTraceExtractor", () => { messageId: "64812b68-4d9b-4dca-b3fb-9b18f255ee51", receiptHandle: "AQEBER6aRkfG8092GvkL7FRwCwbQ7LLDW9Tlk/CembqHe+suS2kfFxXiukomvaIN61QoyQMoRgWuV52SDkiQno2u+5hP64BDbmw+e/KR9ayvIfHJ3M6RfyQLaWNWm3hDFBCKTnBMVIxtdx0N9epZZewyokjKcrNYtmCghFgTCvZzsQkowi5rnoHAVHJ3je1c3bDnQ1KLrZFgajDnootYXDwEPuMq5FIxrf4EzTe0S7S+rnRm+GaQfeBLBVAY6dASL9usV3/AFRqDtaI7GKI+0F2NCgLlqj49VlPRz4ldhkGknYlKTZTluAqALWLJS62/J1GQo53Cs3nneJcmu5ajB2zzmhhRXoXINEkLhCD5ujZfcsw9H4xqW69Or4ECvlqx14bUU2rtMIW0QM2p7pEeXnyocymQv6m1te113eYWTVmaJ4I=", - body: '{\n "Type" : "Notification",\n "MessageId" : "0a0ab23e-4861-5447-82b7-e8094ff3e332",\n "TopicArn" : "arn:aws:sns:eu-west-1:601427279990:js-library-test-dev-demoTopic-15WGUVRCBMPAA",\n "Message" : "{\\"hello\\":\\"harv\\",\\"nice of you to join us\\":\\"david\\",\\"anotherThing\\":{\\"foo\\":\\"bar\\",\\"blah\\":null,\\"harv\\":123},\\"vals\\":[{\\"thingOne\\":1},{\\"thingTwo\\":2}],\\"ajTimestamp\\":1639777617957}",\n "Timestamp" : "2021-12-17T21:46:58.040Z",\n "SignatureVersion" : "1",\n "Signature" : "FR35/7E8C3LHEVk/rC4XxXlXwV/5mNkFNPgDhHSnJ2I6hIoSrTROAm7h5xm1PuBkAeFDvq0zofw91ouk9zZyvhdrMLFIIgrjEyNayRmEffmoEAkzLFUsgtQX7MmTl644r4NuWiM0Oiz7jueRvIcKXcZr7Nc6GJcWV1ymec8oOmuHNMisnPMxI07LIQVYSyAfv6P9r2jEWMVIukRoCzwTnRk4bUUYhPSGHI7OC3AsxxXBbv8snqTrLM/4z2rXCf6jHCKNxWeLlm9/45PphCkEyx5BWS4/71KaoMWUWy8+6CCsy+uF3XTCVmvSEYLyEwTSzOY+vCUjazrRW93498i70g==",\n "SigningCertUrl" : "https://sns.eu-west-1.amazonaws.com/SimpleNotificationService-************************33ab7e69.pem",\n "UnsubscribeUrl" : "https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:601427279990:js-library-test-dev-demoTopic-15WGUVRCBMPAA:1290f550-9a8a-4e8f-a900-8f5f96dcddda",\n "MessageAttributes" : {\n "_datadog" : {"Type":"String","Value":"{\\"x-datadog-trace-id\\":\\"2776434475358637757\\",\\"x-datadog-parent-id\\":\\"4493917105238181843\\",\\"x-datadog-sampled\\":\\"1\\",\\"x-datadog-sampling-priority\\":\\"1\\"}"}\n }\n}', + body: '{\n "Type" : "Notification",\n "MessageId" : "0a0ab23e-4861-5447-82b7-e8094ff3e332",\n "TopicArn" : "arn:aws:sns:eu-west-1:601427279990:js-library-test-dev-demoTopic-15WGUVRCBMPAA",\n "Message" : "{\\"hello\\":\\"harv\\",\\"nice of you to join us\\":\\"david\\",\\"anotherThing\\":{\\"foo\\":\\"bar\\",\\"blah\\":null,\\"harv\\":123},\\"vals\\":[{\\"thingOne\\":1},{\\"thingTwo\\":2}],\\"ajTimestamp\\":1639777617957}",\n "Timestamp" : "2021-12-17T21:46:58.040Z",\n "SignatureVersion" : "1",\n "Signature" : "FR35/7E8C3LHEVk/rC4XxXlXwV/5mNkFNPgDhHSnJ2I6hIoSrTROAm7h5xm1PuBkAeFDvq0zofw91ouk9zZyvhdrMLFIIgrjEyNayRmEffmoEAkzLFUsgtQX7MmTl644r4NuWiM0Oiz7jueRvIcKXcZr7Nc6GJcWV1ymec8oOmuHNMisnPMxI07LIQVYSyAfv6P9r2jEWMVIukRoCzwTnRk4bUUYhPSGHI7OC3AsxxXBbv8snqTrLM/4z2rXCf6jHCKNxWeLlm9/45PphCkEyx5BWS4/71KaoMWUWy8+6CCsy+uF3XTCVmvSEYLyEwTSzOY+vCUjazrRW93498i70g==",\n "SigningCertUrl" : "https://sns.eu-west-1.amazonaws.com/SimpleNotificationService-************************33ab7e69.pem",\n "UnsubscribeUrl" : "https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:601427279990:js-library-test-dev-demoTopic-15WGUVRCBMPAA:1290f550-9a8a-4e8f-a900-8f5f96dcddda",\n "MessageAttributes" : {\n "_datadog" : {"Type":"String","Value":"{\\"x-datadog-trace-id\\":\\"2776434475358637757\\",\\"x-datadog-parent-id\\":\\"4493917105238181843\\",\\"x-datadog-sampled\\":\\"1\\",\\"x-datadog-sampling-priority\\":\\"1\\",\\"dd-pathway-ctx-base64\\":\\"some-base64-encoded-context\\"}"}\n }\n}', attributes: { ApproximateReceiveCount: "1", SentTimestamp: "1639777618130", @@ -70,12 +83,26 @@ describe("SNSSQSEventTraceExtractor", () => { "x-datadog-sampled": "1", "x-datadog-sampling-priority": "1", "x-datadog-trace-id": "2776434475358637757", + "dd-pathway-ctx-base64": "some-base64-encoded-context", }); expect(traceContext?.toTraceId()).toBe("2776434475358637757"); expect(traceContext?.toSpanId()).toBe("4493917105238181843"); expect(traceContext?.sampleMode()).toBe("1"); expect(traceContext?.source).toBe("event"); + + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledWith( + "sqs", + "arn:aws:sqs:eu-west-1:601427279990:aj-js-library-test-dev-demo-queue", + { + "x-datadog-parent-id": "4493917105238181843", + "x-datadog-sampled": "1", + "x-datadog-sampling-priority": "1", + "x-datadog-trace-id": "2776434475358637757", + "dd-pathway-ctx-base64": "some-base64-encoded-context", + }, + false, + ); }); it("extracts trace context with valid payload with Binary Value", () => { @@ -94,7 +121,7 @@ describe("SNSSQSEventTraceExtractor", () => { messageId: "64812b68-4d9b-4dca-b3fb-9b18f255ee51", receiptHandle: "AQEBER6aRkfG8092GvkL7FRwCwbQ7LLDW9Tlk/CembqHe+suS2kfFxXiukomvaIN61QoyQMoRgWuV52SDkiQno2u+5hP64BDbmw+e/KR9ayvIfHJ3M6RfyQLaWNWm3hDFBCKTnBMVIxtdx0N9epZZewyokjKcrNYtmCghFgTCvZzsQkowi5rnoHAVHJ3je1c3bDnQ1KLrZFgajDnootYXDwEPuMq5FIxrf4EzTe0S7S+rnRm+GaQfeBLBVAY6dASL9usV3/AFRqDtaI7GKI+0F2NCgLlqj49VlPRz4ldhkGknYlKTZTluAqALWLJS62/J1GQo53Cs3nneJcmu5ajB2zzmhhRXoXINEkLhCD5ujZfcsw9H4xqW69Or4ECvlqx14bUU2rtMIW0QM2p7pEeXnyocymQv6m1te113eYWTVmaJ4I=", - body: '{\n "Type" : "Notification",\n "MessageId" : "0a0ab23e-4861-5447-82b7-e8094ff3e332",\n "TopicArn" : "arn:aws:sns:eu-west-1:601427279990:js-library-test-dev-demoTopic-15WGUVRCBMPAA",\n "Message" : "{\\"hello\\":\\"harv\\",\\"nice of you to join us\\":\\"david\\",\\"anotherThing\\":{\\"foo\\":\\"bar\\",\\"blah\\":null,\\"harv\\":123},\\"vals\\":[{\\"thingOne\\":1},{\\"thingTwo\\":2}],\\"ajTimestamp\\":1639777617957}",\n "Timestamp" : "2021-12-17T21:46:58.040Z",\n "SignatureVersion" : "1",\n "Signature" : "FR35/7E8C3LHEVk/rC4XxXlXwV/5mNkFNPgDhHSnJ2I6hIoSrTROAm7h5xm1PuBkAeFDvq0zofw91ouk9zZyvhdrMLFIIgrjEyNayRmEffmoEAkzLFUsgtQX7MmTl644r4NuWiM0Oiz7jueRvIcKXcZr7Nc6GJcWV1ymec8oOmuHNMisnPMxI07LIQVYSyAfv6P9r2jEWMVIukRoCzwTnRk4bUUYhPSGHI7OC3AsxxXBbv8snqTrLM/4z2rXCf6jHCKNxWeLlm9/45PphCkEyx5BWS4/71KaoMWUWy8+6CCsy+uF3XTCVmvSEYLyEwTSzOY+vCUjazrRW93498i70g==",\n "SigningCertUrl" : "https://sns.eu-west-1.amazonaws.com/SimpleNotificationService-7ff5318490ec183fbaddaa2a969abfda.pem",\n "UnsubscribeUrl" : "https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:601427279990:js-library-test-dev-demoTopic-15WGUVRCBMPAA:1290f550-9a8a-4e8f-a900-8f5f96dcddda",\n "MessageAttributes" : {\n "_datadog" : {"Type":"Binary","Value":"eyJ4LWRhdGFkb2ctdHJhY2UtaWQiOiI3MTAyMjkxNjI4NDQzMTM0OTE5IiwieC1kYXRhZG9nLXBhcmVudC1pZCI6IjQyNDc1NTAxMDE2NDg2MTg2MTgiLCJ4LWRhdGFkb2ctc2FtcGxpbmctcHJpb3JpdHkiOiIxIn0="}\n }\n}', + body: '{\n "Type" : "Notification",\n "MessageId" : "0a0ab23e-4861-5447-82b7-e8094ff3e332",\n "TopicArn" : "arn:aws:sns:eu-west-1:601427279990:js-library-test-dev-demoTopic-15WGUVRCBMPAA",\n "Message" : "{\\"hello\\":\\"harv\\",\\"nice of you to join us\\":\\"david\\",\\"anotherThing\\":{\\"foo\\":\\"bar\\",\\"blah\\":null,\\"harv\\":123},\\"vals\\":[{\\"thingOne\\":1},{\\"thingTwo\\":2}],\\"ajTimestamp\\":1639777617957}",\n "Timestamp" : "2021-12-17T21:46:58.040Z",\n "SignatureVersion" : "1",\n "Signature" : "FR35/7E8C3LHEVk/rC4XxXlXwV/5mNkFNPgDhHSnJ2I6hIoSrTROAm7h5xm1PuBkAeFDvq0zofw91ouk9zZyvhdrMLFIIgrjEyNayRmEffmoEAkzLFUsgtQX7MmTl644r4NuWiM0Oiz7jueRvIcKXcZr7Nc6GJcWV1ymec8oOmuHNMisnPMxI07LIQVYSyAfv6P9r2jEWMVIukRoCzwTnRk4bUUYhPSGHI7OC3AsxxXBbv8snqTrLM/4z2rXCf6jHCKNxWeLlm9/45PphCkEyx5BWS4/71KaoMWUWy8+6CCsy+uF3XTCVmvSEYLyEwTSzOY+vCUjazrRW93498i70g==",\n "SigningCertUrl" : "https://sns.eu-west-1.amazonaws.com/SimpleNotificationService-7ff5318490ec183fbaddaa2a969abfda.pem",\n "UnsubscribeUrl" : "https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:601427279990:js-library-test-dev-demoTopic-15WGUVRCBMPAA:1290f550-9a8a-4e8f-a900-8f5f96dcddda",\n "MessageAttributes" : {\n "_datadog" : {"Type":"Binary","Value":"eyJ4LWRhdGFkb2ctdHJhY2UtaWQiOiI3MTAyMjkxNjI4NDQzMTM0OTE5IiwieC1kYXRhZG9nLXBhcmVudC1pZCI6IjQyNDc1NTAxMDE2NDg2MTg2MTgiLCJ4LWRhdGFkb2ctc2FtcGxpbmctcHJpb3JpdHkiOiIxIiwiZGQtcGF0aHdheS1jdHgtYmFzZTY0Ijoic29tZS1iYXNlNjQtZW5jb2RlZC1jb250ZXh0In0="}\n }\n}', attributes: { ApproximateReceiveCount: "1", SentTimestamp: "1639777618130", @@ -119,28 +146,53 @@ describe("SNSSQSEventTraceExtractor", () => { "x-datadog-parent-id": "4247550101648618618", "x-datadog-sampling-priority": "1", "x-datadog-trace-id": "7102291628443134919", + "dd-pathway-ctx-base64": "some-base64-encoded-context", }); expect(traceContext?.toTraceId()).toBe("7102291628443134919"); expect(traceContext?.toSpanId()).toBe("4247550101648618618"); expect(traceContext?.sampleMode()).toBe("1"); expect(traceContext?.source).toBe("event"); + + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledWith( + "sqs", + "arn:aws:sqs:eu-west-1:601427279990:aj-js-library-test-dev-demo-queue", + { + "x-datadog-parent-id": "4247550101648618618", + "x-datadog-sampling-priority": "1", + "x-datadog-trace-id": "7102291628443134919", + "dd-pathway-ctx-base64": "some-base64-encoded-context", + }, + false, + ); }); + // prettier-ignore it.each([ - ["Records", {}], - ["Records first entry", { Records: [] }], - ["Records first entry body", { Records: [{}] }], - ["valid data in body", { Records: [{ body: "{" }] }], // JSON.parse should fail - ["MessageAttributes in body", { Records: [{ body: "{}" }] }], - ["_datadog in MessageAttributes", { Records: [{ body: '{"MessageAttributes":{"text":"Hello, world!"}}' }] }], - ["Value in _datadog", { Records: [{ body: '{"MessageAttributes":{"_datadog":{}}}' }] }], - ])("returns null and skips extracting when payload is missing '%s'", (_, payload) => { + ["Records", {}, 0], + ["Records first entry", { Records: [] }, 0], + ["Records first entry body", { Records: [{}] }, 0], + ["valid data in body", { Records: [{ body: "{", eventSourceARN: "arn:aws:sqs:us-east-1:test" }] }, 1], // JSON.parse should fail + ["MessageAttributes in body", { Records: [{ body: "{}", eventSourceARN: "arn:aws:sqs:us-east-1:test" }] }, 1], + ["_datadog in MessageAttributes", { Records: [{ body: '{"MessageAttributes":{"text":"Hello, world!"}}', eventSourceARN: "arn:aws:sqs:us-east-1:test" }] }, 1], + ["Value in _datadog", { Records: [{ body: '{"MessageAttributes":{"_datadog":{}}}', eventSourceARN: "arn:aws:sqs:us-east-1:test" }] }, 1], + ])("returns null and skips extracting when payload is missing '%s'", (_, payload, dsmCalls) => { const tracerWrapper = new TracerWrapper(); const extractor = new SNSSQSEventTraceExtractor(tracerWrapper); const traceContext = extractor.extract(payload as any); expect(traceContext).toBeNull(); + + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledTimes(dsmCalls); + + if (dsmCalls > 0) { + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledWith( + "sqs", + "arn:aws:sqs:us-east-1:test", + null, + false, + ); + } }); it("returns null when extracted span context by tracer is null", () => { diff --git a/src/trace/context/extractors/sns-sqs.ts b/src/trace/context/extractors/sns-sqs.ts index fd23ad9e..8257d3b7 100644 --- a/src/trace/context/extractors/sns-sqs.ts +++ b/src/trace/context/extractors/sns-sqs.ts @@ -10,6 +10,7 @@ export class SNSSQSEventTraceExtractor implements EventTraceExtractor { extract(event: SQSEvent): SpanContextWrapper | null { logDebug("SNS-SQS Extractor Being Used"); + const sourceARN = event?.Records?.[0]?.eventSourceARN; try { // Try to extract trace context from SNS wrapped in SQS const body = event?.Records?.[0]?.body; @@ -27,6 +28,7 @@ export class SNSSQSEventTraceExtractor implements EventTraceExtractor { } const traceContext = extractTraceContext(headers, this.tracerWrapper); + this.tracerWrapper.setConsumeCheckpoint(headers, "sqs", sourceARN); if (traceContext) { return traceContext; } @@ -39,6 +41,7 @@ export class SNSSQSEventTraceExtractor implements EventTraceExtractor { if (sqsMessageAttribute?.stringValue) { const headers = JSON.parse(sqsMessageAttribute.stringValue); const traceContext = extractTraceContext(headers, this.tracerWrapper); + this.tracerWrapper.setConsumeCheckpoint(headers, "sqs", sourceARN); if (traceContext) { return traceContext; } @@ -53,7 +56,8 @@ export class SNSSQSEventTraceExtractor implements EventTraceExtractor { } catch (error) { handleExtractionError(error, "SQS"); } - + // Still want to set a DSM checkpoint even if DSM context not propagated + this.tracerWrapper.setConsumeCheckpoint(null, "sqs", sourceARN); return null; } } diff --git a/src/trace/context/extractors/sns.spec.ts b/src/trace/context/extractors/sns.spec.ts index a3ef1159..7bbd6ab6 100644 --- a/src/trace/context/extractors/sns.spec.ts +++ b/src/trace/context/extractors/sns.spec.ts @@ -4,6 +4,15 @@ import { SNSEventTraceExtractor } from "./sns"; import { StepFunctionContextService } from "../../step-function-service"; let mockSpanContext: any = null; +let mockDataStreamsCheckpointer: any = { + setConsumeCheckpoint: jest.fn(), +}; + +jest.mock("dd-trace/packages/dd-trace/src/datastreams/checkpointer", () => { + return { + DataStreamsCheckpointer: jest.fn().mockImplementation(() => mockDataStreamsCheckpointer), + }; +}); // Mocking extract is needed, due to dd-trace being a No-op // if the detected environment is testing. This is expected, since @@ -14,6 +23,7 @@ jest.mock("dd-trace", () => { ...ddTrace, _tracer: { _service: {} }, extract: (_carrier: any, _headers: any) => mockSpanContext, + dataStreamsCheckpointer: mockDataStreamsCheckpointer, }; }); const spyTracerWrapper = jest.spyOn(TracerWrapper.prototype, "extract"); @@ -23,10 +33,13 @@ describe("SNSEventTraceExtractor", () => { beforeEach(() => { mockSpanContext = null; spyTracerWrapper.mockClear(); + mockDataStreamsCheckpointer.setConsumeCheckpoint.mockClear(); + process.env["DD_DATA_STREAMS_ENABLED"] = "true"; }); afterEach(() => { jest.resetModules(); + delete process.env["DD_DATA_STREAMS_ENABLED"]; }); it("extracts trace context with valid payload with String Value", () => { @@ -64,7 +77,7 @@ describe("SNSEventTraceExtractor", () => { _datadog: { Type: "String", Value: - '{"x-datadog-trace-id":"6966585609680374559","x-datadog-parent-id":"4297634551783724228","x-datadog-sampled":"1","x-datadog-sampling-priority":"1"}', + '{"x-datadog-trace-id":"6966585609680374559","x-datadog-parent-id":"4297634551783724228","x-datadog-sampled":"1","x-datadog-sampling-priority":"1","dd-pathway-ctx-base64":"some-base64-encoded-context"}', }, }, }, @@ -82,12 +95,26 @@ describe("SNSEventTraceExtractor", () => { "x-datadog-sampled": "1", "x-datadog-sampling-priority": "1", "x-datadog-trace-id": "6966585609680374559", + "dd-pathway-ctx-base64": "some-base64-encoded-context", }); expect(traceContext?.toTraceId()).toBe("6966585609680374559"); expect(traceContext?.toSpanId()).toBe("4297634551783724228"); expect(traceContext?.sampleMode()).toBe("1"); expect(traceContext?.source).toBe("event"); + + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledWith( + "sns", + "arn:aws:sns:eu-west-1:601427279990:aj-js-library-test-dev-solo-topic", + { + "x-datadog-parent-id": "4297634551783724228", + "x-datadog-sampled": "1", + "x-datadog-sampling-priority": "1", + "x-datadog-trace-id": "6966585609680374559", + "dd-pathway-ctx-base64": "some-base64-encoded-context", + }, + false, + ); }); it("extracts trace context with valid payload with Binary Value", () => { @@ -125,7 +152,7 @@ describe("SNSEventTraceExtractor", () => { _datadog: { Type: "Binary", Value: - "eyJ4LWRhdGFkb2ctdHJhY2UtaWQiOiI3MTAyMjkxNjI4NDQzMTM0OTE5IiwieC1kYXRhZG9nLXBhcmVudC1pZCI6IjQyNDc1NTAxMDE2NDg2MTg2MTgiLCJ4LWRhdGFkb2ctc2FtcGxpbmctcHJpb3JpdHkiOiIxIn0=", + "eyJ4LWRhdGFkb2ctdHJhY2UtaWQiOiI3MTAyMjkxNjI4NDQzMTM0OTE5IiwieC1kYXRhZG9nLXBhcmVudC1pZCI6IjQyNDc1NTAxMDE2NDg2MTg2MTgiLCJ4LWRhdGFkb2ctc2FtcGxpbmctcHJpb3JpdHkiOiIxIiwiZGQtcGF0aHdheS1jdHgtYmFzZTY0Ijoic29tZS1iYXNlNjQtZW5jb2RlZC1jb250ZXh0In0=", }, }, }, @@ -142,27 +169,52 @@ describe("SNSEventTraceExtractor", () => { "x-datadog-parent-id": "4247550101648618618", "x-datadog-sampling-priority": "1", "x-datadog-trace-id": "7102291628443134919", + "dd-pathway-ctx-base64": "some-base64-encoded-context", }); expect(traceContext?.toTraceId()).toBe("7102291628443134919"); expect(traceContext?.toSpanId()).toBe("4247550101648618618"); expect(traceContext?.sampleMode()).toBe("1"); expect(traceContext?.source).toBe("event"); + + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledWith( + "sns", + "arn:aws:sns:eu-west-1:601427279990:aj-js-library-test-dev-solo-topic", + { + "x-datadog-parent-id": "4247550101648618618", + "x-datadog-sampling-priority": "1", + "x-datadog-trace-id": "7102291628443134919", + "dd-pathway-ctx-base64": "some-base64-encoded-context", + }, + false, + ); }); + // prettier-ignore it.each([ - ["Records", {}], - ["Records first entry", { Records: [] }], - ["Records first entry Sns", { Records: [{}] }], - ["MessageAttributes in Sns", { Records: [{ Sns: "{}" }] }], - ["_datadog in MessageAttributes", { Records: [{ Sns: '{"MessageAttributes":{"text":"Hello, world!"}}' }] }], - ["Value in _datadog", { Records: [{ Sns: '{"MessageAttributes":{"_datadog":{}}}' }] }], - ])("returns null and skips extracting when payload is missing '%s'", (_, payload) => { + ["Records", {}, 0], + ["Records first entry", { Records: [] }, 0], + ["Records first entry Sns", { Records: [{}] }, 0], + ["MessageAttributes in Sns", { Records: [{ Sns: "{TopicArn: 'arn:aws:sns:eu-west-1:test'}" }] }, 0], + ["_datadog in MessageAttributes", { Records: [{ Sns: { MessageAttributes: { text: "Hello, world!" }, TopicArn: "arn:aws:sns:eu-west-1:test" } }] }, 1], + ["Value in _datadog", { Records: [{ Sns: { MessageAttributes: { _datadog: {} }, TopicArn: "arn:aws:sns:eu-west-1:test" } }] }, 1], + ])("returns null and skips extracting when payload is missing '%s'", (_, payload, dsmCalls) => { const tracerWrapper = new TracerWrapper(); const extractor = new SNSEventTraceExtractor(tracerWrapper); const traceContext = extractor.extract(payload as any); expect(traceContext).toBeNull(); + + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledTimes(dsmCalls); + + if (dsmCalls > 0) { + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledWith( + "sns", + "arn:aws:sns:eu-west-1:test", + null, + false, + ); + } }); it("returns null when extracted span context by tracer is null", () => { diff --git a/src/trace/context/extractors/sns.ts b/src/trace/context/extractors/sns.ts index 7336f552..ddf93f5a 100644 --- a/src/trace/context/extractors/sns.ts +++ b/src/trace/context/extractors/sns.ts @@ -10,6 +10,7 @@ export class SNSEventTraceExtractor implements EventTraceExtractor { constructor(private tracerWrapper: TracerWrapper) {} extract(event: SNSEvent): SpanContextWrapper | null { + const sourceARN = event?.Records?.[0]?.Sns?.TopicArn; try { // First try to extract trace context from message attributes const messageAttribute = event?.Records?.[0]?.Sns?.MessageAttributes?._datadog; @@ -24,6 +25,7 @@ export class SNSEventTraceExtractor implements EventTraceExtractor { } const traceContext = extractTraceContext(headers, this.tracerWrapper); + this.tracerWrapper.setConsumeCheckpoint(headers, "sns", sourceARN); if (traceContext) { return traceContext; } @@ -39,6 +41,8 @@ export class SNSEventTraceExtractor implements EventTraceExtractor { handleExtractionError(error, "SNS"); } + // Still want to set a DSM checkpoint even if DSM context not propagated + this.tracerWrapper.setConsumeCheckpoint(null, "sns", sourceARN); return null; } } diff --git a/src/trace/context/extractors/sqs.spec.ts b/src/trace/context/extractors/sqs.spec.ts index 4a1d41cd..2ddf8c13 100644 --- a/src/trace/context/extractors/sqs.spec.ts +++ b/src/trace/context/extractors/sqs.spec.ts @@ -4,6 +4,15 @@ import { SQSEventTraceExtractor } from "./sqs"; import { StepFunctionContextService } from "../../step-function-service"; let mockSpanContext: any = null; +let mockDataStreamsCheckpointer: any = { + setConsumeCheckpoint: jest.fn(), +}; + +jest.mock("dd-trace/packages/dd-trace/src/datastreams/checkpointer", () => { + return { + DataStreamsCheckpointer: jest.fn().mockImplementation(() => mockDataStreamsCheckpointer), + }; +}); // Mocking extract is needed, due to dd-trace being a No-op // if the detected environment is testing. This is expected, since @@ -14,6 +23,7 @@ jest.mock("dd-trace", () => { ...ddTrace, _tracer: { _service: {} }, extract: (_carrier: any, _headers: any) => mockSpanContext, + dataStreamsCheckpointer: mockDataStreamsCheckpointer, }; }); const spyTracerWrapper = jest.spyOn(TracerWrapper.prototype, "extract"); @@ -23,10 +33,13 @@ describe("SQSEventTraceExtractor", () => { beforeEach(() => { mockSpanContext = null; spyTracerWrapper.mockClear(); + mockDataStreamsCheckpointer.setConsumeCheckpoint.mockClear(); + process.env["DD_DATA_STREAMS_ENABLED"] = "true"; }); afterEach(() => { jest.resetModules(); + delete process.env["DD_DATA_STREAMS_ENABLED"]; }); it("extracts trace context with valid payload", () => { @@ -52,7 +65,7 @@ describe("SQSEventTraceExtractor", () => { messageAttributes: { _datadog: { stringValue: - '{"x-datadog-trace-id":"4555236104497098341","x-datadog-parent-id":"3369753143434738315","x-datadog-sampled":"1","x-datadog-sampling-priority":"1"}', + '{"x-datadog-trace-id":"4555236104497098341","x-datadog-parent-id":"3369753143434738315","x-datadog-sampled":"1","x-datadog-sampling-priority":"1","dd-pathway-ctx-base64":"some-base64-encoded-context"}', stringListValues: undefined, binaryListValues: undefined, dataType: "String", @@ -78,12 +91,26 @@ describe("SQSEventTraceExtractor", () => { "x-datadog-sampled": "1", "x-datadog-sampling-priority": "1", "x-datadog-trace-id": "4555236104497098341", + "dd-pathway-ctx-base64": "some-base64-encoded-context", }); expect(traceContext?.toTraceId()).toBe("4555236104497098341"); expect(traceContext?.toSpanId()).toBe("3369753143434738315"); expect(traceContext?.sampleMode()).toBe("1"); expect(traceContext?.source).toBe("event"); + + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledWith( + "sqs", + "arn:aws:sqs:eu-west-1:601427279990:metal-queue", + { + "x-datadog-parent-id": "3369753143434738315", + "x-datadog-sampled": "1", + "x-datadog-sampling-priority": "1", + "x-datadog-trace-id": "4555236104497098341", + "dd-pathway-ctx-base64": "some-base64-encoded-context", + }, + false, + ); }); it("extracts trace context from _datadog binaryValue when raw message delivery is used", () => { @@ -101,6 +128,7 @@ describe("SQSEventTraceExtractor", () => { "x-datadog-parent-id": "0987654321", "x-datadog-sampled": "1", "x-datadog-sampling-priority": "1", + "dd-pathway-ctx-base64": "some-base64-encoded-context", }; const ddHeadersString = JSON.stringify(ddHeaders); const ddHeadersBase64 = Buffer.from(ddHeadersString, "ascii").toString("base64"); @@ -141,20 +169,39 @@ describe("SQSEventTraceExtractor", () => { expect(traceContext?.toSpanId()).toBe("0987654321"); expect(traceContext?.sampleMode()).toBe("1"); expect(traceContext?.source).toBe("event"); + + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledWith( + "sqs", + "arn:aws:sqs:us-east-1:123456789012:MyQueue", + ddHeaders, + false, + ); }); + // prettier-ignore it.each([ - ["Records", {}], - ["Records first entry", { Records: [] }], - ["messageAttributes in first entry", { Records: [{ messageAttributes: "{}" }] }], - ["_datadog in messageAttributes", { Records: [{ messageAttributes: {} }] }], - ["stringValue in _datadog", { Records: [{ messageAttributes: { _datadog: {} } }] }], - ])("returns null and skips extracting when payload is missing '%s'", (_, payload) => { + ["Records", {}, 0], + ["Records first entry", { Records: [] }, 0], + ["messageAttributes in first entry", { Records: [{ messageAttributes: "{}", eventSourceARN: "arn:aws:sqs:us-east-1:MyQueue" }] }, 1], + ["_datadog in messageAttributes", { Records: [{ messageAttributes: {}, eventSourceARN: "arn:aws:sqs:us-east-1:MyQueue" }] }, 1], + ["stringValue in _datadog", { Records: [{ messageAttributes: { _datadog: {} }, eventSourceARN: "arn:aws:sqs:us-east-1:MyQueue" }] }, 1], + ])("returns null and skips extracting when payload is missing '%s'", (_, payload, dsmCalls) => { const tracerWrapper = new TracerWrapper(); const extractor = new SQSEventTraceExtractor(tracerWrapper); const traceContext = extractor.extract(payload as any); expect(traceContext).toBeNull(); + + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledTimes(dsmCalls); + + if (dsmCalls > 0) { + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledWith( + "sqs", + "arn:aws:sqs:us-east-1:MyQueue", + null, + false, + ); + } }); it("returns null when extracted span context by tracer is null", () => { diff --git a/src/trace/context/extractors/sqs.ts b/src/trace/context/extractors/sqs.ts index f4b883e4..a386165e 100644 --- a/src/trace/context/extractors/sqs.ts +++ b/src/trace/context/extractors/sqs.ts @@ -10,6 +10,7 @@ export class SQSEventTraceExtractor implements EventTraceExtractor { extract(event: SQSEvent): SpanContextWrapper | null { logDebug("SQS Extractor Being Used"); + const sourceARN = event?.Records?.[0]?.eventSourceARN; try { // First try to extract trace context from message attributes let headers = event?.Records?.[0]?.messageAttributes?._datadog?.stringValue; @@ -26,7 +27,7 @@ export class SQSEventTraceExtractor implements EventTraceExtractor { if (headers) { const parsedHeaders = JSON.parse(headers); - + this.tracerWrapper.setConsumeCheckpoint(parsedHeaders, "sqs", sourceARN); const traceContext = extractTraceContext(parsedHeaders, this.tracerWrapper); if (traceContext) { return traceContext; @@ -44,6 +45,8 @@ export class SQSEventTraceExtractor implements EventTraceExtractor { handleExtractionError(error, "SQS"); } + // Still want to set a DSM checkpoint even if DSM context not propagated + this.tracerWrapper.setConsumeCheckpoint(null, "sqs", sourceARN); return null; } } diff --git a/src/trace/tracer-wrapper.spec.ts b/src/trace/tracer-wrapper.spec.ts index acd43b22..5a388603 100644 --- a/src/trace/tracer-wrapper.spec.ts +++ b/src/trace/tracer-wrapper.spec.ts @@ -3,6 +3,15 @@ import { TracerWrapper } from "./tracer-wrapper"; let mockNoTracer = false; let mockTracerInitialised = false; let mockSpan: any = null; +let mockDataStreamsCheckpointer: any = { + setConsumeCheckpoint: jest.fn(), +}; +jest.mock("dd-trace/packages/dd-trace/src/datastreams/checkpointer", () => { + return { + DataStreamsCheckpointer: jest.fn().mockImplementation(() => mockDataStreamsCheckpointer), + }; +}); + const mockSpanContext = { toTraceId: () => "1234", toSpanId: () => "45678", @@ -18,6 +27,7 @@ jest.mock("dd-trace", () => { scope: () => ({ active: () => mockSpan, }), + dataStreamsCheckpointer: mockDataStreamsCheckpointer, }; } }); @@ -28,10 +38,12 @@ describe("TracerWrapper", () => { mockNoTracer = false; mockTracerInitialised = true; mockSpan = null; + mockDataStreamsCheckpointer.setConsumeCheckpoint.mockClear(); }); afterEach(() => { jest.resetModules(); delete process.env["AWS_LAMBDA_FUNCTION_NAME"]; + delete process.env["DD_DATA_STREAMS_ENABLED"]; }); it("isTracerAvailable should return true when dd-trace is present and initialised", () => { const wrapper = new TracerWrapper(); @@ -81,4 +93,36 @@ describe("TracerWrapper", () => { const traceContext = wrapper.traceContext(); expect(traceContext).toBeNull(); }); + it("should not call internal setConsumeCheckpoint when arn is not provided", () => { + process.env["DD_DATA_STREAMS_ENABLED"] = "true"; + const wrapper = new TracerWrapper(); + + wrapper.setConsumeCheckpoint({ test: "context" }, "kinesis", ""); + + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).not.toHaveBeenCalled(); + }); + + it("should call internal setConsumeCheckpoint when DD_DATA_STREAMS_ENABLED is on and arn is provided", () => { + process.env["DD_DATA_STREAMS_ENABLED"] = "true"; + const wrapper = new TracerWrapper(); + const contextJson = { test: "context" }; + const eventType = "kinesis"; + const arn = "arn:aws:kinesis:us-east-1:123456789:stream/test-stream"; + + wrapper.setConsumeCheckpoint(contextJson, eventType, arn); + + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledWith(eventType, arn, contextJson, false); + }); + + it("should not call internal setConsumeCheckpoint when DD_DATA_STREAMS_ENABLED is off", () => { + process.env["DD_DATA_STREAMS_ENABLED"] = "false"; + const wrapper = new TracerWrapper(); + const contextJson = { test: "context" }; + const eventType = "kinesis"; + const arn = "arn:aws:kinesis:us-east-1:123456789:stream/test-stream"; + + wrapper.setConsumeCheckpoint(contextJson, eventType, arn); + + expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).not.toHaveBeenCalled(); + }); }); diff --git a/src/trace/tracer-wrapper.ts b/src/trace/tracer-wrapper.ts index 97097b6e..014d656e 100644 --- a/src/trace/tracer-wrapper.ts +++ b/src/trace/tracer-wrapper.ts @@ -1,3 +1,4 @@ +import { getEnvValue } from "../index"; import { logDebug } from "../utils"; import { SpanContextWrapper } from "./span-context-wrapper"; import { TraceSource } from "./trace-context-service"; @@ -99,4 +100,23 @@ export class TracerWrapper { this.tracer.inject(span, "text_map", dest); return dest; } + + public setConsumeCheckpoint(contextJson: any, eventType: string, arn: string): void { + if (!arn) { + logDebug("DSM: No ARN provided, skipping setConsumeCheckpoint"); + return; + } + + if (getEnvValue("DD_DATA_STREAMS_ENABLED", "false").toLowerCase() !== "true") { + return; + } + + try { + this.tracer.dataStreamsCheckpointer.setConsumeCheckpoint(eventType, arn, contextJson, false); + } catch (err) { + if (err instanceof Object || err instanceof Error) { + logDebug(`DSM: Failed to set consume checkpoint for ${eventType} ${arn}:`, err); + } + } + } }