22
33import json
44from io import StringIO , BytesIO
5- from typing import Union
5+ from typing import Union , Optional
66from botocore .exceptions import ClientError
77from constants import ACK_HEADERS , SOURCE_BUCKET_NAME , ACK_BUCKET_NAME , FILE_NAME_PROC_LAMBDA_NAME
88from audit_table import change_audit_table_status_to_processed , get_next_queued_file_details
99from clients import s3_client , logger , lambda_client
1010from utils_for_ack_lambda import get_row_count
11+ from logging_decorators import upload_ack_file_logging_decorator
1112
1213
1314def create_ack_data (
@@ -65,15 +66,17 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
6566 return accumulated_csv_content
6667
6768
69+ @upload_ack_file_logging_decorator
6870def upload_ack_file (
6971 temp_ack_file_key : str ,
7072 message_id : str ,
71- supplier_queue : str ,
73+ supplier : str ,
74+ vaccine_type : str ,
7275 accumulated_csv_content : StringIO ,
7376 ack_data_rows : list ,
7477 archive_ack_file_key : str ,
7578 file_key : str ,
76- ) -> None :
79+ ) -> Optional [ dict ] :
7780 """Adds the data row to the uploaded ack file"""
7881 for row in ack_data_rows :
7982 data_row_str = [str (item ) for item in row .values ()]
@@ -91,17 +94,30 @@ def upload_ack_file(
9194
9295 # Update the audit table and invoke the filename lambda with next file in the queue (if one exists)
9396 change_audit_table_status_to_processed (file_key , message_id )
97+ supplier_queue = f"{ supplier } _{ vaccine_type } "
9498 next_queued_file_details = get_next_queued_file_details (supplier_queue )
9599 if next_queued_file_details :
96100 invoke_filename_lambda (next_queued_file_details ["filename" ], next_queued_file_details ["message_id" ])
97101
102+ # Ingestion of this file is complete
103+ result = {
104+ "message_id" : message_id ,
105+ "file_key" : file_key ,
106+ "supplier" : supplier ,
107+ "vaccine_type" : vaccine_type ,
108+ "row_count" : row_count_source ,
109+ }
110+ else :
111+ result = None
98112 logger .info ("Ack file updated to %s: %s" , ACK_BUCKET_NAME , archive_ack_file_key )
113+ return result
99114
100115
101116def update_ack_file (
102117 file_key : str ,
103118 message_id : str ,
104- supplier_queue : str ,
119+ supplier : str ,
120+ vaccine_type : str ,
105121 created_at_formatted_string : str ,
106122 ack_data_rows : list ,
107123) -> None :
@@ -113,7 +129,8 @@ def update_ack_file(
113129 upload_ack_file (
114130 temp_ack_file_key ,
115131 message_id ,
116- supplier_queue ,
132+ supplier ,
133+ vaccine_type ,
117134 accumulated_csv_content ,
118135 ack_data_rows ,
119136 archive_ack_file_key ,
0 commit comments