6666LOWER_64_BITS = "LOWER_64_BITS"
6767
6868
69- def _set_data_streams_checkpoint (context_json , event_type , arn ):
70- if not config .data_streams_enabled :
71- return
72-
73- if "dd-pathway-ctx-base64" not in context_json :
74- return
75-
76- try :
77- from ddtrace .data_streams import set_consume_checkpoint
78-
79- carrier_get = _create_carrier_get (context_json )
80- set_consume_checkpoint (event_type , arn , carrier_get , manual_checkpoint = False )
81- except Exception as e :
82- logger .debug (
83- f"DSM:Failed to set consume checkpoint for { event_type } { arn } : { e } "
84- )
85-
86-
87- def _create_carrier_get (context_json ):
88- def carrier_get (key ):
89- return context_json .get (key )
90-
91- return carrier_get
92-
93-
9469def _convert_xray_trace_id (xray_trace_id ):
9570 """
9671 Convert X-Ray trace id (hex)'s last 63 bits to a Datadog trace id (int).
@@ -392,7 +367,6 @@ def extract_context_from_kinesis_event(event, lambda_context):
392367 """
393368 try :
394369 record = get_first_record (event )
395- arn = record .get ("eventSourceARN" , "" )
396370 kinesis = record .get ("kinesis" )
397371 if not kinesis :
398372 return extract_context_from_lambda_context (lambda_context )
@@ -407,7 +381,6 @@ def extract_context_from_kinesis_event(event, lambda_context):
407381 dd_ctx = data_obj .get ("_datadog" )
408382 if dd_ctx :
409383 context = propagator .extract (dd_ctx )
410- _set_data_streams_checkpoint (dd_ctx , "kinesis" , arn )
411384 return context
412385 except Exception as e :
413386 logger .debug ("The trace extractor returned with error %s" , e )
0 commit comments