Skip to content

Commit b12b4f2

Browse files
committed
Added ITOC log and minor fixups
1 parent 2ef9312 commit b12b4f2

File tree

11 files changed

+129
-180
lines changed

11 files changed

+129
-180
lines changed
Lines changed: 35 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
"""Add the filename to the audit table and check for duplicates."""
22

3-
import time
4-
from typing import Optional
5-
63
from common.clients import dynamodb_client, logger
74
from common.models.errors import UnhandledAuditTableError
85
from constants import AUDIT_TABLE_NAME, AuditTableKeys, FileStatus
@@ -36,63 +33,26 @@ def change_audit_table_status_to_processed(file_key: str, message_id: str) -> No
3633
raise UnhandledAuditTableError(error) from error
3734

3835

39-
def get_record_count_by_message_id(event_message_id: str) -> Optional[int]:
40-
"""Retrieves full audit entry by unique event message ID"""
36+
def get_record_count_and_failures_by_message_id(event_message_id: str) -> tuple[int, int]:
37+
"""Retrieves total record count and total failures by unique event message ID"""
4138
audit_record = dynamodb_client.get_item(
4239
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": event_message_id}}
4340
)
4441

4542
record_count = audit_record.get("Item", {}).get(AuditTableKeys.RECORD_COUNT, {}).get("N")
43+
failures_count = audit_record.get("Item", {}).get(AuditTableKeys.RECORDS_FAILED, {}).get("N")
4644

47-
if not record_count:
48-
return None
49-
50-
return int(record_count)
51-
52-
53-
def set_records_succeeded_count(message_id: str) -> None:
54-
"""Set the 'records_succeeded' item in the audit table entry"""
55-
audit_record = dynamodb_client.get_item(
56-
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}}
57-
)
58-
record_count_item = audit_record.get("Item", {}).get(AuditTableKeys.RECORD_COUNT, {}).get("N")
59-
records_failed_item = audit_record.get("Item", {}).get(AuditTableKeys.RECORDS_FAILED, {}).get("N")
60-
61-
record_count = int(record_count_item) if record_count_item else 0
62-
records_failed = int(records_failed_item) if records_failed_item else 0
63-
records_succeeded = record_count - records_failed
64-
65-
try:
66-
response = dynamodb_client.update_item(
67-
TableName=AUDIT_TABLE_NAME,
68-
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
69-
UpdateExpression="SET #attribute = :value",
70-
ExpressionAttributeNames={"#attribute": AuditTableKeys.RECORDS_SUCCEEDED},
71-
ExpressionAttributeValues={":value": {"N": str(records_succeeded)}},
72-
ConditionExpression=CONDITION_EXPRESSION,
73-
ReturnValues="UPDATED_NEW",
74-
)
75-
result = response.get("Attributes", {}).get(AuditTableKeys.RECORDS_SUCCEEDED).get("N")
76-
logger.info(
77-
"Attribute %s for message id %s set to %s in the audit table",
78-
AuditTableKeys.RECORDS_SUCCEEDED,
79-
message_id,
80-
result,
81-
)
82-
83-
except Exception as error: # pylint: disable = broad-exception-caught
84-
logger.error(error)
85-
raise UnhandledAuditTableError(error) from error
45+
return int(record_count) if record_count else 0, int(failures_count) if failures_count else 0
8646

8747

