Skip to content

Commit ef52dc1

Browse files
fixes, kinesis support, and renaming some functions
1 parent 7cf8901 commit ef52dc1

File tree

3 files changed

+46
-29
lines changed

3 files changed

+46
-29
lines changed

datadog_lambda/tracing.py

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
224224
logger.debug("Failed extracting context as EventBridge to SQS.")
225225

226226
try:
227-
dd_json_data, _ = extract_dd_json_data_from_message_attributes(event)
227+
dd_json_data, _ = extract_dd_context_from_sqs_or_sns_event(event)
228228
if dd_json_data:
229229
dd_data = json.loads(dd_json_data)
230230

@@ -264,7 +264,7 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
264264
return extract_context_from_lambda_context(lambda_context)
265265

266266

267-
def extract_dd_json_data_from_message_attributes(event):
267+
def extract_dd_context_from_sqs_or_sns_event(event):
268268
first_record = event.get("Records")[0]
269269
arn = first_record.get("eventSourceARN", "")
270270
is_sqs = bool(arn)
@@ -366,28 +366,34 @@ def extract_context_from_kinesis_event(event, lambda_context):
366366
Extract datadog trace context from a Kinesis Stream's base64 encoded data string
367367
"""
368368
try:
369-
record = get_first_record(event)
370-
kinesis = record.get("kinesis")
371-
if not kinesis:
372-
return extract_context_from_lambda_context(lambda_context)
373-
data = kinesis.get("data")
374-
if data:
375-
import base64
376-
377-
b64_bytes = data.encode("ascii")
378-
str_bytes = base64.b64decode(b64_bytes)
379-
data_str = str_bytes.decode("ascii")
380-
data_obj = json.loads(data_str)
381-
dd_ctx = data_obj.get("_datadog")
382-
if dd_ctx:
383-
context = propagator.extract(dd_ctx)
384-
return context
369+
dd_ctx, _ = extract_dd_context_from_kinesis_event(event, lambda_context)
370+
if dd_ctx:
371+
return propagator.extract(dd_ctx)
385372
except Exception as e:
386373
logger.debug("The trace extractor returned with error %s", e)
387374

388375
return extract_context_from_lambda_context(lambda_context)
389376

390377

378+
def extract_dd_context_from_kinesis_event(event, lambda_context):
379+
record = get_first_record(event)
380+
arn = record.get("eventSourceARN", "")
381+
kinesis = record.get("kinesis")
382+
if not kinesis:
383+
return extract_context_from_lambda_context(lambda_context)
384+
data = kinesis.get("data")
385+
if data:
386+
import base64
387+
388+
b64_bytes = data.encode("ascii")
389+
str_bytes = base64.b64decode(b64_bytes)
390+
data_str = str_bytes.decode("ascii")
391+
data_obj = json.loads(data_str)
392+
dd_ctx = data_obj.get("_datadog")
393+
return dd_ctx, arn
394+
return None, None
395+
396+
391397
def _deterministic_sha256_hash(s: str, part: str) -> int:
392398
import hashlib
393399

datadog_lambda/wrapper.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -240,20 +240,33 @@ def _before(self, event, context):
240240
event, context, event_source, config.decode_authorizer_context
241241
)
242242
if config.data_streams_enabled:
243+
from datadog_lambda.trigger import EventTypes
244+
243245
from datadog_lambda.tracing import (
244-
extract_dd_json_data_from_message_attributes,
246+
extract_dd_context_from_sqs_or_sns_event,
247+
extract_dd_context_from_kinesis_event,
245248
)
246249

250+
dd_json_data = None
251+
arn = None
252+
247253
try:
248-
(
249-
dd_json_data,
250-
arn,
251-
) = extract_dd_json_data_from_message_attributes(event)
254+
if event_source.equals(EventTypes.SQS) or event_source.equals(
255+
EventTypes.SNS
256+
):
257+
(
258+
dd_json_data,
259+
arn,
260+
) = extract_dd_context_from_sqs_or_sns_event(event)
261+
elif event_source.equals(EventTypes.KINESIS):
262+
dd_json_data, arn = extract_dd_context_from_kinesis_event(
263+
event, context
264+
)
252265
except Exception as e:
253266
logger.debug(f"Failed to extract DSM checkpoint: {e}")
254267

255268
if dd_json_data:
256-
set_dsm_checkpoint(dd_json_data, event_source, arn)
269+
set_dsm_checkpoint(dd_json_data, event_source.to_string(), arn)
257270

258271
self.span = create_function_execution_span(
259272
context=context,
@@ -363,7 +376,8 @@ def set_dsm_checkpoint(dd_json_data, event_source, arn):
363376
from ddtrace.data_streams import set_consume_checkpoint
364377

365378
try:
366-
dd_json_data = json.loads(dd_json_data)
379+
if type(dd_json_data) is not dict:
380+
dd_json_data = json.loads(dd_json_data)
367381
if "dd-pathway-ctx-base64" not in dd_json_data:
368382
return
369383

@@ -374,9 +388,7 @@ def carrier_get(key):
374388
return carrier_get
375389

376390
carrier_get = create_carrier_get(dd_json_data)
377-
set_consume_checkpoint(
378-
event_source.to_string(), arn, carrier_get, manual_checkpoint=False
379-
)
391+
set_consume_checkpoint(event_source, arn, carrier_get, manual_checkpoint=False)
380392
except Exception as e:
381393
logger.debug(f"Failed to set DSM checkpoint: {e}")
382394

tests/test_tracing.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import base64
21
import copy
32
import functools
43
import json

0 commit comments

Comments
 (0)