Skip to content

Commit b5af84f

Browse files
remove unneccessary checks, still set checkpoints even when dsm context fails to be propagated
1 parent d8a4379 commit b5af84f

File tree

2 files changed

+262
-16
lines changed

2 files changed

+262
-16
lines changed

datadog_lambda/tracing.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,17 @@
6969

7070
def _dsm_set_checkpoint(context_json, event_type, arn):
7171
from ddtrace.data_streams import set_consume_checkpoint
72-
from ddtrace.data_streams import PROPAGATION_KEY_BASE_64
7372

7473
"""
7574
Extracts the context from a JSON carrier and optionally sets a dsm consume checkpoint
7675
if the context is complete and data streams are enabled.
7776
"""
78-
79-
if PROPAGATION_KEY_BASE_64 not in context_json:
77+
if not isinstance(context_json, dict):
8078
return
8179

8280
if not config.data_streams_enabled:
8381
return
82+
8483
try:
8584
carrier_get = _create_carrier_get(context_json)
8685
set_consume_checkpoint(event_type, arn, carrier_get, manual_checkpoint=False)
@@ -336,9 +335,13 @@ def extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_ckpt_if_enabled
336335
span_id=int(x_ray_context["parent_id"], 16),
337336
sampling_priority=float(x_ray_context["sampled"]),
338337
)
338+
# Still want to set a DSM checkpoint even if DSM context not propagated
339+
_dsm_set_checkpoint({}, "sqs" if is_sqs else "sns", arn)
339340
return extract_context_from_lambda_context(lambda_context)
340341
except Exception as e:
341342
logger.debug("The trace extractor returned with error %s", e)
343+
# Still want to set a DSM checkpoint even if DSM context not propagated
344+
_dsm_set_checkpoint({}, "sqs" if is_sqs else "sns", arn)
342345
return extract_context_from_lambda_context(lambda_context)
343346

344347

