Skip to content

Commit f1caf32

Browse files
committed
Refactored record processor
1 parent 922f97c commit f1caf32

File tree

7 files changed

+180
-24
lines changed

7 files changed

+180
-24
lines changed

recordprocessor/src/audit_table.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,35 @@
11
"""Add the filename to the audit table and check for duplicates."""
2+
from typing import Optional
23

34
from clients import dynamodb_client, logger
45
from errors import UnhandledAuditTableError
56
from constants import AUDIT_TABLE_NAME, AuditTableKeys
67

78

8-
def update_audit_table_status(file_key: str, message_id: str, status: str) -> None:
9+
def update_audit_table_status(
10+
file_key: str,
11+
message_id: str,
12+
status: str,
13+
error_details: Optional[str] = None
14+
) -> None:
915
"""Updates the status in the audit table to the requested value"""
16+
update_expression = f"SET #{AuditTableKeys.STATUS} = :status"
17+
expression_attr_names = {f"#{AuditTableKeys.STATUS}": "status"}
18+
expression_attr_values = {":status": {"S": status}}
19+
20+
if error_details is not None:
21+
update_expression = update_expression + f", #{AuditTableKeys.ERROR_DETAILS} = :error_details"
22+
expression_attr_names[f"#{AuditTableKeys.ERROR_DETAILS}"] = "error_details"
23+
expression_attr_values[":error_details"] = {"S": error_details}
24+
1025
try:
1126
# Update the status in the audit table to "Processed"
1227
dynamodb_client.update_item(
1328
TableName=AUDIT_TABLE_NAME,
1429
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
15-
UpdateExpression="SET #status = :status",
16-
ExpressionAttributeNames={"#status": "status"},
17-
ExpressionAttributeValues={":status": {"S": status}},
30+
UpdateExpression=update_expression,
31+
ExpressionAttributeNames=expression_attr_names,
32+
ExpressionAttributeValues=expression_attr_values,
1833
ConditionExpression="attribute_exists(message_id)",
1934
)
2035

recordprocessor/src/batch_processor.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
import os
55
import time
6+
from json import JSONDecodeError
67

78
from constants import FileStatus
89
from process_row import process_row
@@ -66,9 +67,23 @@ def main(event: str) -> None:
6667
logger.info("task started")
6768
start = time.time()
6869
try:
69-
process_csv_to_fhir(incoming_message_body=json.loads(event))
70+
incoming_message_body = json.loads(event)
71+
except JSONDecodeError as error:
72+
logger.error("Error decoding incoming message: %s", error)
73+
return
74+
75+
try:
76+
process_csv_to_fhir(incoming_message_body=incoming_message_body)
7077
except Exception as error: # pylint: disable=broad-exception-caught
7178
logger.error("Error processing message: %s", error)
79+
message_id = incoming_message_body.get("message_id")
80+
file_key = incoming_message_body.get("file_key")
81+
82+
# If an unexpected error occurs, attempt to mark the event as failed. If the event is so malformed that this
83+
# also fails, we will still get the error alert and the event will remain in processing meaning the supplier +
84+
# vacc type queue is blocked until we resolve the issue
85+
update_audit_table_status(file_key, message_id, FileStatus.FAILED, error_details=str(error))
86+
7287
end = time.time()
7388
logger.info("Total time for completion: %ss", round(end - start, 5))
7489

recordprocessor/src/constants.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,14 @@ class FileStatus:
5555
PROCESSING = "Processing"
5656
PREPROCESSED = "Preprocessed" # All entries in file converted to FHIR and forwarded to Kinesis
5757
PROCESSED = "Processed" # All entries processed and ack file created
58-
DUPLICATE = "Duplicate"
58+
NOT_PROCESSED = "Not processed"
59+
FAILED = "Failed"
60+
61+
62+
class FileNotProcessedReason(StrEnum):
63+
"""Reasons why a file was not processed"""
64+
UNAUTHORISED = "Unauthorised"
65+
INVALID_FILE_HEADERS = "Invalid file headers"
5966

6067

6168
class AuditTableKeys:
@@ -66,6 +73,7 @@ class AuditTableKeys:
6673
QUEUE_NAME = "queue_name"
6774
STATUS = "status"
6875
TIMESTAMP = "timestamp"
76+
ERROR_DETAILS = "error_details"
6977

7078

7179
class Diagnostics:

recordprocessor/src/file_level_validation.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
from errors import InvalidHeaders, NoOperationPermissions
99
from logging_decorator import file_level_validation_logging_decorator
1010
from audit_table import update_audit_table_status
11-
from constants import SOURCE_BUCKET_NAME, EXPECTED_CSV_HEADERS, permission_to_operation_map, FileStatus, Permission
11+
from constants import SOURCE_BUCKET_NAME, EXPECTED_CSV_HEADERS, permission_to_operation_map, FileStatus, Permission, \
12+
FileNotProcessedReason
1213

1314

1415
def validate_content_headers(csv_content_reader) -> None:
@@ -17,6 +18,18 @@ def validate_content_headers(csv_content_reader) -> None:
1718
raise InvalidHeaders("File headers are invalid.")
1819

1920

21+
def get_file_status_for_error(error: Exception) -> str:
22+
"""Returns the appropriate file status based on the error that occurred"""
23+
if isinstance(error, NoOperationPermissions):
24+
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"
25+
26+
# TODO - discuss with team. Do we want client errors to leave pipeline unblocked. Or block and investigate?
27+
elif isinstance(error, InvalidHeaders):
28+
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.INVALID_FILE_HEADERS}"
29+
30+
return FileStatus.FAILED
31+
32+
2033
def get_permitted_operations(
2134
supplier: str, vaccine_type: str, allowed_permissions_list: list
2235
) -> set:
@@ -107,12 +120,13 @@ def file_level_validation(incoming_message_body: dict) -> dict:
107120
file_key = file_key or "Unable to ascertain file_key"
108121
created_at_formatted_string = created_at_formatted_string or "Unable to ascertain created_at_formatted_string"
109122
make_and_upload_ack_file(message_id, file_key, False, False, created_at_formatted_string)
123+
file_status = get_file_status_for_error(error)
110124

111125
try:
112126
move_file(SOURCE_BUCKET_NAME, file_key, f"archive/{file_key}")
113127
except Exception as move_file_error:
114128
logger.error("Failed to move file to archive: %s", move_file_error)
115129

116130
# Update the audit table
117-
update_audit_table_status(file_key, message_id, FileStatus.PROCESSED)
131+
update_audit_table_status(file_key, message_id, file_status, error_details=str(error))
118132
raise

recordprocessor/tests/test_audit_table.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,22 @@ def test_update_audit_table_status(self):
6565

6666
self.assertIn(expected_table_entry, table_items)
6767

68+
def test_update_audit_table_status_including_error_details(self):
69+
"""Checks audit table correctly updates a record including some error details"""
70+
add_entry_to_table(MockFileDetails.rsv_ravs, file_status=FileStatus.QUEUED)
71+
ravs_rsv_test_file = FileDetails("RSV", "RAVS", "X26")
72+
73+
update_audit_table_status(ravs_rsv_test_file.file_key, ravs_rsv_test_file.message_id, FileStatus.FAILED,
74+
error_details="Test error details")
75+
76+
table_items = dynamodb_client.scan(TableName=AUDIT_TABLE_NAME).get("Items", [])
77+
self.assertEqual(1, len(table_items))
78+
self.assertEqual({
79+
**MockFileDetails.rsv_ravs.audit_table_entry,
80+
"status": {"S": FileStatus.FAILED},
81+
"error_details": {"S": "Test error details"}
82+
}, table_items[0])
83+
6884
def test_update_audit_table_status_throws_exception_with_invalid_id(self):
6985
emis_flu_test_file_2 = FileDetails("FLU", "EMIS", "YGM41")
7086

recordprocessor/tests/test_recordprocessor_main.py

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@
33
import unittest
44
import json
55
from decimal import Decimal
6+
from json import JSONDecodeError
67
from unittest.mock import patch
78
from datetime import datetime, timedelta, timezone
8-
from moto import mock_s3, mock_kinesis, mock_firehose
9+
from moto import mock_s3, mock_kinesis, mock_firehose, mock_dynamodb
910
from boto3 import client as boto3_client
1011

1112
from tests.utils_for_recordprocessor_tests.utils_for_recordprocessor_tests import (
1213
GenericSetUp,
1314
GenericTearDown,
15+
add_entry_to_table,
16+
assert_audit_table_entry
1417
)
1518
from tests.utils_for_recordprocessor_tests.values_for_recordprocessor_tests import (
1619
MockFileDetails,
@@ -25,36 +28,41 @@
2528
from tests.utils_for_recordprocessor_tests.mock_environment_variables import MOCK_ENVIRONMENT_DICT, BucketNames, Kinesis
2629

2730
with patch("os.environ", MOCK_ENVIRONMENT_DICT):
28-
from constants import Diagnostics
31+
from constants import Diagnostics, FileStatus, FileNotProcessedReason, AUDIT_TABLE_NAME, AuditTableKeys
2932
from batch_processor import main
3033

3134
s3_client = boto3_client("s3", region_name=REGION_NAME)
3235
kinesis_client = boto3_client("kinesis", region_name=REGION_NAME)
3336
firehose_client = boto3_client("firehose", region_name=REGION_NAME)
37+
dynamo_db_client = boto3_client("dynamodb", region_name=REGION_NAME)
3438
yesterday = datetime.now(timezone.utc) - timedelta(days=1)
3539
mock_rsv_emis_file = MockFileDetails.rsv_emis
3640

3741

3842
@patch.dict("os.environ", MOCK_ENVIRONMENT_DICT)
43+
@mock_dynamodb
3944
@mock_s3
4045
@mock_kinesis
4146
@mock_firehose
4247
class TestRecordProcessor(unittest.TestCase):
4348
"""Tests for main function for RecordProcessor"""
4449

4550
def setUp(self) -> None:
46-
GenericSetUp(s3_client, firehose_client, kinesis_client)
51+
GenericSetUp(s3_client, firehose_client, kinesis_client, dynamo_db_client)
4752

4853
redis_patcher = patch("mappings.redis_client")
54+
batch_processor_logger_patcher = patch("batch_processor.logger")
4955
self.addCleanup(redis_patcher.stop)
56+
self.mock_batch_processor_logger = batch_processor_logger_patcher.start()
5057
mock_redis_client = redis_patcher.start()
5158
mock_redis_client.hget.return_value = json.dumps([{
5259
"code": "55735004",
5360
"term": "Respiratory syncytial virus infection (disorder)"
5461
}])
5562

5663
def tearDown(self) -> None:
57-
GenericTearDown(s3_client, firehose_client, kinesis_client)
64+
GenericTearDown(s3_client, firehose_client, kinesis_client, dynamo_db_client)
65+
patch.stopall()
5866

5967
@staticmethod
6068
def upload_source_files(source_file_content): # pylint: disable=dangerous-default-value
@@ -148,9 +156,11 @@ def test_e2e_full_permissions(self):
148156
Tests that file containing CREATE, UPDATE and DELETE is successfully processed when the supplier has
149157
full permissions.
150158
"""
159+
test_file = mock_rsv_emis_file
151160
self.upload_source_files(ValidMockFileContent.with_new_and_update_and_delete)
161+
add_entry_to_table(test_file, FileStatus.PROCESSING)
152162

153-
main(mock_rsv_emis_file.event_full_permissions)
163+
main(test_file.event_full_permissions)
154164

155165
# Assertion case tuples are stuctured as
156166
# (test_name, index, expected_kinesis_data_ignoring_fhir_json,expect_success)
@@ -176,15 +186,18 @@ def test_e2e_full_permissions(self):
176186
]
177187
self.make_inf_ack_assertions(file_details=mock_rsv_emis_file, passed_validation=True)
178188
self.make_kinesis_assertions(assertion_cases)
189+
assert_audit_table_entry(test_file, FileStatus.PREPROCESSED)
179190

180191
def test_e2e_partial_permissions(self):
181192
"""
182193
Tests that file containing CREATE, UPDATE and DELETE is successfully processed when the supplier only has CREATE
183194
permissions.
184195
"""
196+
test_file = mock_rsv_emis_file
197+
add_entry_to_table(test_file, FileStatus.PROCESSING)
185198
self.upload_source_files(ValidMockFileContent.with_new_and_update_and_delete)
186199

187-
main(mock_rsv_emis_file.event_create_permissions_only)
200+
main(test_file.event_create_permissions_only)
188201

189202
# Assertion case tuples are stuctured as
190203
# (test_name, index, expected_kinesis_data_ignoring_fhir_json,expect_success)
@@ -226,15 +239,18 @@ def test_e2e_partial_permissions(self):
226239
]
227240
self.make_inf_ack_assertions(file_details=mock_rsv_emis_file, passed_validation=True)
228241
self.make_kinesis_assertions(assertion_cases)
242+
assert_audit_table_entry(test_file, FileStatus.PREPROCESSED)
229243

230244
def test_e2e_no_required_permissions(self):
231245
"""
232246
Tests that file containing UPDATE and DELETE is successfully processed when the supplier has CREATE permissions
233247
only.
234248
"""
249+
test_file = mock_rsv_emis_file
250+
add_entry_to_table(test_file, FileStatus.PROCESSING)
235251
self.upload_source_files(ValidMockFileContent.with_update_and_delete)
236252

237-
main(mock_rsv_emis_file.event_create_permissions_only)
253+
main(test_file.event_create_permissions_only)
238254

239255
kinesis_records = kinesis_client.get_records(ShardIterator=self.get_shard_iterator(), Limit=10)["Records"]
240256
self.assertEqual(len(kinesis_records), 2)
@@ -244,27 +260,39 @@ def test_e2e_no_required_permissions(self):
244260
self.assertIn("diagnostics", data_dict)
245261
self.assertNotIn("fhir_json", data_dict)
246262
self.make_inf_ack_assertions(file_details=mock_rsv_emis_file, passed_validation=True)
263+
assert_audit_table_entry(test_file, FileStatus.PREPROCESSED)
247264

248265
def test_e2e_no_permissions(self):
249266
"""
250267
Tests that file containing UPDATE and DELETE is successfully processed when the supplier has no permissions.
251268
"""
269+
test_file = mock_rsv_emis_file
270+
add_entry_to_table(test_file, FileStatus.PROCESSING)
252271
self.upload_source_files(ValidMockFileContent.with_update_and_delete)
253272

254-
main(mock_rsv_emis_file.event_no_permissions)
273+
main(test_file.event_no_permissions)
255274

256275
kinesis_records = kinesis_client.get_records(ShardIterator=self.get_shard_iterator(), Limit=10)["Records"]
276+
table_entry = dynamo_db_client.get_item(
277+
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": test_file.message_id}}
278+
).get("Item")
257279
self.assertEqual(len(kinesis_records), 0)
258280
self.make_inf_ack_assertions(file_details=mock_rsv_emis_file, passed_validation=False)
281+
self.assertDictEqual(table_entry, {
282+
**test_file.audit_table_entry,
283+
"status": {"S": f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"},
284+
"error_details": {"S": "EMIS does not have permissions to perform any of the requested actions."}
285+
})
259286

260287
def test_e2e_invalid_action_flags(self):
261288
"""Tests that file is successfully processed when the ACTION_FLAG field is empty or invalid."""
262-
289+
test_file = mock_rsv_emis_file
290+
add_entry_to_table(test_file, FileStatus.PROCESSING)
263291
self.upload_source_files(
264292
ValidMockFileContent.with_update_and_delete.replace("update", "").replace("delete", "INVALID")
265293
)
266294

267-
main(mock_rsv_emis_file.event_full_permissions)
295+
main(test_file.event_full_permissions)
268296

269297
expected_kinesis_data = {
270298
"diagnostics": {
@@ -288,14 +316,16 @@ def test_e2e_invalid_action_flags(self):
288316
def test_e2e_differing_amounts_of_data(self):
289317
"""Tests that file containing rows with differing amounts of data present is processed as expected"""
290318
# Create file content with different amounts of data present in each row
319+
test_file = mock_rsv_emis_file
320+
add_entry_to_table(test_file, FileStatus.PROCESSING)
291321
headers = "|".join(MockFieldDictionaries.all_fields.keys())
292322
all_fields_values = "|".join(f'"{v}"' for v in MockFieldDictionaries.all_fields.values())
293323
mandatory_fields_only_values = "|".join(f'"{v}"' for v in MockFieldDictionaries.mandatory_fields_only.values())
294324
critical_fields_only_values = "|".join(f'"{v}"' for v in MockFieldDictionaries.critical_fields_only.values())
295325
file_content = f"{headers}\n{all_fields_values}\n{mandatory_fields_only_values}\n{critical_fields_only_values}"
296326
self.upload_source_files(file_content)
297327

298-
main(mock_rsv_emis_file.event_full_permissions)
328+
main(test_file.event_full_permissions)
299329

300330
all_fields_row_expected_kinesis_data = {
301331
"operation_requested": "UPDATE",
@@ -329,6 +359,8 @@ def test_e2e_kinesis_failed(self):
329359
Tests that, for a file with valid content and supplier with full permissions, when the kinesis send fails, the
330360
ack file is created and documents an error.
331361
"""
362+
test_file = mock_rsv_emis_file
363+
add_entry_to_table(test_file, FileStatus.PROCESSING)
332364
self.upload_source_files(ValidMockFileContent.with_new_and_update)
333365
# Delete the kinesis stream, to cause kinesis send to fail
334366
kinesis_client.delete_stream(StreamName=Kinesis.STREAM_NAME, EnforceConsumerDeletion=True)
@@ -340,11 +372,14 @@ def test_e2e_kinesis_failed(self):
340372
): # noqa: E999
341373
mock_time.time.side_effect = [1672531200, 1672531200.123456]
342374
mock_datetime.now.return_value = datetime(2024, 1, 1, 12, 0, 0)
343-
main(mock_rsv_emis_file.event_full_permissions)
375+
main(test_file.event_full_permissions)
344376

345377
# Since the failure occured at row level, not file level, the ack file should still be created
346378
# and firehose logs should indicate a successful file level validation
347-
self.make_inf_ack_assertions(file_details=mock_rsv_emis_file, passed_validation=True)
379+
table_entry = dynamo_db_client.get_item(
380+
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": test_file.message_id}}
381+
).get("Item")
382+
self.make_inf_ack_assertions(file_details=test_file, passed_validation=True)
348383
expected_log_data = {
349384
"function_name": "record_processor_file_level_validation",
350385
"date_time": "2024-01-01 12:00:00",
@@ -357,6 +392,25 @@ def test_e2e_kinesis_failed(self):
357392
"message": "Successfully sent for record processing",
358393
}
359394
mock_send_log_to_firehose.assert_called_with(expected_log_data)
395+
self.assertDictEqual(table_entry, {
396+
**test_file.audit_table_entry,
397+
"status": {"S": FileStatus.FAILED},
398+
"error_details": {"S": "An error occurred (ResourceNotFoundException) when calling the PutRecord operation"
399+
": Stream imms-batch-internal-dev-processingdata-stream under account 123456789012"
400+
" not found."}
401+
})
402+
403+
def test_e2e_error_is_logged_if_invalid_json_provided(self):
404+
"""This scenario should not happen. If it does, it means our batch processing system config is broken and we
405+
have received malformed content from SQS -> EventBridge. In this case we log the error so we will be alerted.
406+
However, we cannot do anything with the AuditDB record as we cannot retrieve information from the event"""
407+
malformed_event = '{"test": {}'
408+
main(malformed_event)
409+
410+
logged_message = self.mock_batch_processor_logger.error.call_args[0][0]
411+
exception = self.mock_batch_processor_logger.error.call_args[0][1]
412+
self.assertEqual(logged_message, "Error decoding incoming message: %s")
413+
self.assertIsInstance(exception, JSONDecodeError)
360414

361415

362416
if __name__ == "__main__":

0 commit comments

Comments
 (0)