22
33from botocore .exceptions import ClientError
44from io import StringIO , BytesIO
5- from typing import Optional
65from audit_table import change_audit_table_status_to_processed
76from common .clients import get_s3_client , logger
8- from constants import ACK_HEADERS , get_source_bucket_name , get_ack_bucket_name
9- from logging_decorators import upload_ack_file_logging_decorator
10- from utils_for_ack_lambda import get_row_count
7+ from constants import (
8+ ACK_HEADERS ,
9+ get_source_bucket_name ,
10+ get_ack_bucket_name ,
11+ COMPLETED_ACK_DIR ,
12+ TEMP_ACK_DIR ,
13+ BATCH_FILE_PROCESSING_DIR ,
14+ BATCH_FILE_ARCHIVE_DIR ,
15+ )
16+ from logging_decorators import complete_batch_file_process_logging_decorator
1117
1218
1319def create_ack_data (
@@ -45,6 +51,35 @@ def create_ack_data(
4551 }
4652
4753
54+ @complete_batch_file_process_logging_decorator
55+ def complete_batch_file_process (
56+ message_id : str ,
57+ supplier : str ,
58+ vaccine_type : str ,
59+ created_at_formatted_string : str ,
60+ file_key : str ,
61+ total_ack_rows_processed : int ,
62+ ) -> dict :
63+ """Mark the batch file as processed. This involves moving the ack and original file to destinations and updating
64+ the audit table status"""
65+ ack_filename = f"{ file_key .replace ('.csv' , f'_BusAck_{ created_at_formatted_string } .csv' )} "
66+
67+ move_file (get_ack_bucket_name (), f"{ TEMP_ACK_DIR } /{ ack_filename } " , f"{ COMPLETED_ACK_DIR } /{ ack_filename } " )
68+ move_file (
69+ get_source_bucket_name (), f"{ BATCH_FILE_PROCESSING_DIR } /{ file_key } " , f"{ BATCH_FILE_ARCHIVE_DIR } /{ file_key } "
70+ )
71+
72+ change_audit_table_status_to_processed (file_key , message_id )
73+
74+ return {
75+ "message_id" : message_id ,
76+ "file_key" : file_key ,
77+ "supplier" : supplier ,
78+ "vaccine_type" : vaccine_type ,
79+ "row_count" : total_ack_rows_processed ,
80+ }
81+
82+
4883def obtain_current_ack_content (temp_ack_file_key : str ) -> StringIO :
4984 """Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""
5085 try :
@@ -65,76 +100,27 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
65100 return accumulated_csv_content
66101
67102
68- @upload_ack_file_logging_decorator
69- def upload_ack_file (
70- temp_ack_file_key : str ,
71- message_id : str ,
72- supplier : str ,
73- vaccine_type : str ,
74- accumulated_csv_content : StringIO ,
75- ack_data_rows : list ,
76- archive_ack_file_key : str ,
103+ def update_ack_file (
77104 file_key : str ,
78- ) -> Optional [dict ]:
79- """Adds the data row to the uploaded ack file"""
105+ created_at_formatted_string : str ,
106+ ack_data_rows : list ,
107+ ) -> None :
108+ """Updates the ack file with the new data row based on the given arguments"""
109+ ack_filename = f"{ file_key .replace ('.csv' , f'_BusAck_{ created_at_formatted_string } .csv' )} "
110+ temp_ack_file_key = f"{ TEMP_ACK_DIR } /{ ack_filename } "
111+ archive_ack_file_key = f"{ COMPLETED_ACK_DIR } /{ ack_filename } "
112+ accumulated_csv_content = obtain_current_ack_content (temp_ack_file_key )
113+
80114 for row in ack_data_rows :
81115 data_row_str = [str (item ) for item in row .values ()]
82116 cleaned_row = "|" .join (data_row_str ).replace (" |" , "|" ).replace ("| " , "|" ).strip ()
83117 accumulated_csv_content .write (cleaned_row + "\n " )
84- csv_file_like_object = BytesIO (accumulated_csv_content .getvalue ().encode ("utf-8" ))
85118
119+ csv_file_like_object = BytesIO (accumulated_csv_content .getvalue ().encode ("utf-8" ))
86120 ack_bucket_name = get_ack_bucket_name ()
87- source_bucket_name = get_source_bucket_name ()
88121
89122 get_s3_client ().upload_fileobj (csv_file_like_object , ack_bucket_name , temp_ack_file_key )
90-
91- row_count_source = get_row_count (source_bucket_name , f"processing/{ file_key } " )
92- row_count_destination = get_row_count (ack_bucket_name , temp_ack_file_key )
93- # TODO: Should we check for > and if so what handling is required
94- if row_count_destination == row_count_source :
95- move_file (ack_bucket_name , temp_ack_file_key , archive_ack_file_key )
96- move_file (source_bucket_name , f"processing/{ file_key } " , f"archive/{ file_key } " )
97-
98- # Update the audit table
99- change_audit_table_status_to_processed (file_key , message_id )
100-
101- # Ingestion of this file is complete
102- result = {
103- "message_id" : message_id ,
104- "file_key" : file_key ,
105- "supplier" : supplier ,
106- "vaccine_type" : vaccine_type ,
107- "row_count" : row_count_source - 1 ,
108- }
109- else :
110- result = None
111123 logger .info ("Ack file updated to %s: %s" , ack_bucket_name , archive_ack_file_key )
112- return result
113-
114-
115- def update_ack_file (
116- file_key : str ,
117- message_id : str ,
118- supplier : str ,
119- vaccine_type : str ,
120- created_at_formatted_string : str ,
121- ack_data_rows : list ,
122- ) -> None :
123- """Updates the ack file with the new data row based on the given arguments"""
124- ack_filename = f"{ file_key .replace ('.csv' , f'_BusAck_{ created_at_formatted_string } .csv' )} "
125- temp_ack_file_key = f"TempAck/{ ack_filename } "
126- archive_ack_file_key = f"forwardedFile/{ ack_filename } "
127- accumulated_csv_content = obtain_current_ack_content (temp_ack_file_key )
128- upload_ack_file (
129- temp_ack_file_key ,
130- message_id ,
131- supplier ,
132- vaccine_type ,
133- accumulated_csv_content ,
134- ack_data_rows ,
135- archive_ack_file_key ,
136- file_key ,
137- )
138124
139125
140126def move_file (bucket_name : str , source_file_key : str , destination_file_key : str ) -> None :
0 commit comments