Skip to content

Commit 94306d2

Browse files
dlzhry2nhsmfjarvis
authored andcommitted
VED-734 Handle multiple file events in lambda forwarder (#758)
1 parent 802e3d7 commit 94306d2

File tree

7 files changed

+216
-24
lines changed

7 files changed

+216
-24
lines changed

.github/workflows/sonarcloud.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ jobs:
6060
continue-on-error: true
6161
run: |
6262
poetry install
63-
poetry run coverage run -m unittest discover -s "./tests" -p "*batch*.py" || echo "recordforwarder tests failed" >> ../failed_tests.txt
63+
poetry run coverage run -m unittest discover -p "*batch*.py" || echo "recordforwarder tests failed" >> ../failed_tests.txt
6464
poetry run coverage xml -o ../recordforwarder-coverage.xml
6565
6666
- name: Run unittest with coverage-ack-lambda

backend/src/batch/__init__.py

Whitespace-only changes.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import copy
2+
3+
from models.errors import MessageNotSuccessfulError
4+
5+
6+
class BatchFilenameToEventsMapper:
7+
FILENAME_NOT_PRESENT_ERROR_MSG = "Filename data was not present"
8+
9+
def __init__(self):
10+
self._filename_to_events_map: dict[str, list[dict]] = {}
11+
12+
def add_event(self, event: dict) -> None:
13+
filename_key = self._make_key(event)
14+
15+
if filename_key not in self._filename_to_events_map:
16+
self._filename_to_events_map[filename_key] = [event]
17+
return
18+
19+
self._filename_to_events_map[filename_key].append(event)
20+
21+
def get_map(self) -> dict[str, list[dict]]:
22+
return copy.deepcopy(self._filename_to_events_map)
23+
24+
def _make_key(self, event: dict) -> str:
25+
file_key = event.get("file_key")
26+
created_at_string = event.get("created_at_formatted_string")
27+
28+
if not file_key or not created_at_string:
29+
raise MessageNotSuccessfulError(self.FILENAME_NOT_PRESENT_ERROR_MSG)
30+
31+
return f"{file_key}_{created_at_string}"

backend/src/forwarding_batch_lambda.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import base64
66
import time
77
import logging
8+
9+
from batch.batch_filename_to_events_mapper import BatchFilenameToEventsMapper
810
from fhir_batch_repository import create_table
911
from fhir_batch_controller import ImmunizationBatchController, make_batch_controller
1012
from clients import sqs_client
@@ -59,7 +61,7 @@ def forward_lambda_handler(event, _):
5961
"""Forward each row to the Imms API"""
6062
logger.info("Processing started")
6163
table = create_table()
62-
array_of_messages = []
64+
filename_to_events_mapper = BatchFilenameToEventsMapper()
6365
array_of_identifiers = []
6466
controller = make_batch_controller()
6567

@@ -101,23 +103,20 @@ def forward_lambda_handler(event, _):
101103
array_of_identifiers.append(identifier)
102104

103105
imms_id = forward_request_to_dynamo(incoming_message_body, table, identifier_already_present, controller)
104-
array_of_messages.append({**base_outgoing_message_body, "imms_id": imms_id})
106+
filename_to_events_mapper.add_event({**base_outgoing_message_body, "imms_id": imms_id})
105107

106108
except Exception as error: # pylint: disable = broad-exception-caught
107-
array_of_messages.append(
109+
filename_to_events_mapper.add_event(
108110
{**base_outgoing_message_body, "diagnostics": create_diagnostics_dictionary(error)}
109111
)
110112
logger.error("Error processing message: %s", error)
111113

112114
# Send to SQS
113-
sqs_message_body = json.dumps(array_of_messages)
114-
message_len = len(sqs_message_body)
115-
logger.info(f"total message length:{message_len}")
116-
message_group_id = f"{file_key}_{created_at_formatted_string}"
117-
if message_len < 256 * 1024:
118-
sqs_client.send_message(QueueUrl=QUEUE_URL, MessageBody=sqs_message_body, MessageGroupId=message_group_id)
119-
else:
120-
logger.info("Message size exceeds 256 KB limit.Sending to sqs failed")
115+
for filename_key, events in filename_to_events_mapper.get_map().items():
116+
sqs_message_body = json.dumps(events)
117+
logger.info(f"total message length:{len(sqs_message_body)}")
118+
119+
sqs_client.send_message(QueueUrl=QUEUE_URL, MessageBody=sqs_message_body, MessageGroupId=filename_key)
121120

122121

123122
if __name__ == "__main__":

backend/tests/batch/__init__.py

Whitespace-only changes.
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import unittest
2+
3+
from batch.batch_filename_to_events_mapper import BatchFilenameToEventsMapper
4+
from models.errors import MessageNotSuccessfulError
5+
6+
MOCK_SUPPLIER_ONE_RSV_EVENT_ONE = {
7+
"row_id": "row-1",
8+
"file_key": "supplier_one_rsv_file_key",
9+
"created_at_formatted_string": "2025-01-24T12:00:00Z",
10+
"supplier": "supplier_one",
11+
"vax_type": "RSV",
12+
"local_id": "local-1",
13+
"operation_requested": "CREATE"
14+
}
15+
16+
MOCK_SUPPLIER_ONE_RSV_EVENT_TWO = {
17+
"row_id": "row-2",
18+
"file_key": "supplier_one_rsv_file_key",
19+
"created_at_formatted_string": "2025-01-24T12:00:00Z",
20+
"supplier": "supplier_one",
21+
"vax_type": "RSV",
22+
"local_id": "local-2",
23+
"operation_requested": "UPDATE"
24+
}
25+
26+
MOCK_SUPPLIER_TWO_COVID_EVENT_ONE = {
27+
"row_id": "row-1",
28+
"file_key": "supplier_two_covid19_file_key",
29+
"created_at_formatted_string": "2025-01-24T14:00:00Z",
30+
"supplier": "supplier_two",
31+
"vax_type": "COVID-19",
32+
"local_id": "local-1",
33+
"operation_requested": "CREATE"
34+
}
35+
36+
37+
class TestBatchFilenameToEventsMapper(unittest.TestCase):
38+
expected_key_supplier_one = "supplier_one_rsv_file_key_2025-01-24T12:00:00Z"
39+
expected_key_supplier_two = "supplier_two_covid19_file_key_2025-01-24T14:00:00Z"
40+
41+
def setUp(self) -> None:
42+
self.batch_filename_to_events_mapper = BatchFilenameToEventsMapper()
43+
44+
def test_add_event_creates_new_key(self):
45+
self.batch_filename_to_events_mapper.add_event(MOCK_SUPPLIER_ONE_RSV_EVENT_ONE)
46+
47+
result = self.batch_filename_to_events_mapper.get_map()
48+
49+
self.assertIn(self.expected_key_supplier_one, result)
50+
self.assertEqual(result[self.expected_key_supplier_one], [MOCK_SUPPLIER_ONE_RSV_EVENT_ONE])
51+
52+
def test_add_event_appends_to_existing_key(self):
53+
self.batch_filename_to_events_mapper.add_event(MOCK_SUPPLIER_ONE_RSV_EVENT_ONE)
54+
self.batch_filename_to_events_mapper.add_event(MOCK_SUPPLIER_ONE_RSV_EVENT_TWO)
55+
56+
result = self.batch_filename_to_events_mapper.get_map()
57+
58+
self.assertIn(self.expected_key_supplier_one, result)
59+
self.assertEqual(result[self.expected_key_supplier_one], [
60+
MOCK_SUPPLIER_ONE_RSV_EVENT_ONE,
61+
MOCK_SUPPLIER_ONE_RSV_EVENT_TWO
62+
])
63+
64+
def test_mapper_handles_events_from_multiple_files(self):
65+
self.batch_filename_to_events_mapper.add_event(MOCK_SUPPLIER_ONE_RSV_EVENT_ONE)
66+
self.batch_filename_to_events_mapper.add_event(MOCK_SUPPLIER_ONE_RSV_EVENT_TWO)
67+
self.batch_filename_to_events_mapper.add_event(MOCK_SUPPLIER_TWO_COVID_EVENT_ONE)
68+
69+
result = self.batch_filename_to_events_mapper.get_map()
70+
71+
self.assertEqual(len(result.keys()), 2)
72+
self.assertIn(self.expected_key_supplier_one, result)
73+
self.assertEqual(result[self.expected_key_supplier_one], [
74+
MOCK_SUPPLIER_ONE_RSV_EVENT_ONE,
75+
MOCK_SUPPLIER_ONE_RSV_EVENT_TWO
76+
])
77+
self.assertIn(self.expected_key_supplier_two, result)
78+
self.assertEqual(result[self.expected_key_supplier_two], [MOCK_SUPPLIER_TWO_COVID_EVENT_ONE])
79+
80+
def test_event_with_missing_filename_data_raises_error(self):
81+
incomplete_event = {
82+
"file_key": "file1"
83+
# Missing 'created_at_formatted_string'
84+
}
85+
with self.assertRaises(MessageNotSuccessfulError) as error:
86+
self.batch_filename_to_events_mapper.add_event(incomplete_event)
87+
88+
self.assertEqual(error.exception.message, "Filename data was not present")
89+
90+
def test_get_map_returns_a_copy_instead_of_exact_references(self):
91+
self.batch_filename_to_events_mapper.add_event(MOCK_SUPPLIER_ONE_RSV_EVENT_ONE)
92+
93+
result = self.batch_filename_to_events_mapper.get_map()
94+
95+
self.assertNotEqual(id(result), id(self.batch_filename_to_events_mapper._filename_to_events_map))
96+
97+
98+
if __name__ == "__main__":
99+
unittest.main()

backend/tests/test_forwarding_batch_lambda.py

Lines changed: 75 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import unittest
22
import os
3+
from typing import Optional
34
from unittest import TestCase
4-
from unittest.mock import patch, MagicMock
5+
from unittest.mock import patch, MagicMock, call, ANY
56
from boto3 import resource as boto3_resource
67
from moto import mock_aws
78
from models.errors import (
@@ -19,7 +20,10 @@
1920
from utils.test_utils_for_batch import ForwarderValues, MockFhirImmsResources
2021

2122
with patch.dict("os.environ", ForwarderValues.MOCK_ENVIRONMENT_DICT):
22-
from forwarding_batch_lambda import forward_lambda_handler, create_diagnostics_dictionary, forward_request_to_dynamo
23+
from forwarding_batch_lambda import forward_lambda_handler, create_diagnostics_dictionary, forward_request_to_dynamo, \
24+
QUEUE_URL
25+
26+
2327
@mock_aws
2428
@patch.dict(os.environ, ForwarderValues.MOCK_ENVIRONMENT_DICT)
2529
class TestForwardLambdaHandler(TestCase):
@@ -99,13 +103,14 @@ def generate_details_from_processing(
99103

100104
@staticmethod
101105
def generate_input(
102-
row_id,
103-
identifier_value=None,
104-
file_key="test_file_key",
105-
created_at_formatted_string="2025-01-24T12:00:00Z",
106-
include_fhir_json=True,
107-
operation_requested="create",
108-
diagnostics=None,
106+
row_id: int,
107+
identifier_value: Optional[str] = None,
108+
file_key: str = "test_file_key",
109+
created_at_formatted_string: str = "2025-01-24T12:00:00Z",
110+
include_fhir_json: bool = True,
111+
operation_requested: str = "create",
112+
diagnostics: dict = None,
113+
supplier: str = "test_supplier"
109114
):
110115
"""Generates input rows for test_cases."""
111116
details_from_processing = TestForwardLambdaHandler.generate_details_from_processing(
@@ -118,7 +123,7 @@ def generate_input(
118123
"row_id": f"row-{row_id}",
119124
"file_key": file_key,
120125
"created_at_formatted_string": created_at_formatted_string,
121-
"supplier": "test_supplier",
126+
"supplier": supplier,
122127
"vax_type": "RSV",
123128
**details_from_processing,
124129
}
@@ -403,13 +408,71 @@ def test_forward_lambda_handler_multiple_scenarios(self, mock_send_message):
403408
mock_send_message.reset_mock()
404409
event = self.generate_event(test_cases)
405410

406-
407411
self.mock_redis_client.hget.return_value = "RSV"
408412
forward_lambda_handler(event, {})
409413

410414
self.assert_dynamo_item(table_item)
411415
self.assert_values_in_sqs_messages(mock_send_message, test_cases)
412416

417+
@patch("forwarding_batch_lambda.sqs_client.send_message")
418+
def test_forward_lambda_handler_groups_and_sends_events_by_filename(self, mock_send_message):
419+
"""VED-734 - each batch handled by the Lambda may have events relating to different parent CSV files. This
420+
test ensures events are grouped accordingly and sent with the correct SQS"""
421+
mock_records = [
422+
{
423+
"input": self.generate_input(
424+
row_id=1,
425+
identifier_value="supplier_1_system/54321",
426+
operation_requested="CREATE",
427+
file_key="supplier_1_rsv_test_file",
428+
supplier="supplier_1"
429+
)
430+
},
431+
{
432+
"input": self.generate_input(
433+
row_id=2,
434+
identifier_value="supplier_2_system/12345",
435+
operation_requested="CREATE",
436+
file_key="supplier_2_rsv_test_file",
437+
supplier="supplier_2"
438+
)
439+
}
440+
]
441+
mock_kinesis_event = self.generate_event(mock_records)
442+
self.mock_redis_client.hget.return_value = "RSV"
443+
444+
forward_lambda_handler(mock_kinesis_event, {})
445+
446+
sqs_calls = mock_send_message.call_args_list
447+
_, first_call_kwargs = sqs_calls[0]
448+
_, second_call_kwargs = sqs_calls[1]
449+
450+
# Separate calls are made for each of the respective groups
451+
self.assertEqual(len(sqs_calls), 2)
452+
self.assertEqual(first_call_kwargs["MessageGroupId"], "supplier_1_rsv_test_file_2025-01-24T12:00:00Z")
453+
self.assertEqual(second_call_kwargs["MessageGroupId"], "supplier_2_rsv_test_file_2025-01-24T12:00:00Z")
454+
455+
self.assertDictEqual(json.loads(first_call_kwargs["MessageBody"])[0], {
456+
"created_at_formatted_string": "2025-01-24T12:00:00Z",
457+
"file_key": "supplier_1_rsv_test_file",
458+
"imms_id": ANY,
459+
"local_id": "local-1",
460+
"operation_requested": "CREATE",
461+
"row_id": "row-1",
462+
"supplier": "supplier_1",
463+
"vaccine_type": "RSV"
464+
})
465+
self.assertDictEqual(json.loads(second_call_kwargs["MessageBody"])[0], {
466+
"created_at_formatted_string": "2025-01-24T12:00:00Z",
467+
"file_key": "supplier_2_rsv_test_file",
468+
"imms_id": ANY,
469+
"local_id": "local-2",
470+
"operation_requested": "CREATE",
471+
"row_id": "row-2",
472+
"supplier": "supplier_2",
473+
"vaccine_type": "RSV"
474+
})
475+
413476
@patch("forwarding_batch_lambda.sqs_client.send_message")
414477
def test_forward_lambda_handler_update_scenarios(self, mock_send_message):
415478
"""Test forward lambda handler with multiple rows in the event with update and delete operations,
@@ -584,7 +647,7 @@ def test_create_diagnostics_dictionary(self):
584647
@patch("forwarding_batch_lambda.forward_request_to_dynamo")
585648
@patch("forwarding_batch_lambda.create_table")
586649
@patch("forwarding_batch_lambda.make_batch_controller")
587-
def test_forward_request_to_dyanamo(
650+
def test_forward_request_to_dynamo(
588651
self, mock_make_controller, mock_create_table, mock_forward_request_to_dynamo, mock_send_message
589652
):
590653
"""Test forward lambda handler to assert dynamo db is called,

0 commit comments

Comments
 (0)