Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
733a064
Complete initial adjustments to flow
dlzhry2nhs Dec 2, 2025
d0cfed8
Self review
dlzhry2nhs Dec 2, 2025
62ffc51
Quick fix for batch e2e tests
dlzhry2nhs Dec 3, 2025
360d56f
Review comments
dlzhry2nhs Dec 3, 2025
722c233
Add richer logs to the record forwarder
dlzhry2nhs Dec 3, 2025
da9a81f
Merge branch 'master' into VED-956-Audit-Table-Update
JamesW1-NHS Dec 3, 2025
5b95bd3
redefine audit table
JamesW1-NHS Dec 3, 2025
b15bfde
restore in tf
JamesW1-NHS Dec 3, 2025
041ea67
implemented audit table functions - patched pending unit tests
JamesW1-NHS Dec 3, 2025
e7bbcc6
audit_table & unit tests
JamesW1-NHS Dec 4, 2025
79c49f0
condition_expression
JamesW1-NHS Dec 4, 2025
1f53957
unit tests
JamesW1-NHS Dec 4, 2025
71e8200
CREATED_AT reverted to TIMESTAMP
JamesW1-NHS Dec 4, 2025
2e78eda
CREATED_AT reverted to TIMESTAMP II
JamesW1-NHS Dec 4, 2025
20dad1d
ingestion_started
JamesW1-NHS Dec 5, 2025
50d33f7
cleanup
JamesW1-NHS Dec 5, 2025
63150af
ingestion_complete
JamesW1-NHS Dec 5, 2025
f5c6275
constants
JamesW1-NHS Dec 5, 2025
aa5926f
constants II
JamesW1-NHS Dec 5, 2025
39b204a
Merge branch 'master' into VED-956-Audit-Table-Update
JamesW1-NHS Dec 8, 2025
0b1489d
Adjust tests
dlzhry2nhs Dec 8, 2025
e124135
review fixes
JamesW1-NHS Dec 8, 2025
8bcd956
removed constants
JamesW1-NHS Dec 8, 2025
37ed457
Merge branch 'master' into VED-956-Audit-Table-Update
JamesW1-NHS Dec 8, 2025
5da7c2a
Merge branch 'VED-956-fix-missed-test-changes' into VED-956-Audit-Tab…
JamesW1-NHS Dec 8, 2025
984f43d
removed diagnostics check
JamesW1-NHS Dec 8, 2025
2ef9312
Merge branch 'master' into VED-956-Audit-Table-Update
JamesW1-NHS Dec 8, 2025
08e1404
Added ITOC log and minor fixups (#1051)
dlzhry2nhs Dec 8, 2025
1b77088
Merge branch 'master' into VED-956-Audit-Table-Update
dlzhry2nhs Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lambdas/ack_backend/src/ack_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json

from audit_table import increment_records_failed_count
from common.batch.eof_utils import is_eof_message
from convert_message_to_ack_row import convert_message_to_ack_row
from logging_decorators import ack_lambda_handler_logging_decorator
Expand Down Expand Up @@ -49,6 +50,11 @@ def lambda_handler(event, _):
break

ack_data_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))
# Note: we are only currently receiving failure records.
# Nevertheless, we leave this here pending rewrite of tests (which currently still incorporate
# success records and use "diagnostics" to identify failures).
if message.get("diagnostics"):
increment_records_failed_count(message_id)

update_ack_file(file_key, created_at_formatted_string, ack_data_rows)

Expand Down
111 changes: 107 additions & 4 deletions lambdas/ack_backend/src/audit_table.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,34 @@
"""Add the filename to the audit table and check for duplicates."""

import time
from typing import Optional

from common.clients import dynamodb_client, logger
from common.models.errors import UnhandledAuditTableError
from constants import AUDIT_TABLE_NAME, AuditTableKeys, FileStatus

CONDITION_EXPRESSION = "attribute_exists(message_id)"


