Skip to content

Commit 2a1ea72

Browse files
authored
VED-714 rearchitect to prevent batch race conditions (#770)
1 parent ef988a0 commit 2a1ea72

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1958
-924
lines changed

.github/workflows/sonarcloud.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,22 @@ jobs:
4646
poetry run coverage run -m unittest discover || echo "filenameprocessor tests failed" >> ../failed_tests.txt
4747
poetry run coverage xml -o ../filenameprocessor-coverage.xml
4848
49+
- name: Run unittest with batchprocessorfilter-coverage
50+
working-directory: batch_processor_filter
51+
id: batchprocessorfilter
52+
env:
53+
PYTHONPATH: ${{ github.workspace }}/batch_processor_filter/src:${{ github.workspace }}/batch_processor_filter/tests
54+
continue-on-error: true
55+
run: |
56+
poetry install
57+
poetry run coverage run -m unittest discover || echo "batchprocessorfilter tests failed" >> ../failed_tests.txt
58+
poetry run coverage xml -o ../batchprocessorfilter-coverage.xml
59+
4960
- name: Run unittest with recordprocessor-coverage
5061
working-directory: recordprocessor
5162
id: recordprocessor
63+
env:
64+
PYTHONPATH: ${{ github.workspace }}/recordprocessor/src:${{ github.workspace }}/recordprocessor/tests
5265
continue-on-error: true
5366
run: |
5467
poetry install

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
SHELL=/usr/bin/env bash -euo pipefail
22

3-
PYTHON_PROJECT_DIRS_WITH_UNIT_TESTS = ack_backend backend delta_backend filenameprocessor mesh_processor recordprocessor redis_sync lambdas/id_sync lambdas/shared mns_subscription
3+
PYTHON_PROJECT_DIRS_WITH_UNIT_TESTS = ack_backend backend batch_processor_filter delta_backend filenameprocessor mesh_processor recordprocessor redis_sync lambdas/id_sync lambdas/shared mns_subscription
44
PYTHON_PROJECT_DIRS = e2e e2e_batch $(PYTHON_PROJECT_DIRS_WITH_UNIT_TESTS)
55

66
#Installs dependencies using poetry.

ack_backend/src/audit_table.py

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,8 @@
11
"""Add the filename to the audit table and check for duplicates."""
22

3-
from typing import Union
4-
from boto3.dynamodb.conditions import Key
5-
from clients import dynamodb_client, dynamodb_resource, logger
3+
from clients import dynamodb_client, logger
64
from errors import UnhandledAuditTableError
7-
from constants import AUDIT_TABLE_NAME, AUDIT_TABLE_QUEUE_NAME_GSI, FileStatus, AuditTableKeys
8-
9-
10-
def get_next_queued_file_details(queue_name: str) -> Union[dict, None]:
11-
"""
12-
Checks for queued files.
13-
Returns a dictionary containing the details of the oldest queued file, or returns None if no queued files are found.
14-
"""
15-
queued_files_found_in_audit_table: dict = dynamodb_resource.Table(AUDIT_TABLE_NAME).query(
16-
IndexName=AUDIT_TABLE_QUEUE_NAME_GSI,
17-
KeyConditionExpression=Key(AuditTableKeys.QUEUE_NAME).eq(queue_name)
18-
& Key(AuditTableKeys.STATUS).eq(FileStatus.QUEUED),
19-
)
20-
21-
queued_files_details: list = queued_files_found_in_audit_table["Items"]
22-
23-
# Return the oldest queued file
24-
return sorted(queued_files_details, key=lambda x: x["timestamp"])[0] if queued_files_details else None
5+
from constants import AUDIT_TABLE_NAME, FileStatus, AuditTableKeys
256

267

278
def change_audit_table_status_to_processed(file_key: str, message_id: str) -> None:

ack_backend/src/clients.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
REGION_NAME = "eu-west-2"
77

88
firehose_client = boto3_client("firehose", region_name=REGION_NAME)
9-
lambda_client = boto3_client('lambda', region_name=REGION_NAME)
109
dynamodb_client = boto3_client("dynamodb", region_name=REGION_NAME)
1110

1211
dynamodb_resource = boto3_resource("dynamodb", region_name=REGION_NAME)

ack_backend/src/constants.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
import os
44

55
AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
6-
FILE_NAME_PROC_LAMBDA_NAME = os.getenv("FILE_NAME_PROC_LAMBDA_NAME")
7-
AUDIT_TABLE_FILENAME_GSI = "filename_index"
8-
AUDIT_TABLE_QUEUE_NAME_GSI = "queue_name_index"
96

107
def get_source_bucket_name() -> str:
118
"""Get the SOURCE_BUCKET_NAME environment from environment variables."""

ack_backend/src/update_ack_file.py

Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
"""Functions for uploading the data to the ack file"""
22

3-
import json
43
from io import StringIO, BytesIO
54
from typing import Union, Optional
65
from botocore.exceptions import ClientError
7-
from constants import ACK_HEADERS, get_source_bucket_name, get_ack_bucket_name, FILE_NAME_PROC_LAMBDA_NAME
8-
from audit_table import change_audit_table_status_to_processed, get_next_queued_file_details
9-
from clients import get_s3_client, logger, lambda_client
6+
from constants import ACK_HEADERS, get_source_bucket_name, get_ack_bucket_name
7+
from audit_table import change_audit_table_status_to_processed
8+
from clients import get_s3_client, logger
109
from utils_for_ack_lambda import get_row_count
1110
from logging_decorators import upload_ack_file_logging_decorator
1211

@@ -96,12 +95,9 @@ def upload_ack_file(
9695
move_file(ack_bucket_name, temp_ack_file_key, archive_ack_file_key)
9796
move_file(source_bucket_name, f"processing/{file_key}", f"archive/{file_key}")
9897

99-
# Update the audit table and invoke the filename lambda with next file in the queue (if one exists)
98+
# Update the audit table
10099
change_audit_table_status_to_processed(file_key, message_id)
101-
supplier_queue = f"{supplier}_{vaccine_type}"
102-
next_queued_file_details = get_next_queued_file_details(supplier_queue)
103-
if next_queued_file_details:
104-
invoke_filename_lambda(next_queued_file_details["filename"], next_queued_file_details["message_id"])
100+
105101
# Ingestion of this file is complete
106102
result = {
107103
"message_id": message_id,
@@ -149,27 +145,3 @@ def move_file(bucket_name: str, source_file_key: str, destination_file_key: str)
149145
)
150146
s3_client.delete_object(Bucket=bucket_name, Key=source_file_key)
151147
logger.info("File moved from %s to %s", source_file_key, destination_file_key)
152-
153-
154-
def invoke_filename_lambda(file_key: str, message_id: str) -> None:
155-
"""Invokes the filenameprocessor lambda with the given file key and message id"""
156-
try:
157-
lambda_payload = {
158-
"Records": [
159-
{"s3":
160-
{
161-
"bucket": {
162-
"name": get_source_bucket_name()
163-
},
164-
"object": {"key": file_key}
165-
},
166-
"message_id": message_id
167-
}
168-
]
169-
}
170-
lambda_client.invoke(
171-
FunctionName=FILE_NAME_PROC_LAMBDA_NAME, InvocationType="Event", Payload=json.dumps(lambda_payload)
172-
)
173-
except Exception as error:
174-
logger.error("Error invoking filename lambda: %s", error)
175-
raise

ack_backend/tests/test_audit_table.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,38 +8,13 @@ class TestAuditTable(unittest.TestCase):
88
def setUp(self):
99
self.logger_patcher = patch('audit_table.logger')
1010
self.mock_logger = self.logger_patcher.start()
11-
self.dynamodb_resource_patcher = patch('audit_table.dynamodb_resource')
12-
self.mock_dynamodb_resource = self.dynamodb_resource_patcher.start()
1311
self.dynamodb_client_patcher = patch('audit_table.dynamodb_client')
1412
self.mock_dynamodb_client = self.dynamodb_client_patcher.start()
1513

1614
def tearDown(self):
1715
self.logger_patcher.stop()
18-
self.dynamodb_resource_patcher.stop()
1916
self.dynamodb_client_patcher.stop()
2017

21-
def test_get_next_queued_file_details_returns_oldest(self):
22-
# Arrange
23-
mock_table = MagicMock()
24-
self.mock_dynamodb_resource.Table.return_value = mock_table
25-
mock_table.query.return_value = {
26-
"Items": [
27-
{"timestamp": 2, "my-key": "value2"},
28-
{"timestamp": 1, "my-key": "value1"},
29-
]
30-
}
31-
# Act
32-
result = audit_table.get_next_queued_file_details("queue1")
33-
# Assert
34-
self.assertEqual(result, {"timestamp": 1, "my-key": "value1"})
35-
36-
def test_get_next_queued_file_details_returns_none_if_empty(self):
37-
mock_table = MagicMock()
38-
self.mock_dynamodb_resource.Table.return_value = mock_table
39-
mock_table.query.return_value = {"Items": []}
40-
result = audit_table.get_next_queued_file_details("queue1")
41-
self.assertIsNone(result)
42-
4318
def test_change_audit_table_status_to_processed_success(self):
4419
# Should not raise
4520
self.mock_dynamodb_client.update_item.return_value = {}

ack_backend/tests/test_splunk_logging.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def extract_all_call_args_for_logger_error(self, mock_logger):
8585

8686
def expected_lambda_handler_logs(self, success: bool, number_of_rows, ingestion_complete=False, diagnostics=None):
8787
"""Returns the expected logs for the lambda handler function."""
88-
# Mocking of timings is such that the time taken is 2 seconds for each row,
88+
# Mocking of timings is such that the time taken is 2 seconds for each row,
8989
# plus 2 seconds for the handler if it succeeds (i.e. it calls update_ack_file) or 1 second if it doesn't;
9090
# plus an extra second if ingestion is complete
9191
if success:
@@ -321,8 +321,6 @@ def test_splunk_update_ack_file_not_logged(self):
321321
patch("logging_decorators.send_log_to_firehose") as mock_send_log_to_firehose,
322322
patch("logging_decorators.logger") as mock_logger,
323323
patch("update_ack_file.change_audit_table_status_to_processed") as mock_change_audit_table_status_to_processed,
324-
patch("update_ack_file.get_next_queued_file_details"),
325-
patch("update_ack_file.invoke_filename_lambda"),
326324
):
327325
result = lambda_handler(generate_event(messages), context={})
328326

@@ -362,8 +360,6 @@ def test_splunk_update_ack_file_logged(self):
362360
patch("logging_decorators.send_log_to_firehose") as mock_send_log_to_firehose,
363361
patch("logging_decorators.logger") as mock_logger,
364362
patch("update_ack_file.change_audit_table_status_to_processed") as mock_change_audit_table_status_to_processed,
365-
patch("update_ack_file.get_next_queued_file_details"),
366-
patch("update_ack_file.invoke_filename_lambda"),
367363
):
368364
result = lambda_handler(generate_event(messages), context={})
369365

