55import uuid
66from collections import defaultdict
77from datetime import datetime
8- from typing import Iterable
98
109import pydantic
1110from botocore .exceptions import ClientError
2423from services .bulk_upload_metadata_preprocessor_service import (
2524 MetadataPreprocessorService ,
2625)
26+ from services .metadata_mapping_validator_service import MetadataMappingValidatorService
2727from utils .audit_logging_setup import LoggingService
2828from utils .exceptions import (
2929 BulkUploadMetadataException ,
3737
3838
3939class BulkUploadMetadataProcessorService :
40- def __init__ (self , metadata_formatter_service : MetadataPreprocessorService ):
41- self .s3_service = S3Service ()
42- self .sqs_service = SQSService ()
43- self .dynamo_repository = BulkUploadDynamoRepository ()
4440
41+ def __init__ (
42+ self ,
43+ metadata_formatter_service : MetadataPreprocessorService ,
44+ metadata_heading_remap : dict ,
45+ ):
4546 self .staging_bucket_name = os .getenv ("STAGING_STORE_BUCKET_NAME" )
4647 self .metadata_queue_url = os .getenv ("METADATA_SQS_QUEUE_URL" )
48+ self .s3_service = S3Service ()
49+ self .sqs_service = SQSService ()
50+ self .dynamo_repository = BulkUploadDynamoRepository ()
51+ self .metadata_heading_remap = metadata_heading_remap
4752
4853 self .temp_download_dir = tempfile .mkdtemp ()
49-
5054 self .practice_directory = metadata_formatter_service .practice_directory
5155 self .file_key = (
5256 f"{ metadata_formatter_service .practice_directory } /{ METADATA_FILENAME } "
5357 if metadata_formatter_service .practice_directory
5458 else METADATA_FILENAME
5559 )
60+
61+ self .metadata_mapping_validator_service = MetadataMappingValidatorService ()
62+
5663 self .metadata_formatter_service = metadata_formatter_service
5764
65+ def download_metadata_from_s3 (self ) -> str :
66+ local_file_path = f"{ self .temp_download_dir } /{ METADATA_FILENAME } "
67+
68+ logger .info (
69+ f"Fetching { local_file_path } from bucket { self .staging_bucket_name } "
70+ )
71+
72+ self .s3_service .download_file (
73+ s3_bucket_name = self .staging_bucket_name ,
74+ file_key = self .file_key ,
75+ download_path = local_file_path ,
76+ )
77+ return local_file_path
78+
5879 def process_metadata (self ):
5980 try :
6081 metadata_file = self .download_metadata_from_s3 ()
6182 staging_metadata_list = self .csv_to_sqs_metadata (metadata_file )
6283 logger .info ("Finished parsing metadata" )
6384
6485 self .send_metadata_to_fifo_sqs (staging_metadata_list )
65- logger .info ("Sent bulk upload metadata to sqs queue" )
86+ logger .info ("Sent bulk upload metadata to SQS queue" )
6687
6788 self .copy_metadata_to_dated_folder ()
68-
6989 self .clear_temp_storage ()
7090
7191 except pydantic .ValidationError as e :
72- failure_msg = f"Failed to parse { METADATA_FILENAME } due to error: { str (e )} "
92+ failure_msg = (
93+ f"Failed to parse { METADATA_FILENAME } due to validation error: { str (e )} "
94+ )
7395 logger .error (failure_msg , {"Result" : UNSUCCESSFUL })
7496 raise BulkUploadMetadataException (failure_msg )
97+
7598 except KeyError as e :
7699 failure_msg = f"Failed due to missing key: { str (e )} "
77100 logger .error (failure_msg , {"Result" : UNSUCCESSFUL })
78101 raise BulkUploadMetadataException (failure_msg )
102+
79103 except ClientError as e :
80104 if "HeadObject" in str (e ):
81105 failure_msg = f'No metadata file could be found with the name "{ METADATA_FILENAME } "'
@@ -84,17 +108,6 @@ def process_metadata(self):
84108 logger .error (failure_msg , {"Result" : UNSUCCESSFUL })
85109 raise BulkUploadMetadataException (failure_msg )
86110
87- def download_metadata_from_s3 (self ) -> str :
88- logger .info (f"Fetching { METADATA_FILENAME } from bucket" )
89-
90- local_file_path = os .path .join (self .temp_download_dir , METADATA_FILENAME )
91- self .s3_service .download_file (
92- s3_bucket_name = self .staging_bucket_name ,
93- file_key = self .file_key ,
94- download_path = local_file_path ,
95- )
96- return local_file_path
97-
98111 def csv_to_sqs_metadata (self , csv_file_path : str ) -> list [StagingSqsMetadata ]:
99112 logger .info ("Parsing bulk upload metadata" )
100113 patients : defaultdict [tuple [str , str ], list [BulkUploadQueueMetadata ]] = (
@@ -103,22 +116,49 @@ def csv_to_sqs_metadata(self, csv_file_path: str) -> list[StagingSqsMetadata]:
103116
104117 with open (
105118 csv_file_path , mode = "r" , encoding = "utf-8-sig" , errors = "replace"
106- ) as csv_file_handler :
107- csv_reader : Iterable [dict ] = csv .DictReader (csv_file_handler )
108- for row in csv_reader :
109- self .process_metadata_row (row , patients )
119+ ) as csv_file :
120+ csv_reader = csv .DictReader (csv_file )
121+ if csv_reader .fieldnames is None :
122+ raise BulkUploadMetadataException (
123+ f"{ METADATA_FILENAME } is empty or missing headers."
124+ )
125+
126+ headers = [h .strip () for h in csv_reader .fieldnames ]
127+ records = list (csv_reader )
128+
129+ if not headers :
130+ raise BulkUploadMetadataException (f"{ METADATA_FILENAME } has no headers." )
131+
132+ validated_rows , rejected_rows , rejected_reasons = (
133+ self .metadata_mapping_validator_service .validate_and_normalize_metadata (
134+ records , self .metadata_heading_remap
135+ )
136+ )
137+ if rejected_reasons :
138+ for reason in rejected_reasons :
139+ logger .warning (f"Rejected due to: { reason ['REASON' ]} " )
110140
111- return [
112- StagingSqsMetadata (
113- nhs_number = nhs_number ,
114- files = files ,
141+ logger .info (
142+ f"There are { len (validated_rows )} valid rows, and { len (rejected_rows )} rejected rows"
143+ )
144+
145+ if not validated_rows :
146+ raise BulkUploadMetadataException (
147+ "No valid metadata rows found after alias validation."
115148 )
149+
150+ for row in validated_rows :
151+ self .process_metadata_row (row , patients )
152+
153+ return [
154+ StagingSqsMetadata (nhs_number = nhs_number , files = files )
116155 for (nhs_number , _ ), files in patients .items ()
117156 ]
118157
119158 def process_metadata_row (
120159 self , row : dict , patients : dict [tuple [str , str ], list [BulkUploadQueueMetadata ]]
121160 ) -> None :
161+ """Validate individual file metadata and attach to patient group."""
122162 file_metadata = MetadataFile .model_validate (row )
123163 nhs_number , ods_code = self .extract_patient_info (file_metadata )
124164
@@ -135,58 +175,53 @@ def process_metadata_row(
135175 def convert_to_sqs_metadata (
136176 file : MetadataFile , stored_file_name : str
137177 ) -> BulkUploadQueueMetadata :
178+ """Convert a MetadataFile into BulkUploadQueueMetadata."""
138179 return BulkUploadQueueMetadata (
139180 ** file .model_dump (), stored_file_name = stored_file_name
140181 )
141182
142- def extract_patient_info ( self , file_metadata : MetadataFile ) -> tuple [ str , str ]:
143- nhs_number = file_metadata . nhs_number
144- ods_code = file_metadata . gp_practice_code
145- return nhs_number , ods_code
183+ @ staticmethod
184+ def extract_patient_info ( file_metadata : MetadataFile ) -> tuple [ str , str ]:
185+ """Extract key patient identifiers."""
186+ return file_metadata . nhs_number , file_metadata . gp_practice_code
146187
147- def validate_and_correct_filename (
148- self ,
149- file_metadata : MetadataFile ,
150- ) -> str :
188+ def validate_and_correct_filename (self , file_metadata : MetadataFile ) -> str :
189+ """Validate and normalize file name."""
151190 try :
152191 validate_file_name (file_metadata .file_path .split ("/" )[- 1 ])
153- valid_filepath = file_metadata .file_path
192+ return file_metadata .file_path
154193 except LGInvalidFilesException :
155- valid_filepath = self .metadata_formatter_service .validate_record_filename (
194+ return self .metadata_formatter_service .validate_record_filename (
156195 file_metadata .file_path
157196 )
158197
159- return valid_filepath
160-
161198 def handle_invalid_filename (
162199 self ,
163200 file_metadata : MetadataFile ,
164201 error : InvalidFileNameException ,
165202 nhs_number : str ,
166203 ) -> None :
204+ """Handle invalid filenames by logging and storing failure in Dynamo."""
167205 logger .error (
168206 f"Failed to process { file_metadata .file_path } due to error: { error } "
169207 )
170208 failed_file = self .convert_to_sqs_metadata (
171209 file_metadata , file_metadata .file_path
172210 )
173- failed_entry = StagingSqsMetadata (
174- nhs_number = nhs_number ,
175- files = [failed_file ],
176- )
211+ failed_entry = StagingSqsMetadata (nhs_number = nhs_number , files = [failed_file ])
177212 self .dynamo_repository .write_report_upload_to_dynamo (
178213 failed_entry , UploadStatus .FAILED , str (error )
179214 )
180215
181216 def send_metadata_to_fifo_sqs (
182217 self , staging_sqs_metadata_list : list [StagingSqsMetadata ]
183218 ) -> None :
219+ """Send validated metadata entries to SQS FIFO queue."""
184220 sqs_group_id = f"bulk_upload_{ uuid .uuid4 ()} "
185221
186222 for staging_sqs_metadata in staging_sqs_metadata_list :
187223 nhs_number = staging_sqs_metadata .nhs_number
188224 logger .info (f"Sending metadata for patientId: { nhs_number } " )
189-
190225 self .sqs_service .send_message_with_nhs_number_attr_fifo (
191226 queue_url = self .metadata_queue_url ,
192227 message_body = staging_sqs_metadata .model_dump_json (by_alias = True ),
@@ -195,19 +230,21 @@ def send_metadata_to_fifo_sqs(
195230 )
196231
197232 def copy_metadata_to_dated_folder (self ):
233+ """Copy processed metadata CSV into a dated archive folder in S3."""
198234 logger .info ("Copying metadata CSV to dated folder" )
199-
200235 current_datetime = datetime .now ().strftime ("%Y-%m-%d_%H-%M" )
201-
202236 self .s3_service .copy_across_bucket (
203237 self .staging_bucket_name ,
204238 self .file_key ,
205239 self .staging_bucket_name ,
206240 f"metadata/{ current_datetime } .csv" ,
207241 )
208-
209242 self .s3_service .delete_object (self .staging_bucket_name , METADATA_FILENAME )
210243
211244 def clear_temp_storage (self ):
245+ """Delete temporary working directory."""
212246 logger .info ("Clearing temp storage directory" )
213- shutil .rmtree (self .temp_download_dir )
247+ try :
248+ shutil .rmtree (self .temp_download_dir )
249+ except FileNotFoundError :
250+ pass
0 commit comments