Skip to content

Commit e124135

Browse files
committed
review fixes
1 parent 39b204a commit e124135

17 files changed

+112
-93
lines changed

lambdas/ack_backend/src/ack_processor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ def lambda_handler(event, _):
5050
break
5151

5252
ack_data_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))
53+
# Note: we are only currently receiving failure records.
54+
# Nevertheless, we leave this here pending rewrite of tests (which currently still incorporate
55+
# success records and use "diagnostics" to identify failures).
5356
if message.get("diagnostics"):
5457
increment_records_failed_count(message_id)
5558

lambdas/ack_backend/src/audit_table.py

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -62,21 +62,21 @@ def set_records_succeeded_count(message_id: str) -> None:
6262
records_failed = int(records_failed_item) if records_failed_item else 0
6363
records_succeeded = record_count - records_failed
6464

65-
counter_name = AuditTableKeys.RECORDS_SUCCEEDED
65+
attribute_name = AuditTableKeys.RECORDS_SUCCEEDED
6666
try:
6767
response = dynamodb_client.update_item(
6868
TableName=AUDIT_TABLE_NAME,
6969
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
70-
UpdateExpression="SET #counter = :value",
71-
ExpressionAttributeNames={"#counter": counter_name},
70+
UpdateExpression="SET #attribute = :value",
71+
ExpressionAttributeNames={"#attribute": attribute_name},
7272
ExpressionAttributeValues={":value": {"N": str(records_succeeded)}},
7373
ConditionExpression=CONDITION_EXPRESSION,
7474
ReturnValues="UPDATED_NEW",
7575
)
76-
result = response.get("Attributes", {}).get(counter_name).get("N")
76+
result = response.get("Attributes", {}).get(attribute_name).get("N")
7777
logger.info(
78-
"Counter %s for message id %s set to %s in the audit table",
79-
counter_name,
78+
"Attribute %s for message id %s set to %s in the audit table",
79+
attribute_name,
8080
message_id,
8181
result,
8282
)
@@ -94,43 +94,37 @@ def increment_records_failed_count(message_id: str) -> None:
9494

9595
increment_value = 1
9696
initial_value = 0
97-
counter_name = AuditTableKeys.RECORDS_FAILED
97+
attribute_name = AuditTableKeys.RECORDS_FAILED
9898
try:
99-
# Use SET with if_not_exists to safely increment the counter
99+
# Use SET with if_not_exists to safely increment the counter attribute
100100
response = dynamodb_client.update_item(
101101
TableName=AUDIT_TABLE_NAME,
102102
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
103-
UpdateExpression="SET #counter = if_not_exists(#counter, :initial) + :increment",
104-
ExpressionAttributeNames={"#counter": counter_name},
103+
UpdateExpression="SET #attribute = if_not_exists(#attribute, :initial) + :increment",
104+
ExpressionAttributeNames={"#attribute": attribute_name},
105105
ExpressionAttributeValues={":increment": {"N": str(increment_value)}, ":initial": {"N": str(initial_value)}},
106106
ConditionExpression=CONDITION_EXPRESSION,
107107
ReturnValues="UPDATED_NEW",
108108
)
109-
result = response.get("Attributes", {}).get(counter_name).get("N")
110-
logger.info(
111-
"Counter %s for message id %s incremented to %s in the audit table",
112-
counter_name,
113-
message_id,
114-
result,
115-
)
109+
response.get("Attributes", {}).get(attribute_name).get("N")
116110

117111
except Exception as error: # pylint: disable = broad-exception-caught
118112
logger.error(error)
119113
raise UnhandledAuditTableError(error) from error
120114

121115

