Skip to content

Commit 3672f39

Browse files
revert back to original tracing.py implementation
1 parent 33cdc6c commit 3672f39

File tree

2 files changed

+80
-81
lines changed

2 files changed

+80
-81
lines changed

datadog_lambda/tracing.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,28 +66,27 @@
6666
LOWER_64_BITS = "LOWER_64_BITS"
6767

6868

69-
def _extract_context_with_data_streams(context_json, event_type, arn):
69+
def _dsm_set_checkpoint(context_json, event_type, arn):
7070
from ddtrace.data_streams import set_consume_checkpoint
71+
from ddtrace.data_streams import PROPAGATION_KEY_BASE_64
7172

7273
"""
7374
Extracts the context from a JSON carrier and optionally sets a dsm consume checkpoint
7475
if the context is complete and data streams are enabled.
7576
"""
76-
context = propagator.extract(context_json)
7777

78-
if not config.data_streams_enabled:
79-
return context
78+
if PROPAGATION_KEY_BASE_64 not in context_json:
79+
return
8080

81-
if "dd-pathway-ctx-base64" not in context_json:
82-
return context
81+
if not config.data_streams_enabled:
82+
return
8383
try:
8484
carrier_get = _create_carrier_get(context_json)
8585
set_consume_checkpoint(event_type, arn, carrier_get, manual_checkpoint=False)
8686
except Exception as e:
8787
logger.debug(
8888
f"DSM:Failed to set consume checkpoint for {event_type} {arn}: {e}"
8989
)
90-
return context
9190

9291

9392
def _create_carrier_get(context_json):
@@ -232,7 +231,9 @@ def create_sns_event(message):
232231
}
233232

234233

235-
def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
234+
def extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_checkpoint_if_enabled(
235+
event, lambda_context
236+
):
236237
"""
237238
Extract Datadog trace context from an SQS event.
238239
@@ -244,6 +245,7 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
244245
Lambda Context.
245246
246247
Falls back to lambda context if no trace data is found in the SQS message attributes.
248+
Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
247249
"""
248250
is_sqs = False
249251

@@ -308,9 +310,9 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
308310
logger.debug(
309311
"Failed to extract Step Functions context from SQS/SNS event."
310312
)
311-
return _extract_context_with_data_streams(
312-
dd_data, "sqs" if is_sqs else "sns", arn
313-
)
313+
context = propagator.extract(dd_data)
314+
_dsm_set_checkpoint(dd_data, "sqs" if is_sqs else "sns", arn)
315+
return context
314316
else:
315317
# Handle case where trace context is injected into attributes.AWSTraceHeader
316318
# example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
@@ -391,9 +393,12 @@ def extract_context_from_eventbridge_event(event, lambda_context):
391393
return extract_context_from_lambda_context(lambda_context)
392394

393395

394-
def extract_context_from_kinesis_event(event, lambda_context):
396+
def extract_context_from_kinesis_event_and_set_dsm_checkpoint_if_enabled(
397+
event, lambda_context
398+
):
395399
"""
396400
Extract datadog trace context from a Kinesis Stream's base64 encoded data string
401+
Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
397402
"""
398403
try:
399404
record = get_first_record(event)
@@ -411,7 +416,9 @@ def extract_context_from_kinesis_event(event, lambda_context):
411416
data_obj = json.loads(data_str)
412417
dd_ctx = data_obj.get("_datadog")
413418
if dd_ctx:
414-
return _extract_context_with_data_streams(dd_ctx, "kinesis", arn)
419+
context = propagator.extract(dd_ctx)
420+
_dsm_set_checkpoint(dd_ctx, "kinesis", arn)
421+
return context
415422
except Exception as e:
416423
logger.debug("The trace extractor returned with error %s", e)
417424

