Skip to content

Commit 0ea9e8e

Browse files
authored
Merge branch 'master' into VED-39-Read-response-identifier
2 parents cbb2782 + 8324e54 commit 0ea9e8e

File tree

6 files changed

+297
-6
lines changed

6 files changed

+297
-6
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
"""Tests for audit_table functions"""
2+
3+
from unittest import TestCase
4+
from unittest.mock import patch
5+
from boto3 import client as boto3_client
6+
from moto import mock_dynamodb
7+
from errors import UnhandledAuditTableError
8+
9+
from tests.utils_for_recordprocessor_tests.mock_environment_variables import MOCK_ENVIRONMENT_DICT
10+
from tests.utils_for_recordprocessor_tests.generic_setup_and_teardown import GenericSetUp, GenericTearDown
11+
from tests.utils_for_recordprocessor_tests.values_for_recordprocessor_tests import MockFileDetails, FileDetails
12+
from tests.utils_for_recordprocessor_tests.utils_for_recordprocessor_tests import (
13+
deserialize_dynamodb_types,
14+
add_entry_to_table,
15+
)
16+
17+
# Ensure environment variables are mocked before importing from src files
18+
with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT):
19+
from constants import (
20+
AUDIT_TABLE_NAME,
21+
FileStatus,
22+
)
23+
24+
from audit_table import get_next_queued_file_details, change_audit_table_status_to_processed
25+
from clients import REGION_NAME
26+
27+
28+
dynamodb_client = boto3_client("dynamodb", region_name=REGION_NAME)
29+
30+
FILE_DETAILS = MockFileDetails.ravs_rsv_1
31+
32+
33+
@mock_dynamodb
34+
@patch.dict("os.environ", MOCK_ENVIRONMENT_DICT)
35+
class TestAuditTable(TestCase):
36+
"""Tests for audit table functions"""
37+
38+
def setUp(self):
39+
"""Set up test values to be used for the tests"""
40+
GenericSetUp(dynamodb_client=dynamodb_client)
41+
42+
def tearDown(self):
43+
"""Tear down the test values"""
44+
GenericTearDown(dynamodb_client=dynamodb_client)
45+
46+
@staticmethod
47+
def get_table_items() -> list:
48+
"""Return all items in the audit table"""
49+
50+
return dynamodb_client.scan(TableName=AUDIT_TABLE_NAME).get("Items", [])
51+
52+
def test_get_next_queued_file_details(self):
53+
"""Test that the get_next_queued_file_details function returns the correct file details"""
54+
# NOTE: Throughout this test the assertions will be checking for the next queued RAVS_RSV file.
55+
queue_to_check = "RAVS_RSV"
56+
57+
# Test case 1: no files in audit table
58+
self.assertIsNone(get_next_queued_file_details(queue_to_check))
59+
60+
# Test case 2: files in audit table, but none of the files are in the RAVS_RSV queue
61+
add_entry_to_table(MockFileDetails.flu_emis, file_status=FileStatus.QUEUED) # different queue
62+
add_entry_to_table(MockFileDetails.rsv_emis, file_status=FileStatus.QUEUED) # different queue
63+
add_entry_to_table(MockFileDetails.ravs_flu, file_status=FileStatus.QUEUED) # different queue
64+
add_entry_to_table(MockFileDetails.ravs_rsv_1, FileStatus.PROCESSED) # same queue but already processed
65+
self.assertIsNone(get_next_queued_file_details(queue_to_check))
66+
67+
# Test case 3: one queued file in the ravs_rsv queue
68+
add_entry_to_table(MockFileDetails.ravs_rsv_2, file_status=FileStatus.QUEUED)
69+
expected_table_entry = {**MockFileDetails.ravs_rsv_2.audit_table_entry, "status": {"S": FileStatus.QUEUED}}
70+
self.assertEqual(get_next_queued_file_details(queue_to_check), deserialize_dynamodb_types(expected_table_entry))
71+
72+
# # Test case 4: multiple queued files in the RAVS_RSV queue
73+
# Note that ravs_rsv files 3 and 4 have later timestamps than file 2, so file 2 remains the first in the queue
74+
add_entry_to_table(MockFileDetails.ravs_rsv_3, file_status=FileStatus.QUEUED)
75+
add_entry_to_table(MockFileDetails.ravs_rsv_4, file_status=FileStatus.QUEUED)
76+
self.assertEqual(get_next_queued_file_details(queue_to_check), deserialize_dynamodb_types(expected_table_entry))
77+
78+
def test_change_audit_table_status_to_processed(self):
79+
"""Checks audit table correctly updates a record as processed"""
80+
# Test case 1: file should be updated with status of 'Processed'.
81+
82+
add_entry_to_table(MockFileDetails.rsv_ravs, file_status=FileStatus.QUEUED)
83+
add_entry_to_table(MockFileDetails.flu_emis, file_status=FileStatus.QUEUED)
84+
table_items = dynamodb_client.scan(TableName=AUDIT_TABLE_NAME).get("Items", [])
85+
86+
expected_table_entry = {**MockFileDetails.rsv_ravs.audit_table_entry, "status": {"S": FileStatus.PROCESSED}}
87+
ravs_rsv_test_file = FileDetails("RSV", "RAVS", "X26")
88+
file_key = ravs_rsv_test_file.file_key
89+
message_id = ravs_rsv_test_file.message_id_order
90+
91+
change_audit_table_status_to_processed(file_key, message_id)
92+
table_items = dynamodb_client.scan(TableName=AUDIT_TABLE_NAME).get("Items", [])
93+
94+
self.assertIn(expected_table_entry, table_items)
95+
96+
# Test case 2: # Audit table status should not be updated. Error should be raised.
97+
emis_flu_test_file_2 = FileDetails("FLU", "EMIS", "YGM41")
98+
99+
message_id = emis_flu_test_file_2.message_id
100+
file_key = (emis_flu_test_file_2.file_key,)
101+
with self.assertRaises(UnhandledAuditTableError):
102+
change_audit_table_status_to_processed(file_key, message_id)
103+
104+
# Test case 3: # Audit table status should updated to processed for all values.
105+
message_id = emis_flu_test_file_2.message_id_order
106+
file_key = emis_flu_test_file_2.file_key
107+
change_audit_table_status_to_processed(file_key, message_id)
108+
table_items = dynamodb_client.scan(TableName=AUDIT_TABLE_NAME).get("Items", [])

