Skip to content

Commit 197207e

Browse files
committed
Refactored filename processor for new DB statuses and error handling
1 parent 50292e2 commit 197207e

File tree

8 files changed

+102
-71
lines changed

8 files changed

+102
-71
lines changed

filenameprocessor/src/audit_table.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Add the filename to the audit table and check for duplicates."""
2+
from typing import Optional
23
from clients import dynamodb_client, logger
34
from errors import UnhandledAuditTableError
45
from constants import AUDIT_TABLE_NAME, AuditTableKeys
@@ -10,23 +11,29 @@ def upsert_audit_table(
1011
created_at_formatted_str: str,
1112
expiry_timestamp: int,
1213
queue_name: str,
13-
file_status: str
14+
file_status: str,
15+
error_details: Optional[str] = None
1416
) -> None:
1517
"""
1618
Updates the audit table with the file details
1719
"""
20+
audit_item = {
21+
AuditTableKeys.MESSAGE_ID: {"S": message_id},
22+
AuditTableKeys.FILENAME: {"S": file_key},
23+
AuditTableKeys.QUEUE_NAME: {"S": queue_name},
24+
AuditTableKeys.STATUS: {"S": file_status},
25+
AuditTableKeys.TIMESTAMP: {"S": created_at_formatted_str},
26+
AuditTableKeys.EXPIRES_AT: {"N": str(expiry_timestamp)}
27+
}
28+
29+
if error_details is not None:
30+
audit_item[AuditTableKeys.ERROR_DETAILS] = {"S": error_details}
31+
1832
try:
1933
# Add to the audit table (regardless of whether it is a duplicate)
2034
dynamodb_client.put_item(
2135
TableName=AUDIT_TABLE_NAME,
22-
Item={
23-
AuditTableKeys.MESSAGE_ID: {"S": message_id},
24-
AuditTableKeys.FILENAME: {"S": file_key},
25-
AuditTableKeys.QUEUE_NAME: {"S": queue_name},
26-
AuditTableKeys.STATUS: {"S": file_status},
27-
AuditTableKeys.TIMESTAMP: {"S": created_at_formatted_str},
28-
AuditTableKeys.EXPIRES_AT: {"N": str(expiry_timestamp)},
29-
},
36+
Item=audit_item,
3037
ConditionExpression="attribute_not_exists(message_id)", # Prevents accidental overwrites
3138
)
3239
logger.info("%s file, with message id %s, successfully added to audit table", file_key, message_id)

filenameprocessor/src/constants.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66
from errors import (
77
VaccineTypePermissionsError,
88
InvalidFileKeyError,
9-
InvalidSupplierError,
109
UnhandledAuditTableError,
11-
DuplicateFileError,
1210
UnhandledSqsError,
1311
)
1412

@@ -24,9 +22,7 @@
2422
ERROR_TYPE_TO_STATUS_CODE_MAP = {
2523
VaccineTypePermissionsError: 403,
2624
InvalidFileKeyError: 400, # Includes invalid ODS code, therefore unable to identify supplier
27-
InvalidSupplierError: 500, # Only raised if supplier variable is not correctly set
2825
UnhandledAuditTableError: 500,
29-
DuplicateFileError: 422,
3026
UnhandledSqsError: 500,
3127
Exception: 500,
3228
}
@@ -38,7 +34,14 @@ class FileStatus(StrEnum):
3834
QUEUED = "Queued"
3935
PROCESSING = "Processing"
4036
PROCESSED = "Processed"
41-
DUPLICATE = "Not processed - duplicate"
37+
NOT_PROCESSED = "Not processed"
38+
FAILED = "Failed"
39+
40+
41+
class FileNotProcessedReason(StrEnum):
42+
"""Reasons why a file was not processed"""
43+
UNAUTHORISED = "Unauthorised"
44+
INVALID_FILENAME = "Invalid filename"
4245

4346

4447
class AuditTableKeys(StrEnum):
@@ -50,3 +53,4 @@ class AuditTableKeys(StrEnum):
5053
STATUS = "status"
5154
TIMESTAMP = "timestamp"
5255
EXPIRES_AT = "expires_at"
56+
ERROR_DETAILS = "error_details"

filenameprocessor/src/errors.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
"""Custom exceptions for the Filename Processor."""
22

33

4-
class DuplicateFileError(Exception):
5-
"""A custom exception for when it is identified that the file is a duplicate."""
6-
7-
84
class UnhandledAuditTableError(Exception):
95
"""A custom exception for when an unexpected error occurs whilst adding the file to the audit table."""
106

@@ -17,9 +13,5 @@ class InvalidFileKeyError(Exception):
1713
"""A custom exception for when the file key is invalid."""
1814

1915

20-
class InvalidSupplierError(Exception):
21-
"""A custom exception for when the supplier has not been correctly identified."""
22-
23-
2416
class UnhandledSqsError(Exception):
2517
"""A custom exception for when an unexpected error occurs whilst sending a message to SQS."""

filenameprocessor/src/file_name_processor.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@
1919
from errors import (
2020
VaccineTypePermissionsError,
2121
InvalidFileKeyError,
22-
InvalidSupplierError,
2322
UnhandledAuditTableError,
2423
UnhandledSqsError,
2524
)
26-
from constants import FileStatus, ERROR_TYPE_TO_STATUS_CODE_MAP, SOURCE_BUCKET_NAME
25+
from constants import FileNotProcessedReason, FileStatus, ERROR_TYPE_TO_STATUS_CODE_MAP, SOURCE_BUCKET_NAME
2726

2827

2928
# NOTE: logging_decorator is applied to handle_record function, rather than lambda_handler, because
@@ -69,12 +68,12 @@ def handle_record(record) -> dict:
6968
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)
7069

7170
queue_name = f"{supplier}_{vaccine_type}"
72-
upsert_audit_table(
73-
message_id, file_key, created_at_formatted_string, expiry_timestamp, queue_name, FileStatus.QUEUED
74-
)
7571
make_and_send_sqs_message(
7672
file_key, message_id, permissions, vaccine_type, supplier, created_at_formatted_string
7773
)
74+
upsert_audit_table(
75+
message_id, file_key, created_at_formatted_string, expiry_timestamp, queue_name, FileStatus.QUEUED
76+
)
7877

7978
logger.info("Lambda invocation successful for file '%s'", file_key)
8079

@@ -91,16 +90,17 @@ def handle_record(record) -> dict:
9190
except ( # pylint: disable=broad-exception-caught
9291
VaccineTypePermissionsError,
9392
InvalidFileKeyError,
94-
InvalidSupplierError,
9593
UnhandledAuditTableError,
9694
UnhandledSqsError,
9795
Exception,
9896
) as error:
9997
logger.error("Error processing file '%s': %s", file_key, str(error))
10098

10199
queue_name = f"{supplier}_{vaccine_type}"
100+
file_status = get_file_status_for_error(error)
101+
102102
upsert_audit_table(
103-
message_id, file_key, created_at_formatted_string, expiry_timestamp, queue_name, FileStatus.PROCESSED
103+
message_id, file_key, created_at_formatted_string, expiry_timestamp, queue_name, file_status, error_details=str(error)
104104
)
105105

106106
# Create ack file
@@ -122,6 +122,18 @@ def handle_record(record) -> dict:
122122
}
123123

124124

125+
def get_file_status_for_error(error: Exception) -> str:
126+
"""Creates a file status based on the type of error that was thrown"""
127+
if not isinstance(error, VaccineTypePermissionsError) and not isinstance(error, InvalidFileKeyError):
128+
return FileStatus.FAILED
129+
130+
if isinstance(error, VaccineTypePermissionsError):
131+
# TODO - come back and clarify behaviour for client errors
132+
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"
133+
134+
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.INVALID_FILENAME}"
135+
136+
125137
def handle_unexpected_bucket_name(bucket_name: str, file_key: str) -> dict:
126138
"""Handles scenario where Lambda was not invoked by the data-sources bucket. Should not occur due to terraform
127139
config and overarching design"""

filenameprocessor/src/send_sqs_message.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,11 @@
33
import os
44
from json import dumps as json_dumps
55
from clients import sqs_client, logger
6-
from errors import InvalidSupplierError, UnhandledSqsError
6+
from errors import UnhandledSqsError
77

88

99
def send_to_supplier_queue(message_body: dict, vaccine_type: str, supplier: str) -> None:
1010
"""Sends a message to the supplier queue. Raises an exception if the message is not successfully sent."""
11-
# Check the supplier has been identified (this should already have been validated by initial file validation)
12-
if not supplier or not vaccine_type:
13-
error_message = "Message not sent to supplier queue as unable to identify supplier and/ or vaccine type"
14-
logger.error(error_message)
15-
raise InvalidSupplierError(error_message)
16-
1711
try:
1812
queue_url = os.getenv("QUEUE_URL")
1913
sqs_client.send_message(

filenameprocessor/tests/test_lambda_handler.py

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
from boto3 import client as boto3_client
1111
from moto import mock_s3, mock_sqs, mock_firehose, mock_dynamodb
1212

13+
from errors import UnhandledSqsError
1314
from tests.utils_for_tests.generic_setup_and_teardown import GenericSetUp, GenericTearDown
1415
from tests.utils_for_tests.utils_for_filenameprocessor_tests import (
15-
add_entry_to_table,
1616
assert_audit_table_entry,
1717
create_mock_hget,
1818
MOCK_ODS_CODE_TO_SUPPLIER
@@ -216,8 +216,8 @@ def test_lambda_handler_non_root_file(self):
216216

217217
def test_lambda_invalid_file_key_no_other_files_in_queue(self):
218218
"""
219-
Tests that when the file_key is invalid, and there are no other files in the supplier_vaccineType queue:
220-
* The file is added to the audit table with a status of 'Processed'
219+
Tests that when the file_key is invalid:
220+
* The file is added to the audit table with a status of 'Not processed - Invalid filename'
221221
* The message is not sent to SQS
222222
* The failure inf_ack file is created
223223
"""
@@ -241,7 +241,8 @@ def test_lambda_invalid_file_key_no_other_files_in_queue(self):
241241
"message_id": {"S": file_details.message_id},
242242
"filename": {"S": file_details.file_key},
243243
"queue_name": {"S": "unknown_unknown"},
244-
"status": {"S": "Processed"},
244+
"status": {"S": "Not processed - Invalid filename"},
245+
"error_details": {"S": "Initial file validation failed: invalid file key"},
245246
"timestamp": {"S": file_details.created_at_formatted_string},
246247
"expires_at": {"N": str(file_details.expires_at)},
247248
}
@@ -251,30 +252,72 @@ def test_lambda_invalid_file_key_no_other_files_in_queue(self):
251252
self.assert_ack_file_contents(file_details)
252253
self.assert_no_sqs_message()
253254

254-
def test_lambda_invalid_permissions_other_files_in_queue(self):
255+
def test_lambda_invalid_permissions(self):
255256
"""
256-
Tests that when the file permissions are invalid, and there are other files in the supplier_vaccineType queue:
257-
* The file is added to the audit table with a status of 'Processed'
257+
Tests that when the file permissions are invalid:
258+
* The file is added to the audit table with a status of 'Not processed - Unauthorised'
258259
* The message is not sent to SQS
259260
* The failure inf_ack file is created
260261
"""
261262
file_details = MockFileDetails.ravs_rsv_1
262263
s3_client.put_object(Bucket=BucketNames.SOURCE, Key=file_details.file_key)
263264

264-
queued_file_details = MockFileDetails.ravs_rsv_2
265-
add_entry_to_table(queued_file_details, FileStatus.QUEUED)
266-
267265
# Mock the supplier permissions with a value which doesn't include the requested Flu permissions
268266
mock_hget = create_mock_hget({"X8E5B": "RAVS"}, {})
269267
with ( # noqa: E999
270268
patch("file_name_processor.uuid4", return_value=file_details.message_id), # noqa: E999
271269
patch("elasticache.redis_client.hget", side_effect=mock_hget), # noqa: E999
272270
): # noqa: E999
273271
lambda_handler(self.make_event([self.make_record(file_details.file_key)]), None)
274-
assert_audit_table_entry(file_details, FileStatus.PROCESSED)
272+
273+
expected_table_items = [
274+
{
275+
"message_id": {"S": file_details.message_id},
276+
"filename": {"S": file_details.file_key},
277+
"queue_name": {"S": "RAVS_RSV"},
278+
"status": {"S": "Not processed - Unauthorised"},
279+
"error_details": {"S": "Initial file validation failed: RAVS does not have permissions for RSV"},
280+
"timestamp": {"S": file_details.created_at_formatted_string},
281+
"expires_at": {"N": str(file_details.expires_at)}
282+
}
283+
]
284+
self.assertEqual(self.get_audit_table_items(), expected_table_items)
275285
self.assert_no_sqs_message()
276286
self.assert_ack_file_contents(file_details)
277287

288+
def test_lambda_adds_event_to_audit_table_as_failed_when_unexpected_exception_is_caught(self):
289+
"""
290+
Tests that when an unexpected error occurs e.g. sending to SQS (maybe in case of bad deployment):
291+
* The file is added to the audit table with a status of 'Failed' and the reason
292+
* The message is not sent to SQS
293+
* The failure inf_ack file is created
294+
"""
295+
test_file_details = MockFileDetails.emis_flu
296+
s3_client.put_object(Bucket=BucketNames.SOURCE, Key=test_file_details.file_key)
297+
298+
with ( # noqa: E999
299+
patch("file_name_processor.uuid4", return_value=test_file_details.message_id), # noqa: E999
300+
patch("file_name_processor.make_and_send_sqs_message", side_effect=UnhandledSqsError(
301+
"Some client error with SQS"
302+
))
303+
): # noqa: E999
304+
lambda_handler(self.make_event([self.make_record(test_file_details.file_key)]), None)
305+
306+
expected_table_items = [
307+
{
308+
"message_id": {"S": test_file_details.message_id},
309+
"filename": {"S": test_file_details.file_key},
310+
"queue_name": {"S": "EMIS_FLU"},
311+
"status": {"S": "Failed"},
312+
"error_details": {"S": "Some client error with SQS"},
313+
"timestamp": {"S": test_file_details.created_at_formatted_string},
314+
"expires_at": {"N": str(test_file_details.expires_at)},
315+
}
316+
]
317+
self.assertEqual(self.get_audit_table_items(), expected_table_items)
318+
self.assert_ack_file_contents(test_file_details)
319+
self.assert_no_sqs_message()
320+
278321

279322
@patch.dict("os.environ", MOCK_ENVIRONMENT_DICT)
280323
@mock_s3

filenameprocessor/tests/test_send_sqs_message.py

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Tests for send_sqs_message functions"""
22

