diff --git a/lambdas/services/bulk_upload_metadata_processor_service.py b/lambdas/services/bulk_upload_metadata_processor_service.py index 67385bce1..c74d847d0 100644 --- a/lambdas/services/bulk_upload_metadata_processor_service.py +++ b/lambdas/services/bulk_upload_metadata_processor_service.py @@ -366,7 +366,10 @@ def handle_invalid_filename( ods_code: str, failed_files: dict[tuple[str, str], list[BulkUploadQueueMetadata]], ) -> None: - """Handle invalid filenames by logging, storing failure in Dynamo, and tracking for review.""" + """Handle invalid filenames by logging, storing failure in Dynamo, and tracking for review. + Files that do not exist on the staging bucket are marked as failed only — + they are never added to the review queue since they cannot be reviewed. + """ logger.error( f"Failed to process {file_metadata.file_path} due to error: {error}", ) @@ -375,14 +378,21 @@ def handle_invalid_filename( file_metadata.file_path, ) failed_file.file_path = self.add_directory_path_to_file_path(file_metadata) - failed_files[(nhs_number, ods_code)].append(failed_file) + + file_exists = self.s3_repo.file_exists_on_staging_bucket(failed_file.file_path) + if not file_exists: + logger.info( + f"File {failed_file.file_path} not found on staging bucket. Will not send to review.", + ) + else: + failed_files[(nhs_number, ods_code)].append(failed_file) failed_entry = StagingSqsMetadata(nhs_number=nhs_number, files=[failed_file]) self.dynamo_repository.write_report_upload_to_dynamo( failed_entry, UploadStatus.FAILED, str(error), - sent_to_review=self.send_to_review_enabled, + sent_to_review=self.send_to_review_enabled and file_exists, ) def send_failed_files_to_review_queue( diff --git a/lambdas/services/bulk_upload_service.py b/lambdas/services/bulk_upload_service.py index fde314d8f..8f97b7953 100644 --- a/lambdas/services/bulk_upload_service.py +++ b/lambdas/services/bulk_upload_service.py @@ -138,7 +138,27 @@ def handle_sqs_message(self, message: dict): logger.error(e) raise InvalidMessageException(str(e)) - logger.info("SQS event is valid. Validating NHS number and file names") + logger.info( + "SQS event is valid. Checking files exist on staging bucket before further validation", + ) + try: + self.prepare_file_paths_and_check_existence(staging_metadata) + except S3FileNotFoundException as e: + logger.info(e) + logger.info( + f"One or more of the files is not accessible from S3 bucket for patient {staging_metadata.nhs_number}", + ) + logger.info("Will stop processing Lloyd George record for this patient") + self.dynamo_repository.write_report_upload_to_dynamo( + staging_metadata, + UploadStatus.FAILED, + "One or more of the files is not accessible from staging bucket", + patient_ods_code, + sent_to_review=False, + ) + return + + logger.info("File existence check passed. Validating NHS number and file names") try: validate_nhs_number(staging_metadata.nhs_number) @@ -154,9 +174,6 @@ def handle_sqs_message(self, message: dict): for file_metadata in staging_metadata.files: file_names.append(os.path.basename(file_metadata.stored_file_name)) file_metadata.scan_date = validate_scan_date(file_metadata.scan_date) - file_metadata.file_path = self.strip_leading_slash( - file_metadata.file_path, - ) validate_lg_file_names(file_names, staging_metadata.nhs_number) @@ -249,7 +266,6 @@ def handle_sqs_message(self, message: dict): ) try: - self.resolve_source_file_path(staging_metadata) self.bulk_upload_s3_repository.check_virus_result( staging_metadata, self.file_path_cache, @@ -305,20 +321,6 @@ def handle_sqs_message(self, message: dict): patient_ods_code, ) return - except S3FileNotFoundException as e: - logger.info(e) - logger.info( - f"One or more of the files is not accessible from S3 bucket for patient {staging_metadata.nhs_number}", - ) - logger.info("Will stop processing Lloyd George record for this patient") - - self.dynamo_repository.write_report_upload_to_dynamo( - staging_metadata, - UploadStatus.FAILED, - "One or more of the files is not accessible from staging bucket", - patient_ods_code, - ) - return logger.info("File validation complete. Initialising transaction") @@ -383,41 +385,42 @@ def handle_sqs_message(self, message: dict): f"Message sent to stitching queue for patient {staging_metadata.nhs_number}", ) - def resolve_source_file_path(self, staging_metadata: StagingSqsMetadata): - sample_file_path = staging_metadata.files[0].file_path - - if not contains_accent_char(sample_file_path): - logger.info("No accented character detected in file path.") - self.file_path_cache = { - file.file_path: file.file_path for file in staging_metadata.files - } - return - - logger.info("Detected accented character in file path.") - logger.info("Will take special steps to handle file names.") - + def prepare_file_paths_and_check_existence( + self, + staging_metadata: StagingSqsMetadata, + ): resolved_file_paths = {} - for file in staging_metadata.files: - file_path_in_metadata = file.file_path - file_path_in_nfc_form = convert_to_nfc_form(file_path_in_metadata) - file_path_in_nfd_form = convert_to_nfd_form(file_path_in_metadata) - - if self.bulk_upload_s3_repository.file_exists_on_staging_bucket( - file_path_in_nfc_form, - ): - resolved_file_paths[file_path_in_metadata] = file_path_in_nfc_form - elif self.bulk_upload_s3_repository.file_exists_on_staging_bucket( - file_path_in_nfd_form, - ): - resolved_file_paths[file_path_in_metadata] = file_path_in_nfd_form + for file_metadata in staging_metadata.files: + file_metadata.file_path = self.strip_leading_slash(file_metadata.file_path) + file_path = file_metadata.file_path + + if contains_accent_char(file_path): + nfc_path = convert_to_nfc_form(file_path) + nfd_path = convert_to_nfd_form(file_path) + if self.bulk_upload_s3_repository.file_exists_on_staging_bucket( + nfc_path, + ): + resolved_file_paths[file_path] = nfc_path + elif self.bulk_upload_s3_repository.file_exists_on_staging_bucket( + nfd_path, + ): + resolved_file_paths[file_path] = nfd_path + else: + logger.info( + "No file matching the provided file path was found on S3 bucket", + ) + logger.info("Please check whether files are named correctly") + raise S3FileNotFoundException( + f"Failed to access file {file_path}", + ) else: - logger.info( - "No file matching the provided file path was found on S3 bucket", - ) - logger.info("Please check whether files are named correctly") - raise S3FileNotFoundException( - f"Failed to access file {sample_file_path}", - ) + if not self.bulk_upload_s3_repository.file_exists_on_staging_bucket( + file_path, + ): + raise S3FileNotFoundException( + f"Failed to access file {file_path}", + ) + resolved_file_paths[file_path] = file_path self.file_path_cache = resolved_file_paths diff --git a/lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py b/lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py index f22372a3f..a4fa7127d 100644 --- a/lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py +++ b/lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py @@ -585,6 +585,8 @@ def test_handle_invalid_filename_writes_failed_entry_to_dynamo( failed_files = defaultdict(list) error = InvalidFileNameException("Invalid filename format") + test_service.s3_repo.file_exists_on_staging_bucket.return_value = True + mock_staging_metadata = mocker.patch( "services.bulk_upload_metadata_processor_service.StagingSqsMetadata", ) @@ -636,6 +638,9 @@ def test_handle_invalid_filename_sets_sent_to_review_true_when_review_enabled( failed_files = defaultdict(list) error = InvalidFileNameException("Invalid filename format") + test_service_with_review_enabled.s3_repo.file_exists_on_staging_bucket.return_value = ( + True + ) mock_write = mocker.patch.object( test_service_with_review_enabled.dynamo_repository, "write_report_upload_to_dynamo", @@ -656,6 +661,43 @@ def test_handle_invalid_filename_sets_sent_to_review_true_when_review_enabled( str(error), sent_to_review=True, ) + assert (nhs_number, ods_code) in failed_files + + +def test_handle_invalid_filename_does_not_send_to_review_when_file_missing( + mocker, + test_service_with_review_enabled, + base_metadata_file, +): + nhs_number = "1234567890" + ods_code = "Y12345" + failed_files = defaultdict(list) + error = InvalidFileNameException("Invalid filename format") + + test_service_with_review_enabled.s3_repo.file_exists_on_staging_bucket.return_value = ( + False + ) + mock_write = mocker.patch.object( + test_service_with_review_enabled.dynamo_repository, + "write_report_upload_to_dynamo", + ) + mocker.patch("services.bulk_upload_metadata_processor_service.StagingSqsMetadata") + + test_service_with_review_enabled.handle_invalid_filename( + base_metadata_file, + error, + nhs_number, + ods_code, + failed_files, + ) + + mock_write.assert_called_once_with( + mocker.ANY, + UploadStatus.FAILED, + str(error), + sent_to_review=False, + ) + assert (nhs_number, ods_code) not in failed_files def test_csv_to_sqs_metadata_sends_failed_files_to_review_queue_when_enabled( @@ -676,6 +718,9 @@ def test_csv_to_sqs_metadata_sends_failed_files_to_review_queue_when_enabled( "validate_and_correct_filename", side_effect=InvalidFileNameException("invalid"), ) + test_service_with_review_enabled.s3_repo.file_exists_on_staging_bucket.return_value = ( + True + ) result = test_service_with_review_enabled.csv_to_sqs_metadata(MOCK_METADATA_CSV) diff --git a/lambdas/tests/unit/services/test_bulk_upload_service.py b/lambdas/tests/unit/services/test_bulk_upload_service.py index 9476dfc52..0ff2196e3 100644 --- a/lambdas/tests/unit/services/test_bulk_upload_service.py +++ b/lambdas/tests/unit/services/test_bulk_upload_service.py @@ -511,35 +511,35 @@ def test_handle_sqs_message_report_failure_when_document_is_infected( repo_under_test.sqs_repository.send_message_to_pdf_stitching_queue.assert_not_called() -def test_handle_sqs_message_report_failure_when_document_not_exist( - repo_under_test, +def test_handle_sqs_message_fails_before_nhs_validation_when_file_missing( + repo_with_review_enabled, set_env, mocker, - mock_uuid, - mock_validate_files, - mock_check_virus_result, - mock_pds_service, - mock_pds_validation_strict, - mock_ods_validation, ): TEST_STAGING_METADATA.retries = 0 - repo_under_test.bulk_upload_s3_repository.check_virus_result.side_effect = ( - S3FileNotFoundException + repo_with_review_enabled.bulk_upload_s3_repository.file_exists_on_staging_bucket.return_value = ( + False ) - mock_report_upload_failure = mocker.patch.object( - repo_under_test.dynamo_repository, + mock_pds = mocker.patch( + "services.bulk_upload_service.getting_patient_info_from_pds", + ) + mock_report_failure = mocker.patch.object( + repo_with_review_enabled.dynamo_repository, "write_report_upload_to_dynamo", ) - repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE) + repo_with_review_enabled.handle_sqs_message(message=TEST_SQS_MESSAGE) - mock_report_upload_failure.assert_called_with( + mock_pds.assert_not_called() + mock_report_failure.assert_called_with( TEST_STAGING_METADATA, UploadStatus.FAILED, "One or more of the files is not accessible from staging bucket", - "Y12345", + "", + sent_to_review=False, ) - repo_under_test.sqs_repository.send_message_to_pdf_stitching_queue.assert_not_called() + repo_with_review_enabled.sqs_repository.send_message_to_pdf_stitching_queue.assert_not_called() + repo_with_review_enabled.sqs_repository.send_message_to_review_queue.assert_not_called() def test_handle_sqs_message_calls_report_upload_successful_when_patient_is_formally_deceased( @@ -855,19 +855,38 @@ def test_reports_failure_when_max_retries_reached( repo_under_test.dynamo_repository.write_report_upload_to_dynamo.assert_called() -def test_resolve_source_file_path_when_filenames_dont_have_accented_chars( +def test_prepare_file_paths_and_check_existence_when_filenames_dont_have_accented_chars( set_env, repo_under_test, ): + repo_under_test.bulk_upload_s3_repository.file_exists_on_staging_bucket.return_value = ( + True + ) expected = { file.file_path: file.file_path.lstrip("/") for file in TEST_STAGING_METADATA.files } - repo_under_test.resolve_source_file_path(TEST_STAGING_METADATA) + repo_under_test.prepare_file_paths_and_check_existence(TEST_STAGING_METADATA) actual = repo_under_test.file_path_cache assert actual == expected + assert ( + repo_under_test.bulk_upload_s3_repository.file_exists_on_staging_bucket.call_count + == len(TEST_STAGING_METADATA.files) + ) + + +def test_prepare_file_paths_and_check_existence_raises_S3FileNotFoundException_for_missing_non_accented_file( + set_env, + repo_under_test, +): + repo_under_test.bulk_upload_s3_repository.file_exists_on_staging_bucket.return_value = ( + False + ) + + with pytest.raises(S3FileNotFoundException): + repo_under_test.prepare_file_paths_and_check_existence(TEST_STAGING_METADATA) @pytest.mark.parametrize( @@ -880,7 +899,7 @@ def test_resolve_source_file_path_when_filenames_dont_have_accented_chars( ], ids=["NFC --> NFC", "NFC --> NFD", "NFD --> NFC", "NFD --> NFD"], ) -def test_resolve_source_file_path_when_filenames_have_accented_chars( +def test_prepare_file_paths_and_check_existence_when_filenames_have_accented_chars( set_env, mocker, patient_name_on_s3, @@ -901,13 +920,13 @@ def test_resolve_source_file_path_when_filenames_have_accented_chars( test_staging_metadata = build_test_staging_metadata_from_patient_name( patient_name_in_metadata_file, ) - repo_under_test.resolve_source_file_path(test_staging_metadata) + repo_under_test.prepare_file_paths_and_check_existence(test_staging_metadata) actual = repo_under_test.file_path_cache assert actual == expected_cache -def test_resolves_source_file_path_raise_S3FileNotFoundException_if_filename_cant_match( +def test_prepare_file_paths_and_check_existence_raises_S3FileNotFoundException_if_accented_filename_cant_match( set_env, mocker, repo_under_test, @@ -924,7 +943,7 @@ def test_resolves_source_file_path_raise_S3FileNotFoundException_if_filename_can ) with pytest.raises(S3FileNotFoundException): - repo_under_test.resolve_source_file_path(test_staging_metadata) + repo_under_test.prepare_file_paths_and_check_existence(test_staging_metadata) def test_create_lg_records_and_copy_files(set_env, mocker, mock_uuid, repo_under_test): @@ -935,8 +954,11 @@ def test_create_lg_records_and_copy_files(set_env, mocker, mock_uuid, repo_under repo_under_test.bulk_upload_s3_repository.copy_to_lg_bucket = mocker.MagicMock( return_value=MOCK_COPY_OBJECT_RESPONSE, ) + repo_under_test.bulk_upload_s3_repository.file_exists_on_staging_bucket.return_value = ( + True + ) TEST_STAGING_METADATA.retries = 0 - repo_under_test.resolve_source_file_path(TEST_STAGING_METADATA) + repo_under_test.prepare_file_paths_and_check_existence(TEST_STAGING_METADATA) repo_under_test.create_lg_records_and_copy_files( TEST_STAGING_METADATA, @@ -1343,8 +1365,8 @@ def test_handle_sqs_message_report_failure_when_pdf_integrity_check_file_not_fou repo_under_test.bulk_upload_s3_repository, "remove_ingested_file_from_source_bucket", ) - repo_under_test.bulk_upload_s3_repository.check_pdf_integrity.side_effect = ( - S3FileNotFoundException("Failed to access file") + repo_under_test.bulk_upload_s3_repository.file_exists_on_staging_bucket.side_effect = S3FileNotFoundException( + "Failed to access file", ) repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE) @@ -1353,7 +1375,8 @@ def test_handle_sqs_message_report_failure_when_pdf_integrity_check_file_not_fou TEST_STAGING_METADATA, UploadStatus.FAILED, "One or more of the files is not accessible from staging bucket", - "Y12345", + "", + sent_to_review=False, ) mock_create_lg_records_and_copy_files.assert_not_called() mock_remove_ingested_file_from_source_bucket.assert_not_called() @@ -1503,7 +1526,7 @@ def test_does_not_send_to_review_queue_when_s3_file_not_found( ): mocker.patch.object( repo_with_review_enabled.bulk_upload_s3_repository, - "check_virus_result", + "file_exists_on_staging_bucket", side_effect=S3FileNotFoundException("File not found"), )