|
5 | 5 | import base64 |
6 | 6 | import time |
7 | 7 | import logging |
| 8 | + |
| 9 | +from batch.batch_filename_to_events_mapper import BatchFilenameToEventsMapper |
8 | 10 | from fhir_batch_repository import create_table |
9 | 11 | from fhir_batch_controller import ImmunizationBatchController, make_batch_controller |
10 | 12 | from clients import sqs_client |
@@ -60,6 +62,7 @@ def forward_lambda_handler(event, _): |
60 | 62 | logger.info("Processing started") |
61 | 63 | table = create_table() |
62 | 64 | array_of_messages = [] |
| 65 | + filename_to_events_mapper = BatchFilenameToEventsMapper() |
63 | 66 | array_of_identifiers = [] |
64 | 67 | controller = make_batch_controller() |
65 | 68 |
|
@@ -101,23 +104,26 @@ def forward_lambda_handler(event, _): |
101 | 104 | array_of_identifiers.append(identifier) |
102 | 105 |
|
103 | 106 | imms_id = forward_request_to_dynamo(incoming_message_body, table, identifier_already_present, controller) |
104 | | - array_of_messages.append({**base_outgoing_message_body, "imms_id": imms_id}) |
| 107 | + filename_to_events_mapper.add_event({**base_outgoing_message_body, "imms_id": imms_id}) |
105 | 108 |
|
106 | 109 | except Exception as error: # pylint: disable = broad-exception-caught |
107 | | - array_of_messages.append( |
| 110 | + filename_to_events_mapper.add_event( |
108 | 111 | {**base_outgoing_message_body, "diagnostics": create_diagnostics_dictionary(error)} |
109 | 112 | ) |
110 | 113 | logger.error("Error processing message: %s", error) |
111 | 114 |
|
112 | 115 | # Send to SQS |
113 | | - sqs_message_body = json.dumps(array_of_messages) |
114 | | - message_len = len(sqs_message_body) |
115 | | - logger.info(f"total message length:{message_len}") |
116 | | - message_group_id = f"{file_key}_{created_at_formatted_string}" |
117 | | - if message_len < 256 * 1024: |
118 | | - sqs_client.send_message(QueueUrl=QUEUE_URL, MessageBody=sqs_message_body, MessageGroupId=message_group_id) |
119 | | - else: |
120 | | - logger.info("Message size exceeds 256 KB limit.Sending to sqs failed") |
| 116 | + for filename_key, events in filename_to_events_mapper.get_map().items(): |
| 117 | + sqs_message_body = json.dumps(events) |
| 118 | + message_len = len(sqs_message_body) |
| 119 | + logger.info(f"total message length:{message_len}") |
| 120 | + |
| 121 | + print(message_len) |
| 122 | + # TODO - assess likelihood of condition and adjust Kinesis batch accordingly |
| 123 | + if message_len < 256 * 1024: |
| 124 | + sqs_client.send_message(QueueUrl=QUEUE_URL, MessageBody=sqs_message_body, MessageGroupId=filename_key) |
| 125 | + else: |
| 126 | + logger.info("Message size exceeds 256 KB limit.Sending to sqs failed") |
121 | 127 |
|
122 | 128 |
|
123 | 129 | if __name__ == "__main__": |
|
0 commit comments