@@ -631,13 +638,15 @@ def extract_dd_trace_context(
631638
event, lambda_context, event_source, decode_authorizer_context
632639
)
633640
elif event_source.equals(EventTypes.SNS) or event_source.equals(EventTypes.SQS):
634-
context = extract_context_from_sqs_or_sns_event_or_context(
641+
context = extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_checkpoint_if_enabled(
635642
event, lambda_context
636643
)
637644
elif event_source.equals(EventTypes.EVENTBRIDGE):
638645
context = extract_context_from_eventbridge_event(event, lambda_context)
639646
elif event_source.equals(EventTypes.KINESIS):
640-
context = extract_context_from_kinesis_event(event, lambda_context)
647+
context = extract_context_from_kinesis_event_and_set_dsm_checkpoint_if_enabled(
648+
event, lambda_context
649+
)
641650
elif event_source.equals(EventTypes.STEPFUNCTIONS):
642651
context = extract_context_from_step_functions(event, lambda_context)
643652
else:

tests/test_tracing.py

Lines changed: 56 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@
4242
service_mapping as global_service_mapping,
4343
propagator,
4444
emit_telemetry_on_exception_outside_of_handler,
45-
_extract_context_with_data_streams,
45+
_dsm_set_checkpoint,
4646
_create_carrier_get,
47-
extract_context_from_sqs_or_sns_event_or_context,
48-
extract_context_from_kinesis_event,
47+
extract_context_from_kinesis_event_and_set_dsm_checkpoint_if_enabled,
48+
extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_checkpoint_if_enabled,
4949
)
5050

