Skip to content

Commit 97ce052

Browse files
[PRMP-189] Update BulkUploadMetadataProcessor lambda to add stored_file_name to the bulk upload queue metadata (#807)
1 parent caeddc0 commit 97ce052

14 files changed

+375
-188
lines changed

lambdas/models/staging_metadata.py

Lines changed: 28 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
from typing import Optional
2-
3-
from pydantic import BaseModel, ConfigDict, Field, ValidationInfo, field_validator
4-
from pydantic_core import PydanticCustomError
1+
from pydantic import BaseModel, ConfigDict, Field, field_validator
52

63
METADATA_FILENAME = "metadata.csv"
74
NHS_NUMBER_FIELD_NAME = "NHS-NO"
@@ -13,44 +10,38 @@ def to_upper_case_with_hyphen(field_name: str) -> str:
1310
return field_name.upper().replace("_", "-")
1411

1512

16-
class MetadataFile(BaseModel):
13+
class MetadataBase(BaseModel):
1714
model_config = ConfigDict(
18-
alias_generator=to_upper_case_with_hyphen, validate_by_name=True
15+
validate_by_name=True,
16+
populate_by_name=True,
1917
)
2018

21-
file_path: str = Field(alias="FILEPATH")
22-
page_count: str = Field(alias="PAGE COUNT")
23-
nhs_number: Optional[str] = Field(
24-
alias=NHS_NUMBER_FIELD_NAME, exclude=True, default=None
25-
)
26-
gp_practice_code: str
27-
section: str
28-
sub_section: Optional[str]
19+
file_path: str
20+
gp_practice_code: str = Field(min_length=1)
2921
scan_date: str
30-
scan_id: str
31-
user_id: str
32-
upload: str
3322

34-
@field_validator("gp_practice_code")
35-
@classmethod
36-
def ensure_gp_practice_code_non_empty(
37-
cls, gp_practice_code: str, info: ValidationInfo
38-
) -> str:
39-
if not gp_practice_code:
40-
patient_nhs_number = info.data.get("nhs_number", "")
41-
raise PydanticCustomError(
42-
"MissingGPPracticeCode",
43-
"missing GP-PRACTICE-CODE for patient {patient_nhs_number}",
44-
{"patient_nhs_number": patient_nhs_number},
45-
)
46-
return gp_practice_code
47-
48-
49-
class StagingMetadata(BaseModel):
50-
model_config = ConfigDict(validate_by_name=True)
51-
52-
nhs_number: str = Field(default=NHS_NUMBER_PLACEHOLDER, alias=NHS_NUMBER_FIELD_NAME)
53-
files: list[MetadataFile]
23+
24+
class BulkUploadQueueMetadata(MetadataBase):
25+
stored_file_name: str
26+
27+
28+
class MetadataFile(MetadataBase):
29+
model_config = ConfigDict(
30+
alias_generator=to_upper_case_with_hyphen,
31+
)
32+
nhs_number: str = Field(alias=NHS_NUMBER_FIELD_NAME)
33+
file_path: str = Field(alias="FILEPATH")
34+
page_count: str = Field(default=None, alias="PAGE COUNT")
35+
section: str = None
36+
sub_section: str = None
37+
scan_id: str = None
38+
user_id: str = None
39+
upload: str = None
40+
41+
42+
class StagingSqsMetadata(BaseModel):
43+
nhs_number: str
44+
files: list[BulkUploadQueueMetadata]
5445
retries: int = 0
5546

5647
@field_validator("nhs_number")

lambdas/repositories/bulk_upload/bulk_upload_dynamo_repository.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from enums.upload_status import UploadStatus
55
from models.document_reference import DocumentReference
66
from models.report.bulk_upload_report import BulkUploadReport
7-
from models.staging_metadata import StagingMetadata
7+
from models.staging_metadata import StagingSqsMetadata
88
from services.base.dynamo_service import DynamoDBService
99
from utils.audit_logging_setup import LoggingService
1010

@@ -29,7 +29,7 @@ def create_record_in_lg_dynamo_table(self, document_reference: DocumentReference
2929

3030
def write_report_upload_to_dynamo(
3131
self,
32-
staging_metadata: StagingMetadata,
32+
staging_metadata: StagingSqsMetadata,
3333
upload_status: UploadStatus,
3434
reason: str = None,
3535
pds_ods_code: str = "",

lambdas/repositories/bulk_upload/bulk_upload_s3_repository.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from botocore.exceptions import ClientError
44
from enums.virus_scan_result import SCAN_RESULT_TAG_KEY, VirusScanResult
55
from models.document_reference import DocumentReference
6-
from models.staging_metadata import StagingMetadata
6+
from models.staging_metadata import StagingSqsMetadata
77
from services.base.s3_service import S3Service
88
from utils.audit_logging_setup import LoggingService
99
from utils.exceptions import (
@@ -28,7 +28,7 @@ def __init__(self):
2828
self.dest_bucket_files_in_transaction = []
2929

3030
def check_virus_result(
31-
self, staging_metadata: StagingMetadata, file_path_cache: dict
31+
self, staging_metadata: StagingSqsMetadata, file_path_cache: dict
3232
):
3333
for file_metadata in staging_metadata.files:
3434
file_path = file_metadata.file_path

lambdas/repositories/bulk_upload/bulk_upload_sqs_repository.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import uuid
33

44
from models.sqs.pdf_stitching_sqs_message import PdfStitchingSqsMessage
5-
from models.staging_metadata import StagingMetadata
5+
from models.staging_metadata import StagingSqsMetadata
66
from services.base.sqs_service import SQSService
77
from utils.audit_logging_setup import LoggingService
88
from utils.request_context import request_context
@@ -16,7 +16,7 @@ def __init__(self):
1616
self.invalid_queue_url = os.environ["INVALID_SQS_QUEUE_URL"]
1717
self.metadata_queue_url = os.environ["METADATA_SQS_QUEUE_URL"]
1818

19-
def put_staging_metadata_back_to_queue(self, staging_metadata: StagingMetadata):
19+
def put_staging_metadata_back_to_queue(self, staging_metadata: StagingSqsMetadata):
2020
request_context.patient_nhs_no = staging_metadata.nhs_number
2121
setattr(staging_metadata, "retries", (staging_metadata.retries + 1))
2222
_logger.info("Returning message to sqs queue...")

lambdas/services/bulk_upload_metadata_processor_service.py

Lines changed: 49 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,19 @@
33
import shutil
44
import tempfile
55
import uuid
6+
from collections import defaultdict
67
from datetime import datetime
78
from typing import Iterable
89

910
import pydantic
1011
from botocore.exceptions import ClientError
1112
from enums.upload_status import UploadStatus
12-
from models.staging_metadata import METADATA_FILENAME, MetadataFile, StagingMetadata
13+
from models.staging_metadata import (
14+
METADATA_FILENAME,
15+
BulkUploadQueueMetadata,
16+
MetadataFile,
17+
StagingSqsMetadata,
18+
)
1319
from repositories.bulk_upload.bulk_upload_dynamo_repository import (
1420
BulkUploadDynamoRepository,
1521
)
@@ -41,7 +47,6 @@ def __init__(self, metadata_formatter_service: MetadataPreprocessorService):
4147

4248
self.temp_download_dir = tempfile.mkdtemp()
4349

44-
self.corrections = {}
4550
self.practice_directory = metadata_formatter_service.practice_directory
4651
self.file_key = (
4752
f"{metadata_formatter_service.practice_directory}/{METADATA_FILENAME}"
@@ -53,7 +58,7 @@ def __init__(self, metadata_formatter_service: MetadataPreprocessorService):
5358
def process_metadata(self):
5459
try:
5560
metadata_file = self.download_metadata_from_s3()
56-
staging_metadata_list = self.csv_to_staging_metadata(metadata_file)
61+
staging_metadata_list = self.csv_to_sqs_metadata(metadata_file)
5762
logger.info("Finished parsing metadata")
5863

5964
self.send_metadata_to_fifo_sqs(staging_metadata_list)
@@ -90,50 +95,59 @@ def download_metadata_from_s3(self) -> str:
9095
)
9196
return local_file_path
9297

93-
def csv_to_staging_metadata(self, csv_file_path: str) -> list[StagingMetadata]:
98+
def csv_to_sqs_metadata(self, csv_file_path: str) -> list[StagingSqsMetadata]:
9499
logger.info("Parsing bulk upload metadata")
95-
patients = {}
100+
patients: defaultdict[tuple[str, str], list[BulkUploadQueueMetadata]] = (
101+
defaultdict(list)
102+
)
103+
96104
with open(
97105
csv_file_path, mode="r", encoding="utf-8-sig", errors="replace"
98106
) as csv_file_handler:
99107
csv_reader: Iterable[dict] = csv.DictReader(csv_file_handler)
100108
for row in csv_reader:
101109
self.process_metadata_row(row, patients)
110+
102111
return [
103-
StagingMetadata(
104-
nhs_number=key[0],
105-
files=value,
112+
StagingSqsMetadata(
113+
nhs_number=nhs_number,
114+
files=files,
106115
)
107-
for (key, value) in patients.items()
116+
for (nhs_number, _), files in patients.items()
108117
]
109118

110-
def process_metadata_row(self, row: dict, patients: dict) -> None:
119+
def process_metadata_row(
120+
self, row: dict, patients: dict[tuple[str, str], list[BulkUploadQueueMetadata]]
121+
) -> None:
111122
file_metadata = MetadataFile.model_validate(row)
112123
nhs_number, ods_code = self.extract_patient_info(file_metadata)
113-
patient_record_key = (nhs_number, ods_code)
114-
115-
if patient_record_key not in patients:
116-
patients[patient_record_key] = [file_metadata]
117-
else:
118-
patients[patient_record_key].append(file_metadata)
119124

120125
try:
121-
self.validate_correct_filename(file_metadata)
126+
correct_file_name = self.validate_and_correct_filename(file_metadata)
122127
except InvalidFileNameException as error:
123-
self.handle_invalid_filename(
124-
file_metadata, error, patient_record_key, patients
125-
)
126-
patients.pop(patient_record_key)
128+
self.handle_invalid_filename(file_metadata, error, nhs_number)
129+
return
130+
131+
sqs_metadata = self.convert_to_sqs_metadata(file_metadata, correct_file_name)
132+
patients[(nhs_number, ods_code)].append(sqs_metadata)
133+
134+
@staticmethod
135+
def convert_to_sqs_metadata(
136+
file: MetadataFile, stored_file_name: str
137+
) -> BulkUploadQueueMetadata:
138+
return BulkUploadQueueMetadata(
139+
**file.model_dump(), stored_file_name=stored_file_name
140+
)
127141

128142
def extract_patient_info(self, file_metadata: MetadataFile) -> tuple[str, str]:
129143
nhs_number = file_metadata.nhs_number
130144
ods_code = file_metadata.gp_practice_code
131145
return nhs_number, ods_code
132146

133-
def validate_correct_filename(
147+
def validate_and_correct_filename(
134148
self,
135149
file_metadata: MetadataFile,
136-
) -> None:
150+
) -> str:
137151
try:
138152
validate_file_name(file_metadata.file_path.split("/")[-1])
139153
valid_filepath = file_metadata.file_path
@@ -142,39 +156,40 @@ def validate_correct_filename(
142156
file_metadata.file_path
143157
)
144158

145-
if valid_filepath:
146-
self.corrections[file_metadata.file_path] = valid_filepath
159+
return valid_filepath
147160

148161
def handle_invalid_filename(
149162
self,
150163
file_metadata: MetadataFile,
151164
error: InvalidFileNameException,
152-
key: tuple[str, str],
153-
patients: dict[tuple[str, str], list[MetadataFile]],
165+
nhs_number: str,
154166
) -> None:
155167
logger.error(
156168
f"Failed to process {file_metadata.file_path} due to error: {error}"
157169
)
158-
failed_entry = StagingMetadata(
159-
nhs_number=key[0],
160-
files=patients[key],
170+
failed_file = self.convert_to_sqs_metadata(
171+
file_metadata, file_metadata.file_path
172+
)
173+
failed_entry = StagingSqsMetadata(
174+
nhs_number=nhs_number,
175+
files=[failed_file],
161176
)
162177
self.dynamo_repository.write_report_upload_to_dynamo(
163178
failed_entry, UploadStatus.FAILED, str(error)
164179
)
165180

166181
def send_metadata_to_fifo_sqs(
167-
self, staging_metadata_list: list[StagingMetadata]
182+
self, staging_sqs_metadata_list: list[StagingSqsMetadata]
168183
) -> None:
169184
sqs_group_id = f"bulk_upload_{uuid.uuid4()}"
170185

171-
for staging_metadata in staging_metadata_list:
172-
nhs_number = staging_metadata.nhs_number
186+
for staging_sqs_metadata in staging_sqs_metadata_list:
187+
nhs_number = staging_sqs_metadata.nhs_number
173188
logger.info(f"Sending metadata for patientId: {nhs_number}")
174189

175190
self.sqs_service.send_message_with_nhs_number_attr_fifo(
176191
queue_url=self.metadata_queue_url,
177-
message_body=staging_metadata.model_dump_json(by_alias=True),
192+
message_body=staging_sqs_metadata.model_dump_json(by_alias=True),
178193
nhs_number=nhs_number,
179194
group_id=sqs_group_id,
180195
)

0 commit comments

Comments
 (0)