ack_backend/tests/test_update_ack_file.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
import unittest
44
import os
5-
from unittest.mock import patch
6-
from io import StringIO
75
from boto3 import client as boto3_client
86
from moto import mock_s3
97

ack_backend/tests/test_update_ack_file_flow.py

Lines changed: 1 addition & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
import unittest
2-
from unittest.mock import patch, MagicMock, call
1+
from unittest.mock import patch
32
from io import StringIO
43

54
import update_ack_file
6-
from update_ack_file import invoke_filename_lambda
75
import unittest
86
import boto3
97

@@ -42,22 +40,10 @@ def setUp(self):
4240
self.change_audit_status_patcher = patch('update_ack_file.change_audit_table_status_to_processed')
4341
self.mock_change_audit_status = self.change_audit_status_patcher.start()
4442

45-
self.get_next_queued_file_details_patcher = patch('update_ack_file.get_next_queued_file_details')
46-
self.mock_get_next_queued_file_details = self.get_next_queued_file_details_patcher.start()
47-
48-
self.invoke_filename_lambda_patcher = patch('update_ack_file.invoke_filename_lambda')
49-
self.mock_invoke_filename_lambda = self.invoke_filename_lambda_patcher.start()
50-
51-
self.lambda_client_patcher = patch('update_ack_file.lambda_client')
52-
self.mock_lambda_client = self.lambda_client_patcher.start()
53-
5443
def tearDown(self):
5544
self.logger_patcher.stop()
5645
self.get_row_count_patcher.stop()
5746
self.change_audit_status_patcher.stop()
58-
self.get_next_queued_file_details_patcher.stop()
59-
self.invoke_filename_lambda_patcher.stop()
60-
self.lambda_client_patcher.stop()
6147

