22
33import json
44from io import StringIO , BytesIO
5- from typing import Union
5+ from typing import Union , Optional
66from botocore .exceptions import ClientError
7- from constants import ACK_HEADERS , SOURCE_BUCKET_NAME , ACK_BUCKET_NAME , FILE_NAME_PROC_LAMBDA_NAME
7+ from constants import ACK_HEADERS , get_source_bucket_name , get_ack_bucket_name , FILE_NAME_PROC_LAMBDA_NAME
88from audit_table import change_audit_table_status_to_processed , get_next_queued_file_details
9- from clients import s3_client , logger , lambda_client
9+ from clients import get_s3_client , logger , lambda_client
1010from utils_for_ack_lambda import get_row_count
11+ from logging_decorators import upload_ack_file_logging_decorator
1112
1213
1314def create_ack_data (
@@ -49,7 +50,7 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
4950 """Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""
5051 try :
5152 # If ack file exists in S3 download the contents
52- existing_ack_file = s3_client .get_object (Bucket = ACK_BUCKET_NAME , Key = temp_ack_file_key )
53+ existing_ack_file = get_s3_client () .get_object (Bucket = get_ack_bucket_name () , Key = temp_ack_file_key )
5354 existing_content = existing_ack_file ["Body" ].read ().decode ("utf-8" )
5455 except ClientError as error :
5556 # If ack file does not exist in S3 create a new file containing the headers only
@@ -65,43 +66,61 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
6566 return accumulated_csv_content
6667
6768
69+ @upload_ack_file_logging_decorator
6870def upload_ack_file (
6971 temp_ack_file_key : str ,
7072 message_id : str ,
71- supplier_queue : str ,
73+ supplier : str ,
74+ vaccine_type : str ,
7275 accumulated_csv_content : StringIO ,
7376 ack_data_rows : list ,
7477 archive_ack_file_key : str ,
7578 file_key : str ,
76- ) -> None :
79+ ) -> Optional [ dict ] :
7780 """Adds the data row to the uploaded ack file"""
7881 for row in ack_data_rows :
7982 data_row_str = [str (item ) for item in row .values ()]
8083 cleaned_row = "|" .join (data_row_str ).replace (" |" , "|" ).replace ("| " , "|" ).strip ()
8184 accumulated_csv_content .write (cleaned_row + "\n " )
8285 csv_file_like_object = BytesIO (accumulated_csv_content .getvalue ().encode ("utf-8" ))
83- s3_client .upload_fileobj (csv_file_like_object , ACK_BUCKET_NAME , temp_ack_file_key )
8486
85- row_count_source = get_row_count (SOURCE_BUCKET_NAME , f"processing/{ file_key } " )
86- row_count_destination = get_row_count (ACK_BUCKET_NAME , temp_ack_file_key )
87+ ack_bucket_name = get_ack_bucket_name ()
88+ source_bucket_name = get_source_bucket_name ()
89+
90+ get_s3_client ().upload_fileobj (csv_file_like_object , ack_bucket_name , temp_ack_file_key )
91+
92+ row_count_source = get_row_count (source_bucket_name , f"processing/{ file_key } " )
93+ row_count_destination = get_row_count (ack_bucket_name , temp_ack_file_key )
8794 # TODO: Should we check for > and if so what handling is required
8895 if row_count_destination == row_count_source :
89- move_file (ACK_BUCKET_NAME , temp_ack_file_key , archive_ack_file_key )
90- move_file (SOURCE_BUCKET_NAME , f"processing/{ file_key } " , f"archive/{ file_key } " )
96+ move_file (ack_bucket_name , temp_ack_file_key , archive_ack_file_key )
97+ move_file (source_bucket_name , f"processing/{ file_key } " , f"archive/{ file_key } " )
9198
9299 # Update the audit table and invoke the filename lambda with next file in the queue (if one exists)
93100 change_audit_table_status_to_processed (file_key , message_id )
101+ supplier_queue = f"{ supplier } _{ vaccine_type } "
94102 next_queued_file_details = get_next_queued_file_details (supplier_queue )
95103 if next_queued_file_details :
96104 invoke_filename_lambda (next_queued_file_details ["filename" ], next_queued_file_details ["message_id" ])
97-
98- logger .info ("Ack file updated to %s: %s" , ACK_BUCKET_NAME , archive_ack_file_key )
105+ # Ingestion of this file is complete
106+ result = {
107+ "message_id" : message_id ,
108+ "file_key" : file_key ,
109+ "supplier" : supplier ,
110+ "vaccine_type" : vaccine_type ,
111+ "row_count" : row_count_source - 1 ,
112+ }
113+ else :
114+ result = None
115+ logger .info ("Ack file updated to %s: %s" , ack_bucket_name , archive_ack_file_key )
116+ return result
99117
100118
101119def update_ack_file (
102120 file_key : str ,
103121 message_id : str ,
104- supplier_queue : str ,
122+ supplier : str ,
123+ vaccine_type : str ,
105124 created_at_formatted_string : str ,
106125 ack_data_rows : list ,
107126) -> None :
@@ -113,7 +132,8 @@ def update_ack_file(
113132 upload_ack_file (
114133 temp_ack_file_key ,
115134 message_id ,
116- supplier_queue ,
135+ supplier ,
136+ vaccine_type ,
117137 accumulated_csv_content ,
118138 ack_data_rows ,
119139 archive_ack_file_key ,
@@ -123,6 +143,7 @@ def update_ack_file(
123143
124144def move_file (bucket_name : str , source_file_key : str , destination_file_key : str ) -> None :
125145 """Moves a file from one location to another within a single S3 bucket by copying and then deleting the file."""
146+ s3_client = get_s3_client ()
126147 s3_client .copy_object (
127148 Bucket = bucket_name , CopySource = {"Bucket" : bucket_name , "Key" : source_file_key }, Key = destination_file_key
128149 )
@@ -135,7 +156,15 @@ def invoke_filename_lambda(file_key: str, message_id: str) -> None:
135156 try :
136157 lambda_payload = {
137158 "Records" : [
138- {"s3" : {"bucket" : {"name" : SOURCE_BUCKET_NAME }, "object" : {"key" : file_key }}, "message_id" : message_id }
159+ {"s3" :
160+ {
161+ "bucket" : {
162+ "name" : get_source_bucket_name ()
163+ },
164+ "object" : {"key" : file_key }
165+ },
166+ "message_id" : message_id
167+ }
139168 ]
140169 }
141170 lambda_client .invoke (
0 commit comments