8848
def increment_records_failed_count(message_id: str) -> None:
8949
"""
9050
Increment a counter attribute safely, handling the case where it might not exist.
9151
From https://docs.aws.amazon.com/code-library/latest/ug/dynamodb_example_dynamodb_Scenario_AtomicCounterOperations_section.html
9252
"""
93-
9453
increment_value = 1
9554
initial_value = 0
55+
9656
try:
9757
# Use SET with if_not_exists to safely increment the counter attribute
9858
dynamodb_client.update_item(
@@ -110,37 +70,45 @@ def increment_records_failed_count(message_id: str) -> None:
11070
raise UnhandledAuditTableError(error) from error
11171

11272

113-
def set_audit_table_ingestion_end_time(
114-
file_key: str,
115-
message_id: str,
116-
complete_time: float,
73+
def set_audit_record_success_count_and_end_time(
74+
file_key: str, message_id: str, success_count: int, ingestion_end_time: str
11775
) -> None:
118-
"""Sets the ingestion_end_time in the audit table to the requested time"""
119-
# format the time
120-
ingestion_end_time = time.strftime("%Y%m%dT%H%M%S00", time.gmtime(complete_time))
121-
122-
update_expression = f"SET #{AuditTableKeys.INGESTION_END_TIME} = :{AuditTableKeys.INGESTION_END_TIME}"
123-
expression_attr_names = {f"#{AuditTableKeys.INGESTION_END_TIME}": AuditTableKeys.INGESTION_END_TIME}
124-
expression_attr_values = {f":{AuditTableKeys.INGESTION_END_TIME}": {"S": ingestion_end_time}}
76+
"""Sets the 'records_succeeded' and 'ingestion_end_time' attributes for the given audit record"""
77+
update_expression = (
78+
f"SET #{AuditTableKeys.INGESTION_END_TIME} = :{AuditTableKeys.INGESTION_END_TIME}"
79+
f", #{AuditTableKeys.RECORDS_SUCCEEDED} = :{AuditTableKeys.RECORDS_SUCCEEDED}"
80+
)
81+
expression_attr_names = {
82+
f"#{AuditTableKeys.INGESTION_END_TIME}": AuditTableKeys.INGESTION_END_TIME,
83+
f"#{AuditTableKeys.RECORDS_SUCCEEDED}": AuditTableKeys.RECORDS_SUCCEEDED,
84+
}
85+
expression_attr_values = {
86+
f":{AuditTableKeys.INGESTION_END_TIME}": {"S": ingestion_end_time},
87+
f":{AuditTableKeys.RECORDS_SUCCEEDED}": {"N": str(success_count)},
88+
}
12589

12690
try:
127-
response = dynamodb_client.update_item(
91+
dynamodb_client.update_item(
12892
TableName=AUDIT_TABLE_NAME,
12993
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
13094
UpdateExpression=update_expression,
13195
ExpressionAttributeNames=expression_attr_names,
13296
ExpressionAttributeValues=expression_attr_values,
133-
ConditionExpression=f"attribute_exists({AuditTableKeys.MESSAGE_ID})",
134-
ReturnValues="UPDATED_NEW",
135-
)
136-
result = response.get("Attributes", {}).get(AuditTableKeys.INGESTION_END_TIME).get("S")
137-
logger.info(
138-
"ingestion_end_time for %s file, with message id %s, was successfully updated to %s in the audit table",
139-
file_key,
140-
message_id,
141-
result,
97+
ConditionExpression=CONDITION_EXPRESSION,
14298
)
143-
14499
except Exception as error: # pylint: disable = broad-exception-caught
145100
logger.error(error)
146101
raise UnhandledAuditTableError(error) from error
102+
103+
logger.info(
104+
"ingestion_end_time for %s file, with message id %s, was successfully updated to %s in the audit table",
105+
file_key,
106+
message_id,
107+
ingestion_end_time,
108+
)
109+
logger.info(
110+
"records_succeeded for %s file, with message id %s, was successfully updated to %s in the audit table",
111+
file_key,
112+
message_id,
113+
str(success_count),
114+
)

lambdas/ack_backend/src/logging_decorators.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,10 @@ def wrapper(*args, **kwargs):
7878
"function_name": f"{PREFIX}_{func.__name__}",
7979
"date_time": str(datetime.now()),
8080
}
81+
start_time = time.time()
8182

8283
# NB this doesn't require a try-catch block as the wrapped function never throws an exception
83-
result, ingestion_end_time = func(*args, **kwargs)
84+
result = func(*args, **kwargs)
8485
if result is not None:
8586
message_for_logs = "Record processing complete"
8687
base_log_data.update(result)
@@ -89,7 +90,7 @@ def wrapper(*args, **kwargs):
8990
"statusCode": 200,
9091
"message": message_for_logs,
9192
}
92-
generate_and_send_logs(STREAM_NAME, ingestion_end_time, base_log_data, additional_log_data)
93+
generate_and_send_logs(STREAM_NAME, start_time, base_log_data, additional_log_data)
9394

9495
return result
9596

lambdas/ack_backend/src/update_ack_file.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,8 @@
77

88
from audit_table import (
99
change_audit_table_status_to_processed,
10-
get_record_count_by_message_id,
11-
set_audit_table_ingestion_end_time,
12-
set_records_succeeded_count,
10+
get_record_count_and_failures_by_message_id,
11+
set_audit_record_success_count_and_end_time,
1312
)
1413
from common.aws_s3_utils import move_file
1514
from common.clients import get_s3_client, logger
@@ -67,7 +66,7 @@ def complete_batch_file_process(
6766
vaccine_type: str,
6867
created_at_formatted_string: str,
6968
file_key: str,
70-
) -> tuple[dict, float]:
69+
) -> dict:
7170
"""Mark the batch file as processed. This involves moving the ack and original file to destinations and updating
7271
the audit table status"""
7372
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
@@ -77,21 +76,23 @@ def complete_batch_file_process(
7776
get_source_bucket_name(), f"{BATCH_FILE_PROCESSING_DIR}/{file_key}", f"{BATCH_FILE_ARCHIVE_DIR}/{file_key}"
7877
)
7978

