Skip to content

Commit edf7c0a

Browse files
extract side effects into own function
1 parent 1e81cd0 commit edf7c0a

File tree

2 files changed

+63
-73
lines changed

2 files changed

+63
-73
lines changed

datadog_lambda/tracing.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -66,28 +66,22 @@
6666
LOWER_64_BITS = "LOWER_64_BITS"
6767

6868

69-
def _extract_context_with_data_streams(context_json, event_type, arn):
70-
from ddtrace.data_streams import set_consume_checkpoint
71-
72-
"""
73-
Extracts the context from a JSON carrier and optionally sets a dsm consume checkpoint
74-
if the context is complete and data streams are enabled.
75-
"""
76-
context = propagator.extract(context_json)
77-
69+
def _set_data_streams_checkpoint(context_json, event_type, arn):
7870
if not config.data_streams_enabled:
79-
return context
71+
return
8072

8173
if "dd-pathway-ctx-base64" not in context_json:
82-
return context
74+
return
75+
8376
try:
77+
from ddtrace.data_streams import set_consume_checkpoint
78+
8479
carrier_get = _create_carrier_get(context_json)
8580
set_consume_checkpoint(event_type, arn, carrier_get, manual_checkpoint=False)
8681
except Exception as e:
8782
logger.debug(
8883
f"DSM:Failed to set consume checkpoint for {event_type} {arn}: {e}"
8984
)
90-
return context
9185

9286

9387
def _create_carrier_get(context_json):
@@ -308,9 +302,10 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
308302
logger.debug(
309303
"Failed to extract Step Functions context from SQS/SNS event."
310304
)
311-
return _extract_context_with_data_streams(
312-
dd_data, "sqs" if is_sqs else "sns", arn
313-
)
305+
event_type = "sqs" if is_sqs else "sns"
306+
context = propagator.extract(dd_data)
307+
_set_data_streams_checkpoint(dd_data, event_type, arn)
308+
return context
314309
else:
315310
# Handle case where trace context is injected into attributes.AWSTraceHeader
316311
# example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
@@ -411,7 +406,9 @@ def extract_context_from_kinesis_event(event, lambda_context):
411406
data_obj = json.loads(data_str)
412407
dd_ctx = data_obj.get("_datadog")
413408
if dd_ctx:
414-
return _extract_context_with_data_streams(dd_ctx, "kinesis", arn)
409+
context = propagator.extract(dd_ctx)
410+
_set_data_streams_checkpoint(dd_ctx, "kinesis", arn)
411+
return context
415412
except Exception as e:
416413
logger.debug("The trace extractor returned with error %s", e)
417414

tests/test_tracing.py

