6565DD_TRACE_JAVA_TRACE_ID_PADDING = "00000000"
6666HIGHER_64_BITS = "HIGHER_64_BITS"
6767LOWER_64_BITS = "LOWER_64_BITS"
68- PROPAGATION_KEY_BASE_64 = "dd-pathway-ctx-base64"
68+ DSM_PROPAGATION_KEY_BASE_64 = "dd-pathway-ctx-base64"
6969
7070
7171def _dsm_set_checkpoint (context_json , event_type , arn ):
7272 if not config .data_streams_enabled :
7373 return
7474
75- if not isinstance (context_json , dict ):
76- return
77-
78- if context_json and PROPAGATION_KEY_BASE_64 not in context_json :
79- return
75+ if context_json is not None :
76+ if (
77+ not isinstance (context_json , dict )
78+ or DSM_PROPAGATION_KEY_BASE_64 not in context_json
79+ ):
80+ return
8081
8182 try :
8283 from ddtrace .data_streams import set_consume_checkpoint
8384
84- carrier_get = lambda k : context_json .get (k ) # noqa: E731
85+ carrier_get = lambda k : context_json and context_json .get (k ) # noqa: E731
8586 set_consume_checkpoint (event_type , arn , carrier_get , manual_checkpoint = False )
8687 except Exception as e :
8788 logger .debug (
@@ -224,9 +225,7 @@ def create_sns_event(message):
224225 }
225226
226227
227- def extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_ckpt_if_enabled (
228- event , lambda_context
229- ):
228+ def extract_context_from_sqs_or_sns_event_or_context (event , lambda_context ):
230229 """
231230 Extract Datadog trace context from an SQS event.
232231
@@ -241,6 +240,7 @@ def extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_ckpt_if_enabled
241240 Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
242241 """
243242 is_sqs = False
243+ arn = ""
244244
245245 # EventBridge => SQS
246246 try :
@@ -329,12 +329,12 @@ def extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_ckpt_if_enabled
329329 sampling_priority = float (x_ray_context ["sampled" ]),
330330 )
331331 # Still want to set a DSM checkpoint even if DSM context not propagated
332- _dsm_set_checkpoint ({} , "sqs" if is_sqs else "sns" , arn )
332+ _dsm_set_checkpoint (None , "sqs" if is_sqs else "sns" , arn )
333333 return extract_context_from_lambda_context (lambda_context )
334334 except Exception as e :
335335 logger .debug ("The trace extractor returned with error %s" , e )
336336 # Still want to set a DSM checkpoint even if DSM context not propagated
337- _dsm_set_checkpoint ({} , "sqs" if is_sqs else "sns" , arn )
337+ _dsm_set_checkpoint (None , "sqs" if is_sqs else "sns" , arn )
338338 return extract_context_from_lambda_context (lambda_context )
339339
340340
@@ -390,13 +390,12 @@ def extract_context_from_eventbridge_event(event, lambda_context):
390390 return extract_context_from_lambda_context (lambda_context )
391391
392392
393- def extract_context_from_kinesis_event_and_set_dsm_checkpoint_if_enabled (
394- event , lambda_context
395- ):
393+ def extract_context_from_kinesis_event (event , lambda_context ):
396394 """
397395 Extract datadog trace context from a Kinesis Stream's base64 encoded data string
398396 Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
399397 """
398+ arn = ""
400399 try :
401400 record = get_first_record (event )
402401 arn = record .get ("eventSourceARN" , "" )
@@ -419,7 +418,7 @@ def extract_context_from_kinesis_event_and_set_dsm_checkpoint_if_enabled(
419418 except Exception as e :
420419 logger .debug ("The trace extractor returned with error %s" , e )
421420 # Still want to set a DSM checkpoint even if DSM context not propagated
422- _dsm_set_checkpoint ({} , "kinesis" , arn )
421+ _dsm_set_checkpoint (None , "kinesis" , arn )
423422 return extract_context_from_lambda_context (lambda_context )
424423
425424
@@ -636,15 +635,13 @@ def extract_dd_trace_context(
636635 event , lambda_context , event_source , decode_authorizer_context
637636 )
638637 elif event_source .equals (EventTypes .SNS ) or event_source .equals (EventTypes .SQS ):
639- context = extract_context_from_sqs_or_sns_event_or_context_and_set_dsm_ckpt_if_enabled (
638+ context = extract_context_from_sqs_or_sns_event_or_context (
640639 event , lambda_context
641640 )
642641 elif event_source .equals (EventTypes .EVENTBRIDGE ):
643642 context = extract_context_from_eventbridge_event (event , lambda_context )
644643 elif event_source .equals (EventTypes .KINESIS ):
645- context = extract_context_from_kinesis_event_and_set_dsm_checkpoint_if_enabled (
646- event , lambda_context
647- )
644+ context = extract_context_from_kinesis_event (event , lambda_context )
648645 elif event_source .equals (EventTypes .STEPFUNCTIONS ):
649646 context = extract_context_from_step_functions (event , lambda_context )
650647 else :
0 commit comments