77 set_dsm_context ,
88 _dsm_set_sqs_context ,
99 _dsm_set_sns_context ,
10+ _dsm_set_kinesis_context ,
1011 _get_dsm_context_from_lambda ,
1112)
1213from datadog_lambda .trigger import EventTypes , _EventSource
@@ -24,12 +25,14 @@ def setUp(self):
2425
2526 patcher = patch ("datadog_lambda.dsm._get_dsm_context_from_lambda" )
2627 self .mock_get_dsm_context_from_lambda = patcher .start ()
28+ self .addCleanup (patcher .stop )
29+
2730 patcher = patch ("datadog_lambda.dsm._dsm_set_sns_context" )
2831 self .mock_dsm_set_sns_context = patcher .start ()
2932 self .addCleanup (patcher .stop )
3033
31- patcher = patch ("ddtrace.internal.datastreams.data_streams_processor " )
32- self .mock_data_streams_processor = patcher .start ()
34+ patcher = patch ("datadog_lambda.dsm._dsm_set_kinesis_context " )
35+ self .mock_dsm_set_kinesis_context = patcher .start ()
3336 self .addCleanup (patcher .stop )
3437
3538 def test_non_sqs_event_source_does_nothing (self ):
@@ -266,6 +269,103 @@ def test_sns_multiple_records_process_each_record(self):
266269 pathway_ctx = carrier_get_func ("dd-pathway-ctx-base64" )
267270 self .assertEqual (pathway_ctx , expected_contexts [i ])
268271
272+ def test_kinesis_event_with_no_records_does_nothing (self ):
273+ """Test that events where Records is None don't trigger DSM processing"""
274+ events_with_no_records = [
275+ {},
276+ {"Records" : None },
277+ {"someOtherField" : "value" },
278+ ]
279+
280+ for event in events_with_no_records :
281+ _dsm_set_kinesis_context (event )
282+ self .mock_set_consume_checkpoint .assert_not_called ()
283+
284+ def test_kinesis_event_triggers_dsm_kinesis_context (self ):
285+ """Test that Kinesis event sources trigger the Kinesis-specific DSM context function"""
286+ kinesis_event = {
287+ "Records" : [
288+ {
289+ "eventSource" : "aws:kinesis" ,
290+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream" ,
291+ "kinesis" : {
292+ "data" : "SGVsbG8gZnJvbSBLaW5lc2lzIQ==" ,
293+ "partitionKey" : "partition-key" ,
294+ },
295+ }
296+ ]
297+ }
298+
299+ event_source = _EventSource (EventTypes .KINESIS )
300+ set_dsm_context (kinesis_event , event_source )
301+
302+ self .mock_dsm_set_kinesis_context .assert_called_once_with (kinesis_event )
303+
304+ def test_kinesis_multiple_records_process_each_record (self ):
305+ """Test that each record in a Kinesis event gets processed individually"""
306+ multi_record_event = {
307+ "Records" : [
308+ {
309+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/stream1" ,
310+ "kinesis" : {
311+ "data" : base64 .b64encode (
312+ json .dumps ({"dd-pathway-ctx-base64" : "context1" }).encode ("utf-8" )
313+ ).decode ("utf-8" ),
314+ "partitionKey" : "partition-1" ,
315+ },
316+ },
317+ {
318+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/stream2" ,
319+ "kinesis" : {
320+ "data" : base64 .b64encode (
321+ json .dumps ({"dd-pathway-ctx-base64" : "context2" }).encode ("utf-8" )
322+ ).decode ("utf-8" ),
323+ "partitionKey" : "partition-2" ,
324+ },
325+ },
326+ {
327+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/stream3" ,
328+ "kinesis" : {
329+ "data" : base64 .b64encode (
330+ json .dumps ({"dd-pathway-ctx-base64" : "context3" }).encode ("utf-8" )
331+ ).decode ("utf-8" ),
332+ "partitionKey" : "partition-3" ,
333+ },
334+ },
335+ ]
336+ }
337+
338+ self .mock_get_dsm_context_from_lambda .side_effect = [
339+ {"dd-pathway-ctx-base64" : "context1" },
340+ {"dd-pathway-ctx-base64" : "context2" },
341+ {"dd-pathway-ctx-base64" : "context3" },
342+ ]
343+
344+ _dsm_set_kinesis_context (multi_record_event )
345+
346+ self .assertEqual (self .mock_set_consume_checkpoint .call_count , 3 )
347+
348+ calls = self .mock_set_consume_checkpoint .call_args_list
349+ expected_arns = [
350+ "arn:aws:kinesis:us-east-1:123456789012:stream/stream1" ,
351+ "arn:aws:kinesis:us-east-1:123456789012:stream/stream2" ,
352+ "arn:aws:kinesis:us-east-1:123456789012:stream/stream3" ,
353+ ]
354+ expected_contexts = ["context1" , "context2" , "context3" ]
355+
356+ for i , call in enumerate (calls ):
357+ args , kwargs = call
358+ service_type = args [0 ]
359+ arn = args [1 ]
360+ carrier_get_func = args [2 ]
361+
362+ self .assertEqual (service_type , "kinesis" )
363+
364+ self .assertEqual (arn , expected_arns [i ])
365+
366+ pathway_ctx = carrier_get_func ("dd-pathway-ctx-base64" )
367+ self .assertEqual (pathway_ctx , expected_contexts [i ])
368+
269369
270370class TestGetDSMContext (unittest .TestCase ):
271371 def test_sqs_to_lambda_string_value_format (self ):
0 commit comments