44from io import StringIO , BytesIO
55from typing import Union
66from botocore .exceptions import ClientError
7- from constants import ACK_HEADERS , SOURCE_BUCKET_NAME , ACK_BUCKET_NAME , FILE_NAME_PROC_LAMBDA_NAME
7+ from constants import ACK_HEADERS , get_source_bucket_name , get_ack_bucket_name , FILE_NAME_PROC_LAMBDA_NAME
88from audit_table import change_audit_table_status_to_processed , get_next_queued_file_details
9- from clients import s3_client , logger , lambda_client
9+ from clients import get_s3_client , logger , lambda_client
1010from utils_for_ack_lambda import get_row_count
1111
1212
@@ -49,7 +49,7 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
4949 """Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""
5050 try :
5151 # If ack file exists in S3 download the contents
52- existing_ack_file = s3_client .get_object (Bucket = ACK_BUCKET_NAME , Key = temp_ack_file_key )
52+ existing_ack_file = get_s3_client () .get_object (Bucket = get_ack_bucket_name () , Key = temp_ack_file_key )
5353 existing_content = existing_ack_file ["Body" ].read ().decode ("utf-8" )
5454 except ClientError as error :
5555 # If ack file does not exist in S3 create a new file containing the headers only
@@ -80,22 +80,26 @@ def upload_ack_file(
8080 cleaned_row = "|" .join (data_row_str ).replace (" |" , "|" ).replace ("| " , "|" ).strip ()
8181 accumulated_csv_content .write (cleaned_row + "\n " )
8282 csv_file_like_object = BytesIO (accumulated_csv_content .getvalue ().encode ("utf-8" ))
83- s3_client .upload_fileobj (csv_file_like_object , ACK_BUCKET_NAME , temp_ack_file_key )
8483
85- row_count_source = get_row_count (SOURCE_BUCKET_NAME , f"processing/{ file_key } " )
86- row_count_destination = get_row_count (ACK_BUCKET_NAME , temp_ack_file_key )
84+ ack_bucket_name = get_ack_bucket_name ()
85+ source_bucket_name = get_source_bucket_name ()
86+
87+ get_s3_client ().upload_fileobj (csv_file_like_object , ack_bucket_name , temp_ack_file_key )
88+
89+ row_count_source = get_row_count (source_bucket_name , f"processing/{ file_key } " )
90+ row_count_destination = get_row_count (ack_bucket_name , temp_ack_file_key )
8791 # TODO: Should we check for > and if so what handling is required
8892 if row_count_destination == row_count_source :
89- move_file (ACK_BUCKET_NAME , temp_ack_file_key , archive_ack_file_key )
90- move_file (SOURCE_BUCKET_NAME , f"processing/{ file_key } " , f"archive/{ file_key } " )
93+ move_file (ack_bucket_name , temp_ack_file_key , archive_ack_file_key )
94+ move_file (source_bucket_name , f"processing/{ file_key } " , f"archive/{ file_key } " )
9195
9296 # Update the audit table and invoke the filename lambda with next file in the queue (if one exists)
9397 change_audit_table_status_to_processed (file_key , message_id )
9498 next_queued_file_details = get_next_queued_file_details (supplier_queue )
9599 if next_queued_file_details :
96100 invoke_filename_lambda (next_queued_file_details ["filename" ], next_queued_file_details ["message_id" ])
97101
98- logger .info ("Ack file updated to %s: %s" , ACK_BUCKET_NAME , archive_ack_file_key )
102+ logger .info ("Ack file updated to %s: %s" , ack_bucket_name , archive_ack_file_key )
99103
100104
101105def update_ack_file (
@@ -123,6 +127,7 @@ def update_ack_file(
123127
124128def move_file (bucket_name : str , source_file_key : str , destination_file_key : str ) -> None :
125129 """Moves a file from one location to another within a single S3 bucket by copying and then deleting the file."""
130+ s3_client = get_s3_client ()
126131 s3_client .copy_object (
127132 Bucket = bucket_name , CopySource = {"Bucket" : bucket_name , "Key" : source_file_key }, Key = destination_file_key
128133 )
@@ -135,7 +140,15 @@ def invoke_filename_lambda(file_key: str, message_id: str) -> None:
135140 try :
136141 lambda_payload = {
137142 "Records" : [
138- {"s3" : {"bucket" : {"name" : SOURCE_BUCKET_NAME }, "object" : {"key" : file_key }}, "message_id" : message_id }
143+ {"s3" :
144+ {
145+ "bucket" : {
146+ "name" : get_source_bucket_name ()
147+ },
148+ "object" : {"key" : file_key }
149+ },
150+ "message_id" : message_id
151+ }
139152 ]
140153 }
141154 lambda_client .invoke (
0 commit comments