Skip to content

Commit d84f2c0

Browse files
add get dsm context into lambda llayer
1 parent 8acce15 commit d84f2c0

File tree

2 files changed

+299
-2
lines changed

2 files changed

+299
-2
lines changed

datadog_lambda/dsm.py

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
1+
import json
2+
import base64
3+
4+
from ddtrace.internal.logger import get_logger
15
from datadog_lambda import logger
26
from datadog_lambda.trigger import EventTypes
37

8+
log = get_logger(__name__)
9+
410

511
def set_dsm_context(event, event_source):
612
if event_source.equals(EventTypes.SQS):
@@ -24,7 +30,6 @@ def _dsm_set_context_helper(
2430
from datadog_lambda.wrapper import format_err_with_traceback
2531
from ddtrace.internal.datastreams import data_streams_processor
2632
from ddtrace.internal.datastreams.processor import DsmPathwayCodec
27-
from ddtrace.internal.datastreams.botocore import get_datastreams_context
2833

2934
records = event.get("Records")
3035
if records is None:
@@ -34,7 +39,7 @@ def _dsm_set_context_helper(
3439
for record in records:
3540
try:
3641
arn = arn_extractor(record)
37-
context_json = get_datastreams_context(record)
42+
context_json = _get_dsm_context_from_lambda(record)
3843
payload_size = payload_size_calculator(record, context_json)
3944

4045
ctx = DsmPathwayCodec.decode(context_json, processor)
@@ -71,3 +76,65 @@ def sqs_arn_extractor(record):
7176
return record.get("eventSourceARN", "")
7277

7378
_dsm_set_context_helper(event, "sqs", sqs_arn_extractor, sqs_payload_calculator)
79+
80+
81+
def _get_dsm_context_from_lambda(message):
82+
"""
83+
Lambda-specific message formats:
84+
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
85+
- message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)
86+
- message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)
87+
- message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)
88+
- message.kinesis.data.decode()._datadog (Kinesis -> lambda)
89+
"""
90+
context_json = None
91+
message_body = message
92+
93+
if "kinesis" in message:
94+
try:
95+
kinesis_data = json.loads(
96+
base64.b64decode(message["kinesis"]["data"]).decode()
97+
)
98+
return kinesis_data.get("_datadog")
99+
except (ValueError, TypeError, KeyError):
100+
log.debug("Unable to parse kinesis data for lambda message")
101+
return None
102+
elif "Sns" in message:
103+
message_body = message["Sns"]
104+
else:
105+
try:
106+
body = message.get("body")
107+
if body:
108+
message_body = json.loads(body)
109+
except (ValueError, TypeError):
110+
log.debug("Unable to parse lambda message body as JSON, treat as non-json")
111+
112+
message_attributes = message_body.get("MessageAttributes") or message_body.get(
113+
"messageAttributes"
114+
)
115+
if not message_attributes:
116+
log.debug("DataStreams skipped lambda message: %r", message)
117+
return None
118+
119+
if "_datadog" not in message_attributes:
120+
log.debug("DataStreams skipped lambda message: %r", message)
121+
return None
122+
123+
datadog_attr = message_attributes["_datadog"]
124+
125+
if message_body.get("Type") == "Notification":
126+
# SNS -> lambda notification
127+
if datadog_attr.get("Type") == "Binary":
128+
context_json = json.loads(base64.b64decode(datadog_attr["Value"]).decode())
129+
elif "stringValue" in datadog_attr:
130+
# SQS -> lambda
131+
context_json = json.loads(datadog_attr["stringValue"])
132+
elif "binaryValue" in datadog_attr:
133+
# SNS -> SQS -> lambda, raw message delivery
134+
context_json = json.loads(
135+
base64.b64decode(datadog_attr["binaryValue"]).decode()
136+
)
137+
else:
138+
log.debug("DataStreams did not handle lambda message: %r", message)
139+
140+
return context_json

tests/test_dsm.py

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import unittest
2+
import json
3+
import base64
24
from unittest.mock import patch, MagicMock
35

46
from datadog_lambda.dsm import (
57
set_dsm_context,
68
_dsm_set_sqs_context,
79
_dsm_set_sns_context,
10+
_get_dsm_context_from_lambda,
811
)
912
from datadog_lambda.trigger import EventTypes, _EventSource
1013

@@ -203,3 +206,230 @@ def test_sns_multiple_records_process_each_record(self):
203206
self.assertIn(f"topic:{expected_arns[i]}", tags)
204207
self.assertIn("type:sns", tags)
205208
self.assertEqual(kwargs["payload_size"], 150)
209+
210+
211+
class TestGetDSMContext(unittest.TestCase):
212+
def test_sqs_to_lambda_string_value_format(self):
213+
"""Test format: message.messageAttributes._datadog.stringValue (SQS -> lambda)"""
214+
trace_context = {
215+
"x-datadog-trace-id": "789123456",
216+
"x-datadog-parent-id": "321987654",
217+
"dd-pathway-ctx": "test-pathway-ctx",
218+
}
219+
220+
lambda_record = {
221+
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
222+
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
223+
"body": "Test message.",
224+
"attributes": {
225+
"ApproximateReceiveCount": "1",
226+
"SentTimestamp": "1545082649183",
227+
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
228+
"ApproximateFirstReceiveTimestamp": "1545082649185",
229+
},
230+
"messageAttributes": {
231+
"_datadog": {
232+
"stringValue": json.dumps(trace_context),
233+
"stringListValues": [],
234+
"binaryListValues": [],
235+
"dataType": "String",
236+
},
237+
"myAttribute": {
238+
"stringValue": "myValue",
239+
"stringListValues": [],
240+
"binaryListValues": [],
241+
"dataType": "String",
242+
},
243+
},
244+
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
245+
"eventSource": "aws:sqs",
246+
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
247+
"awsRegion": "us-east-2",
248+
}
249+
250+
result = _get_dsm_context_from_lambda(lambda_record)
251+
252+
assert result is not None
253+
assert result == trace_context
254+
assert result["x-datadog-trace-id"] == "789123456"
255+
assert result["x-datadog-parent-id"] == "321987654"
256+
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
257+
258+
def test_sns_to_lambda_format(self):
259+
"""Test format: message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)"""
260+
trace_context = {
261+
"x-datadog-trace-id": "111111111",
262+
"x-datadog-parent-id": "222222222",
263+
"dd-pathway-ctx": "test-pathway-ctx",
264+
}
265+
binary_data = base64.b64encode(
266+
json.dumps(trace_context).encode("utf-8")
267+
).decode("utf-8")
268+
269+
sns_lambda_record = {
270+
"EventSource": "aws:sns",
271+
"EventSubscriptionArn": (
272+
"arn:aws:sns:us-east-1:123456789012:sns-topic:12345678-1234-1234-1234-123456789012"
273+
),
274+
"Sns": {
275+
"Type": "Notification",
276+
"MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
277+
"TopicArn": "arn:aws:sns:us-east-1:123456789012:sns-topic",
278+
"Subject": "Test Subject",
279+
"Message": "Hello from SNS!",
280+
"Timestamp": "2023-01-01T12:00:00.000Z",
281+
"MessageAttributes": {
282+
"_datadog": {"Type": "Binary", "Value": binary_data}
283+
},
284+
},
285+
}
286+
287+
result = _get_dsm_context_from_lambda(sns_lambda_record)
288+
289+
assert result is not None
290+
assert result == trace_context
291+
assert result["x-datadog-trace-id"] == "111111111"
292+
assert result["x-datadog-parent-id"] == "222222222"
293+
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
294+
295+
def test_sns_to_sqs_to_lambda_binary_value_format(self):
296+
"""Test format: message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)"""
297+
trace_context = {
298+
"x-datadog-trace-id": "777666555",
299+
"x-datadog-parent-id": "444333222",
300+
"dd-pathway-ctx": "test-pathway-ctx",
301+
}
302+
binary_data = base64.b64encode(
303+
json.dumps(trace_context).encode("utf-8")
304+
).decode("utf-8")
305+
306+
lambda_record = {
307+
"messageId": "test-message-id",
308+
"receiptHandle": "test-receipt-handle",
309+
"body": "Test message body",
310+
"messageAttributes": {
311+
"_datadog": {"binaryValue": binary_data, "dataType": "Binary"}
312+
},
313+
"eventSource": "aws:sqs",
314+
"eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:test-queue",
315+
}
316+
317+
result = _get_dsm_context_from_lambda(lambda_record)
318+
319+
assert result is not None
320+
assert result == trace_context
321+
assert result["x-datadog-trace-id"] == "777666555"
322+
assert result["x-datadog-parent-id"] == "444333222"
323+
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
324+
325+
def test_sns_to_sqs_to_lambda_body_format(self):
326+
"""Test format: message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)"""
327+
trace_context = {
328+
"x-datadog-trace-id": "123987456",
329+
"x-datadog-parent-id": "654321987",
330+
"x-datadog-sampling-priority": "1",
331+
"dd-pathway-ctx": "test-pathway-ctx",
332+
}
333+
334+
message_body = {
335+
"Type": "Notification",
336+
"MessageId": "test-message-id",
337+
"Message": "Test message from SNS",
338+
"MessageAttributes": {
339+
"_datadog": {
340+
"Type": "Binary",
341+
"Value": base64.b64encode(
342+
json.dumps(trace_context).encode("utf-8")
343+
).decode("utf-8"),
344+
}
345+
},
346+
}
347+
348+
lambda_record = {
349+
"messageId": "lambda-message-id",
350+
"body": json.dumps(message_body),
351+
"eventSource": "aws:sqs",
352+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:sns-to-sqs-queue",
353+
}
354+
355+
result = _get_dsm_context_from_lambda(lambda_record)
356+
357+
assert result is not None
358+
assert result == trace_context
359+
assert result["x-datadog-trace-id"] == "123987456"
360+
assert result["x-datadog-parent-id"] == "654321987"
361+
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
362+
363+
def test_kinesis_to_lambda_format(self):
364+
"""Test format: message.kinesis.data.decode()._datadog (Kinesis -> lambda)"""
365+
trace_context = {
366+
"x-datadog-trace-id": "555444333",
367+
"x-datadog-parent-id": "888777666",
368+
"dd-pathway-ctx": "test-pathway-ctx",
369+
}
370+
371+
# Create the kinesis data payload
372+
kinesis_payload = {
373+
"_datadog": trace_context,
374+
"actualData": "some business data",
375+
}
376+
encoded_kinesis_data = base64.b64encode(
377+
json.dumps(kinesis_payload).encode("utf-8")
378+
).decode("utf-8")
379+
380+
kinesis_lambda_record = {
381+
"eventSource": "aws:kinesis",
382+
"eventSourceARN": (
383+
"arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
384+
),
385+
"kinesis": {
386+
"data": encoded_kinesis_data,
387+
"partitionKey": "partition-key-1",
388+
"sequenceNumber": (
389+
"49590338271490256608559692538361571095921575989136588898"
390+
),
391+
},
392+
}
393+
394+
result = _get_dsm_context_from_lambda(kinesis_lambda_record)
395+
396+
assert result is not None
397+
assert result == trace_context
398+
assert result["x-datadog-trace-id"] == "555444333"
399+
assert result["x-datadog-parent-id"] == "888777666"
400+
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
401+
402+
def test_no_message_attributes(self):
403+
"""Test message without MessageAttributes returns None."""
404+
message = {
405+
"messageId": "test-message-id",
406+
"body": "Test message without attributes",
407+
}
408+
409+
result = _get_dsm_context_from_lambda(message)
410+
411+
assert result is None
412+
413+
def test_no_datadog_attribute(self):
414+
"""Test message with MessageAttributes but no _datadog attribute returns None."""
415+
message = {
416+
"messageId": "test-message-id",
417+
"body": "Test message",
418+
"messageAttributes": {
419+
"customAttribute": {"stringValue": "custom-value", "dataType": "String"}
420+
},
421+
}
422+
423+
result = _get_dsm_context_from_lambda(message)
424+
assert result is None
425+
426+
def test_empty_datadog_attribute(self):
427+
"""Test message with empty _datadog attribute returns None."""
428+
message = {
429+
"messageId": "test-message-id",
430+
"messageAttributes": {"_datadog": {}},
431+
}
432+
433+
result = _get_dsm_context_from_lambda(message)
434+
435+
assert result is None

0 commit comments

Comments
 (0)