Skip to content

Commit 124928d

Browse files
committed
VED-859 Improve ack backend efficiency and make completion process more robust (#892)
1 parent da5eb1e commit 124928d

28 files changed

+860
-289
lines changed

ack_backend/poetry.lock

Lines changed: 130 additions & 92 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ack_backend/pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ python = "~3.11"
1010
boto3 = "~1.38.42"
1111
mypy-boto3-dynamodb = "^1.38.4"
1212
freezegun = "^1.5.2"
13-
moto = "^4"
14-
coverage = "^7.9.1"
13+
moto = "^5.1.14"
14+
coverage = "^7.10.7"
1515

1616

1717
[build-system]

ack_backend/src/ack_processor.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22

33
import json
44
from logging_decorators import ack_lambda_handler_logging_decorator
5-
from update_ack_file import update_ack_file
5+
from update_ack_file import update_ack_file, complete_batch_file_process
6+
from utils_for_ack_lambda import is_ack_processing_complete
67
from convert_message_to_ack_row import convert_message_to_ack_row
78

89

910
@ack_lambda_handler_logging_decorator
10-
def lambda_handler(event, context):
11+
def lambda_handler(event, _):
1112
"""
1213
Ack lambda handler.
1314
For each record: each message in the array of messages is converted to an ack row,
@@ -23,6 +24,7 @@ def lambda_handler(event, context):
2324
supplier_queue = None
2425

2526
ack_data_rows = []
27+
total_ack_rows_processed = 0
2628

2729
for i, record in enumerate(event["Records"]):
2830

@@ -32,10 +34,8 @@ def lambda_handler(event, context):
3234
raise ValueError("Could not load incoming message body") from body_json_error
3335

3436
if i == 0:
35-
# IMPORTANT NOTE: An assumption is made here that the file_key and created_at_formatted_string are the same
36-
# for all messages in the event. The use of FIFO SQS queues ensures that this is the case, provided that
37-
# there is only one file processing at a time for each supplier queue (combination of supplier and vaccine
38-
# type).
37+
# The SQS FIFO MessageGroupId that this lambda consumes from is based on the source filename + created at
38+
# datetime. Therefore, can safely retrieve file metadata from the first record in the list
3939
file_key = incoming_message_body[0].get("file_key")
4040
message_id = (incoming_message_body[0].get("row_id", "")).split("^")[0]
4141
vaccine_type = incoming_message_body[0].get("vaccine_type")
@@ -46,6 +46,15 @@ def lambda_handler(event, context):
4646
for message in incoming_message_body:
4747
ack_data_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))
4848

49-
update_ack_file(file_key, message_id, supplier_queue, created_at_formatted_string, ack_data_rows)
49+
update_ack_file(file_key, created_at_formatted_string, ack_data_rows)
50+
51+
# Get the row count of the final processed record
52+
# Format of the row id is {batch_message_id}^{row_number}
53+
total_ack_rows_processed = int(incoming_message_body[-1].get("row_id", "").split("^")[1])
54+
55+
if is_ack_processing_complete(message_id, total_ack_rows_processed):
56+
complete_batch_file_process(
57+
message_id, supplier, vaccine_type, supplier_queue, created_at_formatted_string, file_key, total_ack_rows_processed
58+
)
5059

5160
return {"statusCode": 200, "body": json.dumps("Lambda function executed successfully!")}

ack_backend/src/audit_table.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Add the filename to the audit table and check for duplicates."""
22

3-
from typing import Union
3+
from typing import Optional, Union
44
from boto3.dynamodb.conditions import Key
55
from clients import dynamodb_client, dynamodb_resource, logger
66
from errors import UnhandledAuditTableError
@@ -47,3 +47,17 @@ def change_audit_table_status_to_processed(file_key: str, message_id: str) -> No
4747
except Exception as error: # pylint: disable = broad-exception-caught
4848
logger.error(error)
4949
raise UnhandledAuditTableError(error) from error
50+
51+
52+
def get_record_count_by_message_id(event_message_id: str) -> Optional[int]:
53+
"""Retrieves full audit entry by unique event message ID"""
54+
audit_record = dynamodb_client.get_item(
55+
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": event_message_id}}
56+
)
57+
58+
record_count = audit_record.get("Item", {}).get(AuditTableKeys.RECORD_COUNT, {}).get("N")
59+
60+
if not record_count:
61+
return None
62+
63+
return int(record_count)

ack_backend/src/constants.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,26 @@
22

33
import os
44

5-
SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME")
6-
ACK_BUCKET_NAME = os.getenv("ACK_BUCKET_NAME")
75
AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
86
AUDIT_TABLE_FILENAME_GSI = "filename_index"
97
AUDIT_TABLE_QUEUE_NAME_GSI = "queue_name_index"
108
FILE_NAME_PROC_LAMBDA_NAME = os.getenv("FILE_NAME_PROC_LAMBDA_NAME")
119

10+
COMPLETED_ACK_DIR = "forwardedFile"
11+
TEMP_ACK_DIR = "TempAck"
12+
BATCH_FILE_PROCESSING_DIR = "processing"
13+
BATCH_FILE_ARCHIVE_DIR = "archive"
14+
15+
16+
def get_source_bucket_name() -> str:
17+
"""Get the SOURCE_BUCKET_NAME environment from environment variables."""
18+
return os.getenv("SOURCE_BUCKET_NAME")
19+
20+
21+
def get_ack_bucket_name() -> str:
22+
"""Get the ACK_BUCKET_NAME environment from environment variables."""
23+
return os.getenv("ACK_BUCKET_NAME")
24+
1225

1326
class FileStatus:
1427
"""File status constants"""
@@ -25,6 +38,7 @@ class AuditTableKeys:
2538
FILENAME = "filename"
2639
MESSAGE_ID = "message_id"
2740
QUEUE_NAME = "queue_name"
41+
RECORD_COUNT = "record_count"
2842
STATUS = "status"
2943
TIMESTAMP = "timestamp"
3044

ack_backend/src/logging_decorators.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from functools import wraps
88
from clients import firehose_client, logger
99

10-
10+
PREFIX = "ack_processor"
1111
STREAM_NAME = os.getenv("SPLUNK_FIREHOSE_NAME", "immunisation-fhir-api-internal-dev-splunk-firehose")
1212

1313

@@ -77,6 +77,34 @@ def wrapper(message, created_at_formatted_string):
7777
return wrapper
7878

7979

80+
def complete_batch_file_process_logging_decorator(func):
81+
"""This decorator logs when record processing is complete."""
82+
83+
@wraps(func)
84+
def wrapper(*args, **kwargs):
85+
86+
base_log_data = {
87+
"function_name": f"{PREFIX}_{func.__name__}",
88+
"date_time": str(datetime.now()),
89+
}
90+
start_time = time.time()
91+
92+
# NB this doesn't require a try-catch block as the wrapped function never throws an exception
93+
result = func(*args, **kwargs)
94+
if result is not None:
95+
message_for_logs = "Record processing complete"
96+
base_log_data.update(result)
97+
additional_log_data = {
98+
"status": "success",
99+
"statusCode": 200,
100+
"message": message_for_logs,
101+
}
102+
generate_and_send_logs(start_time, base_log_data, additional_log_data)
103+
return result
104+
105+
return wrapper
106+
107+
80108
def ack_lambda_handler_logging_decorator(func):
81109
"""This decorator logs the execution info for the ack lambda handler."""
82110

ack_backend/src/update_ack_file.py

Lines changed: 62 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,20 @@
44
from io import StringIO, BytesIO
55
from typing import Union
66
from botocore.exceptions import ClientError
7-
from constants import ACK_HEADERS, SOURCE_BUCKET_NAME, ACK_BUCKET_NAME, FILE_NAME_PROC_LAMBDA_NAME
8-
from audit_table import change_audit_table_status_to_processed, get_next_queued_file_details
7+
from audit_table import change_audit_table_status_to_processed
98
from clients import s3_client, logger, lambda_client
10-
from utils_for_ack_lambda import get_row_count
9+
from audit_table import change_audit_table_status_to_processed, get_next_queued_file_details
10+
from constants import (
11+
ACK_HEADERS,
12+
get_source_bucket_name,
13+
get_ack_bucket_name,
14+
COMPLETED_ACK_DIR,
15+
TEMP_ACK_DIR,
16+
BATCH_FILE_PROCESSING_DIR,
17+
BATCH_FILE_ARCHIVE_DIR,
18+
FILE_NAME_PROC_LAMBDA_NAME
19+
)
20+
from logging_decorators import complete_batch_file_process_logging_decorator
1121

1222

1323
def create_ack_data(
@@ -45,11 +55,45 @@ def create_ack_data(
4555
}
4656

4757

58+
@complete_batch_file_process_logging_decorator
59+
def complete_batch_file_process(
60+
message_id: str,
61+
supplier: str,
62+
vaccine_type: str,
63+
supplier_queue: str,
64+
created_at_formatted_string: str,
65+
file_key: str,
66+
total_ack_rows_processed: int,
67+
) -> dict:
68+
"""Mark the batch file as processed. This involves moving the ack and original file to destinations and updating
69+
the audit table status"""
70+
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
71+
72+
move_file(get_ack_bucket_name(), f"{TEMP_ACK_DIR}/{ack_filename}", f"{COMPLETED_ACK_DIR}/{ack_filename}")
73+
move_file(
74+
get_source_bucket_name(), f"{BATCH_FILE_PROCESSING_DIR}/{file_key}", f"{BATCH_FILE_ARCHIVE_DIR}/{file_key}"
75+
)
76+
77+
change_audit_table_status_to_processed(file_key, message_id)
78+
79+
next_queued_file_details = get_next_queued_file_details(supplier_queue)
80+
if next_queued_file_details:
81+
invoke_filename_lambda(next_queued_file_details["filename"], next_queued_file_details["message_id"])
82+
83+
return {
84+
"message_id": message_id,
85+
"file_key": file_key,
86+
"supplier": supplier,
87+
"vaccine_type": vaccine_type,
88+
"row_count": total_ack_rows_processed,
89+
}
90+
91+
4892
def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
4993
"""Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""
5094
try:
5195
# If ack file exists in S3 download the contents
52-
existing_ack_file = s3_client.get_object(Bucket=ACK_BUCKET_NAME, Key=temp_ack_file_key)
96+
existing_ack_file = s3_client.get_object(Bucket=get_ack_bucket_name(), Key=temp_ack_file_key)
5397
existing_content = existing_ack_file["Body"].read().decode("utf-8")
5498
except ClientError as error:
5599
# If ack file does not exist in S3 create a new file containing the headers only
@@ -65,60 +109,27 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
65109
return accumulated_csv_content
66110

67111

68-
def upload_ack_file(
69-
temp_ack_file_key: str,
70-
message_id: str,
71-
supplier_queue: str,
72-
accumulated_csv_content: StringIO,
73-
ack_data_rows: list,
74-
archive_ack_file_key: str,
75-
file_key: str,
76-
) -> None:
77-
"""Adds the data row to the uploaded ack file"""
78-
for row in ack_data_rows:
79-
data_row_str = [str(item) for item in row.values()]
80-
cleaned_row = "|".join(data_row_str).replace(" |", "|").replace("| ", "|").strip()
81-
accumulated_csv_content.write(cleaned_row + "\n")
82-
csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8"))
83-
s3_client.upload_fileobj(csv_file_like_object, ACK_BUCKET_NAME, temp_ack_file_key)
84-
85-
row_count_source = get_row_count(SOURCE_BUCKET_NAME, f"processing/{file_key}")
86-
row_count_destination = get_row_count(ACK_BUCKET_NAME, temp_ack_file_key)
87-
# TODO: Should we check for > and if so what handling is required
88-
if row_count_destination == row_count_source:
89-
move_file(ACK_BUCKET_NAME, temp_ack_file_key, archive_ack_file_key)
90-
move_file(SOURCE_BUCKET_NAME, f"processing/{file_key}", f"archive/{file_key}")
91-
92-
# Update the audit table and invoke the filename lambda with next file in the queue (if one exists)
93-
change_audit_table_status_to_processed(file_key, message_id)
94-
next_queued_file_details = get_next_queued_file_details(supplier_queue)
95-
if next_queued_file_details:
96-
invoke_filename_lambda(next_queued_file_details["filename"], next_queued_file_details["message_id"])
97-
98-
logger.info("Ack file updated to %s: %s", ACK_BUCKET_NAME, archive_ack_file_key)
99-
100-
101112
def update_ack_file(
102113
file_key: str,
103-
message_id: str,
104-
supplier_queue: str,
105114
created_at_formatted_string: str,
106115
ack_data_rows: list,
107116
) -> None:
108117
"""Updates the ack file with the new data row based on the given arguments"""
109118
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
110-
temp_ack_file_key = f"TempAck/{ack_filename}"
111-
archive_ack_file_key = f"forwardedFile/{ack_filename}"
119+
temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}"
120+
archive_ack_file_key = f"{COMPLETED_ACK_DIR}/{ack_filename}"
112121
accumulated_csv_content = obtain_current_ack_content(temp_ack_file_key)
113-
upload_ack_file(
114-
temp_ack_file_key,
115-
message_id,
116-
supplier_queue,
117-
accumulated_csv_content,
118-
ack_data_rows,
119-
archive_ack_file_key,
120-
file_key,
121-
)
122+
123+
for row in ack_data_rows:
124+
data_row_str = [str(item) for item in row.values()]
125+
cleaned_row = "|".join(data_row_str).replace(" |", "|").replace("| ", "|").strip()
126+
accumulated_csv_content.write(cleaned_row + "\n")
127+
128+
csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8"))
129+
ack_bucket_name = get_ack_bucket_name()
130+
131+
s3_client.upload_fileobj(csv_file_like_object, ack_bucket_name, temp_ack_file_key)
132+
logger.info("Ack file updated to %s: %s", ack_bucket_name, archive_ack_file_key)
122133

123134

124135
def move_file(bucket_name: str, source_file_key: str, destination_file_key: str) -> None:
@@ -135,7 +146,7 @@ def invoke_filename_lambda(file_key: str, message_id: str) -> None:
135146
try:
136147
lambda_payload = {
137148
"Records": [
138-
{"s3": {"bucket": {"name": SOURCE_BUCKET_NAME}, "object": {"key": file_key}}, "message_id": message_id}
149+
{"s3": {"bucket": {"name": get_source_bucket_name()}, "object": {"key": file_key}}, "message_id": message_id}
139150
]
140151
}
141152
lambda_client.invoke(
Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,21 @@
11
"""Utils for ack lambda"""
22

3-
from clients import s3_client
3+
from audit_table import get_record_count_by_message_id
44

5+
_BATCH_EVENT_ID_TO_RECORD_COUNT_MAP: dict[str, int] = {}
56

6-
def get_row_count(bucket_name: str, file_key: str) -> int:
7-
"""
8-
Looks in the given bucket and returns the count of the number of lines in the given file.
9-
NOTE: Blank lines are not included in the count.
10-
"""
11-
response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
12-
return sum(1 for line in response["Body"].iter_lines() if line.strip())
7+
8+
def is_ack_processing_complete(batch_event_message_id: str, processed_ack_count: int) -> bool:
9+
"""Checks if we have received all the acknowledgement rows for the original source file. Also caches the value of
10+
the source file record count to reduce traffic to DynamoDB"""
11+
if batch_event_message_id in _BATCH_EVENT_ID_TO_RECORD_COUNT_MAP:
12+
return _BATCH_EVENT_ID_TO_RECORD_COUNT_MAP[batch_event_message_id] == processed_ack_count
13+
14+
record_count = get_record_count_by_message_id(batch_event_message_id)
15+
16+
if not record_count:
17+
# Record count is not set on the audit item until all rows have been preprocessed and sent to Kinesis
18+
return False
19+
20+
_BATCH_EVENT_ID_TO_RECORD_COUNT_MAP[batch_event_message_id] = record_count
21+
return record_count == processed_ack_count

0 commit comments

Comments
 (0)