recordprocessor/tests/test_utils_for_recordprocessor.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
with patch("os.environ", MOCK_ENVIRONMENT_DICT):
1818
from utils_for_recordprocessor import get_environment, get_csv_content_dict_reader, create_diagnostics_dictionary
19-
19+
from file_level_validation import move_file
2020

2121
s3_client = boto3.client("s3", region_name=REGION_NAME)
2222
test_file = MockFileDetails.rsv_emis
@@ -76,6 +76,23 @@ def test_create_diagnostics_dictionary(self):
7676
},
7777
)
7878

79+
def test_move_file(self):
80+
"""Tests that move_file correctly moves a file from one location to another within a single S3 bucket"""
81+
source_file_key = "test_file_key"
82+
destination_file_key = "archive/test_file_key"
83+
source_file_content = "test_content"
84+
s3_client.put_object(Bucket=BucketNames.SOURCE, Key=source_file_key, Body=source_file_content)
85+
86+
move_file(BucketNames.SOURCE, source_file_key, destination_file_key)
87+
88+
keys_of_objects_in_bucket = [
89+
obj["Key"] for obj in s3_client.list_objects_v2(Bucket=BucketNames.SOURCE).get("Contents")
90+
]
91+
self.assertNotIn(source_file_key, keys_of_objects_in_bucket)
92+
self.assertIn(destination_file_key, keys_of_objects_in_bucket)
93+
destination_file_content = s3_client.get_object(Bucket=BucketNames.SOURCE, Key=destination_file_key)
94+
self.assertEqual(destination_file_content["Body"].read().decode("utf-8"), source_file_content)
95+
7996

