Skip to content

Commit 47822fb

Browse files
committed
Refactored code
1 parent 9f6b8a1 commit 47822fb

File tree

3 files changed

+47
-10
lines changed

3 files changed

+47
-10
lines changed

backend/src/batch/__init__.py

Whitespace-only changes.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import copy
2+
3+
from models.errors import MessageNotSuccessfulError
4+
5+
6+
class BatchFilenameToEventsMapper:
7+
FILENAME_NOT_PRESENT_ERROR_MSG = "Filename data was not present"
8+
9+
def __init__(self):
10+
self._filename_to_events_map: dict[str, list[dict]] = {}
11+
12+
def add_event(self, event: dict) -> None:
13+
filename_key = self._make_key(event)
14+
15+
if filename_key not in self._filename_to_events_map:
16+
self._filename_to_events_map[filename_key] = [event]
17+
return
18+
19+
self._filename_to_events_map[filename_key].append(event)
20+
21+
def get_map(self) -> dict[str, list[dict]]:
22+
return copy.deepcopy(self._filename_to_events_map)
23+
24+
def _make_key(self, event: dict) -> str:
25+
file_key = event.get("file_key")
26+
created_at_string = event.get("created_at_formatted_string")
27+
28+
if not file_key or not created_at_string:
29+
raise MessageNotSuccessfulError(self.FILENAME_NOT_PRESENT_ERROR_MSG)
30+
31+
return f"{file_key}_{created_at_string}"

backend/src/forwarding_batch_lambda.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import base64
66
import time
77
import logging
8+
9+
from batch.batch_filename_to_events_mapper import BatchFilenameToEventsMapper
810
from fhir_batch_repository import create_table
911
from fhir_batch_controller import ImmunizationBatchController, make_batch_controller
1012
from clients import sqs_client
@@ -60,6 +62,7 @@ def forward_lambda_handler(event, _):
6062
logger.info("Processing started")
6163
table = create_table()
6264
array_of_messages = []
65+
filename_to_events_mapper = BatchFilenameToEventsMapper()
6366
array_of_identifiers = []
6467
controller = make_batch_controller()
6568

@@ -101,23 +104,26 @@ def forward_lambda_handler(event, _):
101104
array_of_identifiers.append(identifier)
102105

103106
imms_id = forward_request_to_dynamo(incoming_message_body, table, identifier_already_present, controller)
104-
array_of_messages.append({**base_outgoing_message_body, "imms_id": imms_id})
107+
filename_to_events_mapper.add_event({**base_outgoing_message_body, "imms_id": imms_id})
105108

106109
except Exception as error: # pylint: disable = broad-exception-caught
107-
array_of_messages.append(
110+
filename_to_events_mapper.add_event(
108111
{**base_outgoing_message_body, "diagnostics": create_diagnostics_dictionary(error)}
109112
)
110113
logger.error("Error processing message: %s", error)
111114

112115
# Send to SQS
113-
sqs_message_body = json.dumps(array_of_messages)
114-
message_len = len(sqs_message_body)
115-
logger.info(f"total message length:{message_len}")
116-
message_group_id = f"{file_key}_{created_at_formatted_string}"
117-
if message_len < 256 * 1024:
118-
sqs_client.send_message(QueueUrl=QUEUE_URL, MessageBody=sqs_message_body, MessageGroupId=message_group_id)
119-
else:
120-
logger.info("Message size exceeds 256 KB limit.Sending to sqs failed")
116+
for filename_key, events in filename_to_events_mapper.get_map().items():
117+
sqs_message_body = json.dumps(events)
118+
message_len = len(sqs_message_body)
119+
logger.info(f"total message length:{message_len}")
120+
121+
print(message_len)
122+
# TODO - assess likelihood of condition and adjust Kinesis batch accordingly
123+
if message_len < 256 * 1024:
124+
sqs_client.send_message(QueueUrl=QUEUE_URL, MessageBody=sqs_message_body, MessageGroupId=filename_key)
125+
else:
126+
logger.info("Message size exceeds 256 KB limit.Sending to sqs failed")
121127

122128

123129
if __name__ == "__main__":

0 commit comments

Comments
 (0)