122-
def set_audit_table_ingestion_complete(
116+
def set_audit_table_ingestion_end_time(
123117
file_key: str,
124118
message_id: str,
125119
complete_time: float,
126120
) -> None:
127-
"""Sets the ingestion_complete in the audit table to the requested time"""
121+
"""Sets the ingestion_end_time in the audit table to the requested time"""
128122
# format the time
129-
ingestion_complete = time.strftime("%Y%m%dT%H%M%S00", time.gmtime(complete_time))
123+
ingestion_end_time = time.strftime("%Y%m%dT%H%M%S00", time.gmtime(complete_time))
130124

131-
update_expression = f"SET #{AuditTableKeys.INGESTION_COMPLETE} = :{AuditTableKeys.INGESTION_COMPLETE}"
132-
expression_attr_names = {f"#{AuditTableKeys.INGESTION_COMPLETE}": AuditTableKeys.INGESTION_COMPLETE}
133-
expression_attr_values = {f":{AuditTableKeys.INGESTION_COMPLETE}": {"S": ingestion_complete}}
125+
update_expression = f"SET #{AuditTableKeys.INGESTION_END_TIME} = :{AuditTableKeys.INGESTION_END_TIME}"
126+
expression_attr_names = {f"#{AuditTableKeys.INGESTION_END_TIME}": AuditTableKeys.INGESTION_END_TIME}
127+
expression_attr_values = {f":{AuditTableKeys.INGESTION_END_TIME}": {"S": ingestion_end_time}}
134128

135129
try:
136130
response = dynamodb_client.update_item(
@@ -142,9 +136,9 @@ def set_audit_table_ingestion_complete(
142136
ConditionExpression=f"attribute_exists({AuditTableKeys.MESSAGE_ID})",
143137
ReturnValues="UPDATED_NEW",
144138
)
145-
result = response.get("Attributes", {}).get(AuditTableKeys.INGESTION_COMPLETE).get("S")
139+
result = response.get("Attributes", {}).get(AuditTableKeys.INGESTION_END_TIME).get("S")
146140
logger.info(
147-
"ingestion_complete for %s file, with message id %s, was successfully updated to %s in the audit table",
141+
"ingestion_end_time for %s file, with message id %s, was successfully updated to %s in the audit table",
148142
file_key,
149143
message_id,
150144
result,

lambdas/ack_backend/src/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class AuditTableKeys:
3838
RECORD_COUNT = "record_count"
3939
STATUS = "status"
4040
TIMESTAMP = "timestamp"
41-
INGESTION_COMPLETE = "ingestion_complete"
41+
INGESTION_END_TIME = "ingestion_end_time"
4242
RECORDS_SUCCEEDED = "records_succeeded"
4343
RECORDS_FAILED = "records_failed"
4444

lambdas/ack_backend/src/logging_decorators.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from datetime import datetime
66
from functools import wraps
77

8-
from audit_table import set_audit_table_ingestion_complete
98
from common.log_decorator import generate_and_send_logs
109

1110
PREFIX = "ack_processor"
@@ -79,10 +78,9 @@ def wrapper(*args, **kwargs):
7978
"function_name": f"{PREFIX}_{func.__name__}",
8079
"date_time": str(datetime.now()),
8180
}
82-
complete_time = time.time()
8381

8482
# NB this doesn't require a try-catch block as the wrapped function never throws an exception
85-
result = func(*args, **kwargs)
83+
result, ingestion_end_time = func(*args, **kwargs)
8684
if result is not None:
8785
message_for_logs = "Record processing complete"
8886
base_log_data.update(result)
@@ -91,11 +89,8 @@ def wrapper(*args, **kwargs):
9189
"statusCode": 200,
9290
"message": message_for_logs,
9391
}
94-
file_key = base_log_data.get("filename")
95-
message_id = base_log_data.get("message_id")
96-
set_audit_table_ingestion_complete(file_key, message_id, complete_time)
92+
generate_and_send_logs(STREAM_NAME, ingestion_end_time, base_log_data, additional_log_data)
9793

98-
generate_and_send_logs(STREAM_NAME, complete_time, base_log_data, additional_log_data)
9994
return result
10095

10196
return wrapper

lambdas/ack_backend/src/update_ack_file.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
"""Functions for uploading the data to the ack file"""
22

3+
import time
34
from io import BytesIO, StringIO
45

56
from botocore.exceptions import ClientError
67

78
from audit_table import (
89
change_audit_table_status_to_processed,
910
get_record_count_by_message_id,
11+
set_audit_table_ingestion_end_time,
1012
set_records_succeeded_count,
1113
)
1214
from common.aws_s3_utils import move_file
@@ -65,7 +67,7 @@ def complete_batch_file_process(
6567
vaccine_type: str,
6668
created_at_formatted_string: str,
6769
file_key: str,
68-
) -> dict:
70+
) -> tuple[dict, float]:
6971
"""Mark the batch file as processed. This involves moving the ack and original file to destinations and updating
7072
the audit table status"""
7173
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
@@ -79,13 +81,17 @@ def complete_batch_file_process(
7981
change_audit_table_status_to_processed(file_key, message_id)
8082
set_records_succeeded_count(message_id)
8183

82-
return {
84+
ingestion_end_time = time.time()
85+
set_audit_table_ingestion_end_time(file_key, message_id, ingestion_end_time)
86+
87+
result = {
8388
"message_id": message_id,
8489
"file_key": file_key,
8590
"supplier": supplier,
8691
"vaccine_type": vaccine_type,
8792
"row_count": total_ack_rows_processed,
8893
}
94+
return result, ingestion_end_time
8995

9096

9197
def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:

lambdas/ack_backend/tests/test_audit_table.py

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ def test_set_records_succeeded_count(self):
6565
self.mock_dynamodb_client.update_item.assert_called_once_with(
6666
TableName=AUDIT_TABLE_NAME,
6767
Key={AuditTableKeys.MESSAGE_ID: {"S": test_message_id}},
68-
UpdateExpression="SET #counter = :value",
69-
ExpressionAttributeNames={"#counter": AuditTableKeys.RECORDS_SUCCEEDED},
68+
UpdateExpression="SET #attribute = :value",
69+
ExpressionAttributeNames={"#attribute": AuditTableKeys.RECORDS_SUCCEEDED},
7070
ExpressionAttributeValues={":value": {"N": "958"}},
7171
ConditionExpression="attribute_exists(message_id)",
7272
ReturnValues="UPDATED_NEW",
@@ -83,8 +83,8 @@ def test_set_records_succeeded_count_no_failures(self):
8383
self.mock_dynamodb_client.update_item.assert_called_once_with(
8484
TableName=AUDIT_TABLE_NAME,
8585
Key={AuditTableKeys.MESSAGE_ID: {"S": test_message_id}},
86-
UpdateExpression="SET #counter = :value",
87-
ExpressionAttributeNames={"#counter": AuditTableKeys.RECORDS_SUCCEEDED},
86+
UpdateExpression="SET #attribute = :value",
87+
ExpressionAttributeNames={"#attribute": AuditTableKeys.RECORDS_SUCCEEDED},
8888
ExpressionAttributeValues={":value": {"N": "1000"}},
8989
ConditionExpression="attribute_exists(message_id)",
9090
ReturnValues="UPDATED_NEW",
@@ -99,8 +99,8 @@ def test_set_records_succeeded_count_no_records(self):
9999
self.mock_dynamodb_client.update_item.assert_called_once_with(
100100
TableName=AUDIT_TABLE_NAME,
101101
Key={AuditTableKeys.MESSAGE_ID: {"S": test_message_id}},
102-
UpdateExpression="SET #counter = :value",
103-
ExpressionAttributeNames={"#counter": AuditTableKeys.RECORDS_SUCCEEDED},
102+
UpdateExpression="SET #attribute = :value",
103+
ExpressionAttributeNames={"#attribute": AuditTableKeys.RECORDS_SUCCEEDED},
104104
ExpressionAttributeValues={":value": {"N": "0"}},
105105
ConditionExpression="attribute_exists(message_id)",
106106
ReturnValues="UPDATED_NEW",
@@ -121,13 +121,12 @@ def test_increment_records_failed_count(self):
121121
self.mock_dynamodb_client.update_item.assert_called_once_with(
122122
TableName=AUDIT_TABLE_NAME,
123123
Key={AuditTableKeys.MESSAGE_ID: {"S": test_message_id}},
124-
UpdateExpression="SET #counter = if_not_exists(#counter, :initial) + :increment",
125-
ExpressionAttributeNames={"#counter": AuditTableKeys.RECORDS_FAILED},
124+
UpdateExpression="SET #attribute = if_not_exists(#attribute, :initial) + :increment",
125+
ExpressionAttributeNames={"#attribute": AuditTableKeys.RECORDS_FAILED},
126126
ExpressionAttributeValues={":increment": {"N": "1"}, ":initial": {"N": "0"}},
127127
ConditionExpression="attribute_exists(message_id)",
128128
ReturnValues="UPDATED_NEW",
129129
)
130-
self.mock_logger.info.assert_called_once()
131130

132131
def test_increment_records_failed_count_raises(self):
133132
self.mock_dynamodb_client.update_item.side_effect = Exception("fail!")
@@ -136,29 +135,29 @@ def test_increment_records_failed_count_raises(self):
136135
self.assertIn("fail!", str(ctx.exception))
137136
self.mock_logger.error.assert_called_once()
138137

139-
def test_set_audit_table_ingestion_complete(self):
140-
"""Checks audit table correctly sets ingestion_complete to the requested value"""
138+
def test_set_audit_table_ingestion_end_time(self):
139+
"""Checks audit table correctly sets ingestion_end_time to the requested value"""
141140
test_file_key = "RSV_Vaccinations_v5_X26_20210730T12000000.csv"
142141
test_message_id = "1234"
143-
test_start_time = 1627647000
144-
audit_table.set_audit_table_ingestion_complete(test_file_key, test_message_id, test_start_time)
142+
test_end_time = 1627647000
143+
audit_table.set_audit_table_ingestion_end_time(test_file_key, test_message_id, test_end_time)
145144
self.mock_dynamodb_client.update_item.assert_called_once_with(
146145
TableName=AUDIT_TABLE_NAME,
147146
Key={AuditTableKeys.MESSAGE_ID: {"S": test_message_id}},
148-
UpdateExpression=f"SET #{AuditTableKeys.INGESTION_COMPLETE} = :{AuditTableKeys.INGESTION_COMPLETE}",
149-
ExpressionAttributeNames={f"#{AuditTableKeys.INGESTION_COMPLETE}": AuditTableKeys.INGESTION_COMPLETE},
150-
ExpressionAttributeValues={f":{AuditTableKeys.INGESTION_COMPLETE}": {"S": "20210730T12100000"}},
147+
UpdateExpression=f"SET #{AuditTableKeys.INGESTION_END_TIME} = :{AuditTableKeys.INGESTION_END_TIME}",
148+
ExpressionAttributeNames={f"#{AuditTableKeys.INGESTION_END_TIME}": AuditTableKeys.INGESTION_END_TIME},
149+
ExpressionAttributeValues={f":{AuditTableKeys.INGESTION_END_TIME}": {"S": "20210730T12100000"}},
151150
ConditionExpression="attribute_exists(message_id)",
152151
ReturnValues="UPDATED_NEW",
153152
)
154153
self.mock_logger.info.assert_called_once()
155154

156-
def test_set_audit_table_ingestion_complete_throws_exception_with_invalid_id(self):
155+
def test_set_audit_table_ingestion_end_time_throws_exception_with_invalid_id(self):
157156
test_file_key = "RSV_Vaccinations_v5_X26_20210730T12000000.csv"
158157
test_message_id = "1234"
159-
test_start_time = 1627647000
158+
test_end_time = 1627647000
160159
self.mock_dynamodb_client.update_item.side_effect = Exception("fail!")
161160
with self.assertRaises(UnhandledAuditTableError) as ctx:
162-
audit_table.set_audit_table_ingestion_complete(test_file_key, test_message_id, test_start_time)
161+
audit_table.set_audit_table_ingestion_end_time(test_file_key, test_message_id, test_end_time)
163162
self.assertIn("fail!", str(ctx.exception))
164163
self.mock_logger.error.assert_called_once()

lambdas/ack_backend/tests/test_splunk_logging.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def run(self, result=None):
6666
# Time is incremented by 1.0 for each call to time.time for ease of testing.
6767
# Range is set to a large number (300) due to many calls being made to time.time for some tests.
6868
patch(
69-
"logging_decorators.time.time",
69+
"update_ack_file.time.time",
7070
side_effect=[0.0 + i for i in range(300)],
7171
),
7272
]
@@ -121,6 +121,7 @@ def test_splunk_logging_successful_rows(self):
121121
with ( # noqa: E999
122122
patch("common.log_decorator.send_log_to_firehose") as mock_send_log_to_firehose, # noqa: E999
123123
patch("common.log_decorator.logger") as mock_logger, # noqa: E999
124+
patch("ack_processor.increment_records_failed_count"), # noqa: E999
124125
): # noqa: E999
125126
result = lambda_handler(
126127
event=generate_event([{"operation_requested": operation}]),
@@ -161,6 +162,7 @@ def test_splunk_logging_missing_data(self):
161162
with ( # noqa: E999
162163
patch("common.log_decorator.send_log_to_firehose") as mock_send_log_to_firehose, # noqa: E999
163164
patch("common.log_decorator.logger") as mock_logger, # noqa: E999
165+
patch("ack_processor.increment_records_failed_count"), # noqa: E999
164166
): # noqa: E999
165167
with self.assertRaises(AttributeError):
166168
lambda_handler(event={"Records": [{"body": json.dumps([{"": "456", "row_id": "test^1"}])}]}, context={})
@@ -263,6 +265,7 @@ def test_splunk_logging_multiple_rows(self):
263265
with ( # noqa: E999
264266
patch("common.log_decorator.send_log_to_firehose") as mock_send_log_to_firehose, # noqa: E999
265267
patch("common.log_decorator.logger") as mock_logger, # noqa: E999
268+
patch("ack_processor.increment_records_failed_count"), # noqa: E999
266269
): # noqa: E999
267270
result = lambda_handler(generate_event(messages), context={})
268271

@@ -382,6 +385,7 @@ def test_splunk_update_ack_file_not_logged(self):
382385
patch(
383386
"update_ack_file.change_audit_table_status_to_processed"
384387
) as mock_change_audit_table_status_to_processed, # noqa: E999
388+
patch("ack_processor.increment_records_failed_count"), # noqa: E999
385389
): # noqa: E999
386390
result = lambda_handler(generate_event(messages), context={})
387391

@@ -424,7 +428,8 @@ def test_splunk_update_ack_file_logged(self):
424428
"update_ack_file.change_audit_table_status_to_processed"
425429
) as mock_change_audit_table_status_to_processed, # noqa: E999
426430
patch("update_ack_file.set_records_succeeded_count") as mock_set_records_succeeded_count, # noqa: E999
427-
patch("logging_decorators.set_audit_table_ingestion_complete"), # noqa: E999
431+
patch("ack_processor.increment_records_failed_count"), # noqa: E999
432+
patch("update_ack_file.set_audit_table_ingestion_end_time"), # noqa: E999
428433
): # noqa: E999
429434
result = lambda_handler(generate_event(messages, include_eof_message=True), context={})
430435

lambdas/ack_backend/tests/test_update_ack_file_flow.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@ def setUp(self):
4141
self.mock_get_record_count = self.get_record_count_patcher.start()
4242
self.set_records_succeeded_count_patcher = patch("update_ack_file.set_records_succeeded_count")
4343
self.mock_set_records_succeeded_count = self.set_records_succeeded_count_patcher.start()
44-
self.set_audit_table_ingestion_complete_patcher = patch("logging_decorators.set_audit_table_ingestion_complete")
45-
self.mock_set_audit_table_ingestion_complete = self.set_audit_table_ingestion_complete_patcher.start()
44+
self.set_audit_table_ingestion_end_time_patcher = patch("update_ack_file.set_audit_table_ingestion_end_time")
45+
self.mock_set_audit_table_ingestion_end_time = self.set_audit_table_ingestion_end_time_patcher.start()
4646

4747
def tearDown(self):
4848
self.logger_patcher.stop()
4949
self.change_audit_status_patcher.stop()
5050
self.get_record_count_patcher.stop()
5151
self.set_records_succeeded_count_patcher.stop()
52-
self.set_audit_table_ingestion_complete_patcher.stop()
52+
self.set_audit_table_ingestion_end_time_patcher.stop()
5353

5454
def test_audit_table_updated_correctly_when_ack_process_complete(self):
5555
"""VED-167 - Test that the audit table has been updated correctly"""

0 commit comments

Comments
 (0)