1- import json
21import logging
3-
2+ import json
43from datadog_lambda .trigger import EventTypes
54
65logger = logging .getLogger (__name__ )
@@ -11,33 +10,8 @@ def set_dsm_context(event, event_source):
1110 _dsm_set_sqs_context (event )
1211
1312
14- def _dsm_set_context_helper (service_type , arn , payload_size , context_json ):
15- """
16- Common helper function for setting DSM context.
17-
18- Args:
19- service_type: The service type string (example: sqs', 'sns')
20- arn: ARN from the record
21- payload_size: payload size of the record
22- context_json: Datadog context for the record
23- """
24- from ddtrace .internal .datastreams import data_streams_processor
25- from ddtrace .internal .datastreams .processor import DsmPathwayCodec
26-
27- processor = data_streams_processor ()
28-
29- try :
30- ctx = DsmPathwayCodec .decode (context_json , processor )
31- ctx .set_checkpoint (
32- ["direction:in" , f"topic:{ arn } " , f"type:{ service_type } " ],
33- payload_size = payload_size ,
34- )
35- except Exception as e :
36- logger .error (f"Unable to set dsm context: { e } " )
37-
38-
3913def _dsm_set_sqs_context (event ):
40- from ddtrace .internal . datastreams . botocore import calculate_sqs_payload_size
14+ from ddtrace .data_streams import set_consume_checkpoint
4115
4216 records = event .get ("Records" )
4317 if records is None :
@@ -46,9 +20,14 @@ def _dsm_set_sqs_context(event):
4620 for record in records :
4721 arn = record .get ("eventSourceARN" , "" )
4822 context_json = _get_dsm_context_from_lambda (record )
49- payload_size = calculate_sqs_payload_size (record , context_json )
23+ if not context_json :
24+ logger .debug ("DataStreams skipped lambda message: %r" , record )
25+ return None
26+
27+ def carrier_get (key ):
28+ return context_json .get (key )
5029
51- _dsm_set_context_helper ("sqs" , arn , payload_size , context_json )
30+ set_consume_checkpoint ("sqs" , arn , carrier_get )
5231
5332
5433def _get_dsm_context_from_lambda (message ):
0 commit comments