Skip to content

Commit 9dca396

Browse files
some fixes
1 parent 0b01978 commit 9dca396

File tree

2 files changed

+41
-27
lines changed

2 files changed

+41
-27
lines changed

datadog_lambda/tracing.py

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,20 +65,12 @@
6565
DD_TRACE_JAVA_TRACE_ID_PADDING = "00000000"
6666
HIGHER_64_BITS = "HIGHER_64_BITS"
6767
LOWER_64_BITS = "LOWER_64_BITS"
68-
DSM_PROPAGATION_KEY_BASE_64 = "dd-pathway-ctx-base64"
6968

7069

7170
def _dsm_set_checkpoint(context_json, event_type, arn):
7271
if not config.data_streams_enabled:
7372
return
7473

75-
if context_json is not None:
76-
if (
77-
not isinstance(context_json, dict)
78-
or DSM_PROPAGATION_KEY_BASE_64 not in context_json
79-
):
80-
return
81-
8274
try:
8375
from ddtrace.data_streams import set_consume_checkpoint
8476

@@ -239,8 +231,8 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
239231
Falls back to lambda context if no trace data is found in the SQS message attributes.
240232
Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
241233
"""
242-
is_sqs = False
243234
arn = ""
235+
event_source = parse_event_source(event)
244236

245237
# EventBridge => SQS
246238
try:
@@ -253,8 +245,6 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
253245
try:
254246
first_record = event.get("Records")[0]
255247
arn = first_record.get("eventSourceARN", "")
256-
if arn:
257-
is_sqs = True
258248

259249
# logic to deal with SNS => SQS event
260250
if "body" in first_record:
@@ -270,7 +260,8 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
270260
msg_attributes = first_record.get("messageAttributes")
271261
if msg_attributes is None:
272262
sns_record = first_record.get("Sns") or {}
273-
if not is_sqs:
263+
# SNS->SQS event would extract SNS arn without this check
264+
if event_source.equals(EventTypes.SNS):
274265
arn = sns_record.get("TopicArn", "")
275266
msg_attributes = sns_record.get("MessageAttributes") or {}
276267
dd_payload = msg_attributes.get("_datadog")
@@ -304,7 +295,14 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
304295
"Failed to extract Step Functions context from SQS/SNS event."
305296
)
306297
context = propagator.extract(dd_data)
307-
_dsm_set_checkpoint(dd_data, "sqs" if is_sqs else "sns", arn)
298+
# Do not want to set checkpoint with "" arn
299+
if arn:
300+
_dsm_set_checkpoint(
301+
dd_data,
302+
# In this function only recieves SQS and SNS events, if not SQS must be SNS
303+
"sqs" if event_source.equals(EventTypes.SQS) else "sns",
304+
arn,
305+
)
308306
return context
309307
else:
310308
# Handle case where trace context is injected into attributes.AWSTraceHeader
@@ -329,12 +327,23 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
329327
sampling_priority=float(x_ray_context["sampled"]),
330328
)
331329
# Still want to set a DSM checkpoint even if DSM context not propagated
332-
_dsm_set_checkpoint(None, "sqs" if is_sqs else "sns", arn)
330+
# In this function only recieves SQS and SNS events, if not SQS must be SNS
331+
# Do not want to set checkpoint with "" arn
332+
if arn:
333+
_dsm_set_checkpoint(
334+
None, "sqs" if event_source.equals(EventTypes.SQS) else "sns", arn
335+
)
333336
return extract_context_from_lambda_context(lambda_context)
334337
except Exception as e:
335338
logger.debug("The trace extractor returned with error %s", e)
336339
# Still want to set a DSM checkpoint even if DSM context not propagated
337-
_dsm_set_checkpoint(None, "sqs" if is_sqs else "sns", arn)
340+
# Do not want to set checkpoint with "" arn
341+
if arn:
342+
_dsm_set_checkpoint(
343+
None,
344+
"sqs" if event_source.equals(EventTypes.SQS) else "sns",
345+
arn,
346+
)
338347
return extract_context_from_lambda_context(lambda_context)
339348

340349

@@ -418,7 +427,9 @@ def extract_context_from_kinesis_event(event, lambda_context):
418427
except Exception as e:
419428
logger.debug("The trace extractor returned with error %s", e)
420429
# Still want to set a DSM checkpoint even if DSM context not propagated
421-
_dsm_set_checkpoint(None, "kinesis", arn)
430+
# Do not want to set checkpoint with "" arn
431+
if arn:
432+
_dsm_set_checkpoint(None, "kinesis", arn)
422433
return extract_context_from_lambda_context(lambda_context)
423434

424435

tests/test_tracing.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2494,13 +2494,6 @@ def test_dsm_set_checkpoint_exception_path(self):
24942494
self.mock_checkpoint.assert_called_once()
24952495
self.mock_logger.debug.assert_called_once()
24962496

2497-
@patch("ddtrace.data_streams.set_consume_checkpoint")
2498-
def test_dsm_set_checkpoint_non_dict_context(self, mock_checkpoint):
2499-
_dsm_set_checkpoint(
2500-
"not_a_dict", "sqs", "arn:aws:sqs:us-east-1:123456789012:test-queue"
2501-
)
2502-
mock_checkpoint.assert_not_called()
2503-
25042497
@patch("ddtrace.data_streams.set_consume_checkpoint")
25052498
def test_dsm_set_checkpoint_DSM_PROPAGATION_KEY_BASE_64_not_present(
25062499
self, mock_checkpoint
@@ -2532,6 +2525,7 @@ def test_sqs_event_with_datadog_message_attributes(
25322525
"messageAttributes": {
25332526
"_datadog": {"dataType": "String", "stringValue": dd_json_data}
25342527
},
2528+
"eventSource": "aws:sqs",
25352529
}
25362530
]
25372531
}
@@ -2565,6 +2559,7 @@ def test_sqs_event_with_binary_datadog_message_attributes(
25652559
"messageAttributes": {
25662560
"_datadog": {"dataType": "Binary", "binaryValue": encoded_data}
25672561
},
2562+
"eventSource": "aws:sqs",
25682563
}
25692564
]
25702565
}
@@ -2600,6 +2595,7 @@ def test_sns_event_with_datadog_message_attributes(
26002595
"_datadog": {"Type": "String", "Value": dd_json_data}
26012596
},
26022597
},
2598+
"eventSource": "aws:sns",
26032599
}
26042600
]
26052601
}
@@ -2633,6 +2629,7 @@ def test_sqs_event_determines_is_sqs_true_when_event_source_arn_present(
26332629
"messageAttributes": {
26342630
"_datadog": {"dataType": "String", "stringValue": dd_json_data}
26352631
},
2632+
"eventSource": "aws:sqs",
26362633
}
26372634
]
26382635
}
@@ -2674,6 +2671,7 @@ def test_sns_to_sqs_event_detection_and_processing(
26742671
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue",
26752672
"body": json.dumps(sns_notification),
26762673
"messageAttributes": {},
2674+
"eventSource": "aws:sqs",
26772675
}
26782676
]
26792677
}
@@ -2701,6 +2699,7 @@ def test_sqs_event_without_datadog_message_attributes(
27012699
{
27022700
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue",
27032701
"messageAttributes": {},
2702+
"eventSource": "aws:sqs",
27042703
}
27052704
]
27062705
}
@@ -2732,6 +2731,7 @@ def test_sqs_event_with_malformed_datadog_message_attributes(
27322731
"messageAttributes": {
27332732
"_datadog": {"dataType": "String", "stringValue": dd_json_data}
27342733
},
2734+
"eventSource": "aws:sqs",
27352735
}
27362736
]
27372737
}
@@ -2760,7 +2760,8 @@ def test_sns_event_without_datadog_message_attributes(
27602760
"Sns": {
27612761
"TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic",
27622762
"MessageAttributes": {},
2763-
}
2763+
},
2764+
"eventSource": "aws:sns",
27642765
}
27652766
]
27662767
}
@@ -2793,7 +2794,8 @@ def test_sns_event_with_malformed_datadog_message_attributes(
27932794
"MessageAttributes": {
27942795
"_datadog": {"Type": "String", "Value": dd_json_data}
27952796
},
2796-
}
2797+
},
2798+
"eventSource": "aws:sns",
27972799
}
27982800
]
27992801
}
@@ -2834,6 +2836,7 @@ def test_sns_to_sqs_event_with_malformed_datadog_message_attributes(
28342836
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue",
28352837
"body": json.dumps(sns_notification),
28362838
"messageAttributes": {},
2839+
"eventSource": "aws:sqs",
28372840
}
28382841
]
28392842
}
@@ -2866,7 +2869,7 @@ def test_sqs_sns_event_with_exception_accessing_first_record(
28662869
event, self.lambda_context
28672870
)
28682871

2869-
mock_dsm_set_checkpoint.assert_called_once_with(None, "sns", "")
2872+
mock_dsm_set_checkpoint.assert_not_called()
28702873
mock_extract_from_lambda_context.assert_called_once_with(self.lambda_context)
28712874
self.assertEqual(result, mock_context)
28722875

@@ -2982,6 +2985,6 @@ def test_kinesis_event_with_exception_accessing_first_record(
29822985

29832986
# Verify that _dsm_set_checkpoint is called with empty string for arn
29842987
# even when exception occurs
2985-
mock_dsm_set_checkpoint.assert_called_once_with(None, "kinesis", "")
2988+
mock_dsm_set_checkpoint.assert_not_called()
29862989
mock_extract_from_lambda_context.assert_called_once_with(self.lambda_context)
29872990
self.assertEqual(result, mock_context)

0 commit comments

Comments
 (0)