@@ -2743,6 +2743,73 @@ def test_sqs_batch_processing(self):
27432743 carrier_get_2 = args_2 [2 ]
27442744 self .assertEqual (carrier_get_2 ("dd-pathway-ctx-base64" ), "record2" )
27452745
2746+ def test_sqs_batch_processing_with_invalid_records (self ):
2747+ dd_data_1 = {"dd-pathway-ctx-base64" : "valid_record" }
2748+ dd_json_data_1 = json .dumps (dd_data_1 )
2749+
2750+ dd_data_3 = {"dd-pathway-ctx-base64" : "another_valid_record" }
2751+ dd_json_data_3 = json .dumps (dd_data_3 )
2752+
2753+ event = {
2754+ "Records" : [
2755+ {
2756+ "eventSourceARN" : "arn:aws:sqs:us-east-1:123456789012:test-queue" ,
2757+ "messageAttributes" : {
2758+ "_datadog" : {
2759+ "dataType" : "String" ,
2760+ "stringValue" : dd_json_data_1 ,
2761+ }
2762+ },
2763+ "eventSource" : "aws:sqs" ,
2764+ },
2765+ {
2766+ "eventSourceARN" : "arn:aws:sqs:us-east-1:123456789012:test-queue" ,
2767+ "messageAttributes" : {
2768+ "_datadog" : {
2769+ "dataType" : "Binary" ,
2770+ # This will cause extraction to fail
2771+ "binaryValue" : "invalid-base64-data" ,
2772+ }
2773+ },
2774+ "eventSource" : "aws:sqs" ,
2775+ },
2776+ {
2777+ "eventSourceARN" : "arn:aws:sqs:us-east-1:123456789012:test-queue" ,
2778+ "messageAttributes" : {
2779+ "_datadog" : {
2780+ "dataType" : "String" ,
2781+ "stringValue" : dd_json_data_3 ,
2782+ }
2783+ },
2784+ "eventSource" : "aws:sqs" ,
2785+ },
2786+ ]
2787+ }
2788+
2789+ extract_context_from_sqs_or_sns_event_or_context (
2790+ event , self .lambda_context , parse_event_source (event )
2791+ )
2792+
2793+ self .assertEqual (self .mock_checkpoint .call_count , 3 )
2794+
2795+ args_1 , _ = self .mock_checkpoint .call_args_list [0 ]
2796+ self .assertEqual (args_1 [0 ], "sqs" )
2797+ self .assertEqual (args_1 [1 ], "arn:aws:sqs:us-east-1:123456789012:test-queue" )
2798+ carrier_get_1 = args_1 [2 ]
2799+ self .assertEqual (carrier_get_1 ("dd-pathway-ctx-base64" ), "valid_record" )
2800+
2801+ args_2 , _ = self .mock_checkpoint .call_args_list [1 ]
2802+ self .assertEqual (args_2 [0 ], "sqs" )
2803+ self .assertEqual (args_2 [1 ], "arn:aws:sqs:us-east-1:123456789012:test-queue" )
2804+ carrier_get_2 = args_2 [2 ]
2805+ self .assertEqual (carrier_get_2 ("dd-pathway-ctx-base64" ), None )
2806+
2807+ args_3 , _ = self .mock_checkpoint .call_args_list [2 ]
2808+ self .assertEqual (args_3 [0 ], "sqs" )
2809+ self .assertEqual (args_3 [1 ], "arn:aws:sqs:us-east-1:123456789012:test-queue" )
2810+ carrier_get_3 = args_3 [2 ]
2811+ self .assertEqual (carrier_get_3 ("dd-pathway-ctx-base64" ), "another_valid_record" )
2812+
27462813 def test_sqs_source_arn_not_found (self ):
27472814 event = {
27482815 "Records" : [
@@ -3019,53 +3086,6 @@ def test_sns_data_streams_disabled(self):
30193086
30203087 self .mock_checkpoint .assert_not_called ()
30213088
3022- def test_sns_batch_processing (self ):
3023- dd_data_1 = {"dd-pathway-ctx-base64" : "record1" }
3024- dd_data_2 = {"dd-pathway-ctx-base64" : "record2" }
3025- dd_json_data_1 = json .dumps (dd_data_1 )
3026- dd_json_data_2 = json .dumps (dd_data_2 )
3027-
3028- event = {
3029- "Records" : [
3030- {
3031- "Sns" : {
3032- "TopicArn" : "arn:aws:sns:us-east-1:123456789012:test-topic" ,
3033- "MessageAttributes" : {
3034- "_datadog" : {"Type" : "String" , "Value" : dd_json_data_1 }
3035- },
3036- },
3037- "eventSource" : "aws:sns" ,
3038- },
3039- {
3040- "Sns" : {
3041- "TopicArn" : "arn:aws:sns:us-east-1:123456789012:test-topic" ,
3042- "MessageAttributes" : {
3043- "_datadog" : {"Type" : "String" , "Value" : dd_json_data_2 }
3044- },
3045- },
3046- "eventSource" : "aws:sns" ,
3047- },
3048- ]
3049- }
3050-
3051- extract_context_from_sqs_or_sns_event_or_context (
3052- event , self .lambda_context , parse_event_source (event )
3053- )
3054-
3055- self .assertEqual (self .mock_checkpoint .call_count , 2 )
3056-
3057- args_1 , _ = self .mock_checkpoint .call_args_list [0 ]
3058- self .assertEqual (args_1 [0 ], "sns" )
3059- self .assertEqual (args_1 [1 ], "arn:aws:sns:us-east-1:123456789012:test-topic" )
3060- carrier_get_1 = args_1 [2 ]
3061- self .assertEqual (carrier_get_1 ("dd-pathway-ctx-base64" ), "record1" )
3062-
3063- args_2 , _ = self .mock_checkpoint .call_args_list [1 ]
3064- self .assertEqual (args_2 [0 ], "sns" )
3065- self .assertEqual (args_2 [1 ], "arn:aws:sns:us-east-1:123456789012:test-topic" )
3066- carrier_get_2 = args_2 [2 ]
3067- self .assertEqual (carrier_get_2 ("dd-pathway-ctx-base64" ), "record2" )
3068-
30693089 # SNS -> SQS TESTS
30703090
30713091 def test_sns_to_sqs_context_propagated_string_value (self ):
@@ -3362,6 +3382,83 @@ def test_sns_to_sqs_batch_processing(self):
33623382 carrier_get_2 = args_2 [2 ]
33633383 self .assertEqual (carrier_get_2 ("dd-pathway-ctx-base64" ), "record2" )
33643384
3385+ def test_sns_to_sqs_batch_processing_with_invalid_records (self ):
3386+ dd_data_1 = {"dd-pathway-ctx-base64" : "valid_sns_record" }
3387+ dd_json_data_1 = json .dumps (dd_data_1 )
3388+
3389+ sns_message_1 = {
3390+ "Type" : "Notification" ,
3391+ "TopicArn" : "arn:aws:sns:us-east-1:123456789012:test-topic" ,
3392+ "MessageAttributes" : {
3393+ "_datadog" : {"Type" : "String" , "Value" : dd_json_data_1 }
3394+ },
3395+ }
3396+
3397+ sns_message_2 = {
3398+ "Type" : "Notification" ,
3399+ "TopicArn" : "arn:aws:sns:us-east-1:123456789012:test-topic" ,
3400+ "MessageAttributes" : {
3401+ "_datadog" : {"Type" : "Binary" , "Value" : "invalid-base64-data" }
3402+ },
3403+ }
3404+
3405+ dd_data_3 = {"dd-pathway-ctx-base64" : "another_valid_sns_record" }
3406+ dd_json_data_3 = json .dumps (dd_data_3 )
3407+
3408+ sns_message_3 = {
3409+ "Type" : "Notification" ,
3410+ "TopicArn" : "arn:aws:sns:us-east-1:123456789012:test-topic" ,
3411+ "MessageAttributes" : {
3412+ "_datadog" : {"Type" : "String" , "Value" : dd_json_data_3 }
3413+ },
3414+ }
3415+
3416+ event = {
3417+ "Records" : [
3418+ {
3419+ "eventSourceARN" : "arn:aws:sqs:us-east-1:123456789012:test-queue" ,
3420+ "body" : json .dumps (sns_message_1 ),
3421+ "eventSource" : "aws:sqs" ,
3422+ },
3423+ {
3424+ "eventSourceARN" : "arn:aws:sqs:us-east-1:123456789012:test-queue" ,
3425+ "body" : json .dumps (sns_message_2 ),
3426+ "eventSource" : "aws:sqs" ,
3427+ },
3428+ {
3429+ "eventSourceARN" : "arn:aws:sqs:us-east-1:123456789012:test-queue" ,
3430+ "body" : json .dumps (sns_message_3 ),
3431+ "eventSource" : "aws:sqs" ,
3432+ },
3433+ ]
3434+ }
3435+
3436+ extract_context_from_sqs_or_sns_event_or_context (
3437+ event , self .lambda_context , parse_event_source (event )
3438+ )
3439+
3440+ self .assertEqual (self .mock_checkpoint .call_count , 3 )
3441+
3442+ args_1 , _ = self .mock_checkpoint .call_args_list [0 ]
3443+ self .assertEqual (args_1 [0 ], "sqs" )
3444+ self .assertEqual (args_1 [1 ], "arn:aws:sqs:us-east-1:123456789012:test-queue" )
3445+ carrier_get_1 = args_1 [2 ]
3446+ self .assertEqual (carrier_get_1 ("dd-pathway-ctx-base64" ), "valid_sns_record" )
3447+
3448+ args_2 , _ = self .mock_checkpoint .call_args_list [1 ]
3449+ self .assertEqual (args_2 [0 ], "sqs" )
3450+ self .assertEqual (args_2 [1 ], "arn:aws:sqs:us-east-1:123456789012:test-queue" )
3451+ carrier_get_2 = args_2 [2 ]
3452+ self .assertEqual (carrier_get_2 ("dd-pathway-ctx-base64" ), None )
3453+
3454+ args_3 , _ = self .mock_checkpoint .call_args_list [2 ]
3455+ self .assertEqual (args_3 [0 ], "sqs" )
3456+ self .assertEqual (args_3 [1 ], "arn:aws:sqs:us-east-1:123456789012:test-queue" )
3457+ carrier_get_3 = args_3 [2 ]
3458+ self .assertEqual (
3459+ carrier_get_3 ("dd-pathway-ctx-base64" ), "another_valid_sns_record"
3460+ )
3461+
33653462 def test_sns_to_sqs_source_arn_not_found (self ):
33663463 sns_notification = {
33673464 "Type" : "Notification" ,
@@ -3574,6 +3671,64 @@ def test_kinesis_batch_processing(self):
35743671 carrier_get_2 = args_2 [2 ]
35753672 self .assertEqual (carrier_get_2 ("dd-pathway-ctx-base64" ), "record2" )
35763673
3674+ def test_kinesis_batch_processing_with_invalid_records (self ):
3675+ dd_data_1 = {"dd-pathway-ctx-base64" : "valid_kinesis_record" }
3676+ kinesis_data_1 = {"_datadog" : dd_data_1 , "message" : "test1" }
3677+ encoded_data_1 = base64 .b64encode (json .dumps (kinesis_data_1 ).encode ()).decode ()
3678+
3679+ dd_data_3 = {"dd-pathway-ctx-base64" : "another_valid_kinesis_record" }
3680+ kinesis_data_3 = {"_datadog" : dd_data_3 , "message" : "test3" }
3681+ encoded_data_3 = base64 .b64encode (json .dumps (kinesis_data_3 ).encode ()).decode ()
3682+
3683+ event = {
3684+ "Records" : [
3685+ {
3686+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream" ,
3687+ "kinesis" : {"data" : encoded_data_1 },
3688+ },
3689+ {
3690+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream" ,
3691+ "kinesis" : {
3692+ "data" : "invalid-base64-data"
3693+ }, # This will cause extraction to fail
3694+ },
3695+ {
3696+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream" ,
3697+ "kinesis" : {"data" : encoded_data_3 },
3698+ },
3699+ ]
3700+ }
3701+
3702+ extract_context_from_kinesis_event (event , self .lambda_context )
3703+
3704+ self .assertEqual (self .mock_checkpoint .call_count , 3 )
3705+
3706+ args_1 , _ = self .mock_checkpoint .call_args_list [0 ]
3707+ self .assertEqual (args_1 [0 ], "kinesis" )
3708+ self .assertEqual (
3709+ args_1 [1 ], "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream"
3710+ )
3711+ carrier_get_1 = args_1 [2 ]
3712+ self .assertEqual (carrier_get_1 ("dd-pathway-ctx-base64" ), "valid_kinesis_record" )
3713+
3714+ args_2 , _ = self .mock_checkpoint .call_args_list [1 ]
3715+ self .assertEqual (args_2 [0 ], "kinesis" )
3716+ self .assertEqual (
3717+ args_2 [1 ], "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream"
3718+ )
3719+ carrier_get_2 = args_2 [2 ]
3720+ self .assertEqual (carrier_get_2 ("dd-pathway-ctx-base64" ), None )
3721+
3722+ args_3 , _ = self .mock_checkpoint .call_args_list [2 ]
3723+ self .assertEqual (args_3 [0 ], "kinesis" )
3724+ self .assertEqual (
3725+ args_3 [1 ], "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream"
3726+ )
3727+ carrier_get_3 = args_3 [2 ]
3728+ self .assertEqual (
3729+ carrier_get_3 ("dd-pathway-ctx-base64" ), "another_valid_kinesis_record"
3730+ )
3731+
35773732 def test_kinesis_source_arn_not_found (self ):
35783733 kinesis_data = {"message" : "test" }
35793734 kinesis_data_str = json .dumps (kinesis_data )
0 commit comments