Skip to content

Commit e207e02

Browse files
committed
Refactored record processor
1 parent fdb4cd7 commit e207e02

File tree

7 files changed

+179
-23
lines changed

7 files changed

+179
-23
lines changed

recordprocessor/src/audit_table.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,35 @@
11
"""Add the filename to the audit table and check for duplicates."""
2+
from typing import Optional
23

34
from clients import dynamodb_client, logger
45
from errors import UnhandledAuditTableError
56
from constants import AUDIT_TABLE_NAME, AuditTableKeys
67

78

8-
def update_audit_table_status(file_key: str, message_id: str, status: str) -> None:
9+
def update_audit_table_status(
10+
file_key: str,
11+
message_id: str,
12+
status: str,
13+
error_details: Optional[str] = None
14+
) -> None:
915
"""Updates the status in the audit table to the requested value"""
16+
update_expression = f"SET #{AuditTableKeys.STATUS} = :status"
17+
expression_attr_names = {f"#{AuditTableKeys.STATUS}": "status"}
18+
expression_attr_values = {":status": {"S": status}}
19+
20+
if error_details is not None:
21+
update_expression = update_expression + f", #{AuditTableKeys.ERROR_DETAILS} = :error_details"
22+
expression_attr_names[f"#{AuditTableKeys.ERROR_DETAILS}"] = "error_details"
23+
expression_attr_values[":error_details"] = {"S": error_details}
24+
1025
try:
1126
# Update the status in the audit table to "Processed"
1227
dynamodb_client.update_item(
1328
TableName=AUDIT_TABLE_NAME,
1429
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
15-
UpdateExpression="SET #status = :status",
16-
ExpressionAttributeNames={"#status": "status"},
17-
ExpressionAttributeValues={":status": {"S": status}},
30+
UpdateExpression=update_expression,
31+
ExpressionAttributeNames=expression_attr_names,
32+
ExpressionAttributeValues=expression_attr_values,
1833
ConditionExpression="attribute_exists(message_id)",
1934
)
2035

recordprocessor/src/batch_processor.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
import os
55
import time
6+
from json import JSONDecodeError
67

78
from constants import FileStatus
89
from process_row import process_row
@@ -111,10 +112,25 @@ def main(event: str) -> None:
111112
logger.info("task started")
112113
start = time.time()
113114
n_rows_processed = 0
115+
116+
try:
117+
incoming_message_body = json.loads(event)
118+
except JSONDecodeError as error:
119+
logger.error("Error decoding incoming message: %s", error)
120+
return
121+
114122
try:
115-
n_rows_processed = process_csv_to_fhir(incoming_message_body=json.loads(event))
123+
n_rows_processed = process_csv_to_fhir(incoming_message_body=incoming_message_body)
116124
except Exception as error: # pylint: disable=broad-exception-caught
117125
logger.error("Error processing message: %s", error)
126+
message_id = incoming_message_body.get("message_id")
127+
file_key = incoming_message_body.get("file_key")
128+
129+
# If an unexpected error occurs, attempt to mark the event as failed. If the event is so malformed that this
130+
# also fails, we will still get the error alert and the event will remain in processing meaning the supplier +
131+
# vacc type queue is blocked until we resolve the issue
132+
update_audit_table_status(file_key, message_id, FileStatus.FAILED, error_details=str(error))
133+
118134
end = time.time()
119135
logger.info("Total rows processed: %s", n_rows_processed)
120136
logger.info("Total time for completion: %ss", round(end - start, 5))

recordprocessor/src/constants.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,14 @@ class FileStatus:
5555
PROCESSING = "Processing"
5656
PREPROCESSED = "Preprocessed" # All entries in file converted to FHIR and forwarded to Kinesis
5757
PROCESSED = "Processed" # All entries processed and ack file created
58-
DUPLICATE = "Duplicate"
58+
NOT_PROCESSED = "Not processed"
59+
FAILED = "Failed"
60+
61+
62+
class FileNotProcessedReason(StrEnum):
63+
"""Reasons why a file was not processed"""
64+
UNAUTHORISED = "Unauthorised"
65+
INVALID_FILE_HEADERS = "Invalid file headers"
5966

