Skip to content

Commit 5f90844

Browse files
committed
Refactored filename processor for new DB statuses and error handling
1 parent 780ac5a commit 5f90844

File tree

8 files changed

+99
-70
lines changed

8 files changed

+99
-70
lines changed

filenameprocessor/src/audit_table.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Add the filename to the audit table and check for duplicates."""
2+
from typing import Optional
23
from clients import dynamodb_client, logger
34
from errors import UnhandledAuditTableError
45
from constants import AUDIT_TABLE_NAME, AuditTableKeys
@@ -9,22 +10,28 @@ def upsert_audit_table(
910
file_key: str,
1011
created_at_formatted_str: str,
1112
queue_name: str,
12-
file_status: str
13+
file_status: str,
14+
error_details: Optional[str] = None
1315
) -> None:
1416
"""
1517
Updates the audit table with the file details
1618
"""
19+
audit_item = {
20+
AuditTableKeys.MESSAGE_ID: {"S": message_id},
21+
AuditTableKeys.FILENAME: {"S": file_key},
22+
AuditTableKeys.QUEUE_NAME: {"S": queue_name},
23+
AuditTableKeys.STATUS: {"S": file_status},
24+
AuditTableKeys.TIMESTAMP: {"S": created_at_formatted_str},
25+
}
26+
27+
if error_details is not None:
28+
audit_item[AuditTableKeys.ERROR_DETAILS] = {"S": error_details}
29+
1730
try:
1831
# Add to the audit table (regardless of whether it is a duplicate)
1932
dynamodb_client.put_item(
2033
TableName=AUDIT_TABLE_NAME,
21-
Item={
22-
AuditTableKeys.MESSAGE_ID: {"S": message_id},
23-
AuditTableKeys.FILENAME: {"S": file_key},
24-
AuditTableKeys.QUEUE_NAME: {"S": queue_name},
25-
AuditTableKeys.STATUS: {"S": file_status},
26-
AuditTableKeys.TIMESTAMP: {"S": created_at_formatted_str},
27-
},
34+
Item=audit_item,
2835
ConditionExpression="attribute_not_exists(message_id)", # Prevents accidental overwrites
2936
)
3037
logger.info("%s file, with message id %s, successfully added to audit table", file_key, message_id)

filenameprocessor/src/constants.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66
from errors import (
77
VaccineTypePermissionsError,
88
InvalidFileKeyError,
9-
InvalidSupplierError,
109
UnhandledAuditTableError,
11-
DuplicateFileError,
1210
UnhandledSqsError,
1311
)
1412

@@ -23,9 +21,7 @@
2321
ERROR_TYPE_TO_STATUS_CODE_MAP = {
2422
VaccineTypePermissionsError: 403,
2523
InvalidFileKeyError: 400, # Includes invalid ODS code, therefore unable to identify supplier
26-
InvalidSupplierError: 500, # Only raised if supplier variable is not correctly set
2724
UnhandledAuditTableError: 500,
28-
DuplicateFileError: 422,
2925
UnhandledSqsError: 500,
3026
Exception: 500,
3127
}
@@ -37,7 +33,14 @@ class FileStatus(StrEnum):
3733
QUEUED = "Queued"
3834
PROCESSING = "Processing"
3935
PROCESSED = "Processed"
40-
DUPLICATE = "Not processed - duplicate"
36+
NOT_PROCESSED = "Not processed"
37+
FAILED = "Failed"
38+
39+
40+
class FileNotProcessedReason(StrEnum):
41+
"""Reasons why a file was not processed"""
42+
UNAUTHORISED = "Unauthorised"
43+
INVALID_FILENAME = "Invalid filename"
4144

4245

4346
class AuditTableKeys(StrEnum):
@@ -48,3 +51,4 @@ class AuditTableKeys(StrEnum):
4851
QUEUE_NAME = "queue_name"
4952
STATUS = "status"
5053
TIMESTAMP = "timestamp"
54+
ERROR_DETAILS = "error_details"

filenameprocessor/src/errors.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
"""Custom exceptions for the Filename Processor."""
22

33

4-
class DuplicateFileError(Exception):
5-
"""A custom exception for when it is identified that the file is a duplicate."""
6-
7-
84
class UnhandledAuditTableError(Exception):
95
"""A custom exception for when an unexpected error occurs whilst adding the file to the audit table."""
106

@@ -17,9 +13,5 @@ class InvalidFileKeyError(Exception):
1713
"""A custom exception for when the file key is invalid."""
1814

1915

20-
class InvalidSupplierError(Exception):
21-
"""A custom exception for when the supplier has not been correctly identified."""
22-
23-
2416
class UnhandledSqsError(Exception):
2517
"""A custom exception for when an unexpected error occurs whilst sending a message to SQS."""

filenameprocessor/src/file_name_processor.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@
1919
from errors import (
2020
VaccineTypePermissionsError,
2121
InvalidFileKeyError,
22-
InvalidSupplierError,
2322
UnhandledAuditTableError,
2423
UnhandledSqsError,
2524
)
26-
from constants import FileStatus, ERROR_TYPE_TO_STATUS_CODE_MAP, SOURCE_BUCKET_NAME
25+
from constants import FileNotProcessedReason, FileStatus, ERROR_TYPE_TO_STATUS_CODE_MAP, SOURCE_BUCKET_NAME
2726

2827

2928
# NOTE: logging_decorator is applied to handle_record function, rather than lambda_handler, because
@@ -69,12 +68,12 @@ def handle_record(record) -> dict:
6968
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)
7069

7170
queue_name = f"{supplier}_{vaccine_type}"
72-
upsert_audit_table(
73-
message_id, file_key, created_at_formatted_string, queue_name, FileStatus.QUEUED
74-
)
7571
make_and_send_sqs_message(
7672
file_key, message_id, permissions, vaccine_type, supplier, created_at_formatted_string
7773
)
74+
upsert_audit_table(
75+
message_id, file_key, created_at_formatted_string, queue_name, FileStatus.QUEUED
76+
)
7877

7978
logger.info("Lambda invocation successful for file '%s'", file_key)
8079

@@ -91,16 +90,17 @@ def handle_record(record) -> dict:
9190
except ( # pylint: disable=broad-exception-caught
9291
VaccineTypePermissionsError,
9392
InvalidFileKeyError,
94-
InvalidSupplierError,
9593
UnhandledAuditTableError,
9694
UnhandledSqsError,
9795
Exception,
9896
) as error:
9997
logger.error("Error processing file '%s': %s", file_key, str(error))
10098

10199
queue_name = f"{supplier}_{vaccine_type}"
100+
file_status = get_file_status_for_error(error)
101+
102102
upsert_audit_table(
103-
message_id, file_key, created_at_formatted_string, queue_name, FileStatus.PROCESSED
103+
message_id, file_key, created_at_formatted_string, queue_name, file_status, error_details=str(error)
104104
)
105105

106106
# Create ack file
@@ -122,6 +122,18 @@ def handle_record(record) -> dict:
122122
}
123123

124124

125+
def get_file_status_for_error(error: Exception) -> str:
126+
"""Creates a file status based on the type of error that was thrown"""
127+
if not isinstance(error, VaccineTypePermissionsError) and not isinstance(error, InvalidFileKeyError):
128+
return FileStatus.FAILED
129+
130+
if isinstance(error, VaccineTypePermissionsError):
131+
# TODO - come back and clarify behaviour for client errors
132+
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"
133+
134+
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.INVALID_FILENAME}"
135+
136+
125137
def handle_unexpected_bucket_name(bucket_name: str, file_key: str) -> dict:
126138
"""Handles scenario where Lambda was not invoked by the data-sources bucket. Should not occur due to terraform
127139
config and overarching design"""

filenameprocessor/src/send_sqs_message.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,11 @@
33
import os
44
from json import dumps as json_dumps
55
from clients import sqs_client, logger
6-
from errors import InvalidSupplierError, UnhandledSqsError
6+
from errors import UnhandledSqsError
77

88

99
def send_to_supplier_queue(message_body: dict, vaccine_type: str, supplier: str) -> None:
1010
"""Sends a message to the supplier queue. Raises an exception if the message is not successfully sent."""
11-
# Check the supplier has been identified (this should already have been validated by initial file validation)
12-
if not supplier or not vaccine_type:
13-
error_message = "Message not sent to supplier queue as unable to identify supplier and/ or vaccine type"
14-
logger.error(error_message)
15-
raise InvalidSupplierError(error_message)
16-
1711
try:
1812
queue_url = os.getenv("QUEUE_URL")
1913
sqs_client.send_message(

filenameprocessor/tests/test_lambda_handler.py

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
from boto3 import client as boto3_client
1111
from moto import mock_s3, mock_sqs, mock_firehose, mock_dynamodb
1212

13+
from errors import UnhandledSqsError
1314
from tests.utils_for_tests.generic_setup_and_teardown import GenericSetUp, GenericTearDown
1415
from tests.utils_for_tests.utils_for_filenameprocessor_tests import (
15-
add_entry_to_table,
1616
assert_audit_table_entry,
1717
create_mock_hget,
1818
MOCK_ODS_CODE_TO_SUPPLIER
@@ -215,8 +215,8 @@ def test_lambda_handler_non_root_file(self):
215215

216216
def test_lambda_invalid_file_key_no_other_files_in_queue(self):
217217
"""
218-
Tests that when the file_key is invalid, and there are no other files in the supplier_vaccineType queue:
219-
* The file is added to the audit table with a status of 'Processed'
218+
Tests that when the file_key is invalid:
219+
* The file is added to the audit table with a status of 'Not processed - Invalid filename'
220220
* The message is not sent to SQS
221221
* The failure inf_ack file is created
222222
"""
@@ -240,7 +240,8 @@ def test_lambda_invalid_file_key_no_other_files_in_queue(self):
240240
"message_id": {"S": file_details.message_id},
241241
"filename": {"S": file_details.file_key},
242242
"queue_name": {"S": "unknown_unknown"},
243-
"status": {"S": "Processed"},
243+
"status": {"S": "Not processed - Invalid filename"},
244+
"error_details": {"S": "Initial file validation failed: invalid file key"},
244245
"timestamp": {"S": file_details.created_at_formatted_string},
245246
}
246247
]
@@ -249,30 +250,70 @@ def test_lambda_invalid_file_key_no_other_files_in_queue(self):
249250
self.assert_ack_file_contents(file_details)
250251
self.assert_no_sqs_message()
251252

252-
def test_lambda_invalid_permissions_other_files_in_queue(self):
253+
def test_lambda_invalid_permissions(self):
253254
"""
254-
Tests that when the file permissions are invalid, and there are other files in the supplier_vaccineType queue:
255-
* The file is added to the audit table with a status of 'Processed'
255+
Tests that when the file permissions are invalid:
256+
* The file is added to the audit table with a status of 'Not processed - Unauthorised'
256257
* The message is not sent to SQS
257258
* The failure inf_ack file is created
258259
"""
259260
file_details = MockFileDetails.ravs_rsv_1
260261
s3_client.put_object(Bucket=BucketNames.SOURCE, Key=file_details.file_key)
261262

262-
queued_file_details = MockFileDetails.ravs_rsv_2
263-
add_entry_to_table(queued_file_details, FileStatus.QUEUED)
264-
265263
# Mock the supplier permissions with a value which doesn't include the requested Flu permissions
266264
mock_hget = create_mock_hget({"X8E5B": "RAVS"}, {})
267265
with ( # noqa: E999
268266
patch("file_name_processor.uuid4", return_value=file_details.message_id), # noqa: E999
269267
patch("elasticache.redis_client.hget", side_effect=mock_hget), # noqa: E999
270268
): # noqa: E999
271269
lambda_handler(self.make_event([self.make_record(file_details.file_key)]), None)
272-
assert_audit_table_entry(file_details, FileStatus.PROCESSED)
270+
271+
expected_table_items = [
272+
{
273+
"message_id": {"S": file_details.message_id},
274+
"filename": {"S": file_details.file_key},
275+
"queue_name": {"S": "RAVS_RSV"},
276+
"status": {"S": "Not processed - Unauthorised"},
277+
"error_details": {"S": "Initial file validation failed: RAVS does not have permissions for RSV"},
278+
"timestamp": {"S": file_details.created_at_formatted_string},
279+
}
280+
]
281+
self.assertEqual(self.get_audit_table_items(), expected_table_items)
273282
self.assert_no_sqs_message()
274283
self.assert_ack_file_contents(file_details)
275284

285+
def test_lambda_adds_event_to_audit_table_as_failed_when_unexpected_exception_is_caught(self):
286+
"""
287+
Tests that when an unexpected error occurs e.g. sending to SQS (maybe in case of bad deployment):
288+
* The file is added to the audit table with a status of 'Failed' and the reason
289+
* The message is not sent to SQS
290+
* The failure inf_ack file is created
291+
"""
292+
test_file_details = MockFileDetails.emis_flu
293+
s3_client.put_object(Bucket=BucketNames.SOURCE, Key=test_file_details.file_key)
294+
295+
with ( # noqa: E999
296+
patch("file_name_processor.uuid4", return_value=test_file_details.message_id), # noqa: E999
297+
patch("file_name_processor.make_and_send_sqs_message", side_effect=UnhandledSqsError(
298+
"Some client error with SQS"
299+
))
300+
): # noqa: E999
301+
lambda_handler(self.make_event([self.make_record(test_file_details.file_key)]), None)
302+
303+
expected_table_items = [
304+
{
305+
"message_id": {"S": test_file_details.message_id},
306+
"filename": {"S": test_file_details.file_key},
307+
"queue_name": {"S": "EMIS_FLU"},
308+
"status": {"S": "Failed"},
309+
"error_details": {"S": "Some client error with SQS"},
310+
"timestamp": {"S": test_file_details.created_at_formatted_string},
311+
}
312+
]
313+
self.assertEqual(self.get_audit_table_items(), expected_table_items)
314+
self.assert_ack_file_contents(test_file_details)
315+
self.assert_no_sqs_message()
316+
276317

277318
@patch.dict("os.environ", MOCK_ENVIRONMENT_DICT)
278319
@mock_s3

filenameprocessor/tests/test_send_sqs_message.py

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Tests for send_sqs_message functions"""
22

