Skip to content

Commit d77f6ad

Browse files
committed
Refactored filename processor for new DB statuses and error handling
1 parent 31e80ed commit d77f6ad

File tree

8 files changed

+120
-75
lines changed

8 files changed

+120
-75
lines changed

filenameprocessor/src/audit_table.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Add the filename to the audit table and check for duplicates."""
2+
from typing import Optional
23
from clients import dynamodb_client, logger
34
from errors import UnhandledAuditTableError
45
from constants import AUDIT_TABLE_NAME, AuditTableKeys
@@ -10,23 +11,29 @@ def upsert_audit_table(
1011
created_at_formatted_str: str,
1112
expiry_timestamp: int,
1213
queue_name: str,
13-
file_status: str
14+
file_status: str,
15+
error_details: Optional[str] = None
1416
) -> None:
1517
"""
1618
Updates the audit table with the file details
1719
"""
20+
audit_item = {
21+
AuditTableKeys.MESSAGE_ID: {"S": message_id},
22+
AuditTableKeys.FILENAME: {"S": file_key},
23+
AuditTableKeys.QUEUE_NAME: {"S": queue_name},
24+
AuditTableKeys.STATUS: {"S": file_status},
25+
AuditTableKeys.TIMESTAMP: {"S": created_at_formatted_str},
26+
AuditTableKeys.EXPIRES_AT: {"N": str(expiry_timestamp)}
27+
}
28+
29+
if error_details is not None:
30+
audit_item[AuditTableKeys.ERROR_DETAILS] = {"S": error_details}
31+
1832
try:
1933
# Add to the audit table (regardless of whether it is a duplicate)
2034
dynamodb_client.put_item(
2135
TableName=AUDIT_TABLE_NAME,
22-
Item={
23-
AuditTableKeys.MESSAGE_ID: {"S": message_id},
24-
AuditTableKeys.FILENAME: {"S": file_key},
25-
AuditTableKeys.QUEUE_NAME: {"S": queue_name},
26-
AuditTableKeys.STATUS: {"S": file_status},
27-
AuditTableKeys.TIMESTAMP: {"S": created_at_formatted_str},
28-
AuditTableKeys.EXPIRES_AT: {"N": str(expiry_timestamp)},
29-
},
36+
Item=audit_item,
3037
ConditionExpression="attribute_not_exists(message_id)", # Prevents accidental overwrites
3138
)
3239
logger.info("%s file, with message id %s, successfully added to audit table", file_key, message_id)

filenameprocessor/src/constants.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,9 @@
66
from errors import (
77
VaccineTypePermissionsError,
88
InvalidFileKeyError,
9-
InvalidSupplierError,
109
UnhandledAuditTableError,
11-
DuplicateFileError,
12-
UnhandledSqsError, EmptyFileError,
10+
EmptyFileError,
11+
UnhandledSqsError
1312
)
1413

1514
SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME")
@@ -25,9 +24,7 @@
2524
VaccineTypePermissionsError: 403,
2625
InvalidFileKeyError: 400, # Includes invalid ODS code, therefore unable to identify supplier
2726
EmptyFileError: 400,
28-
InvalidSupplierError: 500, # Only raised if supplier variable is not correctly set
2927
UnhandledAuditTableError: 500,
30-
DuplicateFileError: 422,
3128
UnhandledSqsError: 500,
3229
Exception: 500,
3330
}
@@ -42,8 +39,15 @@ class FileStatus(StrEnum):
4239
QUEUED = "Queued"
4340
PROCESSING = "Processing"
4441
PROCESSED = "Processed"
45-
DUPLICATE = "Not processed - duplicate"
46-
EMPTY = "Not processed - empty file"
42+
NOT_PROCESSED = "Not processed"
43+
FAILED = "Failed"
44+
45+
46+
class FileNotProcessedReason(StrEnum):
47+
"""Reasons why a file was not processed"""
48+
EMPTY = "Empty file"
49+
UNAUTHORISED = "Unauthorised"
50+
INVALID_FILENAME = "Invalid filename"
4751

4852

4953
class AuditTableKeys(StrEnum):
@@ -55,3 +59,4 @@ class AuditTableKeys(StrEnum):
5559
STATUS = "status"
5660
TIMESTAMP = "timestamp"
5761
EXPIRES_AT = "expires_at"
62+
ERROR_DETAILS = "error_details"

filenameprocessor/src/errors.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
"""Custom exceptions for the Filename Processor."""
22

33

4-
class DuplicateFileError(Exception):
5-
"""A custom exception for when it is identified that the file is a duplicate."""
6-
7-
84
class EmptyFileError(Exception):
95
"""A custom exception for when the batch file contains only the header row or is completely empty"""
106

@@ -21,9 +17,5 @@ class InvalidFileKeyError(Exception):
2117
"""A custom exception for when the file key is invalid."""
2218

2319

24-
class InvalidSupplierError(Exception):
25-
"""A custom exception for when the supplier has not been correctly identified."""
26-
27-
2820
class UnhandledSqsError(Exception):
2921
"""A custom exception for when an unexpected error occurs whilst sending a message to SQS."""

filenameprocessor/src/file_name_processor.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@
1919
from errors import (
2020
VaccineTypePermissionsError,
2121
InvalidFileKeyError,
22-
InvalidSupplierError,
2322
UnhandledAuditTableError,
2423
UnhandledSqsError,
2524
EmptyFileError
2625
)
27-
from constants import FileStatus, ERROR_TYPE_TO_STATUS_CODE_MAP, SOURCE_BUCKET_NAME
26+
from constants import FileNotProcessedReason, FileStatus, ERROR_TYPE_TO_STATUS_CODE_MAP, SOURCE_BUCKET_NAME
2827

2928

3029
# NOTE: logging_decorator is applied to handle_record function, rather than lambda_handler, because
@@ -46,6 +45,7 @@ def handle_record(record) -> dict:
4645

4746
vaccine_type = "unknown"
4847
supplier = "unknown"
48+
expiry_timestamp = "not_set"
4949

5050
if bucket_name != SOURCE_BUCKET_NAME:
5151
return handle_unexpected_bucket_name(bucket_name, file_key)
@@ -73,12 +73,12 @@ def handle_record(record) -> dict:
7373
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)
7474

7575
queue_name = f"{supplier}_{vaccine_type}"
76-
upsert_audit_table(
77-
message_id, file_key, created_at_formatted_string, expiry_timestamp, queue_name, FileStatus.QUEUED
78-
)
7976
make_and_send_sqs_message(
8077
file_key, message_id, permissions, vaccine_type, supplier, created_at_formatted_string
8178
)
79+
upsert_audit_table(
80+
message_id, file_key, created_at_formatted_string, expiry_timestamp, queue_name, FileStatus.QUEUED
81+
)
8282

8383
logger.info("Lambda invocation successful for file '%s'", file_key)
8484

@@ -96,18 +96,18 @@ def handle_record(record) -> dict:
9696
VaccineTypePermissionsError,
9797
EmptyFileError,
9898
InvalidFileKeyError,
99-
InvalidSupplierError,
10099
UnhandledAuditTableError,
101100
UnhandledSqsError,
102101
Exception,
103102
) as error:
104103
logger.error("Error processing file '%s': %s", file_key, str(error))
105104

106105
queue_name = f"{supplier}_{vaccine_type}"
107-
file_status = FileStatus.EMPTY if isinstance(error, EmptyFileError) else FileStatus.PROCESSED
106+
file_status = get_file_status_for_error(error)
108107

109108
upsert_audit_table(
110-
message_id, file_key, created_at_formatted_string, expiry_timestamp, queue_name, file_status
109+
message_id, file_key, created_at_formatted_string, expiry_timestamp, queue_name, file_status,
110+
error_details=str(error)
111111
)
112112

113113
# Create ack file
@@ -129,6 +129,20 @@ def handle_record(record) -> dict:
129129
}
130130

131131

132+
def get_file_status_for_error(error: Exception) -> str:
133+
"""Creates a file status based on the type of error that was thrown"""
134+
if not isinstance(error, (VaccineTypePermissionsError, InvalidFileKeyError, EmptyFileError)):
135+
return FileStatus.FAILED
136+
137+
if isinstance(error, VaccineTypePermissionsError):
138+
# TODO - come back and clarify behaviour for client errors
139+
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"
140+
elif isinstance(error, EmptyFileError):
141+
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.EMPTY}"
142+
143+
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.INVALID_FILENAME}"
144+
145+
132146
def handle_unexpected_bucket_name(bucket_name: str, file_key: str) -> dict:
133147
"""Handles scenario where Lambda was not invoked by the data-sources bucket. Should not occur due to terraform
134148
config and overarching design"""

filenameprocessor/src/send_sqs_message.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,11 @@
33
import os
44
from json import dumps as json_dumps
55
from clients import sqs_client, logger
6-
from errors import InvalidSupplierError, UnhandledSqsError
6+
from errors import UnhandledSqsError
77

88

99
def send_to_supplier_queue(message_body: dict, vaccine_type: str, supplier: str) -> None:
1010
"""Sends a message to the supplier queue. Raises an exception if the message is not successfully sent."""
11-
# Check the supplier has been identified (this should already have been validated by initial file validation)
12-
if not supplier or not vaccine_type:
13-
error_message = "Message not sent to supplier queue as unable to identify supplier and/ or vaccine type"
14-
logger.error(error_message)
15-
raise InvalidSupplierError(error_message)
16-
1711
try:
1812
queue_url = os.getenv("QUEUE_URL")
1913
sqs_client.send_message(

filenameprocessor/tests/test_lambda_handler.py

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
from boto3 import client as boto3_client
1111
from moto import mock_s3, mock_sqs, mock_firehose, mock_dynamodb
1212

13+
from errors import UnhandledSqsError
1314
from tests.utils_for_tests.generic_setup_and_teardown import GenericSetUp, GenericTearDown
1415
from tests.utils_for_tests.utils_for_filenameprocessor_tests import (
15-
add_entry_to_table,
1616
assert_audit_table_entry,
1717
create_mock_hget,
1818
MOCK_ODS_CODE_TO_SUPPLIER
@@ -215,7 +215,18 @@ def test_lambda_handler_correctly_flags_empty_file(self):
215215
None,
216216
)
217217

218-
assert_audit_table_entry(file_details, FileStatus.EMPTY)
218+
expected_table_items = [
219+
{
220+
"message_id": {"S": file_details.message_id},
221+
"filename": {"S": file_details.file_key},
222+
"queue_name": {"S": "RAVS_RSV"},
223+
"status": {"S": "Not processed - Empty file"},
224+
"error_details": {"S": "Initial file validation failed: batch file was empty"},
225+
"timestamp": {"S": file_details.created_at_formatted_string},
226+
"expires_at": {"N": str(file_details.expires_at)},
227+
}
228+
]
229+
self.assertEqual(self.get_audit_table_items(), expected_table_items)
219230
self.assert_no_sqs_message()
220231
self.assert_ack_file_contents(file_details)
221232

@@ -240,8 +251,8 @@ def test_lambda_handler_non_root_file(self):
240251

241252
def test_lambda_invalid_file_key_no_other_files_in_queue(self):
242253
"""
243-
Tests that when the file_key is invalid, and there are no other files in the supplier_vaccineType queue:
244-
* The file is added to the audit table with a status of 'Processed'
254+
Tests that when the file_key is invalid:
255+
* The file is added to the audit table with a status of 'Not processed - Invalid filename'
245256
* The message is not sent to SQS
246257
* The failure inf_ack file is created
247258
"""
@@ -265,7 +276,8 @@ def test_lambda_invalid_file_key_no_other_files_in_queue(self):
265276
"message_id": {"S": file_details.message_id},
266277
"filename": {"S": file_details.file_key},
267278
"queue_name": {"S": "unknown_unknown"},
268-
"status": {"S": "Processed"},
279+
"status": {"S": "Not processed - Invalid filename"},
280+
"error_details": {"S": "Initial file validation failed: invalid file key"},
269281
"timestamp": {"S": file_details.created_at_formatted_string},
270282
"expires_at": {"N": str(file_details.expires_at)},
271283
}
@@ -275,30 +287,72 @@ def test_lambda_invalid_file_key_no_other_files_in_queue(self):
275287
self.assert_ack_file_contents(file_details)
276288
self.assert_no_sqs_message()
277289

278-
def test_lambda_invalid_permissions_other_files_in_queue(self):
290+
def test_lambda_invalid_permissions(self):
279291
"""
280-
Tests that when the file permissions are invalid, and there are other files in the supplier_vaccineType queue:
281-
* The file is added to the audit table with a status of 'Processed'
292+
Tests that when the file permissions are invalid:
293+
* The file is added to the audit table with a status of 'Not processed - Unauthorised'
282294
* The message is not sent to SQS
283295
* The failure inf_ack file is created
284296
"""
285297
file_details = MockFileDetails.ravs_rsv_1
286298
s3_client.put_object(Bucket=BucketNames.SOURCE, Key=file_details.file_key, Body=MOCK_BATCH_FILE_CONTENT)
287299

288-
queued_file_details = MockFileDetails.ravs_rsv_2
289-
add_entry_to_table(queued_file_details, FileStatus.QUEUED)
290-
291300
# Mock the supplier permissions with a value which doesn't include the requested Flu permissions
292301
mock_hget = create_mock_hget({"X8E5B": "RAVS"}, {})
293302
with ( # noqa: E999
294303
patch("file_name_processor.uuid4", return_value=file_details.message_id), # noqa: E999
295304
patch("elasticache.redis_client.hget", side_effect=mock_hget), # noqa: E999
296305
): # noqa: E999
297306
lambda_handler(self.make_event([self.make_record(file_details.file_key)]), None)
298-
assert_audit_table_entry(file_details, FileStatus.PROCESSED)
307+
308+
expected_table_items = [
309+
{
310+
"message_id": {"S": file_details.message_id},
311+
"filename": {"S": file_details.file_key},
312+
"queue_name": {"S": "RAVS_RSV"},
313+
"status": {"S": "Not processed - Unauthorised"},
314+
"error_details": {"S": "Initial file validation failed: RAVS does not have permissions for RSV"},
315+
"timestamp": {"S": file_details.created_at_formatted_string},
316+
"expires_at": {"N": str(file_details.expires_at)}
317+
}
318+
]
319+
self.assertEqual(self.get_audit_table_items(), expected_table_items)
299320
self.assert_no_sqs_message()
300321
self.assert_ack_file_contents(file_details)
301322

323+
def test_lambda_adds_event_to_audit_table_as_failed_when_unexpected_exception_is_caught(self):
324+
"""
325+
Tests that when an unexpected error occurs e.g. sending to SQS (maybe in case of bad deployment):
326+
* The file is added to the audit table with a status of 'Failed' and the reason
327+
* The message is not sent to SQS
328+
* The failure inf_ack file is created
329+
"""
330+
test_file_details = MockFileDetails.emis_flu
331+
s3_client.put_object(Bucket=BucketNames.SOURCE, Key=test_file_details.file_key, Body=MOCK_BATCH_FILE_CONTENT)
332+
333+
with ( # noqa: E999
334+
patch("file_name_processor.uuid4", return_value=test_file_details.message_id), # noqa: E999
335+
patch("file_name_processor.make_and_send_sqs_message", side_effect=UnhandledSqsError(
336+
"Some client error with SQS"
337+
))
338+
): # noqa: E999
339+
lambda_handler(self.make_event([self.make_record(test_file_details.file_key)]), None)
340+
341+
expected_table_items = [
342+
{
343+
"message_id": {"S": test_file_details.message_id},
344+
"filename": {"S": test_file_details.file_key},
345+
"queue_name": {"S": "EMIS_FLU"},
346+
"status": {"S": "Failed"},
347+
"error_details": {"S": "Some client error with SQS"},
348+
"timestamp": {"S": test_file_details.created_at_formatted_string},
349+
"expires_at": {"N": str(test_file_details.expires_at)},
350+
}
351+
]
352+
self.assertEqual(self.get_audit_table_items(), expected_table_items)
353+
self.assert_ack_file_contents(test_file_details)
354+
self.assert_no_sqs_message()
355+
302356

303357
@patch.dict("os.environ", MOCK_ENVIRONMENT_DICT)
304358
@mock_s3

filenameprocessor/tests/test_send_sqs_message.py

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Tests for send_sqs_message functions"""
22

