Skip to content

Commit 066ea5b

Browse files
authored
VED-924 (Also contains VED-925 and VED-926) Modify ack file creation and completion process (#1023)
1 parent 111e0ae commit 066ea5b

26 files changed

+324
-170
lines changed

lambdas/ack_backend/src/ack_processor.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22

33
import json
44

5+
from common.batch.eof_utils import is_eof_message
56
from convert_message_to_ack_row import convert_message_to_ack_row
67
from logging_decorators import ack_lambda_handler_logging_decorator
78
from update_ack_file import complete_batch_file_process, update_ack_file
8-
from utils_for_ack_lambda import is_ack_processing_complete
99

1010

1111
@ack_lambda_handler_logging_decorator
@@ -22,9 +22,11 @@ def lambda_handler(event, _):
2222
file_key = None
2323
created_at_formatted_string = None
2424
message_id = None
25+
supplier = None
26+
vaccine_type = None
2527

2628
ack_data_rows = []
27-
total_ack_rows_processed = 0
29+
file_processing_complete = False
2830

2931
for i, record in enumerate(event["Records"]):
3032
try:
@@ -42,18 +44,16 @@ def lambda_handler(event, _):
4244
created_at_formatted_string = incoming_message_body[0].get("created_at_formatted_string")
4345

4446
for message in incoming_message_body:
47+
if is_eof_message(message):
48+
file_processing_complete = True
49+
break
50+
4551
ack_data_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))
4652

4753
update_ack_file(file_key, created_at_formatted_string, ack_data_rows)
4854

49-
# Get the row count of the final processed record
50-
# Format of the row id is {batch_message_id}^{row_number}
51-
total_ack_rows_processed = int(incoming_message_body[-1].get("row_id", "").split("^")[1])
52-
53-
if is_ack_processing_complete(message_id, total_ack_rows_processed):
54-
complete_batch_file_process(
55-
message_id, supplier, vaccine_type, created_at_formatted_string, file_key, total_ack_rows_processed
56-
)
55+
if file_processing_complete:
56+
complete_batch_file_process(message_id, supplier, vaccine_type, created_at_formatted_string, file_key)
5757

