Skip to content

Commit d256d8d

Browse files
committed
Minor refactoring
1 parent a9d755c commit d256d8d

File tree

5 files changed

+25
-11
lines changed

5 files changed

+25
-11
lines changed

lambdas/ack_backend/src/ack_processor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ def lambda_handler(event, _):
4444
for message in incoming_message_body:
4545
ack_data_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))
4646

47+
# Get the row number of the final record processed
4748
if i == len(event["Records"]) - 1:
49+
# Format of the row id is {batch_message_id}^{row_number}
4850
total_ack_rows_processed = int(incoming_message_body[-1].get("row_id", "").split("^")[1])
4951

5052
update_ack_file(file_key, created_at_formatted_string, ack_data_rows)

lambdas/ack_backend/src/constants.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44

55
AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
66

7+
COMPLETED_ACK_DIR = "forwardedFile"
8+
TEMP_ACK_DIR = "TempAck"
9+
BATCH_FILE_PROCESSING_DIR = "processing"
10+
BATCH_FILE_ARCHIVE_DIR = "archive"
11+
712

813
def get_source_bucket_name() -> str:
914
"""Get the SOURCE_BUCKET_NAME environment from environment variables."""

lambdas/ack_backend/src/logging_decorators.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def wrapper(message, created_at_formatted_string):
4949
return wrapper
5050

5151

52-
def upload_ack_file_logging_decorator(func):
52+
def complete_batch_file_process_logging_decorator(func):
5353
"""This decorator logs when record processing is complete."""
5454

5555
@wraps(func)

lambdas/ack_backend/src/update_ack_file.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
from io import StringIO, BytesIO
55
from audit_table import change_audit_table_status_to_processed
66
from common.clients import get_s3_client, logger
7-
from constants import ACK_HEADERS, get_source_bucket_name, get_ack_bucket_name
8-
from logging_decorators import upload_ack_file_logging_decorator
7+
from constants import ACK_HEADERS, get_source_bucket_name, get_ack_bucket_name, COMPLETED_ACK_DIR, TEMP_ACK_DIR, \
8+
BATCH_FILE_PROCESSING_DIR, BATCH_FILE_ARCHIVE_DIR
9+
from logging_decorators import complete_batch_file_process_logging_decorator
910

1011

1112
def create_ack_data(
@@ -43,7 +44,7 @@ def create_ack_data(
4344
}
4445

4546

46-
@upload_ack_file_logging_decorator
47+
@complete_batch_file_process_logging_decorator
4748
def complete_batch_file_process(
4849
message_id: str,
4950
supplier: str,
@@ -54,12 +55,18 @@ def complete_batch_file_process(
5455
) -> dict:
5556
"""Mark the batch file as processed. This involves moving the ack and original file to destinations and updating
5657
the audit table status"""
57-
ack_bucket_name = get_ack_bucket_name()
58-
source_bucket_name = get_source_bucket_name()
5958
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
6059

61-
move_file(ack_bucket_name, f"TempAck/{ack_filename}", f"forwardedFile/{ack_filename}")
62-
move_file(source_bucket_name, f"processing/{file_key}", f"archive/{file_key}")
60+
move_file(
61+
get_ack_bucket_name(),
62+
f"{TEMP_ACK_DIR}/{ack_filename}",
63+
f"{COMPLETED_ACK_DIR}/{ack_filename}"
64+
)
65+
move_file(
66+
get_source_bucket_name(),
67+
f"{BATCH_FILE_PROCESSING_DIR}/{file_key}",
68+
f"{BATCH_FILE_ARCHIVE_DIR}/{file_key}"
69+
)
6370

6471
change_audit_table_status_to_processed(file_key, message_id)
6572

@@ -99,8 +106,8 @@ def update_ack_file(
99106
) -> None:
100107
"""Updates the ack file with the new data row based on the given arguments"""
101108
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
102-
temp_ack_file_key = f"TempAck/{ack_filename}"
103-
archive_ack_file_key = f"forwardedFile/{ack_filename}"
109+
temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}"
110+
archive_ack_file_key = f"{COMPLETED_ACK_DIR}/{ack_filename}"
104111
accumulated_csv_content = obtain_current_ack_content(temp_ack_file_key)
105112

106113
for row in ack_data_rows:

lambdas/ack_backend/src/utils_for_ack_lambda.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def is_ack_processing_complete(batch_event_message_id: str, processed_ack_count:
1313
record_count = get_record_count_by_message_id(batch_event_message_id)
1414

1515
if not record_count:
16-
# Record Count is not set on the audit item until all rows have been preprocessed and sent to Kinesis
16+
# Record count is not set on the audit item until all rows have been preprocessed and sent to Kinesis
1717
return False
1818

1919
_BATCH_EVENT_ID_TO_RECORD_COUNT_MAP[batch_event_message_id] = record_count

0 commit comments

Comments
 (0)