Skip to content

Commit b55b2cc

Browse files
committed
update processor tests
1 parent 837cac1 commit b55b2cc

File tree

5 files changed

+262
-5
lines changed

5 files changed

+262
-5
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
"""Tests for audit_table functions"""
2+
3+
from unittest import TestCase
4+
from unittest.mock import patch
5+
from boto3 import client as boto3_client
6+
from moto import mock_dynamodb
7+
8+
from tests.utils_for_recordprocessor_tests.mock_environment_variables import MOCK_ENVIRONMENT_DICT
9+
from tests.utils_for_recordprocessor_tests.generic_setup_and_teardown import GenericSetUp, GenericTearDown
10+
from tests.utils_for_recordprocessor_tests.values_for_recordprocessor_tests import MockFileDetails
11+
from tests.utils_for_recordprocessor_tests.utils_for_recordprocessor_tests import (
12+
deserialize_dynamodb_types,
13+
add_entry_to_table,
14+
)
15+
16+
# Ensure environment variables are mocked before importing from src files
17+
with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT):
18+
from constants import (
19+
AUDIT_TABLE_NAME,
20+
FileStatus,
21+
)
22+
23+
from audit_table import get_next_queued_file_details, change_audit_table_status_to_processed
24+
from clients import REGION_NAME
25+
26+
dynamodb_client = boto3_client("dynamodb", region_name=REGION_NAME)
27+
28+
FILE_DETAILS = MockFileDetails.ravs_rsv_1
29+
30+
31+
@mock_dynamodb
32+
@patch.dict("os.environ", MOCK_ENVIRONMENT_DICT)
33+
class TestAuditTable(TestCase):
34+
"""Tests for audit table functions"""
35+
36+
def setUp(self):
37+
"""Set up test values to be used for the tests"""
38+
GenericSetUp(dynamodb_client=dynamodb_client)
39+
40+
def tearDown(self):
41+
"""Tear down the test values"""
42+
GenericTearDown(dynamodb_client=dynamodb_client)
43+
44+
@staticmethod
45+
def get_table_items() -> list:
46+
"""Return all items in the audit table"""
47+
48+
return dynamodb_client.scan(TableName=AUDIT_TABLE_NAME).get("Items", [])
49+
50+
def test_get_next_queued_file_details(self):
51+
"""Test that the get_next_queued_file_details function returns the correct file details"""
52+
# NOTE: Throughout this test the assertions will be checking for the next queued RAVS_RSV file.
53+
queue_to_check = "RAVS_RSV"
54+
55+
# Test case 1: no files in audit table
56+
self.assertIsNone(get_next_queued_file_details(queue_to_check))
57+
58+
# Test case 2: files in audit table, but none of the files are in the RAVS_RSV queue
59+
add_entry_to_table(MockFileDetails.flu_emis, file_status=FileStatus.QUEUED) # different queue
60+
add_entry_to_table(MockFileDetails.rsv_emis, file_status=FileStatus.QUEUED) # different queue
61+
add_entry_to_table(MockFileDetails.ravs_flu, file_status=FileStatus.QUEUED) # different queue
62+
add_entry_to_table(MockFileDetails.ravs_rsv_1, FileStatus.PROCESSED) # same queue but already processed
63+
self.assertIsNone(get_next_queued_file_details(queue_to_check))
64+
65+
# Test case 3: one queued file in the ravs_rsv queue
66+
add_entry_to_table(MockFileDetails.ravs_rsv_2, file_status=FileStatus.QUEUED)
67+
expected_table_entry = {**MockFileDetails.ravs_rsv_2.audit_table_entry, "status": {"S": FileStatus.QUEUED}}
68+
self.assertEqual(get_next_queued_file_details(queue_to_check), deserialize_dynamodb_types(expected_table_entry))
69+
70+
# # Test case 4: multiple queued files in the RAVS_RSV queue
71+
# Note that ravs_rsv files 3 and 4 have later timestamps than file 2, so file 2 remains the first in the queue
72+
add_entry_to_table(MockFileDetails.ravs_rsv_3, file_status=FileStatus.QUEUED)
73+
add_entry_to_table(MockFileDetails.ravs_rsv_4, file_status=FileStatus.QUEUED)
74+
self.assertEqual(get_next_queued_file_details(queue_to_check), deserialize_dynamodb_types(expected_table_entry))
75+
76+
def test_change_audit_table_status_to_processed(self):
77+
"""Checks audit table correctly updates a record as processed"""
78+
79+
add_entry_to_table(MockFileDetails.rsv_ravs, file_status=FileStatus.QUEUED)
80+
add_entry_to_table(MockFileDetails.flu_emis, file_status=FileStatus.QUEUED)
81+
table_items = dynamodb_client.scan(TableName=AUDIT_TABLE_NAME).get("Items", [])
82+
83+
expected_table_entry = {**MockFileDetails.rsv_ravs.audit_table_entry, "status": {"S": FileStatus.PROCESSED}}
84+
85+
file_key = "RSV_Vaccinations_v5_X26_20210730T12000000.csv"
86+
message_id = "rsv_ravs_test_id_1"
87+
88+
change_audit_table_status_to_processed(file_key, message_id)
89+
table_items = dynamodb_client.scan(TableName=AUDIT_TABLE_NAME).get("Items", [])
90+
91+
self.assertIn(expected_table_entry, table_items)
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
"""Generic setup and teardown for recordprocessor tests"""
2+
3+
from unittest.mock import patch
4+
5+
from tests.utils_for_recordprocessor_tests.mock_environment_variables import (
6+
BucketNames,
7+
MOCK_ENVIRONMENT_DICT,
8+
Sqs,
9+
Firehose,
10+
)
11+
12+
# Ensure environment variables are mocked before importing from src files
13+
with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT):
14+
from clients import REGION_NAME
15+
from constants import AuditTableKeys, AUDIT_TABLE_QUEUE_NAME_GSI, AUDIT_TABLE_FILENAME_GSI, AUDIT_TABLE_NAME
16+
17+
18+
class GenericSetUp:
19+
"""
20+
Performs generic setup of mock resources:
21+
* If s3_client is provided, creates source, destination, config and firehose buckets (firehose bucket is used for
22+
testing only)
23+
* If firehose_client is provided, creates a firehose delivery stream
24+
* If sqs_client is provided, creates the SQS queue
25+
* If dynamodb_client is provided, creates the audit table
26+
"""
27+
28+
def __init__(self, s3_client=None, firehose_client=None, sqs_client=None, dynamodb_client=None):
29+
if s3_client:
30+
for bucket_name in [
31+
BucketNames.SOURCE,
32+
BucketNames.DESTINATION,
33+
BucketNames.CONFIG,
34+
BucketNames.MOCK_FIREHOSE,
35+
]:
36+
s3_client.create_bucket(
37+
Bucket=bucket_name, CreateBucketConfiguration={"LocationConstraint": REGION_NAME}
38+
)
39+
40+
if firehose_client:
41+
firehose_client.create_delivery_stream(
42+
DeliveryStreamName=Firehose.STREAM_NAME,
43+
DeliveryStreamType="DirectPut",
44+
S3DestinationConfiguration={
45+
"RoleARN": "arn:aws:iam::123456789012:role/mock-role",
46+
"BucketARN": "arn:aws:s3:::" + BucketNames.MOCK_FIREHOSE,
47+
"Prefix": "firehose-backup/",
48+
},
49+
)
50+
51+
if sqs_client:
52+
sqs_client.create_queue(QueueName=Sqs.QUEUE_NAME, Attributes=Sqs.ATTRIBUTES)
53+
54+
if dynamodb_client:
55+
dynamodb_client.create_table(
56+
TableName=AUDIT_TABLE_NAME,
57+
KeySchema=[{"AttributeName": AuditTableKeys.MESSAGE_ID, "KeyType": "HASH"}],
58+
AttributeDefinitions=[
59+
{"AttributeName": AuditTableKeys.MESSAGE_ID, "AttributeType": "S"},
60+
{"AttributeName": AuditTableKeys.FILENAME, "AttributeType": "S"},
61+
{"AttributeName": AuditTableKeys.QUEUE_NAME, "AttributeType": "S"},
62+
{"AttributeName": AuditTableKeys.STATUS, "AttributeType": "S"},
63+
],
64+
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
65+
GlobalSecondaryIndexes=[
66+
{
67+
"IndexName": AUDIT_TABLE_FILENAME_GSI,
68+
"KeySchema": [{"AttributeName": AuditTableKeys.FILENAME, "KeyType": "HASH"}],
69+
"Projection": {"ProjectionType": "KEYS_ONLY"},
70+
"ProvisionedThroughput": {"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
71+
},
72+
{
73+
"IndexName": AUDIT_TABLE_QUEUE_NAME_GSI,
74+
"KeySchema": [
75+
{"AttributeName": AuditTableKeys.QUEUE_NAME, "KeyType": "HASH"},
76+
{"AttributeName": AuditTableKeys.STATUS, "KeyType": "RANGE"},
77+
],
78+
"Projection": {"ProjectionType": "ALL"},
79+
"ProvisionedThroughput": {"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
80+
},
81+
],
82+
)
83+
84+
85+
class GenericTearDown:
86+
"""Performs generic tear down of mock resources"""
87+
88+
def __init__(self, s3_client=None, firehose_client=None, sqs_client=None, dynamodb_client=None):
89+
90+
if s3_client:
91+
for bucket_name in [
92+
BucketNames.SOURCE,
93+
BucketNames.DESTINATION,
94+
BucketNames.CONFIG,
95+
BucketNames.MOCK_FIREHOSE,
96+
]:
97+
for obj in s3_client.list_objects_v2(Bucket=bucket_name).get("Contents", []):
98+
s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"])
99+
s3_client.delete_bucket(Bucket=bucket_name)
100+
101+
if firehose_client:
102+
firehose_client.delete_delivery_stream(DeliveryStreamName=Firehose.STREAM_NAME)
103+
104+
if sqs_client:
105+
sqs_client.delete_queue(QueueUrl=Sqs.TEST_QUEUE_URL)
106+
107+
if dynamodb_client:
108+
dynamodb_client.delete_table(TableName=AUDIT_TABLE_NAME)

recordprocessor/tests/utils_for_recordprocessor_tests/mock_environment_variables.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@ class Firehose:
2323
STREAM_NAME = "immunisation-fhir-api-internal-dev-splunk-firehose"
2424

2525

26+
class Sqs:
27+
"""Class to hold SQS values for use in tests"""
28+
29+
ATTRIBUTES = {"FifoQueue": "true", "ContentBasedDeduplication": "true"}
30+
QUEUE_NAME = "imms-batch-internal-dev-metadata-queue.fifo"
31+
TEST_QUEUE_URL = f"https://sqs.{REGION_NAME}.amazonaws.com/999999999/{QUEUE_NAME}"
32+
33+
2634
MOCK_ENVIRONMENT_DICT = {
2735
"ENVIRONMENT": "internal-dev",
2836
"LOCAL_ACCOUNT_ID": "123456789012",
@@ -32,4 +40,5 @@ class Firehose:
3240
"KINESIS_STREAM_NAME": Kinesis.STREAM_NAME,
3341
"KINESIS_STREAM_ARN": f"arn:aws:kinesis:{REGION_NAME}:123456789012:stream/{Kinesis.STREAM_NAME}",
3442
"FIREHOSE_STREAM_NAME": Firehose.STREAM_NAME,
43+
"AUDIT_TABLE_NAME": "immunisation-batch-internal-dev-audit-table",
3544
}

recordprocessor/tests/utils_for_recordprocessor_tests/utils_for_recordprocessor_tests.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,20 @@
11
"""Utils for the recordprocessor tests"""
22

3-
from csv import DictReader
43
from io import StringIO
5-
from tests.utils_for_recordprocessor_tests.values_for_recordprocessor_tests import REGION_NAME
64
from tests.utils_for_recordprocessor_tests.mock_environment_variables import BucketNames, Firehose, Kinesis
5+
from tests.utils_for_recordprocessor_tests.values_for_recordprocessor_tests import MockFileDetails, FileDetails
6+
from boto3.dynamodb.types import TypeDeserializer
7+
from boto3 import client as boto3_client
8+
from unittest.mock import patch
9+
from tests.utils_for_recordprocessor_tests.mock_environment_variables import MOCK_ENVIRONMENT_DICT
10+
11+
# Ensure environment variables are mocked before importing from src files
12+
with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT):
13+
from clients import REGION_NAME
14+
from csv import DictReader
15+
from constants import AuditTableKeys, AUDIT_TABLE_NAME, FileStatus
16+
17+
dynamodb_client = boto3_client("dynamodb", region_name=REGION_NAME)
718

819

920
def convert_string_to_dict_reader(data_string: str):
@@ -69,3 +80,25 @@ def __init__(self, s3_client=None, firehose_client=None, kinesis_client=None):
6980
kinesis_client.delete_stream(StreamName=Kinesis.STREAM_NAME, EnforceConsumerDeletion=True)
7081
except kinesis_client.exceptions.ResourceNotFoundException:
7182
pass
83+
84+
85+
def add_entry_to_table(file_details: MockFileDetails, file_status: FileStatus) -> None:
86+
"""Add an entry to the audit table"""
87+
audit_table_entry = {**file_details.audit_table_entry, "status": {"S": file_status}}
88+
dynamodb_client.put_item(TableName=AUDIT_TABLE_NAME, Item=audit_table_entry)
89+
90+
91+
def deserialize_dynamodb_types(dynamodb_table_entry_with_types):
92+
"""
93+
Convert a dynamodb table entry with types to a table entry without types
94+
e.g. {'Attr1': {'S': 'val1'}, 'Attr2': {'N': 'val2'}} becomes {'Attr1': 'val1'}
95+
"""
96+
return {k: TypeDeserializer().deserialize(v) for k, v in dynamodb_table_entry_with_types.items()}
97+
98+
99+
def assert_audit_table_entry(file_details: FileDetails, expected_status: FileStatus) -> None:
100+
"""Assert that the file details are in the audit table"""
101+
table_entry = dynamodb_client.get_item(
102+
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": file_details.message_id}}
103+
).get("Item")
104+
assert table_entry == {**file_details.audit_table_entry, "status": {"S": expected_status}}

recordprocessor/tests/utils_for_recordprocessor_tests/values_for_recordprocessor_tests.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
from tests.utils_for_recordprocessor_tests.mock_environment_variables import MOCK_ENVIRONMENT_DICT
77

88
with patch("os.environ", MOCK_ENVIRONMENT_DICT):
9-
from constants import Urls
9+
from constants import Urls, AuditTableKeys
1010
from mappings import Vaccine
1111

12+
1213
REGION_NAME = "eu-west-2"
1314

1415

@@ -135,9 +136,9 @@ class FileDetails:
135136
vaccine type.
136137
"""
137138

138-
def __init__(self, vaccine_type: str, supplier: str, ods_code: str):
139+
def __init__(self, vaccine_type: str, supplier: str, ods_code: str, file_number: int = 1):
139140
self.name = f"{vaccine_type.upper()}/ {supplier.upper()} file"
140-
self.created_at_formatted_string = "20211120T12000000"
141+
self.created_at_formatted_string = f"202{file_number}1120T12000000"
141142
self.file_key = f"{vaccine_type}_Vaccinations_v5_{ods_code}_20210730T12000000.csv"
142143
self.inf_ack_file_key = (
143144
f"ack/{vaccine_type}_Vaccinations_v5_{ods_code}_20210730T12000000"
@@ -147,7 +148,9 @@ def __init__(self, vaccine_type: str, supplier: str, ods_code: str):
147148
self.vaccine_type = vaccine_type
148149
self.ods_code = ods_code
149150
self.supplier = supplier
151+
self.file_date_and_time_string = f"20000101T0000000{file_number}"
150152
self.message_id = f"{vaccine_type.lower()}_{supplier.lower()}_test_id"
153+
self.message_id_order = f"{vaccine_type.lower()}_{supplier.lower()}_test_id_{file_number}"
151154
self.full_permissions_list = [f"{vaccine_type}_FULL"]
152155
self.create_permissions_only = [f"{vaccine_type}_CREATE"]
153156
self.update_permissions_only = [f"{vaccine_type}_UPDATE"]
@@ -173,13 +176,26 @@ def __init__(self, vaccine_type: str, supplier: str, ods_code: str):
173176
self.event_update_permissions_only = json.dumps(self.event_update_permissions_only_dict)
174177
self.event_delete_permissions_only = json.dumps(self.event_delete_permissions_only_dict)
175178

179+
self.audit_table_entry = {
180+
AuditTableKeys.MESSAGE_ID: {"S": self.message_id_order},
181+
AuditTableKeys.FILENAME: {"S": self.file_key},
182+
AuditTableKeys.QUEUE_NAME: {"S": self.queue_name},
183+
AuditTableKeys.TIMESTAMP: {"S": self.created_at_formatted_string},
184+
}
185+
176186

177187
class MockFileDetails:
178188
"""Class containing mock file details for use in tests"""
179189

190+
ravs_rsv_1 = FileDetails("RSV", "RAVS", "X26", file_number=1)
191+
ravs_rsv_2 = FileDetails("RSV", "RAVS", "X26", file_number=2)
192+
ravs_rsv_3 = FileDetails("RSV", "RAVS", "X26", file_number=3)
193+
ravs_rsv_4 = FileDetails("RSV", "RAVS", "X26", file_number=4)
194+
ravs_rsv_5 = FileDetails("RSV", "RAVS", "X26", file_number=5)
180195
rsv_ravs = FileDetails("RSV", "RAVS", "X26")
181196
rsv_emis = FileDetails("RSV", "EMIS", "8HK48")
182197
flu_emis = FileDetails("FLU", "EMIS", "YGM41")
198+
ravs_flu = FileDetails("FLU", "RSV", "X26")
183199

184200

185201
class UnorderedFieldDictionaries:

0 commit comments

Comments
 (0)