8097
if __name__ == "__main__":
8198
unittest.main()
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
"""Generic setup and teardown for recordprocessor tests"""
2+
3+
from unittest.mock import patch
4+
5+
from tests.utils_for_recordprocessor_tests.mock_environment_variables import (
6+
BucketNames,
7+
MOCK_ENVIRONMENT_DICT,
8+
Sqs,
9+
Firehose,
10+
)
11+
12+
# Ensure environment variables are mocked before importing from src files
13+
with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT):
14+
from clients import REGION_NAME
15+
from constants import AuditTableKeys, AUDIT_TABLE_QUEUE_NAME_GSI, AUDIT_TABLE_FILENAME_GSI, AUDIT_TABLE_NAME
16+
17+
18+
class GenericSetUp:
19+
"""
20+
Performs generic setup of mock resources:
21+
* If s3_client is provided, creates source, destination, config and firehose buckets (firehose bucket is used for
22+
testing only)
23+
* If firehose_client is provided, creates a firehose delivery stream
24+
* If sqs_client is provided, creates the SQS queue
25+
* If dynamodb_client is provided, creates the audit table
26+
"""
27+
28+
def __init__(self, s3_client=None, firehose_client=None, sqs_client=None, dynamodb_client=None):
29+
if s3_client:
30+
for bucket_name in [
31+
BucketNames.SOURCE,
32+
BucketNames.DESTINATION,
33+
BucketNames.CONFIG,
34+
BucketNames.MOCK_FIREHOSE,
35+
]:
36+
s3_client.create_bucket(
37+
Bucket=bucket_name, CreateBucketConfiguration={"LocationConstraint": REGION_NAME}
38+
)
39+
40+
if firehose_client:
41+
firehose_client.create_delivery_stream(
42+
DeliveryStreamName=Firehose.STREAM_NAME,
43+
DeliveryStreamType="DirectPut",
44+
S3DestinationConfiguration={
45+
"RoleARN": "arn:aws:iam::123456789012:role/mock-role",
46+
"BucketARN": "arn:aws:s3:::" + BucketNames.MOCK_FIREHOSE,
47+
"Prefix": "firehose-backup/",
48+
},
49+
)
50+
51+
if sqs_client:
52+
sqs_client.create_queue(QueueName=Sqs.QUEUE_NAME, Attributes=Sqs.ATTRIBUTES)
53+
54+
if dynamodb_client:
55+
dynamodb_client.create_table(
56+
TableName=AUDIT_TABLE_NAME,
57+
KeySchema=[{"AttributeName": AuditTableKeys.MESSAGE_ID, "KeyType": "HASH"}],
58+
AttributeDefinitions=[
59+
{"AttributeName": AuditTableKeys.MESSAGE_ID, "AttributeType": "S"},
60+
{"AttributeName": AuditTableKeys.FILENAME, "AttributeType": "S"},
61+
{"AttributeName": AuditTableKeys.QUEUE_NAME, "AttributeType": "S"},
62+
{"AttributeName": AuditTableKeys.STATUS, "AttributeType": "S"},
63+
],
64+
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
65+
GlobalSecondaryIndexes=[
66+
{
67+
"IndexName": AUDIT_TABLE_FILENAME_GSI,
68+
"KeySchema": [{"AttributeName": AuditTableKeys.FILENAME, "KeyType": "HASH"}],
69+
"Projection": {"ProjectionType": "KEYS_ONLY"},
70+
"ProvisionedThroughput": {"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
71+
},
72+
{
73+
"IndexName": AUDIT_TABLE_QUEUE_NAME_GSI,
74+
"KeySchema": [
75+
{"AttributeName": AuditTableKeys.QUEUE_NAME, "KeyType": "HASH"},
76+
{"AttributeName": AuditTableKeys.STATUS, "KeyType": "RANGE"},
77+
],
78+
"Projection": {"ProjectionType": "ALL"},
79+
"ProvisionedThroughput": {"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
80+
},
81+
],
82+
)
83+
84+
85+
class GenericTearDown:
86+
"""Performs generic tear down of mock resources"""
87+
88+
def __init__(self, s3_client=None, firehose_client=None, sqs_client=None, dynamodb_client=None):
89+
90+
if s3_client:
91+
for bucket_name in [
92+
BucketNames.SOURCE,
93+
BucketNames.DESTINATION,
94+
BucketNames.CONFIG,
95+
BucketNames.MOCK_FIREHOSE,
96+
]:
97+
for obj in s3_client.list_objects_v2(Bucket=bucket_name).get("Contents", []):
98+
s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"])
99+
s3_client.delete_bucket(Bucket=bucket_name)
100+
101+
if firehose_client:
102+
firehose_client.delete_delivery_stream(DeliveryStreamName=Firehose.STREAM_NAME)
103+
104+
if sqs_client:
105+
sqs_client.delete_queue(QueueUrl=Sqs.TEST_QUEUE_URL)
106+
107+
if dynamodb_client:
108+
dynamodb_client.delete_table(TableName=AUDIT_TABLE_NAME)

recordprocessor/tests/utils_for_recordprocessor_tests/mock_environment_variables.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@ class Firehose:
2323
STREAM_NAME = "immunisation-fhir-api-internal-dev-splunk-firehose"
2424

2525

26+
class Sqs:
27+
"""Class to hold SQS values for use in tests"""
28+
29+
ATTRIBUTES = {"FifoQueue": "true", "ContentBasedDeduplication": "true"}
30+
QUEUE_NAME = "imms-batch-internal-dev-metadata-queue.fifo"
31+
TEST_QUEUE_URL = f"https://sqs.{REGION_NAME}.amazonaws.com/999999999/{QUEUE_NAME}"
32+
33+
2634
MOCK_ENVIRONMENT_DICT = {
2735
"ENVIRONMENT": "internal-dev",
2836
"LOCAL_ACCOUNT_ID": "123456789012",
@@ -32,4 +40,5 @@ class Firehose:
3240
"KINESIS_STREAM_NAME": Kinesis.STREAM_NAME,
3341
"KINESIS_STREAM_ARN": f"arn:aws:kinesis:{REGION_NAME}:123456789012:stream/{Kinesis.STREAM_NAME}",
3442
"FIREHOSE_STREAM_NAME": Firehose.STREAM_NAME,
43+
"AUDIT_TABLE_NAME": "immunisation-batch-internal-dev-audit-table",
3544
}

recordprocessor/tests/utils_for_recordprocessor_tests/utils_for_recordprocessor_tests.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,20 @@
11
"""Utils for the recordprocessor tests"""
22

3-
from csv import DictReader
43
from io import StringIO
5-
from tests.utils_for_recordprocessor_tests.values_for_recordprocessor_tests import REGION_NAME
64
from tests.utils_for_recordprocessor_tests.mock_environment_variables import BucketNames, Firehose, Kinesis
5+
from tests.utils_for_recordprocessor_tests.values_for_recordprocessor_tests import MockFileDetails, FileDetails
6+
from boto3.dynamodb.types import TypeDeserializer
7+
from boto3 import client as boto3_client
8+
from unittest.mock import patch
9+
from tests.utils_for_recordprocessor_tests.mock_environment_variables import MOCK_ENVIRONMENT_DICT
10+
11+
# Ensure environment variables are mocked before importing from src files
12+
with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT):
13+
from clients import REGION_NAME
14+
from csv import DictReader
15+
from constants import AuditTableKeys, AUDIT_TABLE_NAME, FileStatus
16+
17+
dynamodb_client = boto3_client("dynamodb", region_name=REGION_NAME)
718

819

920
def convert_string_to_dict_reader(data_string: str):
@@ -69,3 +80,25 @@ def __init__(self, s3_client=None, firehose_client=None, kinesis_client=None):
6980
kinesis_client.delete_stream(StreamName=Kinesis.STREAM_NAME, EnforceConsumerDeletion=True)
7081
except kinesis_client.exceptions.ResourceNotFoundException:
7182
pass
83+
84+
85+
def add_entry_to_table(file_details: MockFileDetails, file_status: FileStatus) -> None:
86+
"""Add an entry to the audit table"""
87+
audit_table_entry = {**file_details.audit_table_entry, "status": {"S": file_status}}
88+
dynamodb_client.put_item(TableName=AUDIT_TABLE_NAME, Item=audit_table_entry)
89+
90+
91+
def deserialize_dynamodb_types(dynamodb_table_entry_with_types):
92+
"""
93+
Convert a dynamodb table entry with types to a table entry without types
94+
e.g. {'Attr1': {'S': 'val1'}, 'Attr2': {'N': 'val2'}} becomes {'Attr1': 'val1'}
95+
"""
96+
return {k: TypeDeserializer().deserialize(v) for k, v in dynamodb_table_entry_with_types.items()}
97+
98+
99+
def assert_audit_table_entry(file_details: FileDetails, expected_status: FileStatus) -> None:
100+
"""Assert that the file details are in the audit table"""
101+
table_entry = dynamodb_client.get_item(
102+
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": file_details.message_id}}
103+
).get("Item")
104+
assert table_entry == {**file_details.audit_table_entry, "status": {"S": expected_status}}

recordprocessor/tests/utils_for_recordprocessor_tests/values_for_recordprocessor_tests.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
from tests.utils_for_recordprocessor_tests.mock_environment_variables import MOCK_ENVIRONMENT_DICT
77

88
with patch("os.environ", MOCK_ENVIRONMENT_DICT):
9-
from constants import Urls
9+
from constants import Urls, AuditTableKeys
1010
from mappings import Vaccine
1111

12+
1213
REGION_NAME = "eu-west-2"
1314

1415

@@ -135,9 +136,9 @@ class FileDetails:
135136
vaccine type.
136137
"""
137138

138-
def __init__(self, vaccine_type: str, supplier: str, ods_code: str):
139+
def __init__(self, vaccine_type: str, supplier: str, ods_code: str, file_number: int = 1):
139140
self.name = f"{vaccine_type.upper()}/ {supplier.upper()} file"
140-
self.created_at_formatted_string = "20211120T12000000"
141+
self.created_at_formatted_string = f"202{file_number}1120T12000000"
141142
self.file_key = f"{vaccine_type}_Vaccinations_v5_{ods_code}_20210730T12000000.csv"
142143
self.inf_ack_file_key = (
143144
f"ack/{vaccine_type}_Vaccinations_v5_{ods_code}_20210730T12000000"
@@ -147,7 +148,9 @@ def __init__(self, vaccine_type: str, supplier: str, ods_code: str):
147148
self.vaccine_type = vaccine_type
148149
self.ods_code = ods_code
149150
self.supplier = supplier
151+
self.file_date_and_time_string = f"20000101T0000000{file_number}"
150152
self.message_id = f"{vaccine_type.lower()}_{supplier.lower()}_test_id"
153+
self.message_id_order = f"{vaccine_type.lower()}_{supplier.lower()}_test_id_{file_number}"
151154
self.full_permissions_list = [f"{vaccine_type}_FULL"]
152155
self.create_permissions_only = [f"{vaccine_type}_CREATE"]
153156
self.update_permissions_only = [f"{vaccine_type}_UPDATE"]
@@ -173,13 +176,26 @@ def __init__(self, vaccine_type: str, supplier: str, ods_code: str):
173176
self.event_update_permissions_only = json.dumps(self.event_update_permissions_only_dict)
174177
self.event_delete_permissions_only = json.dumps(self.event_delete_permissions_only_dict)
175178

179+
self.audit_table_entry = {
180+
AuditTableKeys.MESSAGE_ID: {"S": self.message_id_order},
181+
AuditTableKeys.FILENAME: {"S": self.file_key},
182+
AuditTableKeys.QUEUE_NAME: {"S": self.queue_name},
183+
AuditTableKeys.TIMESTAMP: {"S": self.created_at_formatted_string},
184+
}
185+
176186

177187
class MockFileDetails:
178188
"""Class containing mock file details for use in tests"""
179189

190+
ravs_rsv_1 = FileDetails("RSV", "RAVS", "X26", file_number=1)
191+
ravs_rsv_2 = FileDetails("RSV", "RAVS", "X26", file_number=2)
192+
ravs_rsv_3 = FileDetails("RSV", "RAVS", "X26", file_number=3)
193+
ravs_rsv_4 = FileDetails("RSV", "RAVS", "X26", file_number=4)
194+
ravs_rsv_5 = FileDetails("RSV", "RAVS", "X26", file_number=5)
180195
rsv_ravs = FileDetails("RSV", "RAVS", "X26")
181196
rsv_emis = FileDetails("RSV", "EMIS", "8HK48")
182197
flu_emis = FileDetails("FLU", "EMIS", "YGM41")
198+
ravs_flu = FileDetails("FLU", "RSV", "X26")
183199

184200

185201
class UnorderedFieldDictionaries:

0 commit comments

Comments
 (0)