Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions lambdas/services/bulk_upload_metadata_processor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,10 @@ def handle_invalid_filename(
ods_code: str,
failed_files: dict[tuple[str, str], list[BulkUploadQueueMetadata]],
) -> None:
"""Handle invalid filenames by logging, storing failure in Dynamo, and tracking for review."""
"""Handle invalid filenames by logging, storing failure in Dynamo, and tracking for review.
Files that do not exist on the staging bucket are marked as failed only —
they are never added to the review queue since they cannot be reviewed.
"""
logger.error(
f"Failed to process {file_metadata.file_path} due to error: {error}",
)
Expand All @@ -375,14 +378,21 @@ def handle_invalid_filename(
file_metadata.file_path,
)
failed_file.file_path = self.add_directory_path_to_file_path(file_metadata)
failed_files[(nhs_number, ods_code)].append(failed_file)

file_exists = self.s3_repo.file_exists_on_staging_bucket(failed_file.file_path)
if not file_exists:
logger.info(
f"File {failed_file.file_path} not found on staging bucket. Will not send to review.",
)
else:
failed_files[(nhs_number, ods_code)].append(failed_file)

failed_entry = StagingSqsMetadata(nhs_number=nhs_number, files=[failed_file])
self.dynamo_repository.write_report_upload_to_dynamo(
failed_entry,
UploadStatus.FAILED,
str(error),
sent_to_review=self.send_to_review_enabled,
sent_to_review=self.send_to_review_enabled and file_exists,
)

def send_failed_files_to_review_queue(
Expand Down
107 changes: 55 additions & 52 deletions lambdas/services/bulk_upload_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,27 @@ def handle_sqs_message(self, message: dict):
logger.error(e)
raise InvalidMessageException(str(e))

logger.info("SQS event is valid. Validating NHS number and file names")
logger.info(
"SQS event is valid. Checking files exist on staging bucket before further validation",
)
try:
self.prepare_file_paths_and_check_existence(staging_metadata)
except S3FileNotFoundException as e:
logger.info(e)
logger.info(
f"One or more of the files is not accessible from S3 bucket for patient {staging_metadata.nhs_number}",
)
logger.info("Will stop processing Lloyd George record for this patient")
self.dynamo_repository.write_report_upload_to_dynamo(
staging_metadata,
UploadStatus.FAILED,
"One or more of the files is not accessible from staging bucket",
patient_ods_code,
sent_to_review=False,
)
return

logger.info("File existence check passed. Validating NHS number and file names")

try:
validate_nhs_number(staging_metadata.nhs_number)
Expand All @@ -154,9 +174,6 @@ def handle_sqs_message(self, message: dict):
for file_metadata in staging_metadata.files:
file_names.append(os.path.basename(file_metadata.stored_file_name))
file_metadata.scan_date = validate_scan_date(file_metadata.scan_date)
file_metadata.file_path = self.strip_leading_slash(
file_metadata.file_path,
)

validate_lg_file_names(file_names, staging_metadata.nhs_number)

Expand Down Expand Up @@ -249,7 +266,6 @@ def handle_sqs_message(self, message: dict):
)

try:
self.resolve_source_file_path(staging_metadata)
self.bulk_upload_s3_repository.check_virus_result(
staging_metadata,
self.file_path_cache,
Expand Down Expand Up @@ -305,20 +321,6 @@ def handle_sqs_message(self, message: dict):
patient_ods_code,
)
return
except S3FileNotFoundException as e:
logger.info(e)
logger.info(
f"One or more of the files is not accessible from S3 bucket for patient {staging_metadata.nhs_number}",
)
logger.info("Will stop processing Lloyd George record for this patient")

self.dynamo_repository.write_report_upload_to_dynamo(
staging_metadata,
UploadStatus.FAILED,
"One or more of the files is not accessible from staging bucket",
patient_ods_code,
)
return

logger.info("File validation complete. Initialising transaction")

Expand Down Expand Up @@ -383,41 +385,42 @@ def handle_sqs_message(self, message: dict):
f"Message sent to stitching queue for patient {staging_metadata.nhs_number}",
)

def resolve_source_file_path(self, staging_metadata: StagingSqsMetadata):
sample_file_path = staging_metadata.files[0].file_path

if not contains_accent_char(sample_file_path):
logger.info("No accented character detected in file path.")
self.file_path_cache = {
file.file_path: file.file_path for file in staging_metadata.files
}
return

logger.info("Detected accented character in file path.")
logger.info("Will take special steps to handle file names.")

