1010
1111import pydantic
1212from botocore .exceptions import ClientError
13+
1314from enums .lloyd_george_pre_process_format import LloydGeorgePreProcessFormat
1415from enums .upload_status import UploadStatus
1516from enums .virus_scan_result import VirusScanResult
1617from models .staging_metadata import (
17- METADATA_FILENAME ,
1818 BulkUploadQueueMetadata ,
1919 MetadataFile ,
2020 StagingSqsMetadata ,
5353class BulkUploadMetadataProcessorService :
5454
5555 def __init__ (
56- self ,
57- metadata_formatter_service : MetadataPreprocessorService ,
58- metadata_heading_remap : dict ,
59- fixed_values : dict = None ,
56+ self ,
57+ metadata_formatter_service : MetadataPreprocessorService ,
58+ metadata_heading_remap : dict ,
59+ input_file_location : str = "" ,
60+ fixed_values : dict = None ,
6061 ):
6162 self .staging_bucket_name = os .getenv ("STAGING_STORE_BUCKET_NAME" )
6263 self .metadata_queue_url = os .getenv ("METADATA_SQS_QUEUE_URL" )
@@ -71,29 +72,27 @@ def __init__(
7172 self .fixed_values = fixed_values or {}
7273
7374 self .temp_download_dir = tempfile .mkdtemp ()
74- self .practice_directory = metadata_formatter_service .practice_directory
75- self .file_key = (
76- f"{ metadata_formatter_service .practice_directory } /{ METADATA_FILENAME } "
77- if metadata_formatter_service .practice_directory
78- else METADATA_FILENAME
79- )
75+ self .file_key = input_file_location
8076
8177 self .metadata_mapping_validator_service = MetadataMappingValidatorService ()
8278
8379 self .metadata_formatter_service = metadata_formatter_service
8480
8581 def download_metadata_from_s3 (self ) -> str :
86- local_file_path = f"{ self .temp_download_dir } /{ METADATA_FILENAME } "
82+ local_file_path = f"{ self .temp_download_dir } /{ self . file_key . split ( '/' )[ - 1 ] } "
8783
8884 logger .info (
8985 f"Fetching { local_file_path } from bucket { self .staging_bucket_name } "
9086 )
9187
92- self .s3_service .download_file (
93- s3_bucket_name = self .staging_bucket_name ,
94- file_key = self .file_key ,
95- download_path = local_file_path ,
96- )
88+ try :
89+ self .s3_service .download_file (
90+ s3_bucket_name = self .staging_bucket_name ,
91+ file_key = self .file_key ,
92+ download_path = local_file_path ,
93+ )
94+ except ClientError :
95+ raise BulkUploadMetadataException (f"Could not retrieve the following metadata file: { self .file_key } " )
9796 return local_file_path
9897
9998 def process_metadata (self ):
@@ -109,7 +108,7 @@ def process_metadata(self):
109108
110109 except pydantic .ValidationError as e :
111110 failure_msg = (
112- f"Failed to parse { METADATA_FILENAME } due to validation error: { str (e )} "
111+ f"Failed to parse { self . file_key } due to validation error: { str (e )} "
113112 )
114113 logger .error (failure_msg , {"Result" : UNSUCCESSFUL })
115114 raise BulkUploadMetadataException (failure_msg )
@@ -121,7 +120,7 @@ def process_metadata(self):
121120
122121 except ClientError as e :
123122 if "HeadObject" in str (e ):
124- failure_msg = f'No metadata file could be found with the name " { METADATA_FILENAME } "'
123+ failure_msg = f'No metadata file could be found with the name { self . file_key } "'
125124 else :
126125 failure_msg = str (e )
127126 logger .error (failure_msg , {"Result" : UNSUCCESSFUL })
@@ -134,26 +133,26 @@ def csv_to_sqs_metadata(self, csv_file_path: str) -> list[StagingSqsMetadata]:
134133 )
135134
136135 with open (
137- csv_file_path , mode = "r" , encoding = "utf-8-sig" , errors = "replace"
136+ csv_file_path , mode = "r" , encoding = "utf-8-sig" , errors = "replace"
138137 ) as csv_file :
139138 csv_reader = csv .DictReader (csv_file )
140139 if csv_reader .fieldnames is None :
141140 raise BulkUploadMetadataException (
142- f"{ METADATA_FILENAME } is empty or missing headers."
141+ f"Metdata file is empty or missing headers."
143142 )
144143
145144 headers = [h .strip () for h in csv_reader .fieldnames ]
146145 records = list (csv_reader )
147146
148147 if not headers :
149- raise BulkUploadMetadataException (f"{ METADATA_FILENAME } has no headers." )
148+ raise BulkUploadMetadataException (f"{ self . file_key } has no headers." )
150149
151150 validated_rows , rejected_rows , rejected_reasons = (
152151 self .metadata_mapping_validator_service .validate_and_normalize_metadata (
153152 records , self .fixed_values , self .metadata_heading_remap
154153 )
155154 )
156- if rejected_reasons :
155+ if rejected_reasons :
157156 for reason in rejected_reasons :
158157 logger .warning (f"Rejected due to: { reason ['REASON' ]} " )
159158
@@ -175,7 +174,7 @@ def csv_to_sqs_metadata(self, csv_file_path: str) -> list[StagingSqsMetadata]:
175174 ]
176175
177176 def process_metadata_row (
178- self , row : dict , patients : dict [tuple [str , str ], list [BulkUploadQueueMetadata ]]
177+ self , row : dict , patients : dict [tuple [str , str ], list [BulkUploadQueueMetadata ]]
179178 ) -> None :
180179 """Validate individual file metadata and attach to patient group."""
181180 file_metadata = MetadataFile .model_validate (row )
@@ -192,20 +191,18 @@ def process_metadata_row(
192191 return
193192
194193 sqs_metadata = self .convert_to_sqs_metadata (file_metadata , correct_file_name )
195- sqs_metadata .file_path = (self .practice_directory .rstrip ("/" )
196- + "/" +
197- sqs_metadata .file_path .lstrip ("/" ))
194+ sqs_metadata .file_path = self .file_key .rsplit ("/" , 1 )[0 ] + "/" + sqs_metadata .file_path .lstrip ("/" )
198195 patients [(nhs_number , ods_code )].append (sqs_metadata )
199196
200197 def apply_fixed_values (self , file_metadata : MetadataFile ) -> MetadataFile :
201-
198+
202199 metadata_dict = file_metadata .model_dump (by_alias = True )
203200
204201 for field_name , fixed_value in self .fixed_values .items ():
205202 metadata_dict [field_name ] = fixed_value
206203 logger .info (
207204 f"Applied fixed value for field '{ field_name } ': '{ fixed_value } '" )
208-
205+
209206 return MetadataFile .model_validate (metadata_dict )
210207
211208 @staticmethod
@@ -298,10 +295,10 @@ def handle_expedite_event(self, event):
298295 raise BulkUploadMetadataException (failure_msg )
299296
300297 def handle_invalid_filename (
301- self ,
302- file_metadata : MetadataFile ,
303- error : InvalidFileNameException ,
304- nhs_number : str ,
298+ self ,
299+ file_metadata : MetadataFile ,
300+ error : InvalidFileNameException ,
301+ nhs_number : str ,
305302 ) -> None :
306303 """Handle invalid filenames by logging and storing failure in Dynamo."""
307304 logger .error (
@@ -316,7 +313,7 @@ def handle_invalid_filename(
316313 )
317314
318315 def send_metadata_to_fifo_sqs (
319- self , staging_sqs_metadata_list : list [StagingSqsMetadata ]
316+ self , staging_sqs_metadata_list : list [StagingSqsMetadata ]
320317 ) -> None :
321318 """Send validated metadata entries to SQS FIFO queue."""
322319 for staging_sqs_metadata in staging_sqs_metadata_list :
@@ -331,7 +328,7 @@ def send_metadata_to_fifo_sqs(
331328 logger .info ("Sent bulk upload metadata to sqs queue" )
332329
333330 def send_metadata_to_expedite_sqs (
334- self , staging_sqs_metadata : StagingSqsMetadata
331+ self , staging_sqs_metadata : StagingSqsMetadata
335332 ) -> None :
336333 """Send validated metadata entries to SQS expedite queue."""
337334 sqs_group_id = f"bulk_upload_{ uuid .uuid4 ()} "
@@ -354,7 +351,7 @@ def copy_metadata_to_dated_folder(self):
354351 self .staging_bucket_name ,
355352 f"metadata/{ current_datetime } .csv" ,
356353 )
357- self .s3_service .delete_object (self .staging_bucket_name , METADATA_FILENAME )
354+ self .s3_service .delete_object (self .staging_bucket_name , self . file_key )
358355
359356 def clear_temp_storage (self ):
360357 """Delete temporary working directory."""
0 commit comments