80-
total_ack_rows_processed = get_record_count_by_message_id(message_id)
79+
total_ack_rows_processed, total_failures = get_record_count_and_failures_by_message_id(message_id)
8180
change_audit_table_status_to_processed(file_key, message_id)
82-
set_records_succeeded_count(message_id)
8381

84-
ingestion_end_time = time.time()
85-
set_audit_table_ingestion_end_time(file_key, message_id, ingestion_end_time)
82+
# Consider creating time utils and using datetime instead of time
83+
ingestion_end_time = time.strftime("%Y%m%dT%H%M%S00", time.gmtime())
84+
successful_record_count = total_ack_rows_processed - total_failures
85+
set_audit_record_success_count_and_end_time(file_key, message_id, successful_record_count, ingestion_end_time)
8686

87-
result = {
87+
return {
8888
"message_id": message_id,
8989
"file_key": file_key,
9090
"supplier": supplier,
9191
"vaccine_type": vaccine_type,
9292
"row_count": total_ack_rows_processed,
93+
"success_count": successful_record_count,
94+
"failure_count": total_failures,
9395
}
94-
return result, ingestion_end_time
9596

9697

9798
def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
Lines changed: 60 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import unittest
2-
from unittest.mock import patch
2+
from unittest.mock import call, patch
33

44
import audit_table
55
from common.models.errors import UnhandledAuditTableError
@@ -37,82 +37,29 @@ def test_change_audit_table_status_to_processed_raises(self):
3737
self.assertIn("fail!", str(ctx.exception))
3838
self.mock_logger.error.assert_called_once()
3939

40-
def test_get_record_count_by_message_id_returns_the_record_count(self):
41-
"""Test that get_record_count_by_message_id retrieves the integer value of the total record count"""
40+
def test_get_record_count_and_failures_by_message_id_returns_the_record_count_and_failures(self):
41+
"""Test that get_record_count_by_message_id retrieves the integer values of the total record count and
42+
failures"""
4243
test_message_id = "1234"
43-
4444
self.mock_dynamodb_client.get_item.return_value = {
45-
"Item": {"message_id": {"S": test_message_id}, "record_count": {"N": "1000"}}
45+
"Item": {"message_id": {"S": test_message_id}, "record_count": {"N": "1000"}, "records_failed": {"N": "5"}}
4646
}
4747

48-
self.assertEqual(audit_table.get_record_count_by_message_id(test_message_id), 1000)
49-
50-
def test_get_record_count_by_message_id_returns_none_if_record_count_not_set(self):
51-
"""Test that if the record count has not yet been set on the audit item then None is returned"""
52-
test_message_id = "1234"
53-
54-
self.mock_dynamodb_client.get_item.return_value = {"Item": {"message_id": {"S": test_message_id}}}
48+
record_count, failed_count = audit_table.get_record_count_and_failures_by_message_id(test_message_id)
5549

56-
self.assertIsNone(audit_table.get_record_count_by_message_id(test_message_id))
50+
self.assertEqual(record_count, 1000)
51+
self.assertEqual(failed_count, 5)
5752

58-
def test_set_records_succeeded_count(self):
53+
def test_get_record_count_and_failures_by_message_id_returns_zero_if_values_not_set(self):
54+
"""Test that if the record count has not yet been set on the audit item then zero is returned"""
5955
test_message_id = "1234"
60-
self.mock_dynamodb_client.get_item.return_value = {
61-
"Item": {"message_id": {"S": test_message_id}, "record_count": {"N": "1000"}, "records_failed": {"N": "42"}}
62-
}
63-
audit_table.set_records_succeeded_count(test_message_id)
64-
self.mock_dynamodb_client.get_item.assert_called_once()
65-
self.mock_dynamodb_client.update_item.assert_called_once_with(
66-
TableName=AUDIT_TABLE_NAME,
67-
Key={AuditTableKeys.MESSAGE_ID: {"S": test_message_id}},
68-
UpdateExpression="SET #attribute = :value",
69-
ExpressionAttributeNames={"#attribute": AuditTableKeys.RECORDS_SUCCEEDED},
70-
ExpressionAttributeValues={":value": {"N": "958"}},
71-
ConditionExpression="attribute_exists(message_id)",
72-
ReturnValues="UPDATED_NEW",
73-
)
74-
self.mock_logger.info.assert_called_once()
7556

76-
def test_set_records_succeeded_count_no_failures(self):
77-
test_message_id = "1234"
78-
self.mock_dynamodb_client.get_item.return_value = {
79-
"Item": {"message_id": {"S": test_message_id}, "record_count": {"N": "1000"}}
80-
}
81-
audit_table.set_records_succeeded_count(test_message_id)
82-
self.mock_dynamodb_client.get_item.assert_called_once()
83-
self.mock_dynamodb_client.update_item.assert_called_once_with(
84-
TableName=AUDIT_TABLE_NAME,
85-
Key={AuditTableKeys.MESSAGE_ID: {"S": test_message_id}},
86-
UpdateExpression="SET #attribute = :value",
87-
ExpressionAttributeNames={"#attribute": AuditTableKeys.RECORDS_SUCCEEDED},
88-
ExpressionAttributeValues={":value": {"N": "1000"}},
89-
ConditionExpression="attribute_exists(message_id)",
90-
ReturnValues="UPDATED_NEW",
91-
)
92-
self.mock_logger.info.assert_called_once()
93-
94-
def test_set_records_succeeded_count_no_records(self):
95-
test_message_id = "1234"
9657
self.mock_dynamodb_client.get_item.return_value = {"Item": {"message_id": {"S": test_message_id}}}
97-
audit_table.set_records_succeeded_count(test_message_id)
98-
self.mock_dynamodb_client.get_item.assert_called_once()
99-
self.mock_dynamodb_client.update_item.assert_called_once_with(
100-
TableName=AUDIT_TABLE_NAME,
101-
Key={AuditTableKeys.MESSAGE_ID: {"S": test_message_id}},
102-
UpdateExpression="SET #attribute = :value",
103-
ExpressionAttributeNames={"#attribute": AuditTableKeys.RECORDS_SUCCEEDED},
104-
ExpressionAttributeValues={":value": {"N": "0"}},
105-
ConditionExpression="attribute_exists(message_id)",
106-
ReturnValues="UPDATED_NEW",
107-
)
108-
self.mock_logger.info.assert_called_once()
10958

110-
def test_set_records_succeeded_count_raises(self):
111-
self.mock_dynamodb_client.update_item.side_effect = Exception("fail!")
112-
with self.assertRaises(UnhandledAuditTableError) as ctx:
113-
audit_table.set_records_succeeded_count("msg1")
114-
self.assertIn("fail!", str(ctx.exception))
115-
self.mock_logger.error.assert_called_once()
59+
record_count, failed_count = audit_table.get_record_count_and_failures_by_message_id(test_message_id)
60+
61+
self.assertEqual(record_count, 0)
62+
self.assertEqual(failed_count, 0)
11663

11764
def test_increment_records_failed_count(self):
11865
"""Checks audit table correctly increments the records_failed count"""
@@ -135,29 +82,61 @@ def test_increment_records_failed_count_raises(self):
13582
self.assertIn("fail!", str(ctx.exception))
13683
self.mock_logger.error.assert_called_once()
13784

138-
def test_set_audit_table_ingestion_end_time(self):
139-
"""Checks audit table correctly sets ingestion_end_time to the requested value"""
85+
def test_set_audit_record_success_count_and_end_time(self):
86+
"""Checks audit table correctly sets ingestion_end_time and success count to the requested value"""
14087
test_file_key = "RSV_Vaccinations_v5_X26_20210730T12000000.csv"
14188
test_message_id = "1234"
142-
test_end_time = 1627647000
143-
audit_table.set_audit_table_ingestion_end_time(test_file_key, test_message_id, test_end_time)
89+
test_end_time = "20251208T14430000"
90+
test_success_count = 5
91+
92+
audit_table.set_audit_record_success_count_and_end_time(
93+
test_file_key, test_message_id, test_success_count, test_end_time
94+
)
95+
14496
self.mock_dynamodb_client.update_item.assert_called_once_with(
14597
TableName=AUDIT_TABLE_NAME,
14698
Key={AuditTableKeys.MESSAGE_ID: {"S": test_message_id}},
147-
UpdateExpression=f"SET #{AuditTableKeys.INGESTION_END_TIME} = :{AuditTableKeys.INGESTION_END_TIME}",
148-
ExpressionAttributeNames={f"#{AuditTableKeys.INGESTION_END_TIME}": AuditTableKeys.INGESTION_END_TIME},
149-
ExpressionAttributeValues={f":{AuditTableKeys.INGESTION_END_TIME}": {"S": "20210730T12100000"}},
99+
UpdateExpression=(
100+
f"SET #{AuditTableKeys.INGESTION_END_TIME} = :{AuditTableKeys.INGESTION_END_TIME}"
101+
f", #{AuditTableKeys.RECORDS_SUCCEEDED} = :{AuditTableKeys.RECORDS_SUCCEEDED}"
102+
),
103+
ExpressionAttributeNames={
104+
f"#{AuditTableKeys.INGESTION_END_TIME}": AuditTableKeys.INGESTION_END_TIME,
105+
f"#{AuditTableKeys.RECORDS_SUCCEEDED}": AuditTableKeys.RECORDS_SUCCEEDED,
106+
},
107+
ExpressionAttributeValues={
108+
f":{AuditTableKeys.INGESTION_END_TIME}": {"S": test_end_time},
109+
f":{AuditTableKeys.RECORDS_SUCCEEDED}": {"N": str(test_success_count)},
110+
},
150111
ConditionExpression="attribute_exists(message_id)",
151-
ReturnValues="UPDATED_NEW",
152112
)
153-
self.mock_logger.info.assert_called_once()
113+
self.mock_logger.info.assert_has_calls(
114+
[
115+
call(
116+
"ingestion_end_time for %s file, with message id %s, was successfully updated to %s in the audit table",
117+
"RSV_Vaccinations_v5_X26_20210730T12000000.csv",
118+
"1234",
119+
"20251208T14430000",
120+
),
121+
call(
122+
"records_succeeded for %s file, with message id %s, was successfully updated to %s in the audit table",
123+
"RSV_Vaccinations_v5_X26_20210730T12000000.csv",
124+
"1234",
125+
"5",
126+
),
127+
]
128+
)
154129

155-
def test_set_audit_table_ingestion_end_time_throws_exception_with_invalid_id(self):
130+
def test_set_audit_record_success_count_and_end_time_throws_exception_with_invalid_id(self):
156131
test_file_key = "RSV_Vaccinations_v5_X26_20210730T12000000.csv"
157132
test_message_id = "1234"
158-
test_end_time = 1627647000
159-
self.mock_dynamodb_client.update_item.side_effect = Exception("fail!")
133+
test_end_time = "20251208T14430000"
134+
test_success_count = 5
135+
self.mock_dynamodb_client.update_item.side_effect = Exception("Unhandled error")
136+
160137
with self.assertRaises(UnhandledAuditTableError) as ctx:
161-
audit_table.set_audit_table_ingestion_end_time(test_file_key, test_message_id, test_end_time)
162-
self.assertIn("fail!", str(ctx.exception))
138+
audit_table.set_audit_record_success_count_and_end_time(
139+
test_file_key, test_message_id, test_success_count, test_end_time
140+
)
141+
self.assertIn("Unhandled error", str(ctx.exception))
163142
self.mock_logger.error.assert_called_once()

0 commit comments

Comments
 (0)