Skip to content

Commit b6ebef2

Browse files
committed
Refactored and updated tests for batch processor
1 parent 4245fd2 commit b6ebef2

14 files changed

+72
-129
lines changed

filenameprocessor/src/file_name_processor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def handle_record(record) -> dict:
4747
supplier = "unknown"
4848

4949
if bucket_name != SOURCE_BUCKET_NAME:
50-
return handle_unexpected_bucket_name(bucket_name, file_key, vaccine_type, supplier)
50+
return handle_unexpected_bucket_name(bucket_name, file_key)
5151

5252
# In addition to when a batch file is added to the S3 bucket root for processing, this Lambda is also invoked
5353
# when the file is moved to the processing/ directory and finally the /archive directory. We want to ignore
@@ -122,7 +122,7 @@ def handle_record(record) -> dict:
122122
}
123123

124124

125-
def handle_unexpected_bucket_name(bucket_name: str, file_key: str, vaccine_type: str, supplier: str) -> dict:
125+
def handle_unexpected_bucket_name(bucket_name: str, file_key: str) -> dict:
126126
"""Handles scenario where Lambda was not invoked by the data-sources bucket. Should not occur due to terraform
127127
config and overarching design"""
128128
try:
@@ -139,7 +139,7 @@ def handle_unexpected_bucket_name(bucket_name: str, file_key: str, vaccine_type:
139139
message = f"Failed to process file due to unexpected bucket name {bucket_name} and file key {file_key}"
140140

141141
return {"statusCode": 500, "message": message, "file_key": file_key,
142-
"vaccine_type": vaccine_type, "supplier": supplier, "error": str(error)}
142+
"vaccine_type": "unknown", "supplier": "unknown", "error": str(error)}
143143

144144

145145
def lambda_handler(event: dict, context) -> None: # pylint: disable=unused-argument

recordprocessor/Makefile

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,17 @@ build:
44
package:build
55
mkdir -p build
66
docker run --rm -v $(shell pwd)/build:/build processor-lambda-build
7+
78
test:
8-
python -m unittest
9+
@PYTHONPATH=src:tests python -m unittest
910

10-
coverage run:
11-
coverage run -m unittest discover
11+
coverage-run:
12+
@PYTHONPATH=src:tests coverage run -m unittest discover
1213

13-
coverage report:
14-
coverage report -m
14+
coverage-report:
15+
coverage report -m
1516

16-
coverage html:
17+
coverage-html:
1718
coverage html
1819

1920
.PHONY: build package

recordprocessor/src/audit_table.py

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,30 @@
11
"""Add the filename to the audit table and check for duplicates."""
22

3-
from typing import Union
4-
from boto3.dynamodb.conditions import Key
5-
from clients import dynamodb_client, dynamodb_resource, logger
3+
from clients import dynamodb_client, logger
64
from errors import UnhandledAuditTableError
7-
from constants import AUDIT_TABLE_NAME, AUDIT_TABLE_QUEUE_NAME_GSI, AuditTableKeys, FileStatus
5+
from constants import AUDIT_TABLE_NAME, AuditTableKeys
86

97

10-
def change_audit_table_status_to_processed(file_key: str, message_id: str) -> None:
11-
"""Updates the status in the audit table to 'Processed' and returns the queue name."""
8+
def update_audit_table_status(file_key: str, message_id: str, status: str) -> None:
9+
"""Updates the status in the audit table to the requested value"""
1210
try:
1311
# Update the status in the audit table to "Processed"
1412
dynamodb_client.update_item(
1513
TableName=AUDIT_TABLE_NAME,
1614
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
1715
UpdateExpression="SET #status = :status",
1816
ExpressionAttributeNames={"#status": "status"},
19-
ExpressionAttributeValues={":status": {"S": FileStatus.PROCESSED}},
17+
ExpressionAttributeValues={":status": {"S": status}},
2018
ConditionExpression="attribute_exists(message_id)",
2119
)
2220

2321
logger.info(
2422
"The status of %s file, with message id %s, was successfully updated to %s in the audit table",
2523
file_key,
2624
message_id,
27-
FileStatus.PROCESSED,
25+
status
2826
)
2927