@@ -422,7 +425,8 @@ def extract_context_from_kinesis_event_and_set_dsm_checkpoint_if_enabled(
422425
return context
423426
except Exception as e:
424427
logger.debug("The trace extractor returned with error %s", e)
425-
428+
# Still want to set a DSM checkpoint even if DSM context not propagated
429+
_dsm_set_checkpoint({}, "kinesis", arn)
426430
return extract_context_from_lambda_context(lambda_context)
427431

428432

tests/test_tracing.py

Lines changed: 254 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2480,16 +2480,6 @@ def test_dsm_set_checkpoint_data_streams_enabled_complete_context(self):
24802480
self.assertTrue(callable(args[2]))
24812481
self.assertEqual(kwargs["manual_checkpoint"], False)
24822482

2483-
@patch("datadog_lambda.config.Config.data_streams_enabled", True)
2484-
def test_dsm_set_checkpoint_data_streams_enabled_invalid_context(self):
2485-
context_json = {"something-malformed": "12345"}
2486-
event_type = "sqs"
2487-
arn = "arn:aws:sqs:us-east-1:123456789012:test-queue"
2488-
2489-
_dsm_set_checkpoint(context_json, event_type, arn)
2490-
2491-
self.mock_checkpoint.assert_not_called()
2492-
24932483
@patch("datadog_lambda.config.Config.data_streams_enabled", True)
24942484
def test_dsm_set_checkpoint_exception_path(self):
24952485
context_json = {"dd-pathway-ctx-base64": "12345"}
@@ -2504,6 +2494,34 @@ def test_dsm_set_checkpoint_exception_path(self):
25042494
self.mock_checkpoint.assert_called_once()
25052495
self.mock_logger.debug.assert_called_once()
25062496

2497+
@patch("ddtrace.data_streams.set_consume_checkpoint")
2498+
def test_dsm_set_checkpoint_non_dict_context_sqs(self, mock_checkpoint):
2499+
_dsm_set_checkpoint(
2500+
"not_a_dict", "sqs", "arn:aws:sqs:us-east-1:123456789012:test-queue"
2501+
)
2502+
mock_checkpoint.assert_not_called()
2503+
2504+
@patch("ddtrace.data_streams.set_consume_checkpoint")
2505+
def test_dsm_set_checkpoint_non_dict_context_sns_to_sqs(self, mock_checkpoint):
2506+
_dsm_set_checkpoint(
2507+
["not", "a", "dict"], "sqs", "arn:aws:sqs:us-east-1:123456789012:test"
2508+
)
2509+
mock_checkpoint.assert_not_called()
2510+
2511+
@patch("ddtrace.data_streams.set_consume_checkpoint")
2512+
def test_dsm_set_checkpoint_non_dict_context_kinesis(self, mock_checkpoint):
2513+
_dsm_set_checkpoint(
2514+
12345, "kinesis", "arn:aws:kinesis:us-east-1:123456789012:stream/test"
2515+
)
2516+
mock_checkpoint.assert_not_called()
2517+
2518+
@patch("ddtrace.data_streams.set_consume_checkpoint")
2519+
def test_dsm_set_checkpoint_non_dict_context_sns(self, mock_checkpoint):
2520+
_dsm_set_checkpoint(
2521+
None, "sns", "arn:aws:sns:us-east-1:123456789012:test-topic"
2522+
)
2523+
mock_checkpoint.assert_not_called()
2524+
25072525

25082526
class TestCreateCarrierGet(unittest.TestCase):
25092527
def test_create_carrier_get_with_valid_data(self):
@@ -2538,7 +2556,7 @@ def test_create_carrier_get_with_empty_context(self):
25382556
self.assertIsNone(carrier_get("any-key"))
25392557

25402558

2541-
class TestExtractContextFromSqsOrSnsEvent(unittest.TestCase):
2559+
class TestExtractContextFromSqsOrSnsEventWithDSMLogic(unittest.TestCase):
25422560
def setUp(self):
25432561
self.lambda_context = get_mock_context()
25442562

@@ -2716,8 +2734,168 @@ def test_sns_to_sqs_event_detection_and_processing(
27162734
)
27172735
self.assertEqual(result, mock_context)
27182736

2737+
@patch("datadog_lambda.tracing.extract_context_from_lambda_context")
2738+
@patch("datadog_lambda.tracing._dsm_set_checkpoint")
2739+
def test_sqs_event_without_datadog_message_attributes(
2740+
self, mock_dsm_set_checkpoint, mock_extract_from_lambda_context
2741+
):
2742+
event = {
2743+
"Records": [
2744+
{
2745+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue",
2746+
"messageAttributes": {},
2747+
}
2748+
]
2749+
}
2750+
2751+
mock_context = Context(trace_id=123, span_id=456)
2752+
mock_extract_from_lambda_context.return_value = mock_context
2753+
2754+
result = extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_ckpt_if_enabled(
2755+
event, self.lambda_context
2756+
)
2757+
2758+
mock_dsm_set_checkpoint.assert_called_once_with(
2759+
{}, "sqs", "arn:aws:sqs:us-east-1:123456789012:test-queue"
2760+
)
2761+
mock_extract_from_lambda_context.assert_called_once_with(self.lambda_context)
2762+
self.assertEqual(result, mock_context)
2763+
2764+
@patch("datadog_lambda.tracing.extract_context_from_lambda_context")
2765+
@patch("datadog_lambda.tracing._dsm_set_checkpoint")
2766+
def test_sqs_event_with_malformed_datadog_message_attributes(
2767+
self, mock_dsm_set_checkpoint, mock_extract_from_lambda_context
2768+
):
2769+
dd_json_data = "not-json"
2770+
2771+
event = {
2772+
"Records": [
2773+
{
2774+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue",
2775+
"messageAttributes": {
2776+
"_datadog": {"dataType": "String", "stringValue": dd_json_data}
2777+
},
2778+
}
2779+
]
2780+
}
2781+
2782+
mock_context = Context(trace_id=123, span_id=456)
2783+
mock_extract_from_lambda_context.return_value = mock_context
2784+
2785+
result = extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_ckpt_if_enabled(
2786+
event, self.lambda_context
2787+
)
2788+
2789+
mock_dsm_set_checkpoint.assert_called_once_with(
2790+
{}, "sqs", "arn:aws:sqs:us-east-1:123456789012:test-queue"
2791+
)
2792+
mock_extract_from_lambda_context.assert_called_once_with(self.lambda_context)
2793+
self.assertEqual(result, mock_context)
2794+
2795+
@patch("datadog_lambda.tracing.extract_context_from_lambda_context")
2796+
@patch("datadog_lambda.tracing._dsm_set_checkpoint")
2797+
def test_sns_event_without_datadog_message_attributes(
2798+
self, mock_dsm_set_checkpoint, mock_extract_from_lambda_context
2799+
):
2800+
event = {
2801+
"Records": [
2802+
{
2803+
"Sns": {
2804+
"TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic",
2805+
"MessageAttributes": {},
2806+
}
2807+
}
2808+
]
2809+
}
2810+
2811+
mock_context = Context(trace_id=123, span_id=456)
2812+
mock_extract_from_lambda_context.return_value = mock_context
2813+
2814+
result = extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_ckpt_if_enabled(
2815+
event, self.lambda_context
2816+
)
2817+
2818+
mock_dsm_set_checkpoint.assert_called_once_with(
2819+
{}, "sns", "arn:aws:sns:us-east-1:123456789012:test-topic"
2820+
)
2821+
mock_extract_from_lambda_context.assert_called_once_with(self.lambda_context)
2822+
self.assertEqual(result, mock_context)
2823+
2824+
@patch("datadog_lambda.tracing.extract_context_from_lambda_context")
2825+
@patch("datadog_lambda.tracing._dsm_set_checkpoint")
2826+
def test_sns_event_with_malformed_datadog_message_attributes(
2827+
self, mock_dsm_set_checkpoint, mock_extract_from_lambda_context
2828+
):
2829+
dd_json_data = "not-json"
2830+
2831+
event = {
2832+
"Records": [
2833+
{
2834+
"Sns": {
2835+
"TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic",
2836+
"MessageAttributes": {
2837+
"_datadog": {"Type": "String", "Value": dd_json_data}
2838+
},
2839+
}
2840+
}
2841+
]
2842+
}
2843+
2844+
mock_context = Context(trace_id=123, span_id=456)
2845+
mock_extract_from_lambda_context.return_value = mock_context
2846+
2847+
result = extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_ckpt_if_enabled(
2848+
event, self.lambda_context
2849+
)
2850+
2851+
mock_dsm_set_checkpoint.assert_called_once_with(
2852+
{}, "sns", "arn:aws:sns:us-east-1:123456789012:test-topic"
2853+
)
2854+
mock_extract_from_lambda_context.assert_called_once_with(self.lambda_context)
2855+
self.assertEqual(result, mock_context)
2856+
2857+
@patch("datadog_lambda.tracing.extract_context_from_lambda_context")
2858+
@patch("datadog_lambda.tracing._dsm_set_checkpoint")
2859+
def test_sns_to_sqs_event_with_malformed_datadog_message_attributes(
2860+
self, mock_dsm_set_checkpoint, mock_extract_from_lambda_context
2861+
):
2862+
"""Test SNS->SQS case where SQS body contains SNS notification with malformed datadog ctx"""
2863+
dd_json_data = "not-json"
2864+
2865+
sns_notification = {
2866+
"Type": "Notification",
2867+
"TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic",
2868+
"MessageAttributes": {
2869+
"_datadog": {"Type": "String", "Value": dd_json_data}
2870+
},
2871+
"Message": "test message",
2872+
}
2873+
2874+
event = {
2875+
"Records": [
2876+
{
2877+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue",
2878+
"body": json.dumps(sns_notification),
2879+
"messageAttributes": {},
2880+
}
2881+
]
2882+
}
2883+
2884+
mock_context = Context(trace_id=123, span_id=456)
2885+
mock_extract_from_lambda_context.return_value = mock_context
2886+
2887+
result = extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_ckpt_if_enabled(
2888+
event, self.lambda_context
2889+
)
2890+
2891+
mock_dsm_set_checkpoint.assert_called_once_with(
2892+
{}, "sqs", "arn:aws:sqs:us-east-1:123456789012:test-queue"
2893+
)
2894+
mock_extract_from_lambda_context.assert_called_once_with(self.lambda_context)
2895+
self.assertEqual(result, mock_context)
2896+
27192897

2720-
class TestExtractContextFromKinesisEvent(unittest.TestCase):
2898+
class TestExtractContextFromKinesisEventWithDSMLogic(unittest.TestCase):
27212899
def setUp(self):
27222900
self.lambda_context = get_mock_context()
27232901

@@ -2754,3 +2932,67 @@ def test_kinesis_event_with_datadog_data(
27542932
"arn:aws:kinesis:us-east-1:123456789012:stream/test-stream",
27552933
)
27562934
self.assertEqual(result, mock_context)
2935+
2936+
@patch("datadog_lambda.tracing.extract_context_from_lambda_context")
2937+
@patch("datadog_lambda.tracing._dsm_set_checkpoint")
2938+
def test_kinesis_event_without_datadog_data(
2939+
self, mock_dsm_set_checkpoint, mock_extract_from_lambda_context
2940+
):
2941+
kinesis_data = {"message": "test"}
2942+
kinesis_data_str = json.dumps(kinesis_data)
2943+
encoded_data = base64.b64encode(kinesis_data_str.encode()).decode()
2944+
2945+
event = {
2946+
"Records": [
2947+
{
2948+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream",
2949+
"kinesis": {"data": encoded_data},
2950+
}
2951+
]
2952+
}
2953+
2954+
mock_context = Context(trace_id=123, span_id=456)
2955+
mock_extract_from_lambda_context.return_value = mock_context
2956+
2957+
result = extract_context_from_kinesis_event_and_set_dsm_checkpoint_if_enabled(
2958+
event, self.lambda_context
2959+
)
2960+
2961+
mock_dsm_set_checkpoint.assert_called_once_with(
2962+
{},
2963+
"kinesis",
2964+
"arn:aws:kinesis:us-east-1:123456789012:stream/test-stream",
2965+
)
2966+
mock_extract_from_lambda_context.assert_called_once_with(self.lambda_context)
2967+
self.assertEqual(result, mock_context)
2968+
2969+
@patch("datadog_lambda.tracing.extract_context_from_lambda_context")
2970+
@patch("datadog_lambda.tracing._dsm_set_checkpoint")
2971+
def test_kinesis_event_with_malformed_data(
2972+
self, mock_dsm_set_checkpoint, mock_extract_from_lambda_context
2973+
):
2974+
encoded_data = "not-base64-or-json"
2975+
2976+
event = {
2977+
"Records": [
2978+
{
2979+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream",
2980+
"kinesis": {"data": encoded_data},
2981+
}
2982+
]
2983+
}
2984+
2985+
mock_context = Context(trace_id=123, span_id=456)
2986+
mock_extract_from_lambda_context.return_value = mock_context
2987+
2988+
result = extract_context_from_kinesis_event_and_set_dsm_checkpoint_if_enabled(
2989+
event, self.lambda_context
2990+
)
2991+
2992+
mock_dsm_set_checkpoint.assert_called_once_with(
2993+
{},
2994+
"kinesis",
2995+
"arn:aws:kinesis:us-east-1:123456789012:stream/test-stream",
2996+
)
2997+
mock_extract_from_lambda_context.assert_called_once_with(self.lambda_context)
2998+
self.assertEqual(result, mock_context)

0 commit comments

Comments
 (0)