|
3 | 3 | import shutil |
4 | 4 | import tempfile |
5 | 5 | import urllib.parse |
| 6 | +import uuid |
6 | 7 | from collections import defaultdict |
7 | 8 | from datetime import datetime |
8 | 9 | from pathlib import Path |
@@ -58,6 +59,7 @@ def __init__( |
58 | 59 | ): |
59 | 60 | self.staging_bucket_name = os.getenv("STAGING_STORE_BUCKET_NAME") |
60 | 61 | self.metadata_queue_url = os.getenv("METADATA_SQS_QUEUE_URL") |
| 62 | + self.expedite_queue_url = os.getenv("EXPEDITE_SQS_QUEUE_URL") |
61 | 63 | self.s3_service = S3Service() |
62 | 64 | self.sqs_service = SQSService() |
63 | 65 | self.dynamo_repository = BulkUploadDynamoRepository() |
@@ -261,9 +263,9 @@ def handle_expedite_event(self, event): |
261 | 263 | self.enforce_virus_scanner(s3_object_key) |
262 | 264 | self.check_file_status(s3_object_key) |
263 | 265 |
|
264 | | - sqs_metadata = [self.create_expedite_sqs_metadata(s3_object_key)] |
| 266 | + sqs_metadata = self.create_expedite_sqs_metadata(s3_object_key) |
265 | 267 |
|
266 | | - self.send_metadata_to_fifo_sqs(sqs_metadata) |
| 268 | + self.send_metadata_to_expedite_sqs(sqs_metadata) |
267 | 269 | logger.info("Successfully processed expedite event") |
268 | 270 | else: |
269 | 271 | failure_msg = f"Unexpected directory or file location received from EventBridge: {s3_object_key}" |
@@ -307,6 +309,20 @@ def send_metadata_to_fifo_sqs( |
307 | 309 | ) |
308 | 310 | logger.info("Sent bulk upload metadata to sqs queue") |
309 | 311 |
|
| 312 | + def send_metadata_to_expedite_sqs( |
| 313 | + self, staging_sqs_metadata: StagingSqsMetadata |
| 314 | + ) -> None: |
| 315 | + """Send validated metadata entries to SQS expedite queue.""" |
| 316 | + sqs_group_id = f"bulk_upload_{uuid.uuid4()}" |
| 317 | + nhs_number = staging_sqs_metadata.nhs_number |
| 318 | + logger.info(f"Sending metadata for patientId: {nhs_number}") |
| 319 | + self.sqs_service.send_message_with_nhs_number_attr_fifo( |
| 320 | + queue_url=self.expedite_queue_url, |
| 321 | + message_body=staging_sqs_metadata.model_dump_json(by_alias=True), |
| 322 | + nhs_number=nhs_number, |
| 323 | + group_id=sqs_group_id, |
| 324 | + ) |
| 325 | + |
310 | 326 | def copy_metadata_to_dated_folder(self): |
311 | 327 | """Copy processed metadata CSV into a dated archive folder in S3.""" |
312 | 328 | logger.info("Copying metadata CSV to dated folder") |
|
0 commit comments