Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e93785a
[PRMP-540] - create sqs metadata for expedite files
MohammadIqbalAD-NHS Nov 5, 2025
4920d17
[PRMP-540] - Validate expedite event and file
MohammadIqbalAD-NHS Nov 19, 2025
17ca800
[PRMP-540] - Correct scan_date format
MohammadIqbalAD-NHS Nov 19, 2025
965245d
[PRMP-540] - Refactor handle_expedite_event()
MohammadIqbalAD-NHS Nov 19, 2025
e1e8365
[PRMP-540] - Refactor handle_expedite_event()
MohammadIqbalAD-NHS Nov 19, 2025
a61a3a6
[PRMP-540] - Fix unit tests
MohammadIqbalAD-NHS Nov 19, 2025
138540a
[PRMP-540] - Refactor handle_expedite_event()
MohammadIqbalAD-NHS Nov 19, 2025
7df91eb
[PRMP-540] - Raise exception if file not 1of1
MohammadIqbalAD-NHS Nov 20, 2025
cb198bf
Merge branch 'main' into PRMP-540
MohammadIqbalAD-NHS Nov 20, 2025
1f535a0
[PRMP-540] - Refactor
MohammadIqbalAD-NHS Nov 20, 2025
0b7bde3
[PRMP-540] - Fix unit tests and formatting
MohammadIqbalAD-NHS Nov 20, 2025
222b4b1
[PRMP-540] - Remove unused return value
MohammadIqbalAD-NHS Nov 20, 2025
cbcdd30
[PRMP-540] - Address SonarCloud suggestion
MohammadIqbalAD-NHS Nov 20, 2025
4aa85e6
[PRMP-540] - format and describe handle_expedite_event()
MohammadIqbalAD-NHS Nov 20, 2025
9ae243f
[PRMP-540] - Address PR comments
MohammadIqbalAD-NHS Nov 20, 2025
76debbf
[PRMP-540] - Format and rename variables
MohammadIqbalAD-NHS Nov 20, 2025
5bf16d6
[PRMP-540] - Address PR comment
MohammadIqbalAD-NHS Nov 21, 2025
f61e9e3
[PRMP-540] - Fix bug
MohammadIqbalAD-NHS Nov 21, 2025
ca40e80
Merge remote-tracking branch 'origin/main' into PRMP-540
MohammadIqbalAD-NHS Nov 27, 2025
c71ce42
[PRMP-540] - Fix unit test post-merge conflicts
MohammadIqbalAD-NHS Nov 27, 2025
3d729e7
[PRMP-540] - Fix unit tests
MohammadIqbalAD-NHS Nov 27, 2025
1e68139
[PRMP-540] - Fix test name
MohammadIqbalAD-NHS Nov 27, 2025
b69495b
[PRMP-540] - make format
MohammadIqbalAD-NHS Nov 27, 2025
87986a3
[PRMP-540] - Fix line too long error
MohammadIqbalAD-NHS Nov 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 68 additions & 24 deletions lambdas/services/bulk_upload_metadata_processor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import urllib.parse
from collections import defaultdict
from datetime import datetime
from pathlib import Path

import pydantic
from botocore.exceptions import ClientError

from enums.lloyd_george_pre_process_format import LloydGeorgePreProcessFormat
from enums.upload_status import UploadStatus
from enums.virus_scan_result import VirusScanResult
Expand Down Expand Up @@ -41,6 +41,7 @@
LGInvalidFilesException,
VirusScanFailedException,
)
from utils.filename_utils import extract_nhs_number_from_bulk_upload_file_name
from utils.lloyd_george_validator import validate_file_name
from utils.utilities import get_virus_scan_service

Expand Down Expand Up @@ -98,7 +99,6 @@ def process_metadata(self):
logger.info("Finished parsing metadata")

self.send_metadata_to_fifo_sqs(staging_metadata_list)
logger.info("Sent bulk upload metadata to SQS queue")

self.copy_metadata_to_dated_folder()
self.clear_temp_storage()
Expand Down Expand Up @@ -195,6 +195,21 @@ def convert_to_sqs_metadata(
**file.model_dump(), stored_file_name=stored_file_name
)