33
from unittest import TestCase
4-
from unittest.mock import patch, MagicMock
4+
from unittest.mock import patch
55
from json import loads as json_loads
66
from copy import deepcopy
77
from moto import mock_sqs
@@ -13,7 +13,7 @@
1313
# Ensure environment variables are mocked before importing from src files
1414
with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT):
1515
from send_sqs_message import send_to_supplier_queue, make_and_send_sqs_message
16-
from errors import UnhandledSqsError, InvalidSupplierError
16+
from errors import UnhandledSqsError
1717
from clients import REGION_NAME
1818

1919
sqs_client = boto3_client("sqs", region_name=REGION_NAME)
@@ -76,27 +76,6 @@ def test_send_to_supplier_queue_failure_due_to_queue_does_not_exist(self):
7676
)
7777
self.assertEqual(NON_EXISTENT_QUEUE_ERROR_MESSAGE, str(context.exception))
7878

79-
def test_send_to_supplier_queue_failure_due_to_absent_supplier_or_vaccine_type(self):
80-
"""Test send_to_supplier_queue function for a failed message send"""
81-
# Set up the sqs_queue
82-
sqs_client.create_queue(QueueName=Sqs.QUEUE_NAME, Attributes=Sqs.ATTRIBUTES)
83-
expected_error_message = (
84-
"Message not sent to supplier queue as unable to identify supplier and/ or vaccine type"
85-
)
86-
87-
keys_to_set_to_empty = ["supplier", "vaccine_type"]
88-
for key_to_set_to_empty in keys_to_set_to_empty:
89-
with self.subTest(f"{key_to_set_to_empty} set to empty string"):
90-
mock_sqs_client = MagicMock()
91-
with patch("send_sqs_message.sqs_client", mock_sqs_client):
92-
with self.assertRaises(InvalidSupplierError) as context:
93-
message_body = {**FLU_EMIS_FILE_DETAILS.sqs_message_body, key_to_set_to_empty: ""}
94-
vaccine_type = message_body["vaccine_type"]
95-
supplier = message_body["supplier"]
96-
send_to_supplier_queue(message_body, supplier, vaccine_type)
97-
self.assertEqual(str(context.exception), expected_error_message)
98-
mock_sqs_client.send_message.assert_not_called()
99-
10079
def test_make_and_send_sqs_message_success(self):
10180
"""Test make_and_send_sqs_message function for a successful message send"""
10281
# Create a mock SQS queue

filenameprocessor/tests/utils_for_tests/utils_for_filenameprocessor_tests.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def add_entry_to_table(file_details: MockFileDetails, file_status: FileStatus) -
3939
dynamodb_client.put_item(TableName=AUDIT_TABLE_NAME, Item=audit_table_entry)
4040

4141

42-
def assert_audit_table_entry(file_details: FileDetails, expected_status: FileStatus) -> None:
42+
def assert_audit_table_entry(file_details: FileDetails, expected_status: str) -> None:
4343
"""Assert that the file details are in the audit table"""
4444
table_entry = dynamodb_client.get_item(
4545
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": file_details.message_id}}

0 commit comments

Comments
 (0)