Skip to content

Commit 63150af

Browse files
committed
ingestion_complete
1 parent 50d33f7 commit 63150af

File tree

5 files changed

+77
-4
lines changed

5 files changed

+77
-4
lines changed

lambdas/ack_backend/src/audit_table.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Add the filename to the audit table and check for duplicates."""
22

3+
import time
34
from typing import Optional
45

56
from common.clients import dynamodb_client, logger
@@ -116,3 +117,39 @@ def increment_records_failed_count(message_id: str) -> None:
116117
except Exception as error: # pylint: disable = broad-exception-caught
117118
logger.error(error)
118119
raise UnhandledAuditTableError(error) from error
120+
121+
122+
def set_audit_table_ingestion_complete(
123+
file_key: str,
124+
message_id: str,
125+
complete_time: float,
126+
) -> None:
127+
"""Sets the ingestion_complete in the audit table to the requested time"""
128+
# format the time
129+
ingestion_complete = time.strftime("%Y%m%dT%H%M%S00", time.gmtime(complete_time))
130+
131+
update_expression = f"SET #{AuditTableKeys.INGESTION_COMPLETE} = :{AuditTableKeys.INGESTION_COMPLETE}"
132+
expression_attr_names = {f"#{AuditTableKeys.INGESTION_COMPLETE}": AuditTableKeys.INGESTION_COMPLETE}
133+
expression_attr_values = {f":{AuditTableKeys.INGESTION_COMPLETE}": {"S": ingestion_complete}}
134+
135+
try:
136+
response = dynamodb_client.update_item(
137+
TableName=AUDIT_TABLE_NAME,
138+
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
139+
UpdateExpression=update_expression,
140+
ExpressionAttributeNames=expression_attr_names,
141+
ExpressionAttributeValues=expression_attr_values,
142+
ConditionExpression=f"attribute_exists({AuditTableKeys.MESSAGE_ID})",
143+
ReturnValues="UPDATED_NEW",
144+
)
145+
result = response.get("Attributes", {}).get(AuditTableKeys.INGESTION_COMPLETE).get("S")
146+
logger.info(
147+
"ingestion_complete for %s file, with message id %s, was successfully updated to %s in the audit table",
148+
file_key,
149+
message_id,
150+
result,
151+
)
152+
153+
except Exception as error: # pylint: disable = broad-exception-caught
154+
logger.error(error)
155+
raise UnhandledAuditTableError(error) from error

lambdas/ack_backend/src/logging_decorators.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from datetime import datetime
66
from functools import wraps
77

8+
from audit_table import set_audit_table_ingestion_complete
89
from common.log_decorator import generate_and_send_logs
910

1011
PREFIX = "ack_processor"
@@ -78,7 +79,7 @@ def wrapper(*args, **kwargs):
7879
"function_name": f"{PREFIX}_{func.__name__}",
7980
"date_time": str(datetime.now()),
8081
}
81-
start_time = time.time()
82+
complete_time = time.time()
8283

8384
# NB this doesn't require a try-catch block as the wrapped function never throws an exception
8485
result = func(*args, **kwargs)
@@ -90,10 +91,11 @@ def wrapper(*args, **kwargs):
9091
"statusCode": 200,
9192
"message": message_for_logs,
9293
}
94+
file_key = base_log_data.get("filename")
95+
message_id = base_log_data.get("message_id")
96+
set_audit_table_ingestion_complete(file_key, message_id, complete_time)
9397

94-
# here: add start_time to audit table as "ingestion_complete" for message_id
95-
96-
generate_and_send_logs(STREAM_NAME, start_time, base_log_data, additional_log_data)
98+
generate_and_send_logs(STREAM_NAME, complete_time, base_log_data, additional_log_data)
9799
return result
98100

99101
return wrapper

lambdas/ack_backend/tests/test_audit_table.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ def test_set_records_succeeded_count_raises(self):
115115
self.mock_logger.error.assert_called_once()
116116

117117
def test_increment_records_failed_count(self):
118+
"""Checks audit table correctly increments the records_failed count"""
118119
test_message_id = "1234"
119120
audit_table.increment_records_failed_count(test_message_id)
120121
self.mock_dynamodb_client.update_item.assert_called_once_with(
@@ -134,3 +135,30 @@ def test_increment_records_failed_count_raises(self):
134135
audit_table.increment_records_failed_count("msg1")
135136
self.assertIn("fail!", str(ctx.exception))
136137
self.mock_logger.error.assert_called_once()
138+
139+
def test_set_audit_table_ingestion_complete(self):
140+
"""Checks audit table correctly sets ingestion_complete to the requested value"""
141+
test_file_key = "RSV_Vaccinations_v5_X26_20210730T12000000.csv"
142+
test_message_id = "1234"
143+
test_start_time = 1627647000
144+
audit_table.set_audit_table_ingestion_complete(test_file_key, test_message_id, test_start_time)
145+
self.mock_dynamodb_client.update_item.assert_called_once_with(
146+
TableName=AUDIT_TABLE_NAME,
147+
Key={AuditTableKeys.MESSAGE_ID: {"S": test_message_id}},
148+
UpdateExpression=f"SET #{AuditTableKeys.INGESTION_COMPLETE} = :{AuditTableKeys.INGESTION_COMPLETE}",
149+
ExpressionAttributeNames={f"#{AuditTableKeys.INGESTION_COMPLETE}": AuditTableKeys.INGESTION_COMPLETE},
150+
ExpressionAttributeValues={f":{AuditTableKeys.INGESTION_COMPLETE}": {"S": "20210730T12100000"}},
151+
ConditionExpression="attribute_exists(message_id)",
152+
ReturnValues="UPDATED_NEW",
153+
)
154+
self.mock_logger.info.assert_called_once()
155+
156+
def test_set_audit_table_ingestion_complete_throws_exception_with_invalid_id(self):
157+
test_file_key = "RSV_Vaccinations_v5_X26_20210730T12000000.csv"
158+
test_message_id = "1234"
159+
test_start_time = 1627647000
160+
self.mock_dynamodb_client.update_item.side_effect = Exception("fail!")
161+
with self.assertRaises(UnhandledAuditTableError) as ctx:
162+
audit_table.set_audit_table_ingestion_complete(test_file_key, test_message_id, test_start_time)
163+
self.assertIn("fail!", str(ctx.exception))
164+
self.mock_logger.error.assert_called_once()

lambdas/ack_backend/tests/test_splunk_logging.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ def test_splunk_update_ack_file_logged(self):
424424
"update_ack_file.change_audit_table_status_to_processed"
425425
) as mock_change_audit_table_status_to_processed, # noqa: E999
426426
patch("update_ack_file.set_records_succeeded_count") as mock_set_records_succeeded_count, # noqa: E999
427+
patch("logging_decorators.set_audit_table_ingestion_complete"), # noqa: E999
427428
): # noqa: E999
428429
result = lambda_handler(generate_event(messages, include_eof_message=True), context={})
429430

lambdas/ack_backend/tests/test_update_ack_file_flow.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,15 @@ def setUp(self):
4141
self.mock_get_record_count = self.get_record_count_patcher.start()
4242
self.set_records_succeeded_count_patcher = patch("update_ack_file.set_records_succeeded_count")
4343
self.mock_set_records_succeeded_count = self.set_records_succeeded_count_patcher.start()
44+
self.set_audit_table_ingestion_complete_patcher = patch("logging_decorators.set_audit_table_ingestion_complete")
45+
self.mock_set_audit_table_ingestion_complete = self.set_audit_table_ingestion_complete_patcher.start()
4446

4547
def tearDown(self):
4648
self.logger_patcher.stop()
4749
self.change_audit_status_patcher.stop()
50+
self.get_record_count_patcher.stop()
51+
self.set_records_succeeded_count_patcher.stop()
52+
self.set_audit_table_ingestion_complete_patcher.stop()
4853

4954
def test_audit_table_updated_correctly_when_ack_process_complete(self):
5055
"""VED-167 - Test that the audit table has been updated correctly"""

0 commit comments

Comments
 (0)