3028
except Exception as error: # pylint: disable = broad-exception-caught
3129
logger.error(error)
3230
raise UnhandledAuditTableError(error) from error
33-
34-
35-
def get_next_queued_file_details(queue_name: str) -> Union[dict, None]:
36-
"""
37-
Checks for queued files.
38-
Returns a dictionary containing the details of the oldest queued file, or returns None if no queued files are found.
39-
"""
40-
queued_files_found_in_audit_table: dict = dynamodb_resource.Table(AUDIT_TABLE_NAME).query(
41-
IndexName=AUDIT_TABLE_QUEUE_NAME_GSI,
42-
KeyConditionExpression=Key(AuditTableKeys.QUEUE_NAME).eq(queue_name)
43-
& Key(AuditTableKeys.STATUS).eq(FileStatus.QUEUED),
44-
)
45-
46-
queued_files_details: list = queued_files_found_in_audit_table["Items"]
47-
48-
# Return the oldest queued file
49-
return sorted(queued_files_details, key=lambda x: x["timestamp"])[0] if queued_files_details else None

recordprocessor/src/batch_processor.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
import json
44
import os
55
import time
6+
7+
from constants import FileStatus
68
from process_row import process_row
79
from mappings import map_target_disease
10+
from audit_table import update_audit_table_status
811
from send_to_kinesis import send_to_kinesis
912
from clients import logger
1013
from file_level_validation import file_level_validation
@@ -55,6 +58,8 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
5558

5659
logger.info("Total rows processed: %s", row_count)
5760

61+
update_audit_table_status(file_key, file_id, FileStatus.PREPROCESSED)
62+
5863

5964
def main(event: str) -> None:
6065
"""Process each row of the file"""

recordprocessor/src/constants.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ class FileStatus:
5353

5454
QUEUED = "Queued"
5555
PROCESSING = "Processing"
56-
PROCESSED = "Processed"
56+
PREPROCESSED = "Preprocessed" # All entries in file converted to FHIR and forwarded to Kinesis
57+
PROCESSED = "Processed" # All entries processed and ack file created
5758
DUPLICATE = "Duplicate"
5859

5960

recordprocessor/src/file_level_validation.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44
"""
55
from clients import logger, s3_client
66
from make_and_upload_ack_file import make_and_upload_ack_file
7-
from utils_for_recordprocessor import get_csv_content_dict_reader, invoke_filename_lambda
7+
from utils_for_recordprocessor import get_csv_content_dict_reader
88
from errors import InvalidHeaders, NoOperationPermissions
99
from logging_decorator import file_level_validation_logging_decorator
10-
from audit_table import change_audit_table_status_to_processed, get_next_queued_file_details
11-
from constants import SOURCE_BUCKET_NAME, EXPECTED_CSV_HEADERS, permission_to_operation_map, Permission
10+
from audit_table import update_audit_table_status
11+
from constants import SOURCE_BUCKET_NAME, EXPECTED_CSV_HEADERS, permission_to_operation_map, FileStatus, Permission
1212

1313

1414
def validate_content_headers(csv_content_reader) -> None:
@@ -64,7 +64,7 @@ def move_file(bucket_name: str, source_file_key: str, destination_file_key: str)
6464
def file_level_validation(incoming_message_body: dict) -> dict:
6565
"""
6666
Validates that the csv headers are correct and that the supplier has permission to perform at least one of
67-
the requested operations. Uploades the inf ack file and moves the source file to the processing folder.
67+
the requested operations. Uploads the inf ack file and moves the source file to the processing folder.
6868
Returns an interim message body for row level processing.
6969
NOTE: If file level validation fails the source file is moved to the archive folder, the audit table is updated
7070
to reflect the file has been processed and the filename lambda is invoked with the next file in the queue.
@@ -113,10 +113,6 @@ def file_level_validation(incoming_message_body: dict) -> dict:
113113
except Exception as move_file_error:
114114
logger.error("Failed to move file to archive: %s", move_file_error)
115115