5858
return {
5959
"statusCode": 200,

lambdas/ack_backend/src/update_ack_file.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from botocore.exceptions import ClientError
66

7-
from audit_table import change_audit_table_status_to_processed
7+
from audit_table import change_audit_table_status_to_processed, get_record_count_by_message_id
88
from common.aws_s3_utils import move_file
99
from common.clients import get_s3_client, logger
1010
from constants import (
@@ -61,7 +61,6 @@ def complete_batch_file_process(
6161
vaccine_type: str,
6262
created_at_formatted_string: str,
6363
file_key: str,
64-
total_ack_rows_processed: int,
6564
) -> dict:
6665
"""Mark the batch file as processed. This involves moving the ack and original file to destinations and updating
6766
the audit table status"""
@@ -72,6 +71,7 @@ def complete_batch_file_process(
7271
get_source_bucket_name(), f"{BATCH_FILE_PROCESSING_DIR}/{file_key}", f"{BATCH_FILE_ARCHIVE_DIR}/{file_key}"
7372
)
7473

74+
total_ack_rows_processed = get_record_count_by_message_id(message_id)
7575
change_audit_table_status_to_processed(file_key, message_id)
7676

7777
return {
@@ -111,7 +111,7 @@ def update_ack_file(
111111
"""Updates the ack file with the new data row based on the given arguments"""
112112
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
113113
temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}"
114-
archive_ack_file_key = f"{COMPLETED_ACK_DIR}/{ack_filename}"
114+
completed_ack_file_key = f"{COMPLETED_ACK_DIR}/{ack_filename}"
115115
accumulated_csv_content = obtain_current_ack_content(temp_ack_file_key)
116116

117117
for row in ack_data_rows:
@@ -123,4 +123,4 @@ def update_ack_file(
123123
ack_bucket_name = get_ack_bucket_name()
124124

125125
get_s3_client().upload_fileobj(csv_file_like_object, ack_bucket_name, temp_ack_file_key)
126-
logger.info("Ack file updated to %s: %s", ack_bucket_name, archive_ack_file_key)
126+
logger.info("Ack file updated to %s: %s", ack_bucket_name, completed_ack_file_key)

lambdas/ack_backend/src/utils_for_ack_lambda.py

Lines changed: 0 additions & 21 deletions
This file was deleted.

lambdas/ack_backend/tests/test_ack_processor.py

Lines changed: 7 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
import json
44
import os
55
import unittest
6+
from copy import deepcopy
67
from io import StringIO
7-
from unittest.mock import Mock, patch
8+
from unittest.mock import patch
89

910
from boto3 import client as boto3_client
1011
from moto import mock_aws
@@ -30,7 +31,6 @@
3031
DiagnosticsDictionaries,
3132
ValidValues,
3233
)
33-
from utils_for_ack_lambda import _BATCH_EVENT_ID_TO_RECORD_COUNT_MAP
3434

3535
with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT):
3636
from ack_processor import lambda_handler
@@ -269,45 +269,6 @@ def test_lambda_handler_updates_ack_file_but_does_not_mark_complete_when_records
269269
)
270270
self.assert_audit_entry_status_equals(mock_batch_message_id, "Preprocessed")
271271

272-
@patch("utils_for_ack_lambda.get_record_count_by_message_id", return_value=500)
273-
def test_lambda_handler_uses_message_id_to_record_count_cache_to_reduce_ddb_calls(self, mock_get_record_count: Mock):
274-
"""The DynamoDB Audit table is used to store the total record count for each source file. To reduce calls each
275-
time - this test checks that we cache the value as this lambda is called many times for large files"""
276-
mock_batch_message_id = "622cdeea-461e-4a83-acb5-7871d47ddbcd"
277-
278-
# Original source file had 500 records
279-
add_audit_entry_to_table(self.dynamodb_client, mock_batch_message_id, record_count=500)
280-
281-
message_one = [
282-
{**BASE_SUCCESS_MESSAGE, "row_id": f"{mock_batch_message_id}^1", "imms_id": "imms_1", "local_id": "local^1"}
283-
]
284-
message_two = [
285-
{**BASE_SUCCESS_MESSAGE, "row_id": f"{mock_batch_message_id}^2", "imms_id": "imms_2", "local_id": "local^2"}
286-
]
287-
test_event_one = {"Records": [{"body": json.dumps(message_one)}]}
288-
test_event_two = {"Records": [{"body": json.dumps(message_two)}]}
289-
290-
response = lambda_handler(event=test_event_one, context={})
291-
self.assertEqual(response, EXPECTED_ACK_LAMBDA_RESPONSE_FOR_SUCCESS)
292-
second_invocation_response = lambda_handler(event=test_event_two, context={})
293-
self.assertEqual(second_invocation_response, EXPECTED_ACK_LAMBDA_RESPONSE_FOR_SUCCESS)
294-
295-
# Assert that the DDB call is only performed once on the first invocation
296-
mock_get_record_count.assert_called_once_with(mock_batch_message_id)
297-
validate_ack_file_content(
298-
self.s3_client,
299-
[*message_one, *message_two],
300-
existing_file_content=ValidValues.ack_headers,
301-
)
302-
self.assert_ack_and_source_file_locations_correct(
303-
MOCK_MESSAGE_DETAILS.file_key,
304-
MOCK_MESSAGE_DETAILS.temp_ack_file_key,
305-
MOCK_MESSAGE_DETAILS.archive_ack_file_key,
306-
is_complete=False,
307-
)
308-
self.assertEqual(_BATCH_EVENT_ID_TO_RECORD_COUNT_MAP[mock_batch_message_id], 500)
309-
self.assert_audit_entry_status_equals(mock_batch_message_id, "Preprocessed")
310-
311272
def test_lambda_handler_updates_ack_file_and_marks_complete_when_all_records_processed(self):
312273
"""
313274
Test that the batch file process is marked as complete when all records have been processed.
@@ -339,7 +300,11 @@ def test_lambda_handler_updates_ack_file_and_marks_complete_when_all_records_pro
339300
}
340301
for i in range(50, 101)
341302
]
342-
test_event = {"Records": [{"body": json.dumps(array_of_success_messages)}]}
303+
304+
# Include the EoF message in the event
305+
all_messages_plus_eof = deepcopy(array_of_success_messages)
306+
all_messages_plus_eof.append(MOCK_MESSAGE_DETAILS.eof_message)
307+
test_event = {"Records": [{"body": json.dumps(all_messages_plus_eof)}]}
343308

344309
response = lambda_handler(event=test_event, context={})
345310

lambdas/ack_backend/tests/test_splunk_logging.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@
99
from boto3 import client as boto3_client
1010
from moto import mock_aws
1111

12-
from tests.utils.generic_setup_and_teardown_for_ack_backend import (
12+
from utils.generic_setup_and_teardown_for_ack_backend import (
1313
GenericSetUp,
1414
GenericTearDown,
1515
)
16-
from tests.utils.mock_environment_variables import MOCK_ENVIRONMENT_DICT, BucketNames
17-
from tests.utils.utils_for_ack_backend_tests import generate_event
18-
from tests.utils.values_for_ack_backend_tests import (
16+
from utils.mock_environment_variables import MOCK_ENVIRONMENT_DICT, BucketNames
17+
from utils.utils_for_ack_backend_tests import generate_event
18+
from utils.values_for_ack_backend_tests import (
1919
EXPECTED_ACK_LAMBDA_RESPONSE_FOR_SUCCESS,
2020
DiagnosticsDictionaries,
2121
InvalidValues,
@@ -120,7 +120,6 @@ def test_splunk_logging_successful_rows(self):
120120
for operation in ["CREATE", "UPDATE", "DELETE"]:
121121
with ( # noqa: E999
122122
patch("common.log_decorator.send_log_to_firehose") as mock_send_log_to_firehose, # noqa: E999
123-
patch("ack_processor.is_ack_processing_complete", return_value=False),
124123
patch("common.log_decorator.logger") as mock_logger, # noqa: E999
125124
): # noqa: E999
126125
result = lambda_handler(
@@ -224,7 +223,6 @@ def test_splunk_logging_statuscode_diagnostics(
224223
for test_case in test_cases:
225224
with ( # noqa: E999
226225
patch("common.log_decorator.send_log_to_firehose") as mock_send_log_to_firehose, # noqa: E999
227-
patch("ack_processor.is_ack_processing_complete", return_value=False),
228226
patch("common.log_decorator.logger") as mock_logger, # noqa: E999
229227
): # noqa: E999
230228
result = lambda_handler(
@@ -262,7 +260,6 @@ def test_splunk_logging_multiple_rows(self):
262260

263261
with ( # noqa: E999
264262
patch("common.log_decorator.send_log_to_firehose") as mock_send_log_to_firehose, # noqa: E999
265-
patch("ack_processor.is_ack_processing_complete", return_value=False),
266263
patch("common.log_decorator.logger") as mock_logger, # noqa: E999
267264
): # noqa: E999
268265
result = lambda_handler(generate_event(messages), context={})
@@ -313,7 +310,6 @@ def test_splunk_logging_multiple_with_diagnostics(
313310

314311
with ( # noqa: E999
315312
patch("common.log_decorator.send_log_to_firehose") as mock_send_log_to_firehose, # noqa: E999
316-
patch("ack_processor.is_ack_processing_complete", return_value=False),
317313
patch("common.log_decorator.logger") as mock_logger, # noqa: E999
318314
): # noqa: E999
319315
result = lambda_handler(generate_event(messages), context={})
@@ -369,7 +365,6 @@ def test_splunk_logging_multiple_with_diagnostics(
369365
)
370366

371367
def test_splunk_update_ack_file_not_logged(self):
372-
self.maxDiff = None
373368
"""Tests that update_ack_file is not logged if we have sent acks for less than the whole file"""
374369
# send 98 messages
375370
messages = []
@@ -380,7 +375,6 @@ def test_splunk_update_ack_file_not_logged(self):
380375
with ( # noqa: E999
381376
patch("common.log_decorator.send_log_to_firehose") as mock_send_log_to_firehose, # noqa: E999
382377
patch("common.log_decorator.logger") as mock_logger, # noqa: E999
383-
patch("ack_processor.is_ack_processing_complete", return_value=False),
384378
patch(
385379
"update_ack_file.change_audit_table_status_to_processed"
386380
) as mock_change_audit_table_status_to_processed, # noqa: E999
@@ -421,12 +415,12 @@ def test_splunk_update_ack_file_logged(self):
421415
with ( # noqa: E999
422416
patch("common.log_decorator.send_log_to_firehose") as mock_send_log_to_firehose, # noqa: E999
423417
patch("common.log_decorator.logger") as mock_logger, # noqa: E999
424-
patch("ack_processor.is_ack_processing_complete", return_value=True),
418+
patch("update_ack_file.get_record_count_by_message_id", return_value=99),
425419
patch(
426420
"update_ack_file.change_audit_table_status_to_processed"
427421
) as mock_change_audit_table_status_to_processed, # noqa: E999
428422
): # noqa: E999
429-
result = lambda_handler(generate_event(messages), context={})
423+
result = lambda_handler(generate_event(messages, include_eof_message=True), context={})
430424

431425
self.assertEqual(result, EXPECTED_ACK_LAMBDA_RESPONSE_FOR_SUCCESS)
432426

lambdas/ack_backend/tests/test_update_ack_file_flow.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ def setUp(self):
3737

3838
self.change_audit_status_patcher = patch("update_ack_file.change_audit_table_status_to_processed")
3939
self.mock_change_audit_status = self.change_audit_status_patcher.start()
40+
self.get_record_count_patcher = patch("update_ack_file.get_record_count_by_message_id")
41+
self.mock_get_record_count = self.get_record_count_patcher.start()
4042

4143
def tearDown(self):
4244
self.logger_patcher.stop()
@@ -56,6 +58,7 @@ def test_audit_table_updated_correctly_when_ack_process_complete(self):
5658
self.s3_client.put_object(
5759
Bucket=self.ack_bucket_name, Key=f"TempAck/audit_table_test_BusAck_{mock_created_at_string}.csv"
5860
)
61+
self.mock_get_record_count.return_value = 10
5962

6063
# Act
6164
update_ack_file.complete_batch_file_process(
@@ -64,8 +67,8 @@ def test_audit_table_updated_correctly_when_ack_process_complete(self):
6467
vaccine_type="vaccine-type",
6568
created_at_formatted_string=mock_created_at_string,
6669
file_key=file_key,
67-
total_ack_rows_processed=3,
6870
)
6971

70-
# Assert: Only check audit table update
72+
# Assert: Only check audit table interactions
73+
self.mock_get_record_count.assert_called_once_with(message_id)
7174
self.mock_change_audit_status.assert_called_once_with(file_key, message_id)

lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def add_audit_entry_to_table(dynamodb_client, batch_event_message_id: str, recor
2121
dynamodb_client.put_item(TableName=AUDIT_TABLE_NAME, Item=audit_table_entry)
2222

2323

24-
def generate_event(test_messages: list[dict]) -> dict:
24+
def generate_event(test_messages: list[dict], include_eof_message: bool = False) -> dict:
2525
"""
2626
Returns an event where each message in the incoming message body list is based on a standard mock message,
2727
updated with the details from the corresponsing message in the given test_messages list.
@@ -34,6 +34,10 @@ def generate_event(test_messages: list[dict]) -> dict:
3434
)
3535
for message in test_messages
3636
]
37+
38+
if include_eof_message:
39+
incoming_message_body.append(MOCK_MESSAGE_DETAILS.eof_message)
40+
3741
return {"Records": [{"body": json.dumps(incoming_message_body)}]}
3842

3943

lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,15 @@ def __init__(
139139
"diagnostics": DiagnosticsDictionaries.NO_PERMISSIONS,
140140
}
141141

142+
self.eof_message = {
143+
"created_at_formatted_string": self.created_at_formatted_string,
144+
"file_key": self.file_key,
145+
"message": "EOF",
146+
"row_id": self.row_id,
147+
"supplier": self.supplier,
148+
"vax_type": self.vaccine_type,
149+
}
150+
142151

143152
class MockMessageDetails:
144153
"""Class containing mock message details for use in tests"""

0 commit comments

Comments
 (0)