Skip to content

Commit 5f9f93f

Browse files
dlzhry2nhsmfjarvis
authored andcommitted
VED-859 Improve ack backend efficiency and make completion process more robust (#892)
1 parent 164f0da commit 5f9f93f

27 files changed

+470
-308
lines changed

.github/workflows/quality-checks.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ jobs:
175175
working-directory: lambdas/ack_backend
176176
id: acklambda
177177
env:
178-
PYTHONPATH: ${{ env.LAMBDA_PATH }}/ack_backend/src:${{ github.workspace }}/ack_backend/tests
178+
PYTHONPATH: ${{ env.LAMBDA_PATH }}/ack_backend/src:tests:${{ env.SHARED_PATH }}/src
179179
continue-on-error: true
180180
run: |
181181
poetry install

lambdas/ack_backend/poetry.lock

Lines changed: 25 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lambdas/ack_backend/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ python = "~3.11"
1414
boto3 = "~1.40.45"
1515
mypy-boto3-dynamodb = "^1.40.44"
1616
freezegun = "^1.5.2"
17-
moto = "^4"
17+
moto = "^5.1.14"
1818
coverage = "^7.10.7"
1919

2020

lambdas/ack_backend/src/ack_processor.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22

33
import json
44
from logging_decorators import ack_lambda_handler_logging_decorator
5-
from update_ack_file import update_ack_file
5+
from update_ack_file import update_ack_file, complete_batch_file_process
6+
from utils_for_ack_lambda import is_ack_processing_complete
67
from convert_message_to_ack_row import convert_message_to_ack_row
78

89

910
@ack_lambda_handler_logging_decorator
10-
def lambda_handler(event, context):
11+
def lambda_handler(event, _):
1112
"""
1213
Ack lambda handler.
1314
For each record: each message in the array of messages is converted to an ack row,
@@ -22,6 +23,7 @@ def lambda_handler(event, context):
2223
message_id = None
2324

2425
ack_data_rows = []
26+
total_ack_rows_processed = 0
2527

2628
for i, record in enumerate(event["Records"]):
2729

@@ -31,10 +33,8 @@ def lambda_handler(event, context):
3133
raise ValueError("Could not load incoming message body") from body_json_error
3234

3335
if i == 0:
34-
# IMPORTANT NOTE: An assumption is made here that the file_key and created_at_formatted_string are the same
35-
# for all messages in the event. The use of FIFO SQS queues ensures that this is the case, provided that
36-
# there is only one file processing at a time for each supplier queue (combination of supplier and vaccine
37-
# type).
36+
# The SQS FIFO MessageGroupId that this lambda consumes from is based on the source filename + created at
37+
# datetime. Therefore, can safely retrieve file metadata from the first record in the list
3838
file_key = incoming_message_body[0].get("file_key")
3939
message_id = (incoming_message_body[0].get("row_id", "")).split("^")[0]
4040
vaccine_type = incoming_message_body[0].get("vaccine_type")
@@ -44,14 +44,16 @@ def lambda_handler(event, context):
4444
for message in incoming_message_body:
4545
ack_data_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))
4646

47-
update_ack_file(
48-
file_key,
49-
message_id,
50-
supplier,
51-
vaccine_type,
52-
created_at_formatted_string,
53-
ack_data_rows,
54-
)
47+
update_ack_file(file_key, created_at_formatted_string, ack_data_rows)
48+
49+
# Get the row count of the final processed record
50+
# Format of the row id is {batch_message_id}^{row_number}
51+
total_ack_rows_processed = int(incoming_message_body[-1].get("row_id", "").split("^")[1])
52+
53+
if is_ack_processing_complete(message_id, total_ack_rows_processed):
54+
complete_batch_file_process(
55+
message_id, supplier, vaccine_type, created_at_formatted_string, file_key, total_ack_rows_processed
56+
)
5557

5658
return {
5759
"statusCode": 200,

lambdas/ack_backend/src/audit_table.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Add the filename to the audit table and check for duplicates."""
22

3+
from typing import Optional
34
from common.clients import dynamodb_client, logger
45
from common.models.errors import UnhandledAuditTableError
56
from constants import AUDIT_TABLE_NAME, FileStatus, AuditTableKeys
@@ -28,3 +29,17 @@ def change_audit_table_status_to_processed(file_key: str, message_id: str) -> No
2829
except Exception as error: # pylint: disable = broad-exception-caught
2930
logger.error(error)
3031
raise UnhandledAuditTableError(error) from error
32+
33+
34+
def get_record_count_by_message_id(event_message_id: str) -> Optional[int]:
35+
"""Retrieves full audit entry by unique event message ID"""
36+
audit_record = dynamodb_client.get_item(
37+
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": event_message_id}}
38+
)
39+
40+
record_count = audit_record.get("Item", {}).get(AuditTableKeys.RECORD_COUNT, {}).get("N")
41+
42+
if not record_count:
43+
return None
44+
45+
return int(record_count)

lambdas/ack_backend/src/constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44

55
AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
66

7+
COMPLETED_ACK_DIR = "forwardedFile"
8+
TEMP_ACK_DIR = "TempAck"
9+
BATCH_FILE_PROCESSING_DIR = "processing"
10+
BATCH_FILE_ARCHIVE_DIR = "archive"
11+
712

813
def get_source_bucket_name() -> str:
914
"""Get the SOURCE_BUCKET_NAME environment from environment variables."""
@@ -30,6 +35,7 @@ class AuditTableKeys:
3035
FILENAME = "filename"
3136
MESSAGE_ID = "message_id"
3237
QUEUE_NAME = "queue_name"
38+
RECORD_COUNT = "record_count"
3339
STATUS = "status"
3440
TIMESTAMP = "timestamp"
3541

lambdas/ack_backend/src/logging_decorators.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def wrapper(message, created_at_formatted_string):
6868
return wrapper
6969

7070

71-
def upload_ack_file_logging_decorator(func):
71+
def complete_batch_file_process_logging_decorator(func):
7272
"""This decorator logs when record processing is complete."""
7373

7474
@wraps(func)

lambdas/ack_backend/src/update_ack_file.py

Lines changed: 50 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,18 @@
22

33
from botocore.exceptions import ClientError
44
from io import StringIO, BytesIO
5-
from typing import Optional
65
from audit_table import change_audit_table_status_to_processed
76
from common.clients import get_s3_client, logger
8-
from constants import ACK_HEADERS, get_source_bucket_name, get_ack_bucket_name
9-
from logging_decorators import upload_ack_file_logging_decorator
10-
from utils_for_ack_lambda import get_row_count
7+
from constants import (
8+
ACK_HEADERS,
9+
get_source_bucket_name,
10+
get_ack_bucket_name,
11+
COMPLETED_ACK_DIR,
12+
TEMP_ACK_DIR,
13+
BATCH_FILE_PROCESSING_DIR,
14+
BATCH_FILE_ARCHIVE_DIR,
15+
)
16+
from logging_decorators import complete_batch_file_process_logging_decorator
1117

1218

1319
def create_ack_data(
@@ -45,6 +51,35 @@ def create_ack_data(
4551
}
4652

4753

54+
@complete_batch_file_process_logging_decorator
55+
def complete_batch_file_process(
56+
message_id: str,
57+
supplier: str,
58+
vaccine_type: str,
59+
created_at_formatted_string: str,
60+
file_key: str,
61+
total_ack_rows_processed: int,
62+
) -> dict:
63+
"""Mark the batch file as processed. This involves moving the ack and original file to destinations and updating
64+
the audit table status"""
65+
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
66+
67+
move_file(get_ack_bucket_name(), f"{TEMP_ACK_DIR}/{ack_filename}", f"{COMPLETED_ACK_DIR}/{ack_filename}")
68+
move_file(
69+
get_source_bucket_name(), f"{BATCH_FILE_PROCESSING_DIR}/{file_key}", f"{BATCH_FILE_ARCHIVE_DIR}/{file_key}"
70+
)
71+
72+
change_audit_table_status_to_processed(file_key, message_id)
73+
74+
return {
75+
"message_id": message_id,
76+
"file_key": file_key,
77+
"supplier": supplier,
78+
"vaccine_type": vaccine_type,
79+
"row_count": total_ack_rows_processed,
80+
}
81+
82+
4883
def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
4984
"""Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""
5085
try:
@@ -65,76 +100,27 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
65100
return accumulated_csv_content
66101

67102

68-
@upload_ack_file_logging_decorator
69-
def upload_ack_file(
70-
temp_ack_file_key: str,
71-
message_id: str,
72-
supplier: str,
73-
vaccine_type: str,
74-
accumulated_csv_content: StringIO,
75-
ack_data_rows: list,
76-
archive_ack_file_key: str,
103+
def update_ack_file(
77104
file_key: str,
78-
) -> Optional[dict]:
79-
"""Adds the data row to the uploaded ack file"""
105+
created_at_formatted_string: str,
106+
ack_data_rows: list,
107+
) -> None:
108+
"""Updates the ack file with the new data row based on the given arguments"""
109+
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
110+
temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}"
111+
archive_ack_file_key = f"{COMPLETED_ACK_DIR}/{ack_filename}"
112+
accumulated_csv_content = obtain_current_ack_content(temp_ack_file_key)
113+
80114
for row in ack_data_rows:
81115
data_row_str = [str(item) for item in row.values()]
82116
cleaned_row = "|".join(data_row_str).replace(" |", "|").replace("| ", "|").strip()
83117
accumulated_csv_content.write(cleaned_row + "\n")
84-
csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8"))
85118

119+
csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8"))
86120
ack_bucket_name = get_ack_bucket_name()
87-
source_bucket_name = get_source_bucket_name()
88121

89122
get_s3_client().upload_fileobj(csv_file_like_object, ack_bucket_name, temp_ack_file_key)
90-
91-
row_count_source = get_row_count(source_bucket_name, f"processing/{file_key}")
92-
row_count_destination = get_row_count(ack_bucket_name, temp_ack_file_key)
93-
# TODO: Should we check for > and if so what handling is required
94-
if row_count_destination == row_count_source:
95-
move_file(ack_bucket_name, temp_ack_file_key, archive_ack_file_key)
96-
move_file(source_bucket_name, f"processing/{file_key}", f"archive/{file_key}")
97-
98-
# Update the audit table
99-
change_audit_table_status_to_processed(file_key, message_id)
100-
101-
# Ingestion of this file is complete
102-
result = {
103-
"message_id": message_id,
104-
"file_key": file_key,
105-
"supplier": supplier,
106-
"vaccine_type": vaccine_type,
107-
"row_count": row_count_source - 1,
108-
}
109-
else:
110-
result = None
111123
logger.info("Ack file updated to %s: %s", ack_bucket_name, archive_ack_file_key)
112-
return result
113-
114-
115-
def update_ack_file(
116-
file_key: str,
117-
message_id: str,
118-
supplier: str,
119-
vaccine_type: str,
120-
created_at_formatted_string: str,
121-
ack_data_rows: list,
122-
) -> None:
123-
"""Updates the ack file with the new data row based on the given arguments"""
124-
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
125-
temp_ack_file_key = f"TempAck/{ack_filename}"
126-
archive_ack_file_key = f"forwardedFile/{ack_filename}"
127-
accumulated_csv_content = obtain_current_ack_content(temp_ack_file_key)
128-
upload_ack_file(
129-
temp_ack_file_key,
130-
message_id,
131-
supplier,
132-
vaccine_type,
133-
accumulated_csv_content,
134-
ack_data_rows,
135-
archive_ack_file_key,
136-
file_key,
137-
)
138124

139125

140126
def move_file(bucket_name: str, source_file_key: str, destination_file_key: str) -> None:
Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,21 @@
11
"""Utils for ack lambda"""
22

3-
from common.clients import get_s3_client
3+
from audit_table import get_record_count_by_message_id
44

5+
_BATCH_EVENT_ID_TO_RECORD_COUNT_MAP: dict[str, int] = {}
56

6-
def get_row_count(bucket_name: str, file_key: str) -> int:
7-
"""
8-
Looks in the given bucket and returns the count of the number of lines in the given file.
9-
NOTE: Blank lines are not included in the count.
10-
"""
11-
response = get_s3_client().get_object(Bucket=bucket_name, Key=file_key)
12-
return sum(1 for line in response["Body"].iter_lines() if line.strip())
7+
8+
def is_ack_processing_complete(batch_event_message_id: str, processed_ack_count: int) -> bool:
9+
"""Checks if we have received all the acknowledgement rows for the original source file. Also caches the value of
10+
the source file record count to reduce traffic to DynamoDB"""
11+
if batch_event_message_id in _BATCH_EVENT_ID_TO_RECORD_COUNT_MAP:
12+
return _BATCH_EVENT_ID_TO_RECORD_COUNT_MAP[batch_event_message_id] == processed_ack_count
13+
14+
record_count = get_record_count_by_message_id(batch_event_message_id)
15+
16+
if not record_count:
17+
# Record count is not set on the audit item until all rows have been preprocessed and sent to Kinesis
18+
return False
19+
20+
_BATCH_EVENT_ID_TO_RECORD_COUNT_MAP[batch_event_message_id] = record_count
21+
return record_count == processed_ack_count

0 commit comments

Comments
 (0)