Skip to content

Commit 9996fbb

Browse files
use lambda function, add checks before checkpoint
1 parent 4ad2bab commit 9996fbb

File tree

2 files changed

+24
-69
lines changed

2 files changed

+24
-69
lines changed

datadog_lambda/tracing.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,30 +68,28 @@
6868

6969

7070
def _dsm_set_checkpoint(context_json, event_type, arn):
71+
if not config.data_streams_enabled:
72+
return
73+
7174
if not isinstance(context_json, dict):
7275
return
7376

74-
if not config.data_streams_enabled:
77+
from ddtrace.data_streams import PROPAGATION_KEY_BASE_64
78+
79+
if context_json and PROPAGATION_KEY_BASE_64 not in context_json:
7580
return
7681

7782
try:
7883
from ddtrace.data_streams import set_consume_checkpoint
7984

80-
carrier_get = _create_carrier_get(context_json)
85+
carrier_get = lambda k: context_json.get(k) # noqa: E731
8186
set_consume_checkpoint(event_type, arn, carrier_get, manual_checkpoint=False)
8287
except Exception as e:
8388
logger.debug(
8489
f"DSM:Failed to set consume checkpoint for {event_type} {arn}: {e}"
8590
)
8691

8792

88-
def _create_carrier_get(context_json):
89-
def carrier_get(key):
90-
return context_json.get(key)
91-
92-
return carrier_get
93-
94-
9593
def _convert_xray_trace_id(xray_trace_id):
9694
"""
9795
Convert X-Ray trace id (hex)'s last 63 bits to a Datadog trace id (int).

tests/test_tracing.py

Lines changed: 17 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
propagator,
4444
emit_telemetry_on_exception_outside_of_handler,
4545
_dsm_set_checkpoint,
46-
_create_carrier_get,
4746
extract_context_from_kinesis_event_and_set_dsm_checkpoint_if_enabled,
4847
extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_ckpt_if_enabled,
4948
)
@@ -60,6 +59,7 @@
6059
fake_xray_header_value_root_decimal = "3995693151288333088"
6160

6261
event_samples = "tests/event_samples/"
62+
PROPAGATION_KEY_BASE_64 = "dd-pathway-ctx-base64"
6363

6464

6565
def with_trace_propagation_style(style):
@@ -2457,7 +2457,7 @@ def setUp(self):
24572457

24582458
@patch("datadog_lambda.config.Config.data_streams_enabled", False)
24592459
def test_dsm_set_checkpoint_data_streams_disabled(self):
2460-
context_json = {"dd-pathway-ctx-base64": "12345"}
2460+
context_json = {PROPAGATION_KEY_BASE_64: "12345"}
24612461
event_type = "sqs"
24622462
arn = "arn:aws:sqs:us-east-1:123456789012:test-queue"
24632463

@@ -2467,7 +2467,7 @@ def test_dsm_set_checkpoint_data_streams_disabled(self):
24672467

24682468
@patch("datadog_lambda.config.Config.data_streams_enabled", True)
24692469
def test_dsm_set_checkpoint_data_streams_enabled_complete_context(self):
2470-
context_json = {"dd-pathway-ctx-base64": "12345"}
2470+
context_json = {PROPAGATION_KEY_BASE_64: "12345"}
24712471
event_type = "sqs"
24722472
arn = "arn:aws:sqs:us-east-1:123456789012:test-queue"
24732473

@@ -2482,7 +2482,7 @@ def test_dsm_set_checkpoint_data_streams_enabled_complete_context(self):
24822482

24832483
@patch("datadog_lambda.config.Config.data_streams_enabled", True)
24842484
def test_dsm_set_checkpoint_exception_path(self):
2485-
context_json = {"dd-pathway-ctx-base64": "12345"}
2485+
context_json = {PROPAGATION_KEY_BASE_64: "12345"}
24862486
event_type = "sqs"
24872487
arn = "arn:aws:sqs:us-east-1:123456789012:test-queue"
24882488

@@ -2495,67 +2495,24 @@ def test_dsm_set_checkpoint_exception_path(self):
24952495
self.mock_logger.debug.assert_called_once()
24962496

24972497
@patch("ddtrace.data_streams.set_consume_checkpoint")
2498-
def test_dsm_set_checkpoint_non_dict_context_sqs(self, mock_checkpoint):
2498+
def test_dsm_set_checkpoint_non_dict_context(self, mock_checkpoint):
24992499
_dsm_set_checkpoint(
25002500
"not_a_dict", "sqs", "arn:aws:sqs:us-east-1:123456789012:test-queue"
25012501
)
25022502
mock_checkpoint.assert_not_called()
25032503

25042504
@patch("ddtrace.data_streams.set_consume_checkpoint")
2505-
def test_dsm_set_checkpoint_non_dict_context_sns_to_sqs(self, mock_checkpoint):
2506-
_dsm_set_checkpoint(
2507-
["not", "a", "dict"], "sqs", "arn:aws:sqs:us-east-1:123456789012:test"
2508-
)
2509-
mock_checkpoint.assert_not_called()
2510-
2511-
@patch("ddtrace.data_streams.set_consume_checkpoint")
2512-
def test_dsm_set_checkpoint_non_dict_context_kinesis(self, mock_checkpoint):
2513-
_dsm_set_checkpoint(
2514-
12345, "kinesis", "arn:aws:kinesis:us-east-1:123456789012:stream/test"
2515-
)
2516-
mock_checkpoint.assert_not_called()
2517-
2518-
@patch("ddtrace.data_streams.set_consume_checkpoint")
2519-
def test_dsm_set_checkpoint_non_dict_context_sns(self, mock_checkpoint):
2505+
def test_dsm_set_checkpoint_PROPAGATION_KEY_BASE_64_not_present(
2506+
self, mock_checkpoint
2507+
):
25202508
_dsm_set_checkpoint(
2521-
None, "sns", "arn:aws:sns:us-east-1:123456789012:test-topic"
2509+
{"not_a_dict": "not_a_dict"},
2510+
"sqs",
2511+
"arn:aws:sqs:us-east-1:123456789012:test-queue",
25222512
)
25232513
mock_checkpoint.assert_not_called()
25242514

25252515

2526-
class TestCreateCarrierGet(unittest.TestCase):
2527-
def test_create_carrier_get_with_valid_data(self):
2528-
context_json = {
2529-
"x-datadog-trace-id": "12345",
2530-
"x-datadog-parent-id": "67890",
2531-
"x-datadog-sampling-priority": "1",
2532-
}
2533-
2534-
carrier_get = _create_carrier_get(context_json)
2535-
2536-
self.assertTrue(callable(carrier_get))
2537-
self.assertEqual(carrier_get("x-datadog-trace-id"), "12345")
2538-
self.assertEqual(carrier_get("x-datadog-parent-id"), "67890")
2539-
self.assertEqual(carrier_get("x-datadog-sampling-priority"), "1")
2540-
2541-
def test_create_carrier_get_with_missing_key(self):
2542-
context_json = {"x-datadog-trace-id": "12345"}
2543-
2544-
carrier_get = _create_carrier_get(context_json)
2545-
2546-
self.assertTrue(callable(carrier_get))
2547-
self.assertEqual(carrier_get("x-datadog-trace-id"), "12345")
2548-
self.assertIsNone(carrier_get("x-datadog-parent-id"))
2549-
2550-
def test_create_carrier_get_with_empty_context(self):
2551-
context_json = {}
2552-
2553-
carrier_get = _create_carrier_get(context_json)
2554-
2555-
self.assertTrue(callable(carrier_get))
2556-
self.assertIsNone(carrier_get("any-key"))
2557-
2558-
25592516
class TestExtractContextFromSqsOrSnsEventWithDSMLogic(unittest.TestCase):
25602517
def setUp(self):
25612518
self.lambda_context = get_mock_context()
@@ -2565,7 +2522,7 @@ def setUp(self):
25652522
def test_sqs_event_with_datadog_message_attributes(
25662523
self, mock_extract, mock_dsm_set_checkpoint
25672524
):
2568-
dd_data = {"dd-pathway-ctx-base64": "12345"}
2525+
dd_data = {PROPAGATION_KEY_BASE_64: "12345"}
25692526
dd_json_data = json.dumps(dd_data)
25702527

25712528
event = {
@@ -2597,7 +2554,7 @@ def test_sqs_event_with_datadog_message_attributes(
25972554
def test_sqs_event_with_binary_datadog_message_attributes(
25982555
self, mock_extract, mock_dsm_set_checkpoint
25992556
):
2600-
dd_data = {"dd-pathway-ctx-base64": "12345"}
2557+
dd_data = {PROPAGATION_KEY_BASE_64: "12345"}
26012558
dd_json_data = json.dumps(dd_data)
26022559
encoded_data = base64.b64encode(dd_json_data.encode()).decode()
26032560

@@ -2630,7 +2587,7 @@ def test_sqs_event_with_binary_datadog_message_attributes(
26302587
def test_sns_event_with_datadog_message_attributes(
26312588
self, mock_extract, mock_dsm_set_checkpoint
26322589
):
2633-
dd_data = {"dd-pathway-ctx-base64": "12345"}
2590+
dd_data = {PROPAGATION_KEY_BASE_64: "12345"}
26342591
dd_json_data = json.dumps(dd_data)
26352592

26362593
event = {
@@ -2666,7 +2623,7 @@ def test_sqs_event_determines_is_sqs_true_when_event_source_arn_present(
26662623
self, mock_extract, mock_dsm_set_checkpoint
26672624
):
26682625
"""Test that is_sqs = True when eventSourceARN is present in first record"""
2669-
dd_data = {"dd-pathway-ctx-base64": "12345"}
2626+
dd_data = {PROPAGATION_KEY_BASE_64: "12345"}
26702627
dd_json_data = json.dumps(dd_data)
26712628

26722629
event = {
@@ -2699,7 +2656,7 @@ def test_sns_to_sqs_event_detection_and_processing(
26992656
self, mock_extract, mock_dsm_set_checkpoint
27002657
):
27012658
"""Test SNS->SQS case where SQS body contains SNS notification"""
2702-
dd_data = {"dd-pathway-ctx-base64": "12345"}
2659+
dd_data = {PROPAGATION_KEY_BASE_64: "12345"}
27032660
dd_json_data = json.dumps(dd_data)
27042661

27052662
sns_notification = {
@@ -2904,7 +2861,7 @@ def setUp(self):
29042861
def test_kinesis_event_with_datadog_data(
29052862
self, mock_extract, mock_dsm_set_checkpoint
29062863
):
2907-
dd_data = {"dd-pathway-ctx-base64": "12345"}
2864+
dd_data = {PROPAGATION_KEY_BASE_64: "12345"}
29082865
kinesis_data = {"_datadog": dd_data, "message": "test"}
29092866
kinesis_data_str = json.dumps(kinesis_data)
29102867
encoded_data = base64.b64encode(kinesis_data_str.encode()).decode()

0 commit comments

Comments
 (0)