def create_expedite_sqs_metadata(self, key) -> StagingSqsMetadata:
"""Build a single-patient SQS metadata payload for an expedite upload."""
nhs_number, file_path, ods_code, scan_date = self.validate_expedite_file(key)
return StagingSqsMetadata(
nhs_number=nhs_number,
files=[
BulkUploadQueueMetadata(
file_path=file_path,
stored_file_name=file_path,
gp_practice_code=ods_code,
scan_date=scan_date,
)
],
)

@staticmethod
def extract_patient_info(file_metadata: MetadataFile) -> tuple[str, str]:
"""Extract key patient identifiers."""
Expand All @@ -210,6 +225,55 @@ def validate_and_correct_filename(self, file_metadata: MetadataFile) -> str:
file_metadata.file_path
)

def validate_expedite_file(self, s3_object_key: str):
"""Validate and extract fields from an expedite S3 key.
This ensures the file represents a single document (1of1) and derives
the key fields required to build SQS metadata."""
file_path = os.path.basename(s3_object_key)

if not file_path.startswith("1of1"):
failure_msg = (
"Failed processing expedite event due to file not being a 1of1"
)
logger.error(failure_msg)
raise BulkUploadMetadataException(failure_msg)

nhs_number = extract_nhs_number_from_bulk_upload_file_name(file_path)[0]
file_name = self.metadata_formatter_service.validate_record_filename(
s3_object_key
)
ods_code = Path(s3_object_key).parent.name
scan_date = datetime.now().strftime("%Y-%m-%d")
return nhs_number, file_name, ods_code, scan_date

def handle_expedite_event(self, event):
"""Process S3 EventBridge expedite uploads: enforce virus scan, ensure 1of1, extract identifiers
and send metadata to SQS."""
try:
unparsed_s3_object_key = event["detail"]["object"]["key"]
s3_object_key = urllib.parse.unquote_plus(
unparsed_s3_object_key, encoding="utf-8"
)

if s3_object_key.startswith("expedite/"):
logger.info("Processing file from expedite folder")

self.enforce_virus_scanner(s3_object_key)
self.check_file_status(s3_object_key)

sqs_metadata = [self.create_expedite_sqs_metadata(s3_object_key)]

self.send_metadata_to_fifo_sqs(sqs_metadata)
logger.info("Successfully processed expedite event")
else:
failure_msg = f"Unexpected directory or file location received from EventBridge: {s3_object_key}"
logger.error(failure_msg)
raise BulkUploadMetadataException(failure_msg)
except KeyError as e:
failure_msg = f"Failed due to missing key: {str(e)}"
logger.error(failure_msg)
raise BulkUploadMetadataException(failure_msg)

