Skip to content

Commit f689e4f

Browse files
refactors to remove checkpoint call from being inside extract functions
1 parent edf7c0a commit f689e4f

File tree

3 files changed

+65
-372
lines changed

3 files changed

+65
-372
lines changed

datadog_lambda/tracing.py

Lines changed: 57 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,6 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
239239
240240
Falls back to lambda context if no trace data is found in the SQS message attributes.
241241
"""
242-
is_sqs = False
243242

244243
# EventBridge => SQS
245244
try:
@@ -250,62 +249,18 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
250249
logger.debug("Failed extracting context as EventBridge to SQS.")
251250

252251
try:
253-
first_record = event.get("Records")[0]
254-
arn = first_record.get("eventSourceARN", "")
255-
if arn:
256-
is_sqs = True
257-
258-
# logic to deal with SNS => SQS event
259-
if "body" in first_record:
260-
body_str = first_record.get("body")
261-
try:
262-
body = json.loads(body_str)
263-
if body.get("Type", "") == "Notification" and "TopicArn" in body:
264-
logger.debug("Found SNS message inside SQS event")
265-
first_record = get_first_record(create_sns_event(body))
266-
except Exception:
267-
pass
268-
269-
msg_attributes = first_record.get("messageAttributes")
270-
if msg_attributes is None:
271-
sns_record = first_record.get("Sns") or {}
272-
if not is_sqs:
273-
arn = sns_record.get("TopicArn", "")
274-
msg_attributes = sns_record.get("MessageAttributes") or {}
275-
dd_payload = msg_attributes.get("_datadog")
276-
if dd_payload:
277-
# SQS uses dataType and binaryValue/stringValue
278-
# SNS uses Type and Value
279-
dd_json_data = None
280-
dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType")
281-
if dd_json_data_type == "Binary":
282-
import base64
283-
284-
dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value")
285-
if dd_json_data:
286-
dd_json_data = base64.b64decode(dd_json_data)
287-
elif dd_json_data_type == "String":
288-
dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value")
289-
else:
290-
logger.debug(
291-
"Datadog Lambda Python only supports extracting trace"
292-
"context from String or Binary SQS/SNS message attributes"
293-
)
294-
295-
if dd_json_data:
296-
dd_data = json.loads(dd_json_data)
297-
298-
if is_step_function_event(dd_data):
299-
try:
300-
return extract_context_from_step_functions(dd_data, None)
301-
except Exception:
302-
logger.debug(
303-
"Failed to extract Step Functions context from SQS/SNS event."
304-
)
305-
event_type = "sqs" if is_sqs else "sns"
306-
context = propagator.extract(dd_data)
307-
_set_data_streams_checkpoint(dd_data, event_type, arn)
308-
return context
252+
dd_json_data, _ = extract_dd_json_data_from_message_attributes(event)
253+
if dd_json_data:
254+
dd_data = json.loads(dd_json_data)
255+
256+
if is_step_function_event(dd_data):
257+
try:
258+
return extract_context_from_step_functions(dd_data, None)
259+
except Exception:
260+
logger.debug(
261+
"Failed to extract Step Functions context from SQS/SNS event."
262+
)
263+
return propagator.extract(dd_data)
309264
else:
310265
# Handle case where trace context is injected into attributes.AWSTraceHeader
311266
# example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
@@ -334,6 +289,51 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
334289
return extract_context_from_lambda_context(lambda_context)
335290

336291

292+
def extract_dd_json_data_from_message_attributes(event):
293+
first_record = event.get("Records")[0]
294+
arn = first_record.get("eventSourceARN", "")
295+
is_sqs = bool(arn)
296+
297+
# logic to deal with SNS => SQS event
298+
if "body" in first_record:
299+
body_str = first_record.get("body")
300+
try:
301+
body = json.loads(body_str)
302+
if body.get("Type", "") == "Notification" and "TopicArn" in body:
303+
logger.debug("Found SNS message inside SQS event")
304+
first_record = get_first_record(create_sns_event(body))
305+
except Exception:
306+
pass
307+
308+
msg_attributes = first_record.get("messageAttributes")
309+
if msg_attributes is None:
310+
sns_record = first_record.get("Sns") or {}
311+
if not is_sqs:
312+
arn = sns_record.get("TopicArn", "")
313+
msg_attributes = sns_record.get("MessageAttributes") or {}
314+
dd_payload = msg_attributes.get("_datadog")
315+
if dd_payload:
316+
# SQS uses dataType and binaryValue/stringValue
317+
# SNS uses Type and Value
318+
dd_json_data = None
319+
dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType")
320+
if dd_json_data_type == "Binary":
321+
import base64
322+
323+
dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value")
324+
if dd_json_data:
325+
dd_json_data = base64.b64decode(dd_json_data)
326+
elif dd_json_data_type == "String":
327+
dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value")
328+
else:
329+
logger.debug(
330+
"Datadog Lambda Python only supports extracting trace"
331+
"context from String or Binary SQS/SNS message attributes"
332+
)
333+
return dd_json_data, arn
334+
return None, None
335+
336+
337337
def _extract_context_from_eventbridge_sqs_event(event):
338338
"""
339339
Extracts Datadog trace context from an SQS event triggered by

datadog_lambda/wrapper.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,14 @@ def _before(self, event, context):
234234
)
235235

236236
if config.trace_enabled:
237+
if config.data_streams_enabled:
238+
from datadog_lambda.tracing import extract_dd_json_data_from_message_attributes
239+
from ddtrace.data_streams import set_consume_checkpoint
240+
dd_json_data, arn = extract_dd_json_data_from_message_attributes(event)
241+
if dd_json_data:
242+
carrier_get = lambda k: dd_json_data.get(k)
243+
set_consume_checkpoint(event_source, arn, carrier_get)
244+
237245
set_dd_trace_py_root(trace_context_source, config.merge_xray_traces)
238246
if config.make_inferred_span:
239247
self.inferred_span = create_inferred_span(

0 commit comments

Comments
 (0)