33
from unittest import TestCase
4-
from unittest.mock import patch, MagicMock
4+
from unittest.mock import patch
55
from json import loads as json_loads
66
from copy import deepcopy
77
from moto import mock_sqs
@@ -13,7 +13,7 @@
1313
# Ensure environment variables are mocked before importing from src files
1414
with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT):
1515
from send_sqs_message import send_to_supplier_queue, make_and_send_sqs_message
16-
from errors import UnhandledSqsError, InvalidSupplierError
16+
from errors import UnhandledSqsError
1717
from clients import REGION_NAME
1818

1919
sqs_client = boto3_client("sqs", region_name=REGION_NAME)
@@ -76,27 +76,6 @@ def test_send_to_supplier_queue_failure_due_to_queue_does_not_exist(self):
7676
)
7777
self.assertEqual(NON_EXISTENT_QUEUE_ERROR_MESSAGE, str(context.exception))
7878

79-
def test_send_to_supplier_queue_failure_due_to_absent_supplier_or_vaccine_type(self):
80-
"""Test send_to_supplier_queue function for a failed message send"""
81-
# Set up the sqs_queue
82-
sqs_client.create_queue(QueueName=Sqs.QUEUE_NAME, Attributes=Sqs.ATTRIBUTES)
83-
expected_error_message = (
84-
"Message not sent to supplier queue as unable to identify supplier and/ or vaccine type"
85-
)
86-
87-
keys_to_set_to_empty = ["supplier", "vaccine_type"]
88-
for key_to_set_to_empty in keys_to_set_to_empty:
89-
with self.subTest(f"{key_to_set_to_empty} set to empty string"):
90-
mock_sqs_client = MagicMock()
91-
with patch("send_sqs_message.sqs_client", mock_sqs_client):
92-
with self.assertRaises(InvalidSupplierError) as context:
93-
message_body = {**FLU_EMIS_FILE_DETAILS.sqs_message_body, key_to_set_to_empty: ""}
94-
vaccine_type = message_body["vaccine_type"]
95-
supplier = message_body["supplier"]
96-
send_to_supplier_queue(message_body, supplier, vaccine_type)
97-
self.assertEqual(str(context.exception), expected_error_message)
98-
mock_sqs_client.send_message.assert_not_called()
99-
10079
def test_make_and_send_sqs_message_success(self):
10180
"""Test make_and_send_sqs_message function for a successful message send"""
10281
# Create a mock SQS queue

filenameprocessor/tests/utils_for_tests/utils_for_filenameprocessor_tests.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def add_entry_to_table(file_details: MockFileDetails, file_status: FileStatus) -
3939
dynamodb_client.put_item(TableName=AUDIT_TABLE_NAME, Item=audit_table_entry)
4040

4141

42-
def assert_audit_table_entry(file_details: FileDetails, expected_status: FileStatus) -> None:
42+
def assert_audit_table_entry(file_details: FileDetails, expected_status: str) -> None:
4343
"""Assert that the file details are in the audit table"""
4444
table_entry = dynamodb_client.get_item(
4545
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": file_details.message_id}}

0 commit comments

Comments
 (0)