@@ -406,18 +406,15 @@ def extract_context_from_kinesis_event(event, lambda_context):
406406 Extract datadog trace context from a Kinesis Stream's base64 encoded data string
407407 Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
408408 """
409- source_arn = ""
410409
411- context = None
412- for idx , record in enumerate ( event .get ("Records" , []) ):
410+ apm_context : Context = None
411+ for record in event .get ("Records" , []):
413412 dd_ctx = None
414413 try :
415414 source_arn = record .get ("eventSourceARN" , "" )
416415 kinesis = record .get ("kinesis" )
417416 if not kinesis :
418- if idx == 0 :
419- return extract_context_from_lambda_context (lambda_context )
420- continue
417+ return extract_context_from_lambda_context (lambda_context )
421418 data = kinesis .get ("data" )
422419 if data :
423420 import base64
@@ -427,14 +424,19 @@ def extract_context_from_kinesis_event(event, lambda_context):
427424 data_str = str_bytes .decode ("ascii" )
428425 data_obj = json .loads (data_str )
429426 dd_ctx = data_obj .get ("_datadog" )
430- if dd_ctx and idx == 0 :
431- context = propagator .extract (dd_ctx )
432- if not config .data_streams_enabled :
433- break
427+ if dd_ctx and apm_context is None :
428+ apm_context = propagator .extract (dd_ctx )
434429 except Exception as e :
435430 logger .debug ("The trace extractor returned with error %s" , e )
436- _dsm_set_checkpoint (dd_ctx , "kinesis" , source_arn )
437- return context if context else extract_context_from_lambda_context (lambda_context )
431+ if config .data_streams_enabled :
432+ _dsm_set_checkpoint (dd_ctx , "kinesis" , source_arn )
433+ if not config .data_streams_enabled :
434+ break
435+ return (
436+ apm_context
437+ if apm_context
438+ else extract_context_from_lambda_context (lambda_context )
439+ )
438440
439441
440442def _deterministic_sha256_hash (s : str , part : str ) -> int :
0 commit comments