def change_audit_table_status_to_processed(file_key: str, message_id: str) -> None:
"""Updates the status in the audit table to 'Processed' and returns the queue name."""
try:
# Update the status in the audit table to "Processed"
dynamodb_client.update_item(
response = dynamodb_client.update_item(
TableName=AUDIT_TABLE_NAME,
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
UpdateExpression="SET #status = :status",
ExpressionAttributeNames={"#status": "status"},
ExpressionAttributeValues={":status": {"S": FileStatus.PROCESSED}},
ConditionExpression="attribute_exists(message_id)",
ConditionExpression=CONDITION_EXPRESSION,
ReturnValues="UPDATED_NEW",
)

result = response.get("Attributes", {}).get("status").get("S")
logger.info(
"The status of %s file, with message id %s, was successfully updated to %s in the audit table",
file_key,
message_id,
FileStatus.PROCESSED,
result,
)

except Exception as error: # pylint: disable = broad-exception-caught
Expand All @@ -44,3 +48,102 @@ def get_record_count_by_message_id(event_message_id: str) -> Optional[int]:
return None

return int(record_count)


def set_records_succeeded_count(message_id: str) -> None:
"""Set the 'records_succeeded' item in the audit table entry"""
audit_record = dynamodb_client.get_item(
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}}
)
record_count_item = audit_record.get("Item", {}).get(AuditTableKeys.RECORD_COUNT, {}).get("N")
records_failed_item = audit_record.get("Item", {}).get(AuditTableKeys.RECORDS_FAILED, {}).get("N")

record_count = int(record_count_item) if record_count_item else 0
records_failed = int(records_failed_item) if records_failed_item else 0
records_succeeded = record_count - records_failed

attribute_name = AuditTableKeys.RECORDS_SUCCEEDED
try:
response = dynamodb_client.update_item(
TableName=AUDIT_TABLE_NAME,
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
UpdateExpression="SET #attribute = :value",
ExpressionAttributeNames={"#attribute": attribute_name},
ExpressionAttributeValues={":value": {"N": str(records_succeeded)}},
ConditionExpression=CONDITION_EXPRESSION,
ReturnValues="UPDATED_NEW",
)
result = response.get("Attributes", {}).get(attribute_name).get("N")
logger.info(
"Attribute %s for message id %s set to %s in the audit table",
attribute_name,
message_id,
result,
)

except Exception as error: # pylint: disable = broad-exception-caught
logger.error(error)
raise UnhandledAuditTableError(error) from error


def increment_records_failed_count(message_id: str) -> None:
"""
Increment a counter attribute safely, handling the case where it might not exist.
From https://docs.aws.amazon.com/code-library/latest/ug/dynamodb_example_dynamodb_Scenario_AtomicCounterOperations_section.html
"""

increment_value = 1
initial_value = 0
attribute_name = AuditTableKeys.RECORDS_FAILED
try:
# Use SET with if_not_exists to safely increment the counter attribute
response = dynamodb_client.update_item(
TableName=AUDIT_TABLE_NAME,
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
UpdateExpression="SET #attribute = if_not_exists(#attribute, :initial) + :increment",
ExpressionAttributeNames={"#attribute": attribute_name},
ExpressionAttributeValues={":increment": {"N": str(increment_value)}, ":initial": {"N": str(initial_value)}},
ConditionExpression=CONDITION_EXPRESSION,
ReturnValues="UPDATED_NEW",
)
response.get("Attributes", {}).get(attribute_name).get("N")

except Exception as error: # pylint: disable = broad-exception-caught
logger.error(error)
raise UnhandledAuditTableError(error) from error


def set_audit_table_ingestion_end_time(
file_key: str,
message_id: str,
complete_time: float,
) -> None:
"""Sets the ingestion_end_time in the audit table to the requested time"""
# format the time
ingestion_end_time = time.strftime("%Y%m%dT%H%M%S00", time.gmtime(complete_time))

update_expression = f"SET #{AuditTableKeys.INGESTION_END_TIME} = :{AuditTableKeys.INGESTION_END_TIME}"
expression_attr_names = {f"#{AuditTableKeys.INGESTION_END_TIME}": AuditTableKeys.INGESTION_END_TIME}
expression_attr_values = {f":{AuditTableKeys.INGESTION_END_TIME}": {"S": ingestion_end_time}}

try:
response = dynamodb_client.update_item(
TableName=AUDIT_TABLE_NAME,
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
UpdateExpression=update_expression,
ExpressionAttributeNames=expression_attr_names,
ExpressionAttributeValues=expression_attr_values,
ConditionExpression=f"attribute_exists({AuditTableKeys.MESSAGE_ID})",
ReturnValues="UPDATED_NEW",
)
result = response.get("Attributes", {}).get(AuditTableKeys.INGESTION_END_TIME).get("S")
logger.info(
"ingestion_end_time for %s file, with message id %s, was successfully updated to %s in the audit table",
file_key,
message_id,
result,
)

except Exception as error: # pylint: disable = broad-exception-caught
logger.error(error)
raise UnhandledAuditTableError(error) from error
3 changes: 3 additions & 0 deletions lambdas/ack_backend/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class AuditTableKeys:
RECORD_COUNT = "record_count"
STATUS = "status"
TIMESTAMP = "timestamp"
INGESTION_END_TIME = "ingestion_end_time"
RECORDS_SUCCEEDED = "records_succeeded"
RECORDS_FAILED = "records_failed"


