|
79 | 79 | DD_REQUESTS_SERVICE_NAME = "DD_REQUESTS_SERVICE_NAME" |
80 | 80 | DD_SERVICE = "DD_SERVICE" |
81 | 81 | DD_ENV = "DD_ENV" |
| 82 | +DD_DATA_STREAMS_ENABLED = "DD_DATA_STREAMS_ENABLED" |
82 | 83 |
|
83 | 84 |
|
84 | 85 | def get_env_as_int(env_key, default_value: int) -> int: |
@@ -190,6 +191,9 @@ def __init__(self, func): |
190 | 191 | self.min_cold_start_trace_duration = get_env_as_int( |
191 | 192 | DD_MIN_COLD_START_DURATION, 3 |
192 | 193 | ) |
| 194 | + self.data_streams_enabled = ( |
| 195 | + os.environ.get(DD_DATA_STREAMS_ENABLED, "false").lower() == "true" |
| 196 | + ) |
193 | 197 | self.local_testing_mode = os.environ.get( |
194 | 198 | DD_LOCAL_TEST, "false" |
195 | 199 | ).lower() in ("true", "1") |
@@ -287,6 +291,41 @@ def _inject_authorizer_span_headers(self, request_id): |
287 | 291 | self.response["context"]["_datadog"] = datadog_data |
288 | 292 |
|
289 | 293 | def _before(self, event, context): |
| 294 | + |
| 295 | + from ddtrace.internal.datastreams.processor import ( |
| 296 | + DataStreamsProcessor as processor, |
| 297 | + DsmPathwayCodec, |
| 298 | + ) |
| 299 | + from ddtrace.internal.datastreams.botocore import ( |
| 300 | + get_datastreams_context, |
| 301 | + calculate_sqs_payload_size, |
| 302 | + ) |
| 303 | + |
| 304 | + def _dsm_set_sqs_context(record): |
| 305 | + try: |
| 306 | + queue_arn = record.get("eventSourceARN", "") |
| 307 | + |
| 308 | + contextjson = get_datastreams_context(record) |
| 309 | + payload_size = calculate_sqs_payload_size(record) |
| 310 | + |
| 311 | + ctx = DsmPathwayCodec.decode(contextjson, processor()) |
| 312 | + ctx.set_checkpoint( |
| 313 | + ["direction:in", "queue:arn:" + queue_arn, "type:sqs"], |
| 314 | + payload_size=payload_size, |
| 315 | + ) |
| 316 | + |
| 317 | + except Exception as e: |
| 318 | + logger.error(format_err_with_traceback(e)) |
| 319 | + |
| 320 | + if self.data_streams_enabled: |
| 321 | + if isinstance(event, dict) and "Records" in event and event["Records"]: |
| 322 | + sqs_records = [ |
| 323 | + r for r in event["Records"] if r.get("eventSource") == "aws:sqs" |
| 324 | + ] |
| 325 | + if sqs_records: |
| 326 | + for record in sqs_records: |
| 327 | + _dsm_set_sqs_context(record) |
| 328 | + |
290 | 329 | try: |
291 | 330 | self.response = None |
292 | 331 | set_cold_start(init_timestamp_ns) |
|
0 commit comments