Skip to content

Commit 7cf8901

Browse files
moved set_checkpoint into wrapper code
1 parent 51d1e13 commit 7cf8901

File tree

1 file changed

+36
-19
lines changed

1 file changed

+36
-19
lines changed

datadog_lambda/wrapper.py

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -234,32 +234,27 @@ def _before(self, event, context):
234234
)
235235

236236
if config.trace_enabled:
237+
set_dd_trace_py_root(trace_context_source, config.merge_xray_traces)
238+
if config.make_inferred_span:
239+
self.inferred_span = create_inferred_span(
240+
event, context, event_source, config.decode_authorizer_context
241+
)
237242
if config.data_streams_enabled:
238243
from datadog_lambda.tracing import (
239244
extract_dd_json_data_from_message_attributes,
240245
)
241-
from ddtrace.data_streams import set_consume_checkpoint
242246

243-
dd_json_data, arn = extract_dd_json_data_from_message_attributes(
244-
event
245-
)
246-
if dd_json_data:
247-
dd_json_data = json.loads(dd_json_data)
248-
249-
def create_carrier_get(dd_json_data):
250-
def carrier_get(key):
251-
return dd_json_data.get(key)
252-
253-
return carrier_get
247+
try:
248+
(
249+
dd_json_data,
250+
arn,
251+
) = extract_dd_json_data_from_message_attributes(event)
252+
except Exception as e:
253+
logger.debug(f"Failed to extract DSM checkpoint: {e}")
254254

255-
carrier_get = create_carrier_get(dd_json_data)
256-
set_consume_checkpoint(event_source, arn, carrier_get)
255+
if dd_json_data:
256+
set_dsm_checkpoint(dd_json_data, event_source, arn)
257257

258-
set_dd_trace_py_root(trace_context_source, config.merge_xray_traces)
259-
if config.make_inferred_span:
260-
self.inferred_span = create_inferred_span(
261-
event, context, event_source, config.decode_authorizer_context
262-
)
263258
self.span = create_function_execution_span(
264259
context=context,
265260
function_name=config.function_name,
@@ -364,4 +359,26 @@ def format_err_with_traceback(e):
364359
return f"Error {e}. Traceback: {tb}"
365360

366361

362+
def set_dsm_checkpoint(dd_json_data, event_source, arn):
363+
from ddtrace.data_streams import set_consume_checkpoint
364+
365+
try:
366+
dd_json_data = json.loads(dd_json_data)
367+
if "dd-pathway-ctx-base64" not in dd_json_data:
368+
return
369+
370+
def create_carrier_get(dd_json_data):
371+
def carrier_get(key):
372+
return dd_json_data.get(key)
373+
374+
return carrier_get
375+
376+
carrier_get = create_carrier_get(dd_json_data)
377+
set_consume_checkpoint(
378+
event_source.to_string(), arn, carrier_get, manual_checkpoint=False
379+
)
380+
except Exception as e:
381+
logger.debug(f"Failed to set DSM checkpoint: {e}")
382+
383+
367384
datadog_lambda_wrapper = _LambdaDecorator

0 commit comments

Comments
 (0)