6248
def test_audit_table_updated_correctly(self):
6349
""" VED-167 - Test that the audit table has been updated correctly"""
@@ -116,39 +102,3 @@ def test_move_file(self):
116102

117103
# Logger assertion (if logger is mocked)
118104
self.mock_logger.info.assert_called_with("File moved from %s to %s", file_key, dest_key)
119-
120-
def test_next_queued_file_triggers_lambda(self):
121-
""" VED-167 Test that the next queued file details are used to re-invoke the lambda."""
122-
# Setup
123-
self.mock_get_row_count.side_effect = [3, 3]
124-
next_file = "next_for_lambda.csv"
125-
next_message_id = "msg-next-lambda"
126-
supplier = "lambda-trigger-supplier"
127-
vaccine_type = "vaccine-type"
128-
queue_name = f"{supplier}_{vaccine_type}"
129-
self.mock_get_next_queued_file_details.return_value = {"filename": next_file, "message_id": next_message_id}
130-
accumulated_csv_content = StringIO("header1|header2\n")
131-
ack_data_rows = [
132-
{"a": 1, "b": 2, "row": "lambda1"},
133-
{"a": 3, "b": 4, "row": "lambda2"},
134-
{"a": 5, "b": 6, "row": "lambda3"}
135-
]
136-
next_key="next_lambda_test.csv"
137-
self.s3_client.put_object(
138-
Bucket=self.source_bucket_name,
139-
Key=f"processing/{next_key}",
140-
Body="dummy content"
141-
)
142-
# Act
143-
update_ack_file.upload_ack_file(
144-
temp_ack_file_key=f"TempAck/{next_key}",
145-
message_id="msg-lambda-trigger",
146-
supplier=supplier,
147-
vaccine_type=vaccine_type,
148-
accumulated_csv_content=accumulated_csv_content,
149-
ack_data_rows=ack_data_rows,
150-
archive_ack_file_key=f"forwardedFile/{next_key}",
151-
file_key=next_key
152-
)
153-
# Assert: Check that the next queued file was used to re-invoke the lambda
154-
self.mock_get_next_queued_file_details.assert_called_once_with(queue_name)

0 commit comments

Comments
 (0)