def handle_invalid_filename(
self,
file_metadata: MetadataFile,
Expand Down Expand Up @@ -241,6 +305,7 @@ def send_metadata_to_fifo_sqs(
nhs_number=nhs_number,
group_id=f"bulk_upload_{nhs_number}",
)
logger.info("Sent bulk upload metadata to sqs queue")

def copy_metadata_to_dated_folder(self):
"""Copy processed metadata CSV into a dated archive folder in S3."""
Expand Down Expand Up @@ -277,7 +342,7 @@ def enforce_virus_scanner(self, file_key: str):

try:
result = self.s3_repo.check_file_tag_status_on_staging_bucket(file_key)
if(result != ""):
if result != "":
logger.info("The file has been scanned before")
return
logger.info(f"Virus scan tag missing for {file_key}.")
Expand All @@ -293,27 +358,6 @@ def enforce_virus_scanner(self, file_key: str):
else:
raise

def handle_expedite_event(self, event):
try:
key_string = event["detail"]["object"]["key"]
key = urllib.parse.unquote_plus(key_string, encoding="utf-8")

if key.startswith("expedite/"):
logger.info("Processing file from expedite folder")

self.enforce_virus_scanner(key)
self.check_file_status(key)

return # To be added upon by ticket PRMP-540
else:
failure_msg = f"Unexpected directory or file location received from EventBridge: {key_string}"
logger.error(failure_msg)
raise BulkUploadMetadataException(failure_msg)
except KeyError as e:
failure_msg = f"Failed due to missing key: {str(e)}"
logger.error(failure_msg)
raise BulkUploadMetadataException(failure_msg)


def get_formatter_service(raw_pre_format_type):
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,32 +59,25 @@ def test_metadata_processor_lambda_handler_s3_event_triggers_expedite(
mock_metadata_service.process_metadata.assert_not_called()


def test_s3_event_with_expedite_key_processes(
def test_s3_event_with_non_expedite_key_is_rejected(
set_env, context, mock_metadata_service, caplog
):
event = eventbridge_event_with_s3_key(
"expedite%2F1of1_Lloyd_George_Record_[John Michael SMITH]_[1234567890]_[15-05-1990].pdf"
)
key_string = "uploads/1of1_Lloyd_George_Record_[John Michael SMITH]_[1234567890]_[15-05-1990].pdf"
event = eventbridge_event_with_s3_key(key_string)

with caplog.at_level("INFO"):
lambda_handler(event, context)

assert any(
"Handling EventBridge event from S3" in r.message for r in caplog.records
)

mock_metadata_service.handle_expedite_event.assert_called_once_with(event)
mock_metadata_service.process_metadata.assert_not_called()


def test_s3_event_with_non_expedite_key_is_rejected(
set_env, context, mock_metadata_service, caplog
):
key_string = "uploads/1of1_Lloyd_George_Record_[John Michael SMITH]_[1234567890]_[15-05-1990].pdf"
event = eventbridge_event_with_s3_key(key_string)
def test_s3_event_with_expedite_key_processes(set_env, context, mock_metadata_service):
event = eventbridge_event_with_s3_key(
"expedite%2F1of1_Lloyd_George_Record_[John Michael SMITH]_[1234567890]_[15-05-1990].pdf"
)

with caplog.at_level("INFO"):
lambda_handler(event, context)
lambda_handler(event, context)

mock_metadata_service.handle_expedite_event.assert_called_once_with(event)
mock_metadata_service.process_metadata.assert_not_called()
mock_metadata_service.handle_expedite_event.assert_called_once_with(event)
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
import pytest
from botocore.exceptions import ClientError
from enums.upload_status import UploadStatus
from freezegun import freeze_time

from enums.virus_scan_result import VirusScanResult
from freezegun import freeze_time
from models.staging_metadata import (
METADATA_FILENAME,
BulkUploadQueueMetadata,
Expand All @@ -35,7 +34,7 @@
BulkUploadMetadataException,
InvalidFileNameException,
LGInvalidFilesException,
VirusScanNoResultException, VirusScanFailedException,
VirusScanFailedException,
)

METADATA_FILE_DIR = "tests/unit/helpers/data/bulk_upload"
Expand Down Expand Up @@ -616,6 +615,93 @@ def test_validate_and_correct_filename_sad_path(
assert result == "corrected/path/file_corrected.pdf"


@freeze_time("2025-02-03T10:00:00")
def test_create_expedite_sqs_metadata_builds_expected_structure(test_service):
ods_code = "A12345"
key = f"expedite/{ods_code}/1of1_1234567890_record.pdf"

result = test_service.create_expedite_sqs_metadata(key)

assert result.nhs_number == "1234567890"
assert len(result.files) == 1
item = result.files[0]
assert item.file_path == key
assert item.stored_file_name == key
assert item.gp_practice_code == ods_code
assert item.scan_date == "2025-02-03"


@freeze_time("2025-02-03T10:00:00")
def test_handle_expedite_event_happy_path_sends_sqs(test_service, mocker):
ods = "A12345"
key = f"expedite/{ods}/1of1_1234567890_record.pdf"
event = {"detail": {"object": {"key": key}}}

mocker.patch.object(BulkUploadMetadataProcessorService, "enforce_virus_scanner")
mocker.patch.object(BulkUploadMetadataProcessorService, "check_file_status")
mocked_send = mocker.patch.object(
BulkUploadMetadataProcessorService, "send_metadata_to_fifo_sqs"
)

test_service.handle_expedite_event(event)

mocked_send.assert_called_once()
args, _ = mocked_send.call_args
assert len(args) == 1
sqs_payload_list = args[0]
sqs_payload = sqs_payload_list[0]
assert sqs_payload.nhs_number == "1234567890"
assert len(sqs_payload.files) == 1
file_item = sqs_payload.files[0]
assert file_item.file_path == key
assert file_item.stored_file_name == key
assert file_item.gp_practice_code == ods
assert file_item.scan_date == "2025-02-03"


def test_handle_expedite_event_invalid_directory_raises(test_service, mocker):
mocked_send = mocker.patch.object(
BulkUploadMetadataProcessorService, "send_metadata_to_fifo_sqs"
)
bad_key = "notexpedite/A12345/1234567890_record.pdf"
event = {"detail": {"object": {"key": bad_key}}}

with pytest.raises(BulkUploadMetadataException) as exc:
test_service.handle_expedite_event(event)

assert "Unexpected directory or file location" in str(exc.value)
mocked_send.assert_not_called()


def test_handle_expedite_event_missing_key_raises(test_service, mocker):
mocked_send = mocker.patch.object(
BulkUploadMetadataProcessorService, "send_metadata_to_fifo_sqs"
)
event = {"detail": {}}

with pytest.raises(BulkUploadMetadataException) as exc:
test_service.handle_expedite_event(event)

assert "Failed due to missing key" in str(exc.value)
mocked_send.assert_not_called()


def test_handle_expedite_event_rejects_non_1of1(test_service, mocker):
mocker.patch.object(BulkUploadMetadataProcessorService, "enforce_virus_scanner")
mocker.patch.object(BulkUploadMetadataProcessorService, "check_file_status")
mocked_send = mocker.patch.object(
BulkUploadMetadataProcessorService, "send_metadata_to_fifo_sqs"
)
key = "expedite/A12345/2of3_1234567890_record.pdf"
event = {"detail": {"object": {"key": key}}}

with pytest.raises(BulkUploadMetadataException) as exc:
test_service.handle_expedite_event(event)

assert "not being a 1of1" in str(exc.value)
mocked_send.assert_not_called()


@pytest.fixture
def mock_csv_content():
header = "FILEPATH,PAGE COUNT,GP-PRACTICE-CODE,NHS-NO,SECTION,SUB-SECTION,SCAN-DATE,SCAN-ID,USER-ID,UPLOAD"
Expand Down Expand Up @@ -864,10 +950,34 @@ def test_no_remapping_logic(
]


@freeze_time("2025-02-03T10:00:00")
def test_validate_expedite_file_happy_path_returns_expected_tuple(test_service):
ods_code = "A12345"
key = f"expedite/{ods_code}/1of1_1234567890_record.pdf"

nhs_number, file_name, extracted_ods, scan_date = (
test_service.validate_expedite_file(key)
)

assert nhs_number == "1234567890"
assert file_name == key
assert extracted_ods == ods_code
assert scan_date == "2025-02-03"


def test_validate_expedite_file_rejects_non_1of1(test_service):
key = "expedite/A12345/2of3_1234567890_record.pdf"
with pytest.raises(BulkUploadMetadataException):
test_service.validate_expedite_file(key)


def test_handle_expedite_event_calls_enforce_for_expedite_key(mocker, test_service):
encoded_key = urllib.parse.quote_plus("expedite/folder/some file.pdf")
event = {"detail": {"object": {"key": encoded_key}}}

mocker.patch.object(
BulkUploadMetadataProcessorService, "create_expedite_sqs_metadata"
)
mocked_enforce = mocker.patch.object(test_service, "enforce_virus_scanner")
mocked_check_status = mocker.patch.object(test_service, "check_file_status")

Expand Down Expand Up @@ -1011,6 +1121,7 @@ def test_enforce_virus_scanner_re_raises_unexpected_client_error(mocker, test_se

mock_scan.assert_not_called()


def test_check_file_status_clean_does_nothing(mocker, test_service, caplog):
file_key = "expedite/folder/file.pdf"
mock_check = mocker.patch.object(
Expand Down Expand Up @@ -1043,4 +1154,4 @@ def test_check_file_status_logs_issue_when_not_clean(mocker, test_service, caplo
assert any(
f"Found an issue with the file {file_key}." in record.msg
for record in caplog.records
)
)
Loading