Skip to content

Commit 8ebf6f2

Browse files
made dsm context extraction more explicit
1 parent 69071b4 commit 8ebf6f2

File tree

4 files changed

+201
-240
lines changed

4 files changed

+201
-240
lines changed

datadog_lambda/tracing.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
215215
216216
Falls back to lambda context if no trace data is found in the SQS message attributes.
217217
"""
218+
data_streams_ctx = {}
218219

219220
# EventBridge => SQS
220221
try:
@@ -265,6 +266,15 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
265266
if dd_json_data:
266267
dd_data = json.loads(dd_json_data)
267268

269+
if config.data_streams_enabled:
270+
from ddtrace.data_streams import PROPAGATION_KEY_BASE_64
271+
272+
data_streams_ctx = (
273+
{PROPAGATION_KEY_BASE_64: dd_data[PROPAGATION_KEY_BASE_64]}
274+
if PROPAGATION_KEY_BASE_64 in dd_data
275+
else {}
276+
)
277+
268278
if is_step_function_event(dd_data):
269279
try:
270280
return (
@@ -275,7 +285,7 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
275285
logger.debug(
276286
"Failed to extract Step Functions context from SQS/SNS event."
277287
)
278-
return propagator.extract(dd_data), dd_data
288+
return propagator.extract(dd_data), data_streams_ctx.get
279289
else:
280290
# Handle case where trace context is injected into attributes.AWSTraceHeader
281291
# example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
@@ -301,7 +311,7 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
301311
),
302312
None,
303313
)
304-
return extract_context_from_lambda_context(lambda_context), None
314+
return extract_context_from_lambda_context(lambda_context), data_streams_ctx.get
305315
except Exception as e:
306316
logger.debug("The trace extractor returned with error %s", e)
307317
return extract_context_from_lambda_context(lambda_context), None
@@ -363,6 +373,7 @@ def extract_context_from_kinesis_event(event, lambda_context):
363373
"""
364374
Extract datadog trace context from a Kinesis Stream's base64 encoded data string
365375
"""
376+
data_streams_ctx = {}
366377
try:
367378
record = get_first_record(event)
368379
kinesis = record.get("kinesis")
@@ -378,11 +389,19 @@ def extract_context_from_kinesis_event(event, lambda_context):
378389
data_obj = json.loads(data_str)
379390
dd_ctx = data_obj.get("_datadog")
380391
if dd_ctx:
381-
return propagator.extract(dd_ctx), dd_ctx
392+
if config.data_streams_enabled:
393+
from ddtrace.data_streams import PROPAGATION_KEY_BASE_64
394+
395+
data_streams_ctx = (
396+
{PROPAGATION_KEY_BASE_64: dd_ctx[PROPAGATION_KEY_BASE_64]}
397+
if PROPAGATION_KEY_BASE_64 in dd_ctx
398+
else {}
399+
)
400+
return propagator.extract(dd_ctx), data_streams_ctx.get
382401
except Exception as e:
383402
logger.debug("The trace extractor returned with error %s", e)
384403

385-
return extract_context_from_lambda_context(lambda_context), None
404+
return extract_context_from_lambda_context(lambda_context), data_streams_ctx.get
386405

387406

388407
def _deterministic_sha256_hash(s: str, part: str) -> int:
@@ -590,7 +609,7 @@ def extract_dd_trace_context(
590609
global dd_trace_context
591610
trace_context_source = None
592611
event_source = parse_event_source(event)
593-
dd_json_data = None
612+
dsm_carrier = None
594613

595614
if extractor is not None:
596615
context = extract_context_custom_extractor(extractor, event, lambda_context)
@@ -599,15 +618,13 @@ def extract_dd_trace_context(
599618
event, lambda_context, event_source, decode_authorizer_context
600619
)
601620
elif event_source.equals(EventTypes.SNS) or event_source.equals(EventTypes.SQS):
602-
context, dd_json_data = extract_context_from_sqs_or_sns_event_or_context(
621+
context, dsm_carrier = extract_context_from_sqs_or_sns_event_or_context(
603622
event, lambda_context
604623
)
605624
elif event_source.equals(EventTypes.EVENTBRIDGE):
606625
context = extract_context_from_eventbridge_event(event, lambda_context)
607626
elif event_source.equals(EventTypes.KINESIS):
608-
context, dd_json_data = extract_context_from_kinesis_event(
609-
event, lambda_context
610-
)
627+
context, dsm_carrier = extract_context_from_kinesis_event(event, lambda_context)
611628
elif event_source.equals(EventTypes.STEPFUNCTIONS):
612629
context = extract_context_from_step_functions(event, lambda_context)
613630
else:
@@ -624,7 +641,7 @@ def extract_dd_trace_context(
624641
if dd_trace_context:
625642
trace_context_source = TraceContextSource.XRAY
626643
logger.debug("extracted dd trace context %s", dd_trace_context)
627-
return dd_trace_context, trace_context_source, event_source, dd_json_data
644+
return dd_trace_context, trace_context_source, event_source, dsm_carrier
628645

629646

630647
def get_dd_trace_context_obj():

datadog_lambda/wrapper.py

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from datadog_lambda.trigger import (
4545
extract_trigger_tags,
4646
extract_http_status_code_tag,
47+
EventTypes,
4748
)
4849

4950
if config.profiling_enabled:
@@ -219,7 +220,7 @@ def _before(self, event, context):
219220
dd_context,
220221
trace_context_source,
221222
event_source,
222-
dd_json_data,
223+
dsm_carrier,
223224
) = extract_dd_trace_context(
224225
event,
225226
context,
@@ -245,10 +246,14 @@ def _before(self, event, context):
245246
event, context, event_source, config.decode_authorizer_context
246247
)
247248
if config.data_streams_enabled:
248-
if dd_json_data:
249+
if (
250+
event_source.equals(EventTypes.SQS)
251+
or event_source.equals(EventTypes.SNS)
252+
or event_source.equals(EventTypes.KINESIS)
253+
):
249254
source_arn = extract_source_arn(event)
250255
set_dsm_checkpoint(
251-
dd_json_data, event_source.to_string(), source_arn
256+
dsm_carrier, event_source.to_string(), source_arn
252257
)
253258

254259
self.span = create_function_execution_span(
@@ -355,28 +360,11 @@ def format_err_with_traceback(e):
355360
return f"Error {e}. Traceback: {tb}"
356361

357362

358-
def set_dsm_checkpoint(dd_json_data, event_source, arn):
363+
def set_dsm_checkpoint(dsm_carrier, event_source, arn):
359364
from ddtrace.data_streams import set_consume_checkpoint
360365

361366
try:
362-
if type(dd_json_data) is not dict:
363-
logger.debug("Failed to set DSM checkpoint: context is not a dict")
364-
return
365-
366-
if "dd-pathway-ctx-base64" not in dd_json_data:
367-
logger.debug(
368-
"Failed to set DSM checkpoint: dd-pathway-ctx-base64 not found"
369-
)
370-
return
371-
372-
def create_carrier_get(dd_json_data):
373-
def carrier_get(key):
374-
return dd_json_data.get(key)
375-
376-
return carrier_get
377-
378-
carrier_get = create_carrier_get(dd_json_data)
379-
set_consume_checkpoint(event_source, arn, carrier_get, manual_checkpoint=False)
367+
set_consume_checkpoint(event_source, arn, dsm_carrier)
380368
except Exception as e:
381369
logger.debug(f"Failed to set DSM checkpoint: {e}")
382370

tests/test_tracing.py

Lines changed: 28 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from ddtrace._trace._span_pointer import _SpanPointer
1616
from ddtrace._trace._span_pointer import _SpanPointerDirection
1717
from ddtrace._trace._span_pointer import _SpanPointerDescription
18+
from ddtrace.data_streams import PROPAGATION_KEY_BASE_64
1819

1920
from datadog_lambda.constants import (
2021
SamplingPriority,
@@ -262,6 +263,9 @@ def setUp(self):
262263
self.mock_is_lambda_context = patcher.start()
263264
self.mock_is_lambda_context.return_value = True
264265
self.addCleanup(patcher.stop)
266+
patcher = patch("datadog_lambda.config.Config.data_streams_enabled", True)
267+
self.mock_data_streams_enabled = patcher.start()
268+
self.addCleanup(patcher.stop)
265269

266270
def tearDown(self):
267271
del os.environ["_X_AMZN_TRACE_ID"]
@@ -2443,46 +2447,53 @@ def test_exception_outside_handler_tracing_disabled(
24432447
mock_trace.assert_not_called()
24442448

24452449

2446-
class TestExtractContextHaveAddedDDContext(unittest.TestCase):
2450+
class TestExtractContextHaveDSMContext(unittest.TestCase):
2451+
@patch("datadog_lambda.config.Config.trace_enabled", True)
2452+
@patch("datadog_lambda.config.Config.data_streams_enabled", True)
24472453
def test_extract_context_from_sqs_event_with_datadog_context(self):
24482454
"""Test SQS event extraction with datadog context returns expected values."""
24492455
lambda_ctx = get_mock_context()
2450-
expected_trace_data = {
2456+
header = {
24512457
TraceHeader.TRACE_ID: "456",
24522458
TraceHeader.PARENT_ID: "789",
24532459
TraceHeader.SAMPLING_PRIORITY: "1",
2460+
PROPAGATION_KEY_BASE_64: "test-data",
24542461
}
2462+
24552463
sqs_event = {
24562464
"Records": [
24572465
{
24582466
"messageId": "test-message-id",
24592467
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue",
24602468
"messageAttributes": {
24612469
"_datadog": {
2462-
"stringValue": json.dumps(expected_trace_data),
2470+
"stringValue": json.dumps(header),
24632471
"dataType": "String",
24642472
}
24652473
},
24662474
}
24672475
]
24682476
}
24692477

2470-
(context, dd_json_data) = extract_context_from_sqs_or_sns_event_or_context(
2478+
(context, dsm_carrier) = extract_context_from_sqs_or_sns_event_or_context(
24712479
sqs_event, lambda_ctx
24722480
)
24732481

24742482
self.assertEqual(context.trace_id, 456)
24752483
self.assertEqual(context.span_id, 789)
24762484
self.assertEqual(context.sampling_priority, 1)
2477-
self.assertEqual(dd_json_data, expected_trace_data)
2485+
self.assertEqual(dsm_carrier(PROPAGATION_KEY_BASE_64), "test-data")
24782486

2487+
@patch("datadog_lambda.config.Config.trace_enabled", True)
2488+
@patch("datadog_lambda.config.Config.data_streams_enabled", True)
24792489
def test_extract_context_from_sns_event_with_datadog_context(self):
24802490
"""Test SNS event extraction with datadog context returns expected values."""
24812491
lambda_ctx = get_mock_context()
24822492
expected_trace_data = {
24832493
TraceHeader.TRACE_ID: "111",
24842494
TraceHeader.PARENT_ID: "222",
24852495
TraceHeader.SAMPLING_PRIORITY: "2",
2496+
PROPAGATION_KEY_BASE_64: "test-data",
24862497
}
24872498
sns_event = {
24882499
"Records": [
@@ -2500,21 +2511,24 @@ def test_extract_context_from_sns_event_with_datadog_context(self):
25002511
]
25012512
}
25022513

2503-
(context, dd_json_data) = extract_context_from_sqs_or_sns_event_or_context(
2514+
(context, dsm_carrier) = extract_context_from_sqs_or_sns_event_or_context(
25042515
sns_event, lambda_ctx
25052516
)
25062517

25072518
self.assertEqual(context.trace_id, 111)
25082519
self.assertEqual(context.span_id, 222)
25092520
self.assertEqual(context.sampling_priority, 2)
2510-
self.assertEqual(dd_json_data, expected_trace_data)
2521+
self.assertEqual(dsm_carrier(PROPAGATION_KEY_BASE_64), "test-data")
25112522

2523+
@patch("datadog_lambda.config.Config.trace_enabled", True)
2524+
@patch("datadog_lambda.config.Config.data_streams_enabled", True)
25122525
def test_extract_context_from_kinesis_event_with_datadog_context(self):
25132526
"""Test Kinesis event extraction with datadog context returns expected values."""
25142527
lambda_ctx = get_mock_context()
25152528
expected_trace_data = {
25162529
TraceHeader.TRACE_ID: "333",
25172530
TraceHeader.PARENT_ID: "444",
2531+
PROPAGATION_KEY_BASE_64: "test-data",
25182532
}
25192533
kinesis_event = {
25202534
"Records": [
@@ -2529,50 +2543,24 @@ def test_extract_context_from_kinesis_event_with_datadog_context(self):
25292543
]
25302544
}
25312545

2532-
context, dd_json_data = extract_context_from_kinesis_event(
2546+
context, dsm_carrier = extract_context_from_kinesis_event(
25332547
kinesis_event, lambda_ctx
25342548
)
25352549

25362550
self.assertEqual(context.trace_id, 333)
25372551
self.assertEqual(context.span_id, 444)
2538-
self.assertEqual(dd_json_data, expected_trace_data)
2539-
2540-
def test_step_function_context_in_sqs_calls_step_function_extractor(self):
2541-
"""Test that Step Function context in SQS calls the step function extractor."""
2542-
lambda_ctx = get_mock_context()
2543-
sfn_context = {
2544-
"Execution": {"Id": "test-execution-id"},
2545-
"StateMachine": {"Id": "test-state-machine-id"},
2546-
"State": {"Name": "test-state"},
2547-
}
2548-
sfn_sqs_event = {
2549-
"Records": [
2550-
{
2551-
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue",
2552-
"messageAttributes": {
2553-
"_datadog": {
2554-
"stringValue": json.dumps(sfn_context),
2555-
"dataType": "String",
2556-
}
2557-
},
2558-
}
2559-
]
2560-
}
2561-
2562-
(context, dd_json_data) = extract_context_from_sqs_or_sns_event_or_context(
2563-
sfn_sqs_event, lambda_ctx
2564-
)
2565-
2566-
self.assertIsNotNone(context)
2567-
self.assertIsNone(dd_json_data)
2552+
self.assertEqual(dsm_carrier(PROPAGATION_KEY_BASE_64), "test-data")
25682553

2554+
@patch("datadog_lambda.config.Config.trace_enabled", True)
2555+
@patch("datadog_lambda.config.Config.data_streams_enabled", True)
25692556
def test_extract_context_from_sns_to_sqs_event_with_datadog_context(self):
25702557
"""Test SNS -> SQS event extraction with datadog context returns expected values."""
25712558
lambda_ctx = get_mock_context()
25722559
expected_trace_data = {
25732560
TraceHeader.TRACE_ID: "555",
25742561
TraceHeader.PARENT_ID: "666",
25752562
TraceHeader.SAMPLING_PRIORITY: "1",
2563+
PROPAGATION_KEY_BASE_64: "test-data",
25762564
}
25772565

25782566
sns_body = {
@@ -2599,11 +2587,11 @@ def test_extract_context_from_sns_to_sqs_event_with_datadog_context(self):
25992587
]
26002588
}
26012589

2602-
(context, dd_json_data) = extract_context_from_sqs_or_sns_event_or_context(
2590+
(context, dsm_carrier) = extract_context_from_sqs_or_sns_event_or_context(
26032591
sns_to_sqs_event, lambda_ctx
26042592
)
26052593

26062594
self.assertEqual(context.trace_id, 555)
26072595
self.assertEqual(context.span_id, 666)
26082596
self.assertEqual(context.sampling_priority, 1)
2609-
self.assertEqual(dd_json_data, expected_trace_data)
2597+
self.assertEqual(dsm_carrier(PROPAGATION_KEY_BASE_64), "test-data")

0 commit comments

Comments
 (0)