116-
# Update the audit table and invoke the filename lambda with next file in the queue (if one exists)
117-
change_audit_table_status_to_processed(file_key, message_id)
118-
queue_name = f"{supplier}_{vaccine}"
119-
next_queued_file_details = get_next_queued_file_details(queue_name)
120-
if next_queued_file_details:
121-
invoke_filename_lambda(next_queued_file_details["filename"], next_queued_file_details["message_id"])
116+
# Update the audit table
117+
update_audit_table_status(file_key, message_id, FileStatus.PROCESSED)
122118
raise
Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
"""Utils for filenameprocessor lambda"""
22

33
import os
4-
import json
54
from csv import DictReader
65
from io import TextIOWrapper
7-
from clients import s3_client, lambda_client, logger
8-
from constants import SOURCE_BUCKET_NAME, FILE_NAME_PROC_LAMBDA_NAME
6+
from clients import s3_client
97

108

119
def get_environment() -> str:
@@ -26,19 +24,3 @@ def get_csv_content_dict_reader(file_key: str) -> DictReader:
2624
def create_diagnostics_dictionary(error_type, status_code, error_message) -> dict:
2725
"""Returns a dictionary containing the error_type, statusCode, and error_message"""
2826
return {"error_type": error_type, "statusCode": status_code, "error_message": error_message}
29-
30-
31-
def invoke_filename_lambda(file_key: str, message_id: str) -> None:
32-
"""Invokes the filenameprocessor lambda with the given file key and message id"""
33-
try:
34-
lambda_payload = {
35-
"Records": [
36-
{"s3": {"bucket": {"name": SOURCE_BUCKET_NAME}, "object": {"key": file_key}}, "message_id": message_id}
37-
]
38-
}
39-
lambda_client.invoke(
40-
FunctionName=FILE_NAME_PROC_LAMBDA_NAME, InvocationType="Event", Payload=json.dumps(lambda_payload)
41-
)
42-
except Exception as error:
43-
logger.error("Error invoking filename lambda: %s", error)
44-
raise
Lines changed: 14 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""Tests for audit_table functions"""
2-
2+
import unittest
33
from unittest import TestCase
44
from unittest.mock import patch
55
from boto3 import client as boto3_client
@@ -10,7 +10,6 @@
1010
from tests.utils_for_recordprocessor_tests.generic_setup_and_teardown import GenericSetUp, GenericTearDown
1111
from tests.utils_for_recordprocessor_tests.values_for_recordprocessor_tests import MockFileDetails, FileDetails
1212
from tests.utils_for_recordprocessor_tests.utils_for_recordprocessor_tests import (
13-
deserialize_dynamodb_types,
1413
add_entry_to_table,
1514
)
1615

@@ -21,7 +20,7 @@
2120
FileStatus,
2221
)
2322

24-
from audit_table import get_next_queued_file_details, change_audit_table_status_to_processed
23+
from audit_table import update_audit_table_status
2524
from clients import REGION_NAME
2625

2726

@@ -49,60 +48,32 @@ def get_table_items() -> list:
4948

5049
return dynamodb_client.scan(TableName=AUDIT_TABLE_NAME).get("Items", [])
5150

52-
def test_get_next_queued_file_details(self):
53-
"""Test that the get_next_queued_file_details function returns the correct file details"""
54-
# NOTE: Throughout this test the assertions will be checking for the next queued RAVS_RSV file.
55-
queue_to_check = "RAVS_RSV"
56-
57-
# Test case 1: no files in audit table
58-
self.assertIsNone(get_next_queued_file_details(queue_to_check))
59-
60-
# Test case 2: files in audit table, but none of the files are in the RAVS_RSV queue
61-
add_entry_to_table(MockFileDetails.flu_emis, file_status=FileStatus.QUEUED) # different queue
62-
add_entry_to_table(MockFileDetails.rsv_emis, file_status=FileStatus.QUEUED) # different queue
63-
add_entry_to_table(MockFileDetails.ravs_flu, file_status=FileStatus.QUEUED) # different queue
64-
add_entry_to_table(MockFileDetails.ravs_rsv_1, FileStatus.PROCESSED) # same queue but already processed
65-
self.assertIsNone(get_next_queued_file_details(queue_to_check))
66-
67-
# Test case 3: one queued file in the ravs_rsv queue
68-
add_entry_to_table(MockFileDetails.ravs_rsv_2, file_status=FileStatus.QUEUED)
69-
expected_table_entry = {**MockFileDetails.ravs_rsv_2.audit_table_entry, "status": {"S": FileStatus.QUEUED}}
70-
self.assertEqual(get_next_queued_file_details(queue_to_check), deserialize_dynamodb_types(expected_table_entry))
71-
72-
# # Test case 4: multiple queued files in the RAVS_RSV queue
73-
# Note that ravs_rsv files 3 and 4 have later timestamps than file 2, so file 2 remains the first in the queue
74-
add_entry_to_table(MockFileDetails.ravs_rsv_3, file_status=FileStatus.QUEUED)
75-
add_entry_to_table(MockFileDetails.ravs_rsv_4, file_status=FileStatus.QUEUED)
76-
self.assertEqual(get_next_queued_file_details(queue_to_check), deserialize_dynamodb_types(expected_table_entry))
77-
78-
def test_change_audit_table_status_to_processed(self):
79-
"""Checks audit table correctly updates a record as processed"""
51+
def test_update_audit_table_status(self):
52+
"""Checks audit table correctly updates a record to the requested status"""
8053
# Test case 1: file should be updated with status of 'Processed'.
8154

82-
add_entry_to_table(MockFileDetails.rsv_ravs, file_status=FileStatus.QUEUED)
55+
add_entry_to_table(MockFileDetails.rsv_ravs, file_status=FileStatus.PROCESSING)
8356
add_entry_to_table(MockFileDetails.flu_emis, file_status=FileStatus.QUEUED)
84-
table_items = dynamodb_client.scan(TableName=AUDIT_TABLE_NAME).get("Items", [])
8557

86-
expected_table_entry = {**MockFileDetails.rsv_ravs.audit_table_entry, "status": {"S": FileStatus.PROCESSED}}
58+
expected_table_entry = {**MockFileDetails.rsv_ravs.audit_table_entry, "status": {"S": FileStatus.PREPROCESSED}}
8759
ravs_rsv_test_file = FileDetails("RSV", "RAVS", "X26")
8860
file_key = ravs_rsv_test_file.file_key
89-
message_id = ravs_rsv_test_file.message_id_order
61+
message_id = ravs_rsv_test_file.message_id
9062

91-
change_audit_table_status_to_processed(file_key, message_id)
63+
update_audit_table_status(file_key, message_id, FileStatus.PREPROCESSED)
9264
table_items = dynamodb_client.scan(TableName=AUDIT_TABLE_NAME).get("Items", [])
9365

9466
self.assertIn(expected_table_entry, table_items)
9567

96-
# Test case 2: # Audit table status should not be updated. Error should be raised.
68+
def test_update_audit_table_status_throws_exception_with_invalid_id(self):
9769
emis_flu_test_file_2 = FileDetails("FLU", "EMIS", "YGM41")
9870

9971
message_id = emis_flu_test_file_2.message_id
10072
file_key = (emis_flu_test_file_2.file_key,)
73+
10174
with self.assertRaises(UnhandledAuditTableError):
102-
change_audit_table_status_to_processed(file_key, message_id)
75+
update_audit_table_status(file_key, message_id, FileStatus.PROCESSED)
10376

104-
# Test case 3: # Audit table status should updated to processed for all values.
105-
message_id = emis_flu_test_file_2.message_id_order
106-
file_key = emis_flu_test_file_2.file_key
107-
change_audit_table_status_to_processed(file_key, message_id)
108-
table_items = dynamodb_client.scan(TableName=AUDIT_TABLE_NAME).get("Items", [])
77+
78+
if __name__ == "__main__":
79+
unittest.main()

recordprocessor/tests/test_logging_decorator.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ def run(self, result=None):
6161
# Set up common patches to be applied to all tests in the class.
6262
# These patches can be overridden in individual tests.
6363
common_patches = [
64-
patch("file_level_validation.change_audit_table_status_to_processed"),
65-
patch("file_level_validation.get_next_queued_file_details", return_value=None),
64+
patch("file_level_validation.update_audit_table_status"),
6665
]
6766

6867
with ExitStack() as stack:

0 commit comments

Comments
 (0)