Skip to content

Commit 3da61e3

Browse files
new dsm lambda implementation
1 parent f41778c commit 3da61e3

File tree

1 file changed

+47
-3
lines changed

1 file changed

+47
-3
lines changed

datadog_lambda/tracing.py

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@
5050

5151
set_tracer_provider(TracerProvider())
5252

53+
data_streams_enabled = (
54+
os.environ.get("DD_DATA_STREAMS_ENABLED", "false").lower() == "true"
55+
)
5356

5457
logger = logging.getLogger(__name__)
5558

@@ -67,6 +70,37 @@
6770
LOWER_64_BITS = "LOWER_64_BITS"
6871

6972

73+
def _extract_context(context_json, event_type, arn):
74+
from ddtrace.data_streams import set_consume_checkpoint
75+
76+
"""
77+
Extracts the context from a JSON carrier and optionally sets a consume checkpoint
78+
if the context is complete and data streams are enabled.
79+
"""
80+
context = propagator.extract(context_json)
81+
82+
if not _is_context_complete(context):
83+
return context
84+
85+
if not data_streams_enabled:
86+
return context
87+
try:
88+
carrier_get = _create_carrier_get(context_json)
89+
set_consume_checkpoint(event_type, arn, carrier_get, manual_checkpoint=False)
90+
except Exception as e:
91+
logger.debug(
92+
f"DSM:Failed to set consume checkpoint for {event_type} {arn}: {e}"
93+
)
94+
return context
95+
96+
97+
def _create_carrier_get(context_json):
98+
def carrier_get(key):
99+
return context_json.get(key)
100+
101+
return carrier_get
102+
103+
70104
def _convert_xray_trace_id(xray_trace_id):
71105
"""
72106
Convert X-Ray trace id (hex)'s last 63 bits to a Datadog trace id (int).
@@ -215,6 +249,8 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
215249
216250
Falls back to lambda context if no trace data is found in the SQS message attributes.
217251
"""
252+
is_sqs = False
253+
arn = None
218254

219255
# EventBridge => SQS
220256
try:
@@ -226,6 +262,9 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
226262

227263
try:
228264
first_record = event.get("Records")[0]
265+
arn = first_record.get("eventSourceARN", "")
266+
if arn:
267+
is_sqs = True
229268

230269
# logic to deal with SNS => SQS event
231270
if "body" in first_record:
@@ -272,8 +311,12 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
272311
logger.debug(
273312
"Failed to extract Step Functions context from SQS/SNS event."
274313
)
275-
276-
return propagator.extract(dd_data)
314+
if is_sqs:
315+
return _extract_context(dd_data, "sqs", arn)
316+
else:
317+
return _extract_context(
318+
dd_data, "sns", sns_record.get(("TopicArn"))
319+
)
277320
else:
278321
# Handle case where trace context is injected into attributes.AWSTraceHeader
279322
# example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
@@ -360,6 +403,7 @@ def extract_context_from_kinesis_event(event, lambda_context):
360403
"""
361404
try:
362405
record = get_first_record(event)
406+
arn = record.get("eventSourceARN", "")
363407
kinesis = record.get("kinesis")
364408
if not kinesis:
365409
return extract_context_from_lambda_context(lambda_context)
@@ -373,7 +417,7 @@ def extract_context_from_kinesis_event(event, lambda_context):
373417
data_obj = json.loads(data_str)
374418
dd_ctx = data_obj.get("_datadog")
375419
if dd_ctx:
376-
return propagator.extract(dd_ctx)
420+
return _extract_context(dd_ctx, "kinesis", arn)
377421
except Exception as e:
378422
logger.debug("The trace extractor returned with error %s", e)
379423

0 commit comments

Comments
 (0)