ACK_HEADERS = [
Expand Down
6 changes: 3 additions & 3 deletions lambdas/ack_backend/src/logging_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,9 @@ def wrapper(*args, **kwargs):
"function_name": f"{PREFIX}_{func.__name__}",
"date_time": str(datetime.now()),
}
start_time = time.time()

# NB this doesn't require a try-catch block as the wrapped function never throws an exception
result = func(*args, **kwargs)
result, ingestion_end_time = func(*args, **kwargs)
if result is not None:
message_for_logs = "Record processing complete"
base_log_data.update(result)
Expand All @@ -90,7 +89,8 @@ def wrapper(*args, **kwargs):
"statusCode": 200,
"message": message_for_logs,
}
generate_and_send_logs(STREAM_NAME, start_time, base_log_data, additional_log_data)
generate_and_send_logs(STREAM_NAME, ingestion_end_time, base_log_data, additional_log_data)

return result

return wrapper
Expand Down
17 changes: 14 additions & 3 deletions lambdas/ack_backend/src/update_ack_file.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
"""Functions for uploading the data to the ack file"""

import time
from io import BytesIO, StringIO

from botocore.exceptions import ClientError

from audit_table import change_audit_table_status_to_processed, get_record_count_by_message_id
from audit_table import (
change_audit_table_status_to_processed,
get_record_count_by_message_id,
set_audit_table_ingestion_end_time,
set_records_succeeded_count,
)
from common.aws_s3_utils import move_file
from common.clients import get_s3_client, logger
from constants import (
Expand Down Expand Up @@ -61,7 +67,7 @@ def complete_batch_file_process(
vaccine_type: str,
created_at_formatted_string: str,
file_key: str,
) -> dict:
) -> tuple[dict, float]:
"""Mark the batch file as processed. This involves moving the ack and original file to destinations and updating
the audit table status"""
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
Expand All @@ -73,14 +79,19 @@ def complete_batch_file_process(

total_ack_rows_processed = get_record_count_by_message_id(message_id)
change_audit_table_status_to_processed(file_key, message_id)
set_records_succeeded_count(message_id)

return {
ingestion_end_time = time.time()
set_audit_table_ingestion_end_time(file_key, message_id, ingestion_end_time)

result = {
"message_id": message_id,
"file_key": file_key,
"supplier": supplier,
"vaccine_type": vaccine_type,
"row_count": total_ack_rows_processed,
}
return result, ingestion_end_time


def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
Expand Down
95 changes: 94 additions & 1 deletion lambdas/ack_backend/tests/test_ack_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,19 @@ def assert_audit_entry_status_equals(self, message_id: str, status: str) -> None
actual_status = audit_entry.get("status", {}).get("S")
self.assertEqual(actual_status, status)

def assert_audit_entry_counts_equal(self, message_id: str, expected_counts: dict) -> None:
"""Checks the audit entry counts are as expected"""
audit_entry = self.dynamodb_client.get_item(
TableName=AUDIT_TABLE_NAME, Key={"message_id": {"S": message_id}}
).get("Item")

actual_counts = {}
actual_counts["record_count"] = audit_entry.get("record_count", {}).get("N")
actual_counts["records_succeeded"] = audit_entry.get("records_succeeded", {}).get("N")
actual_counts["records_failed"] = audit_entry.get("records_failed", {}).get("N")

self.assertEqual(actual_counts, expected_counts)

def test_lambda_handler_main_multiple_records(self):
"""Test lambda handler with multiple records."""
# Set up an audit entry which does not yet have record_count recorded
Expand Down Expand Up @@ -164,6 +177,11 @@ def test_lambda_handler_main_multiple_records(self):
{"body": json.dumps(array_of_mixed_success_and_failure_messages)},
]
}
expected_entry_counts = {
"record_count": None,
"records_succeeded": None,
"records_failed": "6",
}

response = lambda_handler(event=event, context={})

Expand All @@ -177,6 +195,7 @@ def test_lambda_handler_main_multiple_records(self):
],
existing_file_content=ValidValues.ack_headers,
)
self.assert_audit_entry_counts_equal("row", expected_entry_counts)

def test_lambda_handler_main(self):
"""Test lambda handler with consitent ack_file_name and message_template."""
Expand Down Expand Up @@ -217,12 +236,20 @@ def test_lambda_handler_main(self):
"messages": [{"row_id": "row^1", "diagnostics": "SHOULD BE A DICTIONARY, NOT A STRING"}],
},
]
expected_records_failed = [None, "3", "8", "8", "9"]
expected_entry_counts = {
"record_count": None,
"records_succeeded": None,
"records_failed": None,
}