5151
from tests.utils import get_mock_context
@@ -2445,12 +2445,8 @@ def test_exception_outside_handler_tracing_disabled(
24452445
mock_trace.assert_not_called()
24462446

24472447

2448-
class TestExtractContext(unittest.TestCase):
2448+
class TestDsmSetCheckpoint(unittest.TestCase):
24492449
def setUp(self):
2450-
patcher = patch("datadog_lambda.tracing.propagator.extract")
2451-
self.mock_extract = patcher.start()
2452-
self.addCleanup(patcher.stop)
2453-
24542450
checkpoint_patcher = patch("ddtrace.data_streams.set_consume_checkpoint")
24552451
self.mock_checkpoint = checkpoint_patcher.start()
24562452
self.addCleanup(checkpoint_patcher.stop)
@@ -2460,73 +2456,53 @@ def setUp(self):
24602456
self.addCleanup(logger_patcher.stop)
24612457

24622458
@patch("datadog_lambda.config.Config.data_streams_enabled", False)
2463-
def test_extract_context_data_streams_disabled(self):
2459+
def test_dsm_set_checkpoint_data_streams_disabled(self):
24642460
context_json = {"dd-pathway-ctx-base64": "12345"}
24652461
event_type = "sqs"
24662462
arn = "arn:aws:sqs:us-east-1:123456789012:test-queue"
24672463

2468-
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2469-
self.mock_extract.return_value = mock_context
2470-
2471-
result = _extract_context_with_data_streams(context_json, event_type, arn)
2464+
_dsm_set_checkpoint(context_json, event_type, arn)
24722465

2473-
self.mock_extract.assert_called_once_with(context_json)
24742466
self.mock_checkpoint.assert_not_called()
2475-
self.assertEqual(result, mock_context)
24762467

24772468
@patch("datadog_lambda.config.Config.data_streams_enabled", True)
2478-
def test_extract_context_data_streams_enabled_complete_context(self):
2469+
def test_dsm_set_checkpoint_data_streams_enabled_complete_context(self):
24792470
context_json = {"dd-pathway-ctx-base64": "12345"}
24802471
event_type = "sqs"
24812472
arn = "arn:aws:sqs:us-east-1:123456789012:test-queue"
24822473

2483-
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2484-
self.mock_extract.return_value = mock_context
2485-
2486-
result = _extract_context_with_data_streams(context_json, event_type, arn)
2474+
_dsm_set_checkpoint(context_json, event_type, arn)
24872475

2488-
self.mock_extract.assert_called_once_with(context_json)
24892476
self.mock_checkpoint.assert_called_once()
24902477
args, kwargs = self.mock_checkpoint.call_args
24912478
self.assertEqual(args[0], event_type)
24922479
self.assertEqual(args[1], arn)
24932480
self.assertTrue(callable(args[2]))
24942481
self.assertEqual(kwargs["manual_checkpoint"], False)
2495-
self.assertEqual(result, mock_context)
24962482

24972483
@patch("datadog_lambda.config.Config.data_streams_enabled", True)
2498-
def test_extract_context_data_streams_enabled_invalid_context(self):
2484+
def test_dsm_set_checkpoint_data_streams_enabled_invalid_context(self):
24992485
context_json = {"something-malformed": "12345"}
25002486
event_type = "sqs"
25012487
arn = "arn:aws:sqs:us-east-1:123456789012:test-queue"
25022488

2503-
mock_context = Context(trace_id=12345, span_id=12345, sampling_priority=1)
2504-
self.mock_extract.return_value = mock_context
2489+
_dsm_set_checkpoint(context_json, event_type, arn)
25052490

2506-
result = _extract_context_with_data_streams(context_json, event_type, arn)
2507-
2508-
self.mock_extract.assert_called_once_with(context_json)
25092491
self.mock_checkpoint.assert_not_called()
2510-
self.assertEqual(result, mock_context)
25112492

25122493
@patch("datadog_lambda.config.Config.data_streams_enabled", True)
2513-
def test_extract_context_exception_path(self):
2494+
def test_dsm_set_checkpoint_exception_path(self):
25142495
context_json = {"dd-pathway-ctx-base64": "12345"}
25152496
event_type = "sqs"
25162497
arn = "arn:aws:sqs:us-east-1:123456789012:test-queue"
25172498

2518-
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2519-
self.mock_extract.return_value = mock_context
2520-
25212499
test_exception = Exception("Test exception")
25222500
self.mock_checkpoint.side_effect = test_exception
25232501

2524-
result = _extract_context_with_data_streams(context_json, event_type, arn)
2502+
_dsm_set_checkpoint(context_json, event_type, arn)
25252503

2526-
self.mock_extract.assert_called_once_with(context_json)
25272504
self.mock_checkpoint.assert_called_once()
25282505
self.mock_logger.debug.assert_called_once()
2529-
self.assertEqual(result, mock_context)
25302506

25312507

25322508
class TestCreateCarrierGet(unittest.TestCase):
@@ -2566,9 +2542,10 @@ class TestExtractContextFromSqsOrSnsEvent(unittest.TestCase):
25662542
def setUp(self):
25672543
self.lambda_context = get_mock_context()
25682544

2569-
@patch("datadog_lambda.tracing._extract_context_with_data_streams")
2545+
@patch("datadog_lambda.tracing._dsm_set_checkpoint")
2546+
@patch("datadog_lambda.tracing.propagator.extract")
25702547
def test_sqs_event_with_datadog_message_attributes(
2571-
self, mock_extract_context_with_data_streams
2548+
self, mock_extract, mock_dsm_set_checkpoint
25722549
):
25732550
dd_data = {"dd-pathway-ctx-base64": "12345"}
25742551
dd_json_data = json.dumps(dd_data)
@@ -2585,20 +2562,22 @@ def test_sqs_event_with_datadog_message_attributes(
25852562
}
25862563

25872564
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2588-
mock_extract_context_with_data_streams.return_value = mock_context
2565+
mock_extract.return_value = mock_context
25892566

2590-
result = extract_context_from_sqs_or_sns_event_or_context(
2567+
result = extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_checkpoint_if_enabled(
25912568
event, self.lambda_context
25922569
)
25932570

2594-
mock_extract_context_with_data_streams.assert_called_once_with(
2571+
mock_extract.assert_called_once_with(dd_data)
2572+
mock_dsm_set_checkpoint.assert_called_once_with(
25952573
dd_data, "sqs", "arn:aws:sqs:us-east-1:123456789012:test-queue"
25962574
)
25972575
self.assertEqual(result, mock_context)
25982576

2599-
@patch("datadog_lambda.tracing._extract_context_with_data_streams")
2577+
@patch("datadog_lambda.tracing._dsm_set_checkpoint")
2578+
@patch("datadog_lambda.tracing.propagator.extract")
26002579
def test_sqs_event_with_binary_datadog_message_attributes(
2601-
self, mock_extract_context_with_data_streams
2580+
self, mock_extract, mock_dsm_set_checkpoint
26022581
):
26032582
dd_data = {"dd-pathway-ctx-base64": "12345"}
26042583
dd_json_data = json.dumps(dd_data)
@@ -2616,20 +2595,22 @@ def test_sqs_event_with_binary_datadog_message_attributes(
26162595
}
26172596

26182597
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2619-
mock_extract_context_with_data_streams.return_value = mock_context
2598+
mock_extract.return_value = mock_context
26202599

2621-
result = extract_context_from_sqs_or_sns_event_or_context(
2600+
result = extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_checkpoint_if_enabled(
26222601
event, self.lambda_context
26232602
)
26242603

2625-
mock_extract_context_with_data_streams.assert_called_once_with(
2604+
mock_extract.assert_called_once_with(dd_data)
2605+
mock_dsm_set_checkpoint.assert_called_once_with(
26262606
dd_data, "sqs", "arn:aws:sqs:us-east-1:123456789012:test-queue"
26272607
)
26282608
self.assertEqual(result, mock_context)
26292609

2630-
@patch("datadog_lambda.tracing._extract_context_with_data_streams")
2610+
@patch("datadog_lambda.tracing._dsm_set_checkpoint")
2611+
@patch("datadog_lambda.tracing.propagator.extract")
26312612
def test_sns_event_with_datadog_message_attributes(
2632-
self, mock_extract_context_with_data_streams
2613+
self, mock_extract, mock_dsm_set_checkpoint
26332614
):
26342615
dd_data = {"dd-pathway-ctx-base64": "12345"}
26352616
dd_json_data = json.dumps(dd_data)
@@ -2649,20 +2630,22 @@ def test_sns_event_with_datadog_message_attributes(
26492630
}
26502631

26512632
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2652-
mock_extract_context_with_data_streams.return_value = mock_context
2633+
mock_extract.return_value = mock_context
26532634

2654-
result = extract_context_from_sqs_or_sns_event_or_context(
2635+
result = extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_checkpoint_if_enabled(
26552636
event, self.lambda_context
26562637
)
26572638

2658-
mock_extract_context_with_data_streams.assert_called_once_with(
2639+
mock_extract.assert_called_once_with(dd_data)
2640+
mock_dsm_set_checkpoint.assert_called_once_with(
26592641
dd_data, "sns", "arn:aws:sns:us-east-1:123456789012:test-topic"
26602642
)
26612643
self.assertEqual(result, mock_context)
26622644

2663-
@patch("datadog_lambda.tracing._extract_context_with_data_streams")
2645+
@patch("datadog_lambda.tracing._dsm_set_checkpoint")
2646+
@patch("datadog_lambda.tracing.propagator.extract")
26642647
def test_sqs_event_determines_is_sqs_true_when_event_source_arn_present(
2665-
self, mock_extract_context_with_data_streams
2648+
self, mock_extract, mock_dsm_set_checkpoint
26662649
):
26672650
"""Test that is_sqs = True when eventSourceARN is present in first record"""
26682651
dd_data = {"dd-pathway-ctx-base64": "12345"}
@@ -2680,20 +2663,22 @@ def test_sqs_event_determines_is_sqs_true_when_event_source_arn_present(
26802663
}
26812664

26822665
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2683-
mock_extract_context_with_data_streams.return_value = mock_context
2666+
mock_extract.return_value = mock_context
26842667

2685-
result = extract_context_from_sqs_or_sns_event_or_context(
2668+
result = extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_checkpoint_if_enabled(
26862669
event, self.lambda_context
26872670
)
26882671

2689-
mock_extract_context_with_data_streams.assert_called_once_with(
2672+
mock_extract.assert_called_once_with(dd_data)
2673+
mock_dsm_set_checkpoint.assert_called_once_with(
26902674
dd_data, "sqs", "arn:aws:sqs:us-east-1:123456789012:test-queue"
26912675
)
26922676
self.assertEqual(result, mock_context)
26932677

2694-
@patch("datadog_lambda.tracing._extract_context_with_data_streams")
2678+
@patch("datadog_lambda.tracing._dsm_set_checkpoint")
2679+
@patch("datadog_lambda.tracing.propagator.extract")
26952680
def test_sns_to_sqs_event_detection_and_processing(
2696-
self, mock_extract_context_with_data_streams
2681+
self, mock_extract, mock_dsm_set_checkpoint
26972682
):
26982683
"""Test SNS->SQS case where SQS body contains SNS notification"""
26992684
dd_data = {"dd-pathway-ctx-base64": "12345"}
@@ -2719,13 +2704,14 @@ def test_sns_to_sqs_event_detection_and_processing(
27192704
}
27202705

27212706
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2722-
mock_extract_context_with_data_streams.return_value = mock_context
2707+
mock_extract.return_value = mock_context
27232708

2724-
result = extract_context_from_sqs_or_sns_event_or_context(
2709+
result = extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_checkpoint_if_enabled(
27252710
event, self.lambda_context
27262711
)
27272712

2728-
mock_extract_context_with_data_streams.assert_called_once_with(
2713+
mock_extract.assert_called_once_with(dd_data)
2714+
mock_dsm_set_checkpoint.assert_called_once_with(
27292715
dd_data, "sqs", "arn:aws:sqs:us-east-1:123456789012:test-queue"
27302716
)
27312717
self.assertEqual(result, mock_context)
@@ -2735,9 +2721,10 @@ class TestExtractContextFromKinesisEvent(unittest.TestCase):
27352721
def setUp(self):
27362722
self.lambda_context = get_mock_context()
27372723

2738-
@patch("datadog_lambda.tracing._extract_context_with_data_streams")
2724+
@patch("datadog_lambda.tracing._dsm_set_checkpoint")
2725+
@patch("datadog_lambda.tracing.propagator.extract")
27392726
def test_kinesis_event_with_datadog_data(
2740-
self, mock_extract_context_with_data_streams
2727+
self, mock_extract, mock_dsm_set_checkpoint
27412728
):
27422729
dd_data = {"dd-pathway-ctx-base64": "12345"}
27432730
kinesis_data = {"_datadog": dd_data, "message": "test"}
@@ -2754,11 +2741,14 @@ def test_kinesis_event_with_datadog_data(
27542741
}
27552742

27562743
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2757-
mock_extract_context_with_data_streams.return_value = mock_context
2744+
mock_extract.return_value = mock_context
27582745

2759-
result = extract_context_from_kinesis_event(event, self.lambda_context)
2746+
result = extract_context_from_kinesis_event_and_set_dsm_checkpoint_if_enabled(
2747+
event, self.lambda_context
2748+
)
27602749

2761-
mock_extract_context_with_data_streams.assert_called_once_with(
2750+
mock_extract.assert_called_once_with(dd_data)
2751+
mock_dsm_set_checkpoint.assert_called_once_with(
27622752
dd_data,
27632753
"kinesis",
27642754
"arn:aws:kinesis:us-east-1:123456789012:stream/test-stream",

0 commit comments

Comments
 (0)