def prepare_file_paths_and_check_existence(
self,
staging_metadata: StagingSqsMetadata,
):
resolved_file_paths = {}
for file in staging_metadata.files:
file_path_in_metadata = file.file_path
file_path_in_nfc_form = convert_to_nfc_form(file_path_in_metadata)
file_path_in_nfd_form = convert_to_nfd_form(file_path_in_metadata)

if self.bulk_upload_s3_repository.file_exists_on_staging_bucket(
file_path_in_nfc_form,
):
resolved_file_paths[file_path_in_metadata] = file_path_in_nfc_form
elif self.bulk_upload_s3_repository.file_exists_on_staging_bucket(
file_path_in_nfd_form,
):
resolved_file_paths[file_path_in_metadata] = file_path_in_nfd_form
for file_metadata in staging_metadata.files:
file_metadata.file_path = self.strip_leading_slash(file_metadata.file_path)
file_path = file_metadata.file_path

if contains_accent_char(file_path):
nfc_path = convert_to_nfc_form(file_path)
nfd_path = convert_to_nfd_form(file_path)
if self.bulk_upload_s3_repository.file_exists_on_staging_bucket(
nfc_path,
):
resolved_file_paths[file_path] = nfc_path
elif self.bulk_upload_s3_repository.file_exists_on_staging_bucket(
nfd_path,
):
resolved_file_paths[file_path] = nfd_path
else:
logger.info(
"No file matching the provided file path was found on S3 bucket",
)
logger.info("Please check whether files are named correctly")
raise S3FileNotFoundException(
f"Failed to access file {file_path}",
)
else:
logger.info(
"No file matching the provided file path was found on S3 bucket",
)
logger.info("Please check whether files are named correctly")
raise S3FileNotFoundException(
f"Failed to access file {sample_file_path}",
)
if not self.bulk_upload_s3_repository.file_exists_on_staging_bucket(
file_path,
):
raise S3FileNotFoundException(
f"Failed to access file {file_path}",
)
resolved_file_paths[file_path] = file_path

self.file_path_cache = resolved_file_paths

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,8 @@ def test_handle_invalid_filename_writes_failed_entry_to_dynamo(
failed_files = defaultdict(list)
error = InvalidFileNameException("Invalid filename format")

test_service.s3_repo.file_exists_on_staging_bucket.return_value = True

mock_staging_metadata = mocker.patch(
"services.bulk_upload_metadata_processor_service.StagingSqsMetadata",
)
Expand Down Expand Up @@ -636,6 +638,9 @@ def test_handle_invalid_filename_sets_sent_to_review_true_when_review_enabled(
failed_files = defaultdict(list)
error = InvalidFileNameException("Invalid filename format")

test_service_with_review_enabled.s3_repo.file_exists_on_staging_bucket.return_value = (
True
)
mock_write = mocker.patch.object(
test_service_with_review_enabled.dynamo_repository,
"write_report_upload_to_dynamo",
Expand All @@ -656,6 +661,43 @@ def test_handle_invalid_filename_sets_sent_to_review_true_when_review_enabled(
str(error),
sent_to_review=True,
)
assert (nhs_number, ods_code) in failed_files


def test_handle_invalid_filename_does_not_send_to_review_when_file_missing(
mocker,
test_service_with_review_enabled,
base_metadata_file,
):
nhs_number = "1234567890"
ods_code = "Y12345"
failed_files = defaultdict(list)
error = InvalidFileNameException("Invalid filename format")

test_service_with_review_enabled.s3_repo.file_exists_on_staging_bucket.return_value = (
False
)
mock_write = mocker.patch.object(
test_service_with_review_enabled.dynamo_repository,
"write_report_upload_to_dynamo",
)
mocker.patch("services.bulk_upload_metadata_processor_service.StagingSqsMetadata")

test_service_with_review_enabled.handle_invalid_filename(
base_metadata_file,
error,
nhs_number,
ods_code,
failed_files,
)

mock_write.assert_called_once_with(
mocker.ANY,
UploadStatus.FAILED,
str(error),
sent_to_review=False,
)
assert (nhs_number, ods_code) not in failed_files


def test_csv_to_sqs_metadata_sends_failed_files_to_review_queue_when_enabled(
Expand All @@ -676,6 +718,9 @@ def test_csv_to_sqs_metadata_sends_failed_files_to_review_queue_when_enabled(
"validate_and_correct_filename",
side_effect=InvalidFileNameException("invalid"),
)
test_service_with_review_enabled.s3_repo.file_exists_on_staging_bucket.return_value = (
True
)

result = test_service_with_review_enabled.csv_to_sqs_metadata(MOCK_METADATA_CSV)

Expand Down
Loading
Loading