@@ -71,6 +71,9 @@ def _dsm_set_checkpoint(context_json, event_type, arn):
7171 if not config .data_streams_enabled :
7272 return
7373
74+ if not arn :
75+ return
76+
7477 try :
7578 from ddtrace .data_streams import set_consume_checkpoint
7679
@@ -234,6 +237,7 @@ def extract_context_from_sqs_or_sns_event_or_context(
234237 Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
235238 """
236239 source_arn = ""
240+ event_type = "sqs" if event_source .equals (EventTypes .SQS ) else "sns"
237241
238242 # EventBridge => SQS
239243 try :
@@ -296,14 +300,11 @@ def extract_context_from_sqs_or_sns_event_or_context(
296300 "Failed to extract Step Functions context from SQS/SNS event."
297301 )
298302 context = propagator .extract (dd_data )
299- # Do not want to set checkpoint with "" arn
300- if source_arn :
301- _dsm_set_checkpoint (
302- dd_data ,
303- # In this function only recieves SQS and SNS events, if not SQS must be SNS
304- "sqs" if event_source .equals (EventTypes .SQS ) else "sns" ,
305- source_arn ,
306- )
303+ _dsm_set_checkpoint (
304+ dd_data ,
305+ event_type ,
306+ source_arn ,
307+ )
307308 return context
308309 else :
309310 # Handle case where trace context is injected into attributes.AWSTraceHeader
@@ -328,25 +329,20 @@ def extract_context_from_sqs_or_sns_event_or_context(
328329 sampling_priority = float (x_ray_context ["sampled" ]),
329330 )
330331 # Still want to set a DSM checkpoint even if DSM context not propagated
331- # In this function only recieves SQS and SNS events, if not SQS must be SNS
332- # Do not want to set checkpoint with "" arn
333- if source_arn :
334- _dsm_set_checkpoint (
335- None ,
336- "sqs" if event_source .equals (EventTypes .SQS ) else "sns" ,
337- source_arn ,
338- )
332+ _dsm_set_checkpoint (
333+ None ,
334+ event_type ,
335+ source_arn ,
336+ )
339337 return extract_context_from_lambda_context (lambda_context )
340338 except Exception as e :
341339 logger .debug ("The trace extractor returned with error %s" , e )
342340 # Still want to set a DSM checkpoint even if DSM context not propagated
343- # Do not want to set checkpoint with "" arn
344- if source_arn :
345- _dsm_set_checkpoint (
346- None ,
347- "sqs" if event_source .equals (EventTypes .SQS ) else "sns" ,
348- source_arn ,
349- )
341+ _dsm_set_checkpoint (
342+ None ,
343+ event_type ,
344+ source_arn ,
345+ )
350346 return extract_context_from_lambda_context (lambda_context )
351347
352348
@@ -425,9 +421,7 @@ def extract_context_from_kinesis_event(event, lambda_context):
425421 dd_ctx = data_obj .get ("_datadog" )
426422 if dd_ctx :
427423 context = propagator .extract (dd_ctx )
428- # Do not want to set checkpoint with "" arn
429- if source_arn :
430- _dsm_set_checkpoint (dd_ctx , "kinesis" , source_arn )
424+ _dsm_set_checkpoint (dd_ctx , "kinesis" , source_arn )
431425 return context
432426 except Exception as e :
433427 logger .debug ("The trace extractor returned with error %s" , e )
0 commit comments