@@ -563,6 +563,171 @@ def return_type_test(event, context):
563563 self .assertEqual (result , test_result )
564564 self .assertFalse (MockPrintExc .called )
565565
566+ @patch .dict (os .environ , {"DD_DATA_STREAMS_ENABLED" : "true" })
567+ def test_datadog_lambda_wrapper_dsm_sqs_context_pathway_verification (self ):
568+ with patch (
569+ "ddtrace.internal.datastreams.processor.get_connection"
570+ ) as mock_get_connection :
571+
572+ mock_conn = unittest .mock .MagicMock ()
573+ mock_response = unittest .mock .MagicMock ()
574+ mock_response .status = 200
575+ mock_conn .getresponse .return_value = mock_response
576+ mock_get_connection .return_value = mock_conn
577+
578+ def updated_get_datastreams_context (message ):
579+ """
580+ Updated version that handles the correct message formats
581+ """
582+ import base64
583+ import json
584+
585+ context_json = None
586+ message_body = message
587+ try :
588+ body = message .get ("Body" )
589+ if body :
590+ message_body = json .loads (body )
591+ except (ValueError , TypeError ):
592+ pass
593+
594+ message_attributes = message_body .get (
595+ "MessageAttributes"
596+ ) or message_body .get ("messageAttributes" )
597+ if not message_attributes :
598+ return None
599+
600+ if "_datadog" not in message_attributes :
601+ return None
602+
603+ datadog_attr = message_attributes ["_datadog" ]
604+
605+ if message_body .get ("Type" ) == "Notification" :
606+ if datadog_attr .get ("Type" ) == "Binary" :
607+ context_json = json .loads (
608+ base64 .b64decode (datadog_attr ["Value" ]).decode ()
609+ )
610+ elif "StringValue" in datadog_attr :
611+ context_json = json .loads (datadog_attr ["StringValue" ])
612+ elif "stringValue" in datadog_attr :
613+ context_json = json .loads (datadog_attr ["stringValue" ])
614+ elif "BinaryValue" in datadog_attr :
615+ context_json = json .loads (datadog_attr ["BinaryValue" ].decode ())
616+ else :
617+ print (f"DEBUG: Unhandled datadog_attr format: { datadog_attr } " )
618+
619+ return context_json
620+
621+ with patch (
622+ "ddtrace.internal.datastreams.botocore.get_datastreams_context" ,
623+ updated_get_datastreams_context ,
624+ ):
625+
626+ # Step 1: Create a message with some context in the message attributes
627+
628+ from ddtrace .internal .datastreams .processor import DataStreamsProcessor
629+
630+ processor_instance = DataStreamsProcessor ()
631+
632+ with patch (
633+ "ddtrace.internal.datastreams.processor.DataStreamsProcessor" ,
634+ return_value = processor_instance ,
635+ ):
636+
637+ parent_ctx = processor_instance .new_pathway ()
638+
639+ parent_ctx .set_checkpoint (
640+ ["direction:out" , "topic:upstream-topic" , "type:sqs" ],
641+ now_sec = 1640995200.0 ,
642+ payload_size = 512 ,
643+ )
644+ parent_hash = parent_ctx .hash
645+ encoded_parent_context = parent_ctx .encode_b64 ()
646+
647+ sqs_event = {
648+ "Records" : [
649+ {
650+ "eventSource" : "aws:sqs" ,
651+ "eventSourceARN" : "arn:aws:sqs:us-east-1:123456789012:test" ,
652+ "Body" : "test message body" ,
653+ "messageAttributes" : {
654+ "_datadog" : {
655+ "stringValue" : json .dumps (
656+ {
657+ "dd-pathway-ctx-base64" : encoded_parent_context
658+ }
659+ )
660+ }
661+ },
662+ }
663+ ]
664+ }
665+
666+ # Step 2: Call the handler
667+ @wrapper .datadog_lambda_wrapper
668+ def lambda_handler (event , context ):
669+ return {"statusCode" : 200 , "body" : "processed" }
670+
671+ result = lambda_handler (sqs_event , get_mock_context ())
672+ self .assertEqual (result ["statusCode" ], 200 )
673+
674+ # New context set after handler call
675+ current_ctx = processor_instance ._current_context .value
676+ self .assertIsNotNone (
677+ current_ctx ,
678+ "Data streams context should be set after processing SQS message" ,
679+ )
680+
681+ # Step 3: Check that hash in this context is the child of the hash you passed
682+ # Step 4: Check that the right checkpoint was produced during call to handler
683+
684+ found_sqs_checkpoint = False
685+ for bucket_time , bucket in processor_instance ._buckets .items ():
686+ for aggr_key , stats in bucket .pathway_stats .items ():
687+ edge_tags_str , hash_value , parent_hash_recorded = aggr_key
688+ edge_tags = edge_tags_str .split ("," )
689+
690+ if (
691+ "direction:in" in edge_tags
692+ and "topic:test" in edge_tags
693+ and "type:sqs" in edge_tags
694+ ):
695+ found_sqs_checkpoint = True
696+
697+ # EXPLICIT PARENT-CHILD HASH RELATIONSHIP TEST
698+ self .assertEqual (
699+ parent_hash_recorded ,
700+ parent_hash ,
701+ f"Parent hash must be preserved: "
702+ f"expected { parent_hash } , got { parent_hash_recorded } " ,
703+ )
704+ self .assertEqual (
705+ hash_value ,
706+ current_ctx .hash ,
707+ f"Child hash must match current context: "
708+ f"expected { current_ctx .hash } , got { hash_value } " ,
709+ )
710+ self .assertNotEqual (
711+ hash_value ,
712+ parent_hash_recorded ,
713+ f"Child hash ({ hash_value } ) must be different from "
714+ f"parent hash ({ parent_hash_recorded } ) - proves parent-child" ,
715+ )
716+ self .assertGreaterEqual (
717+ stats .payload_size .count ,
718+ 1 ,
719+ "Should have one payload size measurement" ,
720+ )
721+
722+ break
723+
724+ self .assertTrue (
725+ found_sqs_checkpoint ,
726+ "Should have found SQS consumption checkpoint in processor stats" ,
727+ )
728+
729+ processor_instance .shutdown (timeout = 0.1 )
730+
566731
567732class TestLambdaDecoratorSettings (unittest .TestCase ):
568733 def test_some_envs_should_depend_on_dd_tracing_enabled (self ):
0 commit comments