Skip to content

Commit 2e3d70f

Browse files
add kinesis -> lambda support
1 parent 17eec07 commit 2e3d70f

File tree

2 files changed

+122
-2
lines changed

2 files changed

+122
-2
lines changed

datadog_lambda/dsm.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ def set_dsm_context(event, event_source):
1212
_dsm_set_sqs_context(event)
1313
elif event_source.equals(EventTypes.SNS):
1414
_dsm_set_sns_context(event)
15+
elif event_source.equals(EventTypes.KINESIS):
16+
_dsm_set_kinesis_context(event)
1517

1618

1719
def _dsm_set_sqs_context(event):
@@ -53,6 +55,24 @@ def _dsm_set_sns_context(event):
5355
set_consume_checkpoint("sns", arn, carrier_get)
5456

5557

58+
def _dsm_set_kinesis_context(event):
59+
from ddtrace.data_streams import set_consume_checkpoint
60+
61+
records = event.get("Records")
62+
if records is None:
63+
return
64+
65+
for record in records:
66+
arn = record.get("eventSourceARN", "")
67+
context_json = _get_dsm_context_from_lambda(record)
68+
if not context_json:
69+
logger.debug("DataStreams skipped lambda message: %r", record)
70+
return None
71+
72+
carrier_get = _create_carrier_get(context_json)
73+
set_consume_checkpoint("kinesis", arn, carrier_get)
74+
75+
5676
def _get_dsm_context_from_lambda(message):
5777
"""
5878
Lambda-specific message formats:

tests/test_dsm.py

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
set_dsm_context,
88
_dsm_set_sqs_context,
99
_dsm_set_sns_context,
10+
_dsm_set_kinesis_context,
1011
_get_dsm_context_from_lambda,
1112
)
1213
from datadog_lambda.trigger import EventTypes, _EventSource
@@ -24,12 +25,14 @@ def setUp(self):
2425

2526
patcher = patch("datadog_lambda.dsm._get_dsm_context_from_lambda")
2627
self.mock_get_dsm_context_from_lambda = patcher.start()
28+
self.addCleanup(patcher.stop)
29+
2730
patcher = patch("datadog_lambda.dsm._dsm_set_sns_context")
2831
self.mock_dsm_set_sns_context = patcher.start()
2932
self.addCleanup(patcher.stop)
3033

31-
patcher = patch("ddtrace.internal.datastreams.data_streams_processor")
32-
self.mock_data_streams_processor = patcher.start()
34+
patcher = patch("datadog_lambda.dsm._dsm_set_kinesis_context")
35+
self.mock_dsm_set_kinesis_context = patcher.start()
3336
self.addCleanup(patcher.stop)
3437

3538
def test_non_sqs_event_source_does_nothing(self):
@@ -257,6 +260,103 @@ def test_sns_multiple_records_process_each_record(self):
257260
pathway_ctx = carrier_get_func("dd-pathway-ctx-base64")
258261
self.assertEqual(pathway_ctx, expected_contexts[i])
259262

263+
def test_kinesis_event_with_no_records_does_nothing(self):
264+
"""Test that events where Records is None don't trigger DSM processing"""
265+
events_with_no_records = [
266+
{},
267+
{"Records": None},
268+
{"someOtherField": "value"},
269+
]
270+
271+
for event in events_with_no_records:
272+
_dsm_set_kinesis_context(event)
273+
self.mock_set_consume_checkpoint.assert_not_called()
274+
275+
def test_kinesis_event_triggers_dsm_kinesis_context(self):
276+
"""Test that Kinesis event sources trigger the Kinesis-specific DSM context function"""
277+
kinesis_event = {
278+
"Records": [
279+
{
280+
"eventSource": "aws:kinesis",
281+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
282+
"kinesis": {
283+
"data": "SGVsbG8gZnJvbSBLaW5lc2lzIQ==",
284+
"partitionKey": "partition-key",
285+
},
286+
}
287+
]
288+
}
289+
290+
event_source = _EventSource(EventTypes.KINESIS)
291+
set_dsm_context(kinesis_event, event_source)
292+
293+
self.mock_dsm_set_kinesis_context.assert_called_once_with(kinesis_event)
294+
295+
def test_kinesis_multiple_records_process_each_record(self):
296+
"""Test that each record in a Kinesis event gets processed individually"""
297+
multi_record_event = {
298+
"Records": [
299+
{
300+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream1",
301+
"kinesis": {
302+
"data": base64.b64encode(
303+
json.dumps({"dd-pathway-ctx-base64": "context1"}).encode("utf-8")
304+
).decode("utf-8"),
305+
"partitionKey": "partition-1",
306+
},
307+
},
308+
{
309+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream2",
310+
"kinesis": {
311+
"data": base64.b64encode(
312+
json.dumps({"dd-pathway-ctx-base64": "context2"}).encode("utf-8")
313+
).decode("utf-8"),
314+
"partitionKey": "partition-2",
315+
},
316+
},
317+
{
318+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream3",
319+
"kinesis": {
320+
"data": base64.b64encode(
321+
json.dumps({"dd-pathway-ctx-base64": "context3"}).encode("utf-8")
322+
).decode("utf-8"),
323+
"partitionKey": "partition-3",
324+
},
325+
},
326+
]
327+
}
328+
329+
self.mock_get_dsm_context_from_lambda.side_effect = [
330+
{"dd-pathway-ctx-base64": "context1"},
331+
{"dd-pathway-ctx-base64": "context2"},
332+
{"dd-pathway-ctx-base64": "context3"},
333+
]
334+
335+
_dsm_set_kinesis_context(multi_record_event)
336+
337+
self.assertEqual(self.mock_set_consume_checkpoint.call_count, 3)
338+
339+
calls = self.mock_set_consume_checkpoint.call_args_list
340+
expected_arns = [
341+
"arn:aws:kinesis:us-east-1:123456789012:stream/stream1",
342+
"arn:aws:kinesis:us-east-1:123456789012:stream/stream2",
343+
"arn:aws:kinesis:us-east-1:123456789012:stream/stream3",
344+
]
345+
expected_contexts = ["context1", "context2", "context3"]
346+
347+
for i, call in enumerate(calls):
348+
args, kwargs = call
349+
service_type = args[0]
350+
arn = args[1]
351+
carrier_get_func = args[2]
352+
353+
self.assertEqual(service_type, "kinesis")
354+
355+
self.assertEqual(arn, expected_arns[i])
356+
357+
pathway_ctx = carrier_get_func("dd-pathway-ctx-base64")
358+
self.assertEqual(pathway_ctx, expected_contexts[i])
359+
260360

261361
class TestGetDSMContext(unittest.TestCase):
262362
def test_sqs_to_lambda_string_value_format(self):

0 commit comments

Comments
 (0)