for test_case in test_cases:
for test_case, expected_failures in zip(test_cases, expected_records_failed):
expected_entry_counts["records_failed"] = expected_failures
# Test scenario where there is no existing ack file
with self.subTest(msg=f"No existing ack file: {test_case['description']}"):
response = lambda_handler(event=self.generate_event(test_case["messages"]), context={})
self.assertEqual(response, EXPECTED_ACK_LAMBDA_RESPONSE_FOR_SUCCESS)
self.assert_audit_entry_counts_equal("row", expected_entry_counts)
validate_ack_file_content(self.s3_client, test_case["messages"])

self.s3_client.delete_object(
Expand Down Expand Up @@ -252,6 +279,11 @@ def test_lambda_handler_updates_ack_file_but_does_not_mark_complete_when_records
for i in range(1, 4)
]
test_event = {"Records": [{"body": json.dumps(array_of_success_messages)}]}
expected_entry_counts = {
"record_count": "100",
"records_succeeded": None,
"records_failed": None,
}

response = lambda_handler(event=test_event, context={})

Expand All @@ -268,6 +300,7 @@ def test_lambda_handler_updates_ack_file_but_does_not_mark_complete_when_records
is_complete=False,
)
self.assert_audit_entry_status_equals(mock_batch_message_id, "Preprocessed")
self.assert_audit_entry_counts_equal(mock_batch_message_id, expected_entry_counts)

def test_lambda_handler_updates_ack_file_and_marks_complete_when_all_records_processed(self):
"""
Expand Down Expand Up @@ -305,6 +338,11 @@ def test_lambda_handler_updates_ack_file_and_marks_complete_when_all_records_pro
all_messages_plus_eof = deepcopy(array_of_success_messages)
all_messages_plus_eof.append(MOCK_MESSAGE_DETAILS.eof_message)
test_event = {"Records": [{"body": json.dumps(all_messages_plus_eof)}]}
expected_entry_counts = {
"record_count": "100",
"records_succeeded": "100",
"records_failed": None,
}

response = lambda_handler(event=test_event, context={})

Expand All @@ -319,6 +357,61 @@ def test_lambda_handler_updates_ack_file_and_marks_complete_when_all_records_pro
is_complete=True,
)
self.assert_audit_entry_status_equals(mock_batch_message_id, "Processed")
self.assert_audit_entry_counts_equal(mock_batch_message_id, expected_entry_counts)

def test_lambda_handler_sets_records_succeeded(self):
"""
Test that the records_succeeded count is set when all records have been processed.
"""
mock_batch_message_id = "75db20e6-c0b5-4012-a8bc-f861a1dd4b22"

# Original source file had 100 records
add_audit_entry_to_table(self.dynamodb_client, mock_batch_message_id, record_count=100)

# Previous invocations have already created and added to the temp ack file
existing_ack_content = generate_sample_existing_ack_content()

self.s3_client.put_object(
Bucket=BucketNames.DESTINATION,
Key=MOCK_MESSAGE_DETAILS.temp_ack_file_key,
Body=StringIO(existing_ack_content).getvalue(),
)

array_of_success_messages = [
{
**BASE_SUCCESS_MESSAGE,
"row_id": f"{mock_batch_message_id}^{i}",
"imms_id": f"imms_{i}",
"local_id": f"local^{i}",
}
for i in range(50, 75)
]
array_of_failure_messages = [
{
**BASE_FAILURE_MESSAGE,
"row_id": f"{mock_batch_message_id}^{i}",
"local_id": f"local^{i}",
"diagnostics": DiagnosticsDictionaries.UNHANDLED_ERROR,
}
for i in range(75, 100)
]

# Include the EoF message in the event
array_of_messages = deepcopy(array_of_success_messages) + deepcopy(array_of_failure_messages)
all_messages_plus_eof = array_of_messages
all_messages_plus_eof.append(MOCK_MESSAGE_DETAILS.eof_message)
test_event = {"Records": [{"body": json.dumps(all_messages_plus_eof)}]}
expected_entry_counts = {
"record_count": "100",
"records_succeeded": "75",
"records_failed": "25",
}

response = lambda_handler(event=test_event, context={})

self.assertEqual(response, EXPECTED_ACK_LAMBDA_RESPONSE_FOR_SUCCESS)
self.assert_audit_entry_status_equals(mock_batch_message_id, "Processed")
self.assert_audit_entry_counts_equal(mock_batch_message_id, expected_entry_counts)

def test_lambda_handler_error_scenarios(self):
"""Test that the lambda handler raises appropriate exceptions for malformed event data."""
Expand Down
Loading