Skip to content

Commit 7e34919

Browse files
authored
VED-956: Audit table update (#1026)
1 parent 13f44aa commit 7e34919

19 files changed

+407
-81
lines changed

lambdas/ack_backend/src/ack_processor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import json
44

5+
from audit_table import increment_records_failed_count
56
from common.batch.eof_utils import is_eof_message
67
from convert_message_to_ack_row import convert_message_to_ack_row
78
from logging_decorators import ack_lambda_handler_logging_decorator
@@ -49,6 +50,7 @@ def lambda_handler(event, _):
4950
break
5051

5152
ack_data_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))
53+
increment_records_failed_count(message_id)
5254

5355
update_ack_file(file_key, created_at_formatted_string, ack_data_rows)
5456

Lines changed: 79 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,114 @@
11
"""Add the filename to the audit table and check for duplicates."""
22

3-
from typing import Optional
4-
53
from common.clients import dynamodb_client, logger
64
from common.models.errors import UnhandledAuditTableError
75
from constants import AUDIT_TABLE_NAME, AuditTableKeys, FileStatus
86

7+
CONDITION_EXPRESSION = "attribute_exists(message_id)"
8+
99

1010
def change_audit_table_status_to_processed(file_key: str, message_id: str) -> None:
1111
"""Updates the status in the audit table to 'Processed' and returns the queue name."""
1212
try:
1313
# Update the status in the audit table to "Processed"
14-
dynamodb_client.update_item(
14+
response = dynamodb_client.update_item(
1515
TableName=AUDIT_TABLE_NAME,
1616
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
1717
UpdateExpression="SET #status = :status",
1818
ExpressionAttributeNames={"#status": "status"},
1919
ExpressionAttributeValues={":status": {"S": FileStatus.PROCESSED}},
20-
ConditionExpression="attribute_exists(message_id)",
20+
ConditionExpression=CONDITION_EXPRESSION,
21+
ReturnValues="UPDATED_NEW",
2122
)
22-
23+
result = response.get("Attributes", {}).get("status").get("S")
2324
logger.info(
2425
"The status of %s file, with message id %s, was successfully updated to %s in the audit table",
2526
file_key,
2627
message_id,
27-
FileStatus.PROCESSED,
28+
result,
2829
)
2930

3031
except Exception as error: # pylint: disable = broad-exception-caught
3132
logger.error(error)
3233
raise UnhandledAuditTableError(error) from error
3334

3435

35-
def get_record_count_by_message_id(event_message_id: str) -> Optional[int]:
36-
"""Retrieves full audit entry by unique event message ID"""
36+
def get_record_count_and_failures_by_message_id(event_message_id: str) -> tuple[int, int]:
37+
"""Retrieves total record count and total failures by unique event message ID"""
3738
audit_record = dynamodb_client.get_item(
3839
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": event_message_id}}
3940
)
4041

4142
record_count = audit_record.get("Item", {}).get(AuditTableKeys.RECORD_COUNT, {}).get("N")
43+
failures_count = audit_record.get("Item", {}).get(AuditTableKeys.RECORDS_FAILED, {}).get("N")
44+
45+
return int(record_count) if record_count else 0, int(failures_count) if failures_count else 0
46+
47+
48+
def increment_records_failed_count(message_id: str) -> None:
49+
"""
50+
Increment a counter attribute safely, handling the case where it might not exist.
51+
From https://docs.aws.amazon.com/code-library/latest/ug/dynamodb_example_dynamodb_Scenario_AtomicCounterOperations_section.html
52+
"""
53+
increment_value = 1
54+
initial_value = 0
55+
56+
try:
57+
# Use SET with if_not_exists to safely increment the counter attribute
58+
dynamodb_client.update_item(
59+
TableName=AUDIT_TABLE_NAME,
60+
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
61+
UpdateExpression="SET #attribute = if_not_exists(#attribute, :initial) + :increment",
62+
ExpressionAttributeNames={"#attribute": AuditTableKeys.RECORDS_FAILED},
63+
ExpressionAttributeValues={":increment": {"N": str(increment_value)}, ":initial": {"N": str(initial_value)}},
64+
ConditionExpression=CONDITION_EXPRESSION,
65+
ReturnValues="UPDATED_NEW",
66+
)
67+
68+
except Exception as error: # pylint: disable = broad-exception-caught
69+
logger.error(error)
70+
raise UnhandledAuditTableError(error) from error
4271

43-
if not record_count:
44-
return None
4572

46-
return int(record_count)
73+
def set_audit_record_success_count_and_end_time(
74+
file_key: str, message_id: str, success_count: int, ingestion_end_time: str
75+
) -> None:
76+
"""Sets the 'records_succeeded' and 'ingestion_end_time' attributes for the given audit record"""
77+
update_expression = (
78+
f"SET #{AuditTableKeys.INGESTION_END_TIME} = :{AuditTableKeys.INGESTION_END_TIME}"
79+
f", #{AuditTableKeys.RECORDS_SUCCEEDED} = :{AuditTableKeys.RECORDS_SUCCEEDED}"
80+
)
81+
expression_attr_names = {
82+
f"#{AuditTableKeys.INGESTION_END_TIME}": AuditTableKeys.INGESTION_END_TIME,
83+
f"#{AuditTableKeys.RECORDS_SUCCEEDED}": AuditTableKeys.RECORDS_SUCCEEDED,
84+
}
85+
expression_attr_values = {
86+
f":{AuditTableKeys.INGESTION_END_TIME}": {"S": ingestion_end_time},
87+
f":{AuditTableKeys.RECORDS_SUCCEEDED}": {"N": str(success_count)},
88+
}
89+
90+
try:
91+
dynamodb_client.update_item(
92+
TableName=AUDIT_TABLE_NAME,
93+
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
94+
UpdateExpression=update_expression,
95+
ExpressionAttributeNames=expression_attr_names,
96+
ExpressionAttributeValues=expression_attr_values,
97+
ConditionExpression=CONDITION_EXPRESSION,
98+
)
99+
except Exception as error: # pylint: disable = broad-exception-caught
100+
logger.error(error)
101+
raise UnhandledAuditTableError(error) from error
102+
103+
logger.info(
104+
"ingestion_end_time for %s file, with message id %s, was successfully updated to %s in the audit table",
105+
file_key,
106+
message_id,
107+
ingestion_end_time,
108+
)
109+
logger.info(
110+
"records_succeeded for %s file, with message id %s, was successfully updated to %s in the audit table",
111+
file_key,
112+
message_id,
113+
str(success_count),
114+
)

lambdas/ack_backend/src/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ class AuditTableKeys:
3838
RECORD_COUNT = "record_count"
3939
STATUS = "status"
4040
TIMESTAMP = "timestamp"
41+
INGESTION_END_TIME = "ingestion_end_time"
42+
RECORDS_SUCCEEDED = "records_succeeded"
43+
RECORDS_FAILED = "records_failed"
4144

4245

4346
ACK_HEADERS = [

lambdas/ack_backend/src/logging_decorators.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ def wrapper(*args, **kwargs):
9191
"message": message_for_logs,
9292
}
9393
generate_and_send_logs(STREAM_NAME, start_time, base_log_data, additional_log_data)
94+
9495
return result
9596

9697
return wrapper

lambdas/ack_backend/src/update_ack_file.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
"""Functions for uploading the data to the ack file"""
22

3+
import time
34
from io import BytesIO, StringIO
45

56
from botocore.exceptions import ClientError
67

7-
from audit_table import change_audit_table_status_to_processed, get_record_count_by_message_id
8+
from audit_table import (
9+
change_audit_table_status_to_processed,
10+
get_record_count_and_failures_by_message_id,
11+
set_audit_record_success_count_and_end_time,
12+
)
813
from common.aws_s3_utils import move_file
914
from common.clients import get_s3_client, logger
1015
from constants import (
@@ -71,15 +76,22 @@ def complete_batch_file_process(
7176
get_source_bucket_name(), f"{BATCH_FILE_PROCESSING_DIR}/{file_key}", f"{BATCH_FILE_ARCHIVE_DIR}/{file_key}"
7277
)
7378

74-
total_ack_rows_processed = get_record_count_by_message_id(message_id)
79+
total_ack_rows_processed, total_failures = get_record_count_and_failures_by_message_id(message_id)
7580
change_audit_table_status_to_processed(file_key, message_id)
7681

82+
# Consider creating time utils and using datetime instead of time
83+
ingestion_end_time = time.strftime("%Y%m%dT%H%M%S00", time.gmtime())
84+
successful_record_count = total_ack_rows_processed - total_failures
85+
set_audit_record_success_count_and_end_time(file_key, message_id, successful_record_count, ingestion_end_time)
86+
7787
return {
7888
"message_id": message_id,
7989
"file_key": file_key,
8090
"supplier": supplier,
8191
"vaccine_type": vaccine_type,
8292
"row_count": total_ack_rows_processed,
93+
"success_count": successful_record_count,
94+
"failure_count": total_failures,
8395
}
8496

8597

0 commit comments

Comments
 (0)