33
from unittest import TestCase
4-
from unittest.mock import patch, MagicMock
4+
from unittest.mock import patch
55
from json import loads as json_loads
66
from copy import deepcopy
77
from moto import mock_sqs
@@ -13,7 +13,7 @@
1313
# Ensure environment variables are mocked before importing from src files
1414
with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT):
1515
from send_sqs_message import send_to_supplier_queue, make_and_send_sqs_message
16-
from errors import UnhandledSqsError, InvalidSupplierError
16+
from errors import UnhandledSqsError
1717
from clients import REGION_NAME
1818

1919
sqs_client = boto3_client("sqs", region_name=REGION_NAME)
@@ -76,27 +76,6 @@ def test_send_to_supplier_queue_failure_due_to_queue_does_not_exist(self):
7676
)
7777
self.assertEqual(NON_EXISTENT_QUEUE_ERROR_MESSAGE, str(context.exception))
7878

79-
def test_send_to_supplier_queue_failure_due_to_absent_supplier_or_vaccine_type(self):
80-
"""Test send_to_supplier_queue function for a failed message send"""
81-
# Set up the sqs_queue
82-
sqs_client.create_queue(QueueName=Sqs.QUEUE_NAME, Attributes=Sqs.ATTRIBUTES)
83-
expected_error_message = (
84-
"Message not sent to supplier queue as unable to identify supplier and/ or vaccine type"
85-
)
86-
87-
keys_to_set_to_empty = ["supplier", "vaccine_type"]
88-
for key_to_set_to_empty in keys_to_set_to_empty:
89-
with self.subTest(f"{key_to_set_to_empty} set to empty string"):
90-
mock_sqs_client = MagicMock()
91-
with patch("send_sqs_message.sqs_client", mock_sqs_client):
92-
with self.assertRaises(InvalidSupplierError) as context:
93-
message_body = {**FLU_EMIS_FILE_DETAILS.sqs_message_body, key_to_set_to_empty: ""}
94-
vaccine_type = message_body["vaccine_type"]
95-
supplier = message_body["supplier"]
96-
send_to_supplier_queue(message_body, supplier, vaccine_type)
97-
self.assertEqual(str(context.exception), expected_error_message)
98-
mock_sqs_client.send_message.assert_not_called()
99-
10079
def test_make_and_send_sqs_message_success(self):
10180
"""Test make_and_send_sqs_message function for a successful message send"""
10281
# Create a mock SQS queue

filenameprocessor/tests/utils_for_tests/utils_for_filenameprocessor_tests.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def add_entry_to_table(file_details: MockFileDetails, file_status: FileStatus) -
3939
dynamodb_client.put_item(TableName=AUDIT_TABLE_NAME, Item=audit_table_entry)
4040

4141

42-
def assert_audit_table_entry(file_details: FileDetails, expected_status: FileStatus) -> None:
42+
def assert_audit_table_entry(file_details: FileDetails, expected_status: str) -> None:
4343
"""Assert that the file details are in the audit table"""
4444
table_entry = dynamodb_client.get_item(
4545
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": file_details.message_id}}

0 commit comments

Comments
 (0)