44from io import StringIO , BytesIO
55from typing import Union , Optional
66from botocore .exceptions import ClientError
7- from constants import ACK_HEADERS , SOURCE_BUCKET_NAME , ACK_BUCKET_NAME , FILE_NAME_PROC_LAMBDA_NAME
7+ from constants import ACK_HEADERS , get_source_bucket_name , get_ack_bucket_name , FILE_NAME_PROC_LAMBDA_NAME
88from audit_table import change_audit_table_status_to_processed , get_next_queued_file_details
9- from clients import s3_client , logger , lambda_client
9+ from clients import get_s3_client , logger , lambda_client
1010from utils_for_ack_lambda import get_row_count
1111from logging_decorators import upload_ack_file_logging_decorator
1212
@@ -50,7 +50,7 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
5050 """Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""
5151 try :
5252 # If ack file exists in S3 download the contents
53- existing_ack_file = s3_client .get_object (Bucket = ACK_BUCKET_NAME , Key = temp_ack_file_key )
53+ existing_ack_file = get_s3_client () .get_object (Bucket = get_ack_bucket_name () , Key = temp_ack_file_key )
5454 existing_content = existing_ack_file ["Body" ].read ().decode ("utf-8" )
5555 except ClientError as error :
5656 # If ack file does not exist in S3 create a new file containing the headers only
@@ -83,14 +83,19 @@ def upload_ack_file(
8383 cleaned_row = "|" .join (data_row_str ).replace (" |" , "|" ).replace ("| " , "|" ).strip ()
8484 accumulated_csv_content .write (cleaned_row + "\n " )
8585 csv_file_like_object = BytesIO (accumulated_csv_content .getvalue ().encode ("utf-8" ))
86- s3_client .upload_fileobj (csv_file_like_object , ACK_BUCKET_NAME , temp_ack_file_key )
8786
88- row_count_source = get_row_count (SOURCE_BUCKET_NAME , f"processing/{ file_key } " )
89- row_count_destination = get_row_count (ACK_BUCKET_NAME , temp_ack_file_key )
87+ ack_bucket_name = get_ack_bucket_name ()
88+ source_bucket_name = get_source_bucket_name ()
89+
90+ get_s3_client ().upload_fileobj (csv_file_like_object , ack_bucket_name , temp_ack_file_key )
91+
92+ row_count_source = get_row_count (source_bucket_name , f"processing/{ file_key } " )
93+ row_count_destination = get_row_count (ack_bucket_name , temp_ack_file_key )
9094 # TODO: Should we check for > and if so what handling is required
9195 if row_count_destination == row_count_source :
92- move_file (ACK_BUCKET_NAME , temp_ack_file_key , archive_ack_file_key )
93- move_file (SOURCE_BUCKET_NAME , f"processing/{ file_key } " , f"archive/{ file_key } " )
96+ move_file (ack_bucket_name , temp_ack_file_key , archive_ack_file_key )
97+ move_file (source_bucket_name , f"processing/{ file_key } " , f"archive/{ file_key } " )
98+
9499 # Update the audit table and invoke the filename lambda with next file in the queue (if one exists)
95100 change_audit_table_status_to_processed (file_key , message_id )
96101 supplier_queue = f"{ supplier } _{ vaccine_type } "
@@ -107,7 +112,7 @@ def upload_ack_file(
107112 }
108113 else :
109114 result = None
110- logger .info ("Ack file updated to %s: %s" , ACK_BUCKET_NAME , archive_ack_file_key )
115+ logger .info ("Ack file updated to %s: %s" , ack_bucket_name , archive_ack_file_key )
111116 return result
112117
113118
@@ -138,6 +143,7 @@ def update_ack_file(
138143
139144def move_file (bucket_name : str , source_file_key : str , destination_file_key : str ) -> None :
140145 """Moves a file from one location to another within a single S3 bucket by copying and then deleting the file."""
146+ s3_client = get_s3_client ()
141147 s3_client .copy_object (
142148 Bucket = bucket_name , CopySource = {"Bucket" : bucket_name , "Key" : source_file_key }, Key = destination_file_key
143149 )
@@ -150,7 +156,15 @@ def invoke_filename_lambda(file_key: str, message_id: str) -> None:
150156 try :
151157 lambda_payload = {
152158 "Records" : [
153- {"s3" : {"bucket" : {"name" : SOURCE_BUCKET_NAME }, "object" : {"key" : file_key }}, "message_id" : message_id }
159+ {"s3" :
160+ {
161+ "bucket" : {
162+ "name" : get_source_bucket_name ()
163+ },
164+ "object" : {"key" : file_key }
165+ },
166+ "message_id" : message_id
167+ }
154168 ]
155169 }
156170 lambda_client .invoke (
0 commit comments