diff --git a/lambdas/services/bulk_upload_metadata_processor_service.py b/lambdas/services/bulk_upload_metadata_processor_service.py index b20b12d33..8aae45b8a 100644 --- a/lambdas/services/bulk_upload_metadata_processor_service.py +++ b/lambdas/services/bulk_upload_metadata_processor_service.py @@ -5,10 +5,10 @@ import urllib.parse from collections import defaultdict from datetime import datetime +from pathlib import Path import pydantic from botocore.exceptions import ClientError - from enums.lloyd_george_pre_process_format import LloydGeorgePreProcessFormat from enums.upload_status import UploadStatus from enums.virus_scan_result import VirusScanResult @@ -41,6 +41,7 @@ LGInvalidFilesException, VirusScanFailedException, ) +from utils.filename_utils import extract_nhs_number_from_bulk_upload_file_name from utils.lloyd_george_validator import validate_file_name from utils.utilities import get_virus_scan_service @@ -98,7 +99,6 @@ def process_metadata(self): logger.info("Finished parsing metadata") self.send_metadata_to_fifo_sqs(staging_metadata_list) - logger.info("Sent bulk upload metadata to SQS queue") self.copy_metadata_to_dated_folder() self.clear_temp_storage() @@ -195,6 +195,21 @@ def convert_to_sqs_metadata( **file.model_dump(), stored_file_name=stored_file_name ) + def create_expedite_sqs_metadata(self, key) -> StagingSqsMetadata: + """Build a single-patient SQS metadata payload for an expedite upload.""" + nhs_number, file_path, ods_code, scan_date = self.validate_expedite_file(key) + return StagingSqsMetadata( + nhs_number=nhs_number, + files=[ + BulkUploadQueueMetadata( + file_path=file_path, + stored_file_name=file_path, + gp_practice_code=ods_code, + scan_date=scan_date, + ) + ], + ) + @staticmethod def extract_patient_info(file_metadata: MetadataFile) -> tuple[str, str]: """Extract key patient identifiers.""" @@ -210,6 +225,55 @@ def validate_and_correct_filename(self, file_metadata: MetadataFile) -> str: file_metadata.file_path ) + def validate_expedite_file(self, s3_object_key: str): + """Validate and extract fields from an expedite S3 key. + This ensures the file represents a single document (1of1) and derives + the key fields required to build SQS metadata.""" + file_path = os.path.basename(s3_object_key) + + if not file_path.startswith("1of1"): + failure_msg = ( + "Failed processing expedite event due to file not being a 1of1" + ) + logger.error(failure_msg) + raise BulkUploadMetadataException(failure_msg) + + nhs_number = extract_nhs_number_from_bulk_upload_file_name(file_path)[0] + file_name = self.metadata_formatter_service.validate_record_filename( + s3_object_key + ) + ods_code = Path(s3_object_key).parent.name + scan_date = datetime.now().strftime("%Y-%m-%d") + return nhs_number, file_name, ods_code, scan_date + + def handle_expedite_event(self, event): + """Process S3 EventBridge expedite uploads: enforce virus scan, ensure 1of1, extract identifiers + and send metadata to SQS.""" + try: + unparsed_s3_object_key = event["detail"]["object"]["key"] + s3_object_key = urllib.parse.unquote_plus( + unparsed_s3_object_key, encoding="utf-8" + ) + + if s3_object_key.startswith("expedite/"): + logger.info("Processing file from expedite folder") + + self.enforce_virus_scanner(s3_object_key) + self.check_file_status(s3_object_key) + + sqs_metadata = [self.create_expedite_sqs_metadata(s3_object_key)] + + self.send_metadata_to_fifo_sqs(sqs_metadata) + logger.info("Successfully processed expedite event") + else: + failure_msg = f"Unexpected directory or file location received from EventBridge: {s3_object_key}" + logger.error(failure_msg) + raise BulkUploadMetadataException(failure_msg) + except KeyError as e: + failure_msg = f"Failed due to missing key: {str(e)}" + logger.error(failure_msg) + raise BulkUploadMetadataException(failure_msg) + def handle_invalid_filename( self, file_metadata: MetadataFile, @@ -241,6 +305,7 @@ def send_metadata_to_fifo_sqs( nhs_number=nhs_number, group_id=f"bulk_upload_{nhs_number}", ) + logger.info("Sent bulk upload metadata to sqs queue") def copy_metadata_to_dated_folder(self): """Copy processed metadata CSV into a dated archive folder in S3.""" @@ -277,7 +342,7 @@ def enforce_virus_scanner(self, file_key: str): try: result = self.s3_repo.check_file_tag_status_on_staging_bucket(file_key) - if(result != ""): + if result != "": logger.info("The file has been scanned before") return logger.info(f"Virus scan tag missing for {file_key}.") @@ -293,27 +358,6 @@ def enforce_virus_scanner(self, file_key: str): else: raise - def handle_expedite_event(self, event): - try: - key_string = event["detail"]["object"]["key"] - key = urllib.parse.unquote_plus(key_string, encoding="utf-8") - - if key.startswith("expedite/"): - logger.info("Processing file from expedite folder") - - self.enforce_virus_scanner(key) - self.check_file_status(key) - - return # To be added upon by ticket PRMP-540 - else: - failure_msg = f"Unexpected directory or file location received from EventBridge: {key_string}" - logger.error(failure_msg) - raise BulkUploadMetadataException(failure_msg) - except KeyError as e: - failure_msg = f"Failed due to missing key: {str(e)}" - logger.error(failure_msg) - raise BulkUploadMetadataException(failure_msg) - def get_formatter_service(raw_pre_format_type): try: diff --git a/lambdas/tests/unit/handlers/test_bulk_upload_metadata_processor_handler.py b/lambdas/tests/unit/handlers/test_bulk_upload_metadata_processor_handler.py index fe69f1633..913c51166 100644 --- a/lambdas/tests/unit/handlers/test_bulk_upload_metadata_processor_handler.py +++ b/lambdas/tests/unit/handlers/test_bulk_upload_metadata_processor_handler.py @@ -59,32 +59,25 @@ def test_metadata_processor_lambda_handler_s3_event_triggers_expedite( mock_metadata_service.process_metadata.assert_not_called() -def test_s3_event_with_expedite_key_processes( +def test_s3_event_with_non_expedite_key_is_rejected( set_env, context, mock_metadata_service, caplog ): - event = eventbridge_event_with_s3_key( - "expedite%2F1of1_Lloyd_George_Record_[John Michael SMITH]_[1234567890]_[15-05-1990].pdf" - ) + key_string = "uploads/1of1_Lloyd_George_Record_[John Michael SMITH]_[1234567890]_[15-05-1990].pdf" + event = eventbridge_event_with_s3_key(key_string) with caplog.at_level("INFO"): lambda_handler(event, context) - assert any( - "Handling EventBridge event from S3" in r.message for r in caplog.records - ) - mock_metadata_service.handle_expedite_event.assert_called_once_with(event) mock_metadata_service.process_metadata.assert_not_called() -def test_s3_event_with_non_expedite_key_is_rejected( - set_env, context, mock_metadata_service, caplog -): - key_string = "uploads/1of1_Lloyd_George_Record_[John Michael SMITH]_[1234567890]_[15-05-1990].pdf" - event = eventbridge_event_with_s3_key(key_string) +def test_s3_event_with_expedite_key_processes(set_env, context, mock_metadata_service): + event = eventbridge_event_with_s3_key( + "expedite%2F1of1_Lloyd_George_Record_[John Michael SMITH]_[1234567890]_[15-05-1990].pdf" + ) - with caplog.at_level("INFO"): - lambda_handler(event, context) + lambda_handler(event, context) - mock_metadata_service.handle_expedite_event.assert_called_once_with(event) mock_metadata_service.process_metadata.assert_not_called() + mock_metadata_service.handle_expedite_event.assert_called_once_with(event) diff --git a/lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py b/lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py index d9bf00824..7ea7f50d1 100644 --- a/lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py +++ b/lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py @@ -8,9 +8,8 @@ import pytest from botocore.exceptions import ClientError from enums.upload_status import UploadStatus -from freezegun import freeze_time - from enums.virus_scan_result import VirusScanResult +from freezegun import freeze_time from models.staging_metadata import ( METADATA_FILENAME, BulkUploadQueueMetadata, @@ -35,7 +34,7 @@ BulkUploadMetadataException, InvalidFileNameException, LGInvalidFilesException, - VirusScanNoResultException, VirusScanFailedException, + VirusScanFailedException, ) METADATA_FILE_DIR = "tests/unit/helpers/data/bulk_upload" @@ -616,6 +615,93 @@ def test_validate_and_correct_filename_sad_path( assert result == "corrected/path/file_corrected.pdf" +@freeze_time("2025-02-03T10:00:00") +def test_create_expedite_sqs_metadata_builds_expected_structure(test_service): + ods_code = "A12345" + key = f"expedite/{ods_code}/1of1_1234567890_record.pdf" + + result = test_service.create_expedite_sqs_metadata(key) + + assert result.nhs_number == "1234567890" + assert len(result.files) == 1 + item = result.files[0] + assert item.file_path == key + assert item.stored_file_name == key + assert item.gp_practice_code == ods_code + assert item.scan_date == "2025-02-03" + + +@freeze_time("2025-02-03T10:00:00") +def test_handle_expedite_event_happy_path_sends_sqs(test_service, mocker): + ods = "A12345" + key = f"expedite/{ods}/1of1_1234567890_record.pdf" + event = {"detail": {"object": {"key": key}}} + + mocker.patch.object(BulkUploadMetadataProcessorService, "enforce_virus_scanner") + mocker.patch.object(BulkUploadMetadataProcessorService, "check_file_status") + mocked_send = mocker.patch.object( + BulkUploadMetadataProcessorService, "send_metadata_to_fifo_sqs" + ) + + test_service.handle_expedite_event(event) + + mocked_send.assert_called_once() + args, _ = mocked_send.call_args + assert len(args) == 1 + sqs_payload_list = args[0] + sqs_payload = sqs_payload_list[0] + assert sqs_payload.nhs_number == "1234567890" + assert len(sqs_payload.files) == 1 + file_item = sqs_payload.files[0] + assert file_item.file_path == key + assert file_item.stored_file_name == key + assert file_item.gp_practice_code == ods + assert file_item.scan_date == "2025-02-03" + + +def test_handle_expedite_event_invalid_directory_raises(test_service, mocker): + mocked_send = mocker.patch.object( + BulkUploadMetadataProcessorService, "send_metadata_to_fifo_sqs" + ) + bad_key = "notexpedite/A12345/1234567890_record.pdf" + event = {"detail": {"object": {"key": bad_key}}} + + with pytest.raises(BulkUploadMetadataException) as exc: + test_service.handle_expedite_event(event) + + assert "Unexpected directory or file location" in str(exc.value) + mocked_send.assert_not_called() + + +def test_handle_expedite_event_missing_key_raises(test_service, mocker): + mocked_send = mocker.patch.object( + BulkUploadMetadataProcessorService, "send_metadata_to_fifo_sqs" + ) + event = {"detail": {}} + + with pytest.raises(BulkUploadMetadataException) as exc: + test_service.handle_expedite_event(event) + + assert "Failed due to missing key" in str(exc.value) + mocked_send.assert_not_called() + + +def test_handle_expedite_event_rejects_non_1of1(test_service, mocker): + mocker.patch.object(BulkUploadMetadataProcessorService, "enforce_virus_scanner") + mocker.patch.object(BulkUploadMetadataProcessorService, "check_file_status") + mocked_send = mocker.patch.object( + BulkUploadMetadataProcessorService, "send_metadata_to_fifo_sqs" + ) + key = "expedite/A12345/2of3_1234567890_record.pdf" + event = {"detail": {"object": {"key": key}}} + + with pytest.raises(BulkUploadMetadataException) as exc: + test_service.handle_expedite_event(event) + + assert "not being a 1of1" in str(exc.value) + mocked_send.assert_not_called() + + @pytest.fixture def mock_csv_content(): header = "FILEPATH,PAGE COUNT,GP-PRACTICE-CODE,NHS-NO,SECTION,SUB-SECTION,SCAN-DATE,SCAN-ID,USER-ID,UPLOAD" @@ -864,10 +950,34 @@ def test_no_remapping_logic( ] +@freeze_time("2025-02-03T10:00:00") +def test_validate_expedite_file_happy_path_returns_expected_tuple(test_service): + ods_code = "A12345" + key = f"expedite/{ods_code}/1of1_1234567890_record.pdf" + + nhs_number, file_name, extracted_ods, scan_date = ( + test_service.validate_expedite_file(key) + ) + + assert nhs_number == "1234567890" + assert file_name == key + assert extracted_ods == ods_code + assert scan_date == "2025-02-03" + + +def test_validate_expedite_file_rejects_non_1of1(test_service): + key = "expedite/A12345/2of3_1234567890_record.pdf" + with pytest.raises(BulkUploadMetadataException): + test_service.validate_expedite_file(key) + + def test_handle_expedite_event_calls_enforce_for_expedite_key(mocker, test_service): encoded_key = urllib.parse.quote_plus("expedite/folder/some file.pdf") event = {"detail": {"object": {"key": encoded_key}}} + mocker.patch.object( + BulkUploadMetadataProcessorService, "create_expedite_sqs_metadata" + ) mocked_enforce = mocker.patch.object(test_service, "enforce_virus_scanner") mocked_check_status = mocker.patch.object(test_service, "check_file_status") @@ -1011,6 +1121,7 @@ def test_enforce_virus_scanner_re_raises_unexpected_client_error(mocker, test_se mock_scan.assert_not_called() + def test_check_file_status_clean_does_nothing(mocker, test_service, caplog): file_key = "expedite/folder/file.pdf" mock_check = mocker.patch.object( @@ -1043,4 +1154,4 @@ def test_check_file_status_logs_issue_when_not_clean(mocker, test_service, caplo assert any( f"Found an issue with the file {file_key}." in record.msg for record in caplog.records - ) \ No newline at end of file + )