Lines changed: 50 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
service_mapping as global_service_mapping,
4343
propagator,
4444
emit_telemetry_on_exception_outside_of_handler,
45-
_extract_context_with_data_streams,
4645
_create_carrier_get,
4746
extract_context_from_sqs_or_sns_event_or_context,
4847
extract_context_from_kinesis_event,
@@ -2445,12 +2444,8 @@ def test_exception_outside_handler_tracing_disabled(
24452444
mock_trace.assert_not_called()
24462445

24472446

2448-
class TestExtractContext(unittest.TestCase):
2447+
class TestSetDataStreamsCheckpoint(unittest.TestCase):
24492448
def setUp(self):
2450-
patcher = patch("datadog_lambda.tracing.propagator.extract")
2451-
self.mock_extract = patcher.start()
2452-
self.addCleanup(patcher.stop)
2453-
24542449
checkpoint_patcher = patch("ddtrace.data_streams.set_consume_checkpoint")
24552450
self.mock_checkpoint = checkpoint_patcher.start()
24562451
self.addCleanup(checkpoint_patcher.stop)
@@ -2460,73 +2455,61 @@ def setUp(self):
24602455
self.addCleanup(logger_patcher.stop)
24612456

24622457
@patch("datadog_lambda.config.Config.data_streams_enabled", False)
2463-
def test_extract_context_data_streams_disabled(self):
2458+
def test_set_checkpoint_data_streams_disabled(self):
24642459
context_json = {"dd-pathway-ctx-base64": "12345"}
24652460
event_type = "sqs"
24662461
arn = "arn:aws:sqs:us-east-1:123456789012:test-queue"
24672462

2468-
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2469-
self.mock_extract.return_value = mock_context
2463+
from datadog_lambda.tracing import _set_data_streams_checkpoint
24702464

2471-
result = _extract_context_with_data_streams(context_json, event_type, arn)
2465+
_set_data_streams_checkpoint(context_json, event_type, arn)
24722466

2473-
self.mock_extract.assert_called_once_with(context_json)
24742467
self.mock_checkpoint.assert_not_called()
2475-
self.assertEqual(result, mock_context)
24762468

24772469
@patch("datadog_lambda.config.Config.data_streams_enabled", True)
2478-
def test_extract_context_data_streams_enabled_complete_context(self):
2470+
def test_set_checkpoint_data_streams_enabled_complete_context(self):
24792471
context_json = {"dd-pathway-ctx-base64": "12345"}
24802472
event_type = "sqs"
24812473
arn = "arn:aws:sqs:us-east-1:123456789012:test-queue"
24822474

2483-
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2484-
self.mock_extract.return_value = mock_context
2475+
from datadog_lambda.tracing import _set_data_streams_checkpoint
24852476

2486-
result = _extract_context_with_data_streams(context_json, event_type, arn)
2477+
_set_data_streams_checkpoint(context_json, event_type, arn)
24872478

2488-
self.mock_extract.assert_called_once_with(context_json)
24892479
self.mock_checkpoint.assert_called_once()
24902480
args, kwargs = self.mock_checkpoint.call_args
24912481
self.assertEqual(args[0], event_type)
24922482
self.assertEqual(args[1], arn)
24932483
self.assertTrue(callable(args[2]))
24942484
self.assertEqual(kwargs["manual_checkpoint"], False)
2495-
self.assertEqual(result, mock_context)
24962485

24972486
@patch("datadog_lambda.config.Config.data_streams_enabled", True)
2498-
def test_extract_context_data_streams_enabled_invalid_context(self):
2487+
def test_set_checkpoint_data_streams_enabled_invalid_context(self):
24992488
context_json = {"something-malformed": "12345"}
25002489
event_type = "sqs"
25012490
arn = "arn:aws:sqs:us-east-1:123456789012:test-queue"
25022491

2503-
mock_context = Context(trace_id=12345, span_id=12345, sampling_priority=1)
2504-
self.mock_extract.return_value = mock_context
2492+
from datadog_lambda.tracing import _set_data_streams_checkpoint
25052493

2506-
result = _extract_context_with_data_streams(context_json, event_type, arn)
2494+
_set_data_streams_checkpoint(context_json, event_type, arn)
25072495

2508-
self.mock_extract.assert_called_once_with(context_json)
25092496
self.mock_checkpoint.assert_not_called()
2510-
self.assertEqual(result, mock_context)
25112497

25122498
@patch("datadog_lambda.config.Config.data_streams_enabled", True)
2513-
def test_extract_context_exception_path(self):
2499+
def test_set_checkpoint_exception_path(self):
25142500
context_json = {"dd-pathway-ctx-base64": "12345"}
25152501
event_type = "sqs"
25162502
arn = "arn:aws:sqs:us-east-1:123456789012:test-queue"
25172503

2518-
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2519-
self.mock_extract.return_value = mock_context
2520-
25212504
test_exception = Exception("Test exception")
25222505
self.mock_checkpoint.side_effect = test_exception
25232506

2524-
result = _extract_context_with_data_streams(context_json, event_type, arn)
2507+
from datadog_lambda.tracing import _set_data_streams_checkpoint
2508+
2509+
_set_data_streams_checkpoint(context_json, event_type, arn)
25252510

2526-
self.mock_extract.assert_called_once_with(context_json)
25272511
self.mock_checkpoint.assert_called_once()
25282512
self.mock_logger.debug.assert_called_once()
2529-
self.assertEqual(result, mock_context)
25302513

25312514

25322515
class TestCreateCarrierGet(unittest.TestCase):
@@ -2566,9 +2549,10 @@ class TestExtractContextFromSqsOrSnsEvent(unittest.TestCase):
25662549
def setUp(self):
25672550
self.lambda_context = get_mock_context()
25682551

2569-
@patch("datadog_lambda.tracing._extract_context_with_data_streams")
2552+
@patch("datadog_lambda.tracing._set_data_streams_checkpoint")
2553+
@patch("datadog_lambda.tracing.propagator.extract")
25702554
def test_sqs_event_with_datadog_message_attributes(
2571-
self, mock_extract_context_with_data_streams
2555+
self, mock_extract, mock_set_checkpoint
25722556
):
25732557
dd_data = {"dd-pathway-ctx-base64": "12345"}
25742558
dd_json_data = json.dumps(dd_data)
@@ -2585,20 +2569,22 @@ def test_sqs_event_with_datadog_message_attributes(
25852569
}
25862570

25872571
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2588-
mock_extract_context_with_data_streams.return_value = mock_context
2572+
mock_extract.return_value = mock_context
25892573

25902574
result = extract_context_from_sqs_or_sns_event_or_context(
25912575
event, self.lambda_context
25922576
)
25932577

2594-
mock_extract_context_with_data_streams.assert_called_once_with(
2578+
mock_extract.assert_called_once_with(dd_data)
2579+
mock_set_checkpoint.assert_called_once_with(
25952580
dd_data, "sqs", "arn:aws:sqs:us-east-1:123456789012:test-queue"
25962581
)
25972582
self.assertEqual(result, mock_context)
25982583

2599-
@patch("datadog_lambda.tracing._extract_context_with_data_streams")
2584+
@patch("datadog_lambda.tracing._set_data_streams_checkpoint")
2585+
@patch("datadog_lambda.tracing.propagator.extract")
26002586
def test_sqs_event_with_binary_datadog_message_attributes(
2601-
self, mock_extract_context_with_data_streams
2587+
self, mock_extract, mock_set_checkpoint
26022588
):
26032589
dd_data = {"dd-pathway-ctx-base64": "12345"}
26042590
dd_json_data = json.dumps(dd_data)
@@ -2616,20 +2602,22 @@ def test_sqs_event_with_binary_datadog_message_attributes(
26162602
}
26172603

26182604
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2619-
mock_extract_context_with_data_streams.return_value = mock_context
2605+
mock_extract.return_value = mock_context
26202606

26212607
result = extract_context_from_sqs_or_sns_event_or_context(
26222608
event, self.lambda_context
26232609
)
26242610

2625-
mock_extract_context_with_data_streams.assert_called_once_with(
2611+
mock_extract.assert_called_once_with(dd_data)
2612+
mock_set_checkpoint.assert_called_once_with(
26262613
dd_data, "sqs", "arn:aws:sqs:us-east-1:123456789012:test-queue"
26272614
)
26282615
self.assertEqual(result, mock_context)
26292616

2630-
@patch("datadog_lambda.tracing._extract_context_with_data_streams")
2617+
@patch("datadog_lambda.tracing._set_data_streams_checkpoint")
2618+
@patch("datadog_lambda.tracing.propagator.extract")
26312619
def test_sns_event_with_datadog_message_attributes(
2632-
self, mock_extract_context_with_data_streams
2620+
self, mock_extract, mock_set_checkpoint
26332621
):
26342622
dd_data = {"dd-pathway-ctx-base64": "12345"}
26352623
dd_json_data = json.dumps(dd_data)
@@ -2649,20 +2637,22 @@ def test_sns_event_with_datadog_message_attributes(
26492637
}
26502638

26512639
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2652-
mock_extract_context_with_data_streams.return_value = mock_context
2640+
mock_extract.return_value = mock_context
26532641

26542642
result = extract_context_from_sqs_or_sns_event_or_context(
26552643
event, self.lambda_context
26562644
)
26572645

2658-
mock_extract_context_with_data_streams.assert_called_once_with(
2646+
mock_extract.assert_called_once_with(dd_data)
2647+
mock_set_checkpoint.assert_called_once_with(
26592648
dd_data, "sns", "arn:aws:sns:us-east-1:123456789012:test-topic"
26602649
)
26612650
self.assertEqual(result, mock_context)
26622651

2663-
@patch("datadog_lambda.tracing._extract_context_with_data_streams")
2652+
@patch("datadog_lambda.tracing._set_data_streams_checkpoint")
2653+
@patch("datadog_lambda.tracing.propagator.extract")
26642654
def test_sqs_event_determines_is_sqs_true_when_event_source_arn_present(
2665-
self, mock_extract_context_with_data_streams
2655+
self, mock_extract, mock_set_checkpoint
26662656
):
26672657
"""Test that is_sqs = True when eventSourceARN is present in first record"""
26682658
dd_data = {"dd-pathway-ctx-base64": "12345"}
@@ -2680,20 +2670,22 @@ def test_sqs_event_determines_is_sqs_true_when_event_source_arn_present(
26802670
}
26812671

26822672
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2683-
mock_extract_context_with_data_streams.return_value = mock_context
2673+
mock_extract.return_value = mock_context
26842674

26852675
result = extract_context_from_sqs_or_sns_event_or_context(
26862676
event, self.lambda_context
26872677
)
26882678

2689-
mock_extract_context_with_data_streams.assert_called_once_with(
2679+
mock_extract.assert_called_once_with(dd_data)
2680+
mock_set_checkpoint.assert_called_once_with(
26902681
dd_data, "sqs", "arn:aws:sqs:us-east-1:123456789012:test-queue"
26912682
)
26922683
self.assertEqual(result, mock_context)
26932684

2694-
@patch("datadog_lambda.tracing._extract_context_with_data_streams")
2685+
@patch("datadog_lambda.tracing._set_data_streams_checkpoint")
2686+
@patch("datadog_lambda.tracing.propagator.extract")
26952687
def test_sns_to_sqs_event_detection_and_processing(
2696-
self, mock_extract_context_with_data_streams
2688+
self, mock_extract, mock_set_checkpoint
26972689
):
26982690
"""Test SNS->SQS case where SQS body contains SNS notification"""
26992691
dd_data = {"dd-pathway-ctx-base64": "12345"}
@@ -2719,13 +2711,14 @@ def test_sns_to_sqs_event_detection_and_processing(
27192711
}
27202712

27212713
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2722-
mock_extract_context_with_data_streams.return_value = mock_context
2714+
mock_extract.return_value = mock_context
27232715

27242716
result = extract_context_from_sqs_or_sns_event_or_context(
27252717
event, self.lambda_context
27262718
)
27272719

2728-
mock_extract_context_with_data_streams.assert_called_once_with(
2720+
mock_extract.assert_called_once_with(dd_data)
2721+
mock_set_checkpoint.assert_called_once_with(
27292722
dd_data, "sqs", "arn:aws:sqs:us-east-1:123456789012:test-queue"
27302723
)
27312724
self.assertEqual(result, mock_context)
@@ -2735,10 +2728,9 @@ class TestExtractContextFromKinesisEvent(unittest.TestCase):
27352728
def setUp(self):
27362729
self.lambda_context = get_mock_context()
27372730

2738-
@patch("datadog_lambda.tracing._extract_context_with_data_streams")
2739-
def test_kinesis_event_with_datadog_data(
2740-
self, mock_extract_context_with_data_streams
2741-
):
2731+
@patch("datadog_lambda.tracing._set_data_streams_checkpoint")
2732+
@patch("datadog_lambda.tracing.propagator.extract")
2733+
def test_kinesis_event_with_datadog_data(self, mock_extract, mock_set_checkpoint):
27422734
dd_data = {"dd-pathway-ctx-base64": "12345"}
27432735
kinesis_data = {"_datadog": dd_data, "message": "test"}
27442736
kinesis_data_str = json.dumps(kinesis_data)
@@ -2754,11 +2746,12 @@ def test_kinesis_event_with_datadog_data(
27542746
}
27552747

27562748
mock_context = Context(trace_id=12345, span_id=67890, sampling_priority=1)
2757-
mock_extract_context_with_data_streams.return_value = mock_context
2749+
mock_extract.return_value = mock_context
27582750

27592751
result = extract_context_from_kinesis_event(event, self.lambda_context)
27602752

2761-
mock_extract_context_with_data_streams.assert_called_once_with(
2753+
mock_extract.assert_called_once_with(dd_data)
2754+
mock_set_checkpoint.assert_called_once_with(
27622755
dd_data,
27632756
"kinesis",
27642757
"arn:aws:kinesis:us-east-1:123456789012:stream/test-stream",

0 commit comments

Comments
 (0)