6067

6168
class AuditTableKeys:
@@ -66,6 +73,7 @@ class AuditTableKeys:
6673
QUEUE_NAME = "queue_name"
6774
STATUS = "status"
6875
TIMESTAMP = "timestamp"
76+
ERROR_DETAILS = "error_details"
6977

7078

7179
class Diagnostics:

recordprocessor/src/file_level_validation.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
from errors import InvalidHeaders, NoOperationPermissions
99
from logging_decorator import file_level_validation_logging_decorator
1010
from audit_table import update_audit_table_status
11-
from constants import SOURCE_BUCKET_NAME, EXPECTED_CSV_HEADERS, permission_to_operation_map, FileStatus, Permission
11+
from constants import SOURCE_BUCKET_NAME, EXPECTED_CSV_HEADERS, permission_to_operation_map, FileStatus, Permission, \
12+
FileNotProcessedReason
1213

1314

1415
def validate_content_headers(csv_content_reader) -> None:
@@ -17,6 +18,18 @@ def validate_content_headers(csv_content_reader) -> None:
1718
raise InvalidHeaders("File headers are invalid.")
1819

1920

21+
def get_file_status_for_error(error: Exception) -> str:
22+
"""Returns the appropriate file status based on the error that occurred"""
23+
if isinstance(error, NoOperationPermissions):
24+
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"
25+
26+
# TODO - discuss with team. Do we want client errors to leave pipeline unblocked. Or block and investigate?
27+
elif isinstance(error, InvalidHeaders):
28+
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.INVALID_FILE_HEADERS}"
29+
30+
return FileStatus.FAILED
31+
32+
2033
def get_permitted_operations(
2134
supplier: str, vaccine_type: str, allowed_permissions_list: list
2235
) -> set:
@@ -115,12 +128,13 @@ def file_level_validation(incoming_message_body: dict) -> dict:
115128
file_key = file_key or "Unable to ascertain file_key"
116129
created_at_formatted_string = created_at_formatted_string or "Unable to ascertain created_at_formatted_string"
117130
make_and_upload_ack_file(message_id, file_key, False, False, created_at_formatted_string)
131+
file_status = get_file_status_for_error(error)
118132

119133
try:
120134
move_file(SOURCE_BUCKET_NAME, file_key, f"archive/{file_key}")
121135
except Exception as move_file_error:
122136
logger.error("Failed to move file to archive: %s", move_file_error)
123137

124138
# Update the audit table
125-
update_audit_table_status(file_key, message_id, FileStatus.PROCESSED)
139+
update_audit_table_status(file_key, message_id, file_status, error_details=str(error))
126140
raise

recordprocessor/tests/test_audit_table.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,22 @@ def test_update_audit_table_status(self):
6565

6666
self.assertIn(expected_table_entry, table_items)
6767

68+
def test_update_audit_table_status_including_error_details(self):
69+
"""Checks audit table correctly updates a record including some error details"""
70+
add_entry_to_table(MockFileDetails.rsv_ravs, file_status=FileStatus.QUEUED)
71+
ravs_rsv_test_file = FileDetails("RSV", "RAVS", "X26")
72+
73+
update_audit_table_status(ravs_rsv_test_file.file_key, ravs_rsv_test_file.message_id, FileStatus.FAILED,
74+
error_details="Test error details")
75+
76+
table_items = dynamodb_client.scan(TableName=AUDIT_TABLE_NAME).get("Items", [])
77+
self.assertEqual(1, len(table_items))
78+
self.assertEqual({
79+
**MockFileDetails.rsv_ravs.audit_table_entry,
80+
"status": {"S": FileStatus.FAILED},
81+
"error_details": {"S": "Test error details"}
82+
}, table_items[0])
83+
6884
def test_update_audit_table_status_throws_exception_with_invalid_id(self):
6985
emis_flu_test_file_2 = FileDetails("FLU", "EMIS", "YGM41")
7086

recordprocessor/tests/test_recordprocessor_main.py

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@
33
import unittest
44
import json
55
from decimal import Decimal
6+
from json import JSONDecodeError
67
from unittest.mock import patch
78
from datetime import datetime, timedelta, timezone
8-
from moto import mock_s3, mock_kinesis, mock_firehose
9+
from moto import mock_s3, mock_kinesis, mock_firehose, mock_dynamodb
910
from boto3 import client as boto3_client
1011

1112
from tests.utils_for_recordprocessor_tests.utils_for_recordprocessor_tests import (
1213
GenericSetUp,
1314
GenericTearDown,
15+
add_entry_to_table,
16+
assert_audit_table_entry
1417
)
1518
from tests.utils_for_recordprocessor_tests.values_for_recordprocessor_tests import (
1619
MockFileDetails,
@@ -26,28 +29,32 @@
2629
from tests.utils_for_recordprocessor_tests.utils_for_recordprocessor_tests import create_patch
2730

2831
with patch("os.environ", MOCK_ENVIRONMENT_DICT):
29-
from constants import Diagnostics
32+
from constants import Diagnostics, FileStatus, FileNotProcessedReason, AUDIT_TABLE_NAME, AuditTableKeys
3033
from batch_processor import main
3134

3235
s3_client = boto3_client("s3", region_name=REGION_NAME)
3336
kinesis_client = boto3_client("kinesis", region_name=REGION_NAME)
3437
firehose_client = boto3_client("firehose", region_name=REGION_NAME)
38+
dynamo_db_client = boto3_client("dynamodb", region_name=REGION_NAME)
3539
yesterday = datetime.now(timezone.utc) - timedelta(days=1)
3640
mock_rsv_emis_file = MockFileDetails.rsv_emis
3741

3842

3943
@patch.dict("os.environ", MOCK_ENVIRONMENT_DICT)
44+
@mock_dynamodb
4045
@mock_s3
4146
@mock_kinesis
4247
@mock_firehose
4348
class TestRecordProcessor(unittest.TestCase):
4449
"""Tests for main function for RecordProcessor"""
4550

4651
def setUp(self) -> None:
47-
GenericSetUp(s3_client, firehose_client, kinesis_client)
52+
GenericSetUp(s3_client, firehose_client, kinesis_client, dynamo_db_client)
4853

4954
redis_patcher = patch("mappings.redis_client")
55+
batch_processor_logger_patcher = patch("batch_processor.logger")
5056
self.addCleanup(redis_patcher.stop)
57+
self.mock_batch_processor_logger = batch_processor_logger_patcher.start()
5158
mock_redis_client = redis_patcher.start()
5259
mock_redis_client.hget.return_value = json.dumps([{
5360
"code": "55735004",
@@ -151,9 +158,11 @@ def test_e2e_full_permissions(self):
151158
Tests that file containing CREATE, UPDATE and DELETE is successfully processed when the supplier has
152159
full permissions.
153160
"""
161+
test_file = mock_rsv_emis_file
154162
self.upload_source_files(ValidMockFileContent.with_new_and_update_and_delete)
163+
add_entry_to_table(test_file, FileStatus.PROCESSING)
155164

156-
main(mock_rsv_emis_file.event_full_permissions)
165+
main(test_file.event_full_permissions)
157166

158167
# Assertion case tuples are stuctured as
159168
# (test_name, index, expected_kinesis_data_ignoring_fhir_json,expect_success)
@@ -179,15 +188,18 @@ def test_e2e_full_permissions(self):
179188
]
180189
self.make_inf_ack_assertions(file_details=mock_rsv_emis_file, passed_validation=True)
181190
self.make_kinesis_assertions(assertion_cases)
191+
assert_audit_table_entry(test_file, FileStatus.PREPROCESSED)
182192

183193
def test_e2e_partial_permissions(self):
184194
"""
185195
Tests that file containing CREATE, UPDATE and DELETE is successfully processed when the supplier only has CREATE
186196
permissions.
187197
"""
198+
test_file = mock_rsv_emis_file
199+
add_entry_to_table(test_file, FileStatus.PROCESSING)
188200
self.upload_source_files(ValidMockFileContent.with_new_and_update_and_delete)
189201

190-
main(mock_rsv_emis_file.event_create_permissions_only)
202+
main(test_file.event_create_permissions_only)
191203

192204
# Assertion case tuples are stuctured as
193205
# (test_name, index, expected_kinesis_data_ignoring_fhir_json,expect_success)
@@ -229,15 +241,18 @@ def test_e2e_partial_permissions(self):
229241
]
230242
self.make_inf_ack_assertions(file_details=mock_rsv_emis_file, passed_validation=True)
231243
self.make_kinesis_assertions(assertion_cases)
244+
assert_audit_table_entry(test_file, FileStatus.PREPROCESSED)
232245

233246
def test_e2e_no_required_permissions(self):
234247
"""
235248
Tests that file containing UPDATE and DELETE is successfully processed when the supplier has CREATE permissions
236249
only.
237250
"""
251+
test_file = mock_rsv_emis_file
252+
add_entry_to_table(test_file, FileStatus.PROCESSING)
238253
self.upload_source_files(ValidMockFileContent.with_update_and_delete)
239254

240-
main(mock_rsv_emis_file.event_create_permissions_only)
255+
main(test_file.event_create_permissions_only)
241256

242257
kinesis_records = kinesis_client.get_records(ShardIterator=self.get_shard_iterator(), Limit=10)["Records"]
243258
self.assertEqual(len(kinesis_records), 2)
@@ -247,27 +262,39 @@ def test_e2e_no_required_permissions(self):
247262
self.assertIn("diagnostics", data_dict)
248263
self.assertNotIn("fhir_json", data_dict)
249264
self.make_inf_ack_assertions(file_details=mock_rsv_emis_file, passed_validation=True)
265+
assert_audit_table_entry(test_file, FileStatus.PREPROCESSED)
250266

251267
def test_e2e_no_permissions(self):
252268
"""
253269
Tests that file containing UPDATE and DELETE is successfully processed when the supplier has no permissions.
254270
"""
271+
test_file = mock_rsv_emis_file
272+
add_entry_to_table(test_file, FileStatus.PROCESSING)
255273
self.upload_source_files(ValidMockFileContent.with_update_and_delete)
256274

257-
main(mock_rsv_emis_file.event_no_permissions)
275+
main(test_file.event_no_permissions)
258276

259277
kinesis_records = kinesis_client.get_records(ShardIterator=self.get_shard_iterator(), Limit=10)["Records"]
278+
table_entry = dynamo_db_client.get_item(
279+
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": test_file.message_id}}
280+
).get("Item")
260281
self.assertEqual(len(kinesis_records), 0)
261282
self.make_inf_ack_assertions(file_details=mock_rsv_emis_file, passed_validation=False)
283+
self.assertDictEqual(table_entry, {
284+
**test_file.audit_table_entry,
285+
"status": {"S": f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"},
286+
"error_details": {"S": "EMIS does not have permissions to perform any of the requested actions."}
287+
})
262288

263289
def test_e2e_invalid_action_flags(self):
264290
"""Tests that file is successfully processed when the ACTION_FLAG field is empty or invalid."""
265-
291+
test_file = mock_rsv_emis_file
292+
add_entry_to_table(test_file, FileStatus.PROCESSING)
266293
self.upload_source_files(
267294
ValidMockFileContent.with_update_and_delete.replace("update", "").replace("delete", "INVALID")
268295
)
269296

270-
main(mock_rsv_emis_file.event_full_permissions)
297+
main(test_file.event_full_permissions)
271298

272299
expected_kinesis_data = {
273300
"diagnostics": {
@@ -291,14 +318,16 @@ def test_e2e_invalid_action_flags(self):
291318
def test_e2e_differing_amounts_of_data(self):
292319
"""Tests that file containing rows with differing amounts of data present is processed as expected"""
293320
# Create file content with different amounts of data present in each row
321+
test_file = mock_rsv_emis_file
322+
add_entry_to_table(test_file, FileStatus.PROCESSING)
294323
headers = "|".join(MockFieldDictionaries.all_fields.keys())
295324
all_fields_values = "|".join(f'"{v}"' for v in MockFieldDictionaries.all_fields.values())
296325
mandatory_fields_only_values = "|".join(f'"{v}"' for v in MockFieldDictionaries.mandatory_fields_only.values())
297326
critical_fields_only_values = "|".join(f'"{v}"' for v in MockFieldDictionaries.critical_fields_only.values())
298327
file_content = f"{headers}\n{all_fields_values}\n{mandatory_fields_only_values}\n{critical_fields_only_values}"
299328
self.upload_source_files(file_content)
300329

301-
main(mock_rsv_emis_file.event_full_permissions)
330+
main(test_file.event_full_permissions)
302331

303332
all_fields_row_expected_kinesis_data = {
304333
"operation_requested": "UPDATE",
@@ -332,6 +361,8 @@ def test_e2e_kinesis_failed(self):
332361
Tests that, for a file with valid content and supplier with full permissions, when the kinesis send fails, the
333362
ack file is created and documents an error.
334363
"""
364+
test_file = mock_rsv_emis_file
365+
add_entry_to_table(test_file, FileStatus.PROCESSING)
335366
self.upload_source_files(ValidMockFileContent.with_new_and_update)
336367
# Delete the kinesis stream, to cause kinesis send to fail
337368
kinesis_client.delete_stream(StreamName=Kinesis.STREAM_NAME, EnforceConsumerDeletion=True)
@@ -343,11 +374,14 @@ def test_e2e_kinesis_failed(self):
343374
): # noqa: E999
344375
mock_time.time.side_effect = [1672531200, 1672531200.123456]
345376
mock_datetime.now.return_value = datetime(2024, 1, 1, 12, 0, 0)
346-
main(mock_rsv_emis_file.event_full_permissions)
377+
main(test_file.event_full_permissions)
347378

348379
# Since the failure occured at row level, not file level, the ack file should still be created
349380
# and firehose logs should indicate a successful file level validation
350-
self.make_inf_ack_assertions(file_details=mock_rsv_emis_file, passed_validation=True)
381+
table_entry = dynamo_db_client.get_item(
382+
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": test_file.message_id}}
383+
).get("Item")
384+
self.make_inf_ack_assertions(file_details=test_file, passed_validation=True)
351385
expected_log_data = {
352386
"function_name": "record_processor_file_level_validation",
353387
"date_time": "2024-01-01 12:00:00",
@@ -360,6 +394,25 @@ def test_e2e_kinesis_failed(self):
360394
"message": "Successfully sent for record processing",
361395
}
362396
mock_send_log_to_firehose.assert_called_with(expected_log_data)
397+
self.assertDictEqual(table_entry, {
398+
**test_file.audit_table_entry,
399+
"status": {"S": FileStatus.FAILED},
400+
"error_details": {"S": "An error occurred (ResourceNotFoundException) when calling the PutRecord operation"
401+
": Stream imms-batch-internal-dev-processingdata-stream under account 123456789012"
402+
" not found."}
403+
})
404+
405+
def test_e2e_error_is_logged_if_invalid_json_provided(self):
406+
"""This scenario should not happen. If it does, it means our batch processing system config is broken and we
407+
have received malformed content from SQS -> EventBridge. In this case we log the error so we will be alerted.
408+
However, we cannot do anything with the AuditDB record as we cannot retrieve information from the event"""
409+
malformed_event = '{"test": {}'
410+
main(malformed_event)
411+
412+
logged_message = self.mock_batch_processor_logger.error.call_args[0][0]
413+
exception = self.mock_batch_processor_logger.error.call_args[0][1]
414+
self.assertEqual(logged_message, "Error decoding incoming message: %s")
415+
self.assertIsInstance(exception, JSONDecodeError)
363416

364417

365418
if __name__ == "__main__":

0 commit comments

Comments
 (0)