Skip to content

Commit 5207702

Browse files
committed
VED-470: Stream records in record processor instead of loading everything into memory.
1 parent d51a18a commit 5207702

12 files changed

+70
-148
lines changed

recordprocessor/src/batch_processing.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
2020
interim_message_body = file_level_validation(incoming_message_body=incoming_message_body)
2121
except (InvalidHeaders, NoOperationPermissions, Exception): # pylint: disable=broad-exception-caught
2222
# If the file is invalid, processing should cease immediately
23-
return None
23+
return
2424

2525
file_id = interim_message_body.get("message_id")
2626
vaccine = interim_message_body.get("vaccine")

recordprocessor/src/constants.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ class Urls:
8787
VACCINATION_PROCEDURE = "https://fhir.hl7.org.uk/StructureDefinition/Extension-UKCore-VaccinationProcedure"
8888

8989

90-
class ActionFlag(StrEnum):
91-
CREATE = "NEW"
90+
class Operation(StrEnum):
91+
CREATE = "CREATE"
9292
UPDATE = "UPDATE"
9393
DELETE = "DELETE"
9494

@@ -99,8 +99,8 @@ class Permission(StrEnum):
9999
DELETE = "D"
100100

101101

102-
permission_to_action_flag_map = {
103-
Permission.CREATE: ActionFlag.CREATE,
104-
Permission.UPDATE: ActionFlag.UPDATE,
105-
Permission.DELETE: ActionFlag.DELETE
106-
}
102+
permission_to_operation_map = {
103+
Permission.CREATE: Operation.CREATE,
104+
Permission.UPDATE: Operation.UPDATE,
105+
Permission.DELETE: Operation.DELETE
106+
}

recordprocessor/src/file_level_validation.py

Lines changed: 9 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@
22
Functions for completing file-level validation
33
(validating headers and ensuring that the supplier has permission to perform at least one of the requested operations)
44
"""
5-
6-
from unique_permission import get_unique_action_flags_from_s3
75
from clients import logger, s3_client
86
from make_and_upload_ack_file import make_and_upload_ack_file
97
from utils_for_recordprocessor import get_csv_content_dict_reader, invoke_filename_lambda
108
from errors import InvalidHeaders, NoOperationPermissions
119
from logging_decorator import file_level_validation_logging_decorator
1210
from audit_table import change_audit_table_status_to_processed, get_next_queued_file_details
13-
from constants import SOURCE_BUCKET_NAME, EXPECTED_CSV_HEADERS, permission_to_action_flag_map, ActionFlag, Permission
11+
from constants import SOURCE_BUCKET_NAME, EXPECTED_CSV_HEADERS, permission_to_operation_map, Permission
1412

1513

1614
def validate_content_headers(csv_content_reader) -> None:
@@ -19,25 +17,9 @@ def validate_content_headers(csv_content_reader) -> None:
1917
raise InvalidHeaders("File headers are invalid.")
2018

2119

22-
def validate_action_flag_permissions(
23-
supplier: str, vaccine_type: str, allowed_permissions_list: list, csv_data: str
20+
def get_permitted_operations(
21+
supplier: str, vaccine_type: str, allowed_permissions_list: list
2422
) -> set:
25-
"""
26-
Validates that the supplier has permission to perform at least one of the requested operations for the given
27-
vaccine type and returns the set of allowed operations for that vaccine type.
28-
Raises a NoPermissionsError if the supplier does not have permission to perform any of the requested operations.
29-
"""
30-
31-
# Get unique ACTION_FLAG values from the S3 file
32-
required_action_flags = get_unique_action_flags_from_s3(csv_data)
33-
34-
valid_action_flag_values = {flag.value for flag in ActionFlag}
35-
required_action_flags = required_action_flags & valid_action_flag_values # intersection
36-
37-
if not required_action_flags:
38-
logger.warning("No valid ACTION_FLAGs found in file. Skipping permission validation.")
39-
return set()
40-
4123
# Check if supplier has permission for the subject vaccine type and extract permissions
4224
permission_strs_for_vaccine_type = {
4325
permission_str
@@ -54,24 +36,17 @@ def validate_action_flag_permissions(
5436
}
5537

5638
# Map Permission key to action flag
57-
permitted_action_flags_for_vaccine_type = {
58-
permission_to_action_flag_map[permission].value
39+
permitted_operations_for_vaccine_type = {
40+
permission_to_operation_map[permission].value
5941
for permission in permissions_for_vaccine_type
6042
}
6143

62-
if not required_action_flags.intersection(permitted_action_flags_for_vaccine_type):
44+
if not permitted_operations_for_vaccine_type:
6345
raise NoOperationPermissions(
6446
f"{supplier} does not have permissions to perform any of the requested actions."
6547
)
6648

67-
logger.info(
68-
"%s permissions %s match one of the requested permissions required to %s",
69-
supplier,
70-
allowed_permissions_list,
71-
permitted_action_flags_for_vaccine_type,
72-
)
73-
74-
return {permission.name for permission in permissions_for_vaccine_type}
49+
return permitted_operations_for_vaccine_type
7550

7651

7752
def move_file(bucket_name: str, source_file_key: str, destination_file_key: str) -> None:
@@ -103,12 +78,12 @@ def file_level_validation(incoming_message_body: dict) -> dict:
10378
created_at_formatted_string = incoming_message_body.get("created_at_formatted_string")
10479

10580
# Fetch the data
106-
csv_reader, csv_data = get_csv_content_dict_reader(file_key)
81+
csv_reader = get_csv_content_dict_reader(file_key)
10782

10883
validate_content_headers(csv_reader)
10984

11085
# Validate has permission to perform at least one of the requested actions
111-
allowed_operations_set = validate_action_flag_permissions(supplier, vaccine, permission, csv_data)
86+
allowed_operations_set = get_permitted_operations(supplier, vaccine, permission)
11287

11388
make_and_upload_ack_file(message_id, file_key, True, True, created_at_formatted_string)
11489

recordprocessor/src/unique_permission.py

Lines changed: 0 additions & 31 deletions
This file was deleted.

recordprocessor/src/utils_for_recordprocessor.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import os
44
import json
55
from csv import DictReader
6-
from io import StringIO
6+
from io import StringIO, TextIOWrapper
77
from clients import s3_client, lambda_client, logger
88
from constants import SOURCE_BUCKET_NAME, FILE_NAME_PROC_LAMBDA_NAME
99

@@ -15,12 +15,12 @@ def get_environment() -> str:
1515
return _env if _env in ["internal-dev", "int", "ref", "sandbox", "prod"] else "internal-dev"
1616

1717

18-
def get_csv_content_dict_reader(file_key: str) -> (DictReader, str):
18+
def get_csv_content_dict_reader(file_key: str) -> DictReader:
1919
"""Returns the requested file contents from the source bucket in the form of a DictReader"""
2020
response = s3_client.get_object(Bucket=os.getenv("SOURCE_BUCKET_NAME"), Key=file_key)
21-
# TODO - this reads everything into memory! Look into streaming instead
22-
csv_data = response["Body"].read().decode("utf-8")
23-
return DictReader(StringIO(csv_data), delimiter="|"), csv_data
21+
binary_io = response["Body"]
22+
text_io = TextIOWrapper(binary_io, encoding="utf-8")
23+
return DictReader(text_io, delimiter="|")
2424

2525

2626
def create_diagnostics_dictionary(error_type, status_code, error_message) -> dict:

recordprocessor/tests/test_file_level_validation.py

Lines changed: 20 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
with patch("os.environ", MOCK_ENVIRONMENT_DICT):
1313
from errors import NoOperationPermissions, InvalidHeaders
14-
from file_level_validation import validate_content_headers, validate_action_flag_permissions
14+
from file_level_validation import validate_content_headers, get_permitted_operations
1515

1616

1717
test_file = MockFileDetails.rsv_emis
@@ -43,80 +43,56 @@ def test_validate_content_headers(self):
4343
with self.assertRaises(InvalidHeaders):
4444
validate_content_headers(test_data)
4545

46-
def test_validate_action_flag_permissions(self):
47-
"""
48-
Tests that validate_action_flag_permissions returns True if supplier has permissions to perform at least one
49-
of the requested CRUD operations for the given vaccine type, and False otherwise
50-
"""
46+
def test_get_permitted_operations(self):
5147
# Set up test file content. Note that ValidFileContent has action flags in lower case
52-
valid_file_content = ValidMockFileContent.with_new_and_update
53-
valid_content_new_and_update_lowercase = valid_file_content
54-
valid_content_new_and_update_uppercase = valid_file_content.replace("new", "NEW").replace("update", "UPDATE")
55-
valid_content_new_and_update_mixedcase = valid_file_content.replace("new", "New").replace("update", "uPdAte")
56-
valid_content_new_and_delete_lowercase = valid_file_content.replace("update", "delete")
57-
valid_content_update_and_delete_lowercase = valid_file_content.replace("new", "delete").replace(
58-
"update", "UPDATE"
59-
)
6048

6149
# Case: Supplier has permissions to perform at least one of the requested operations
6250
# Test case tuples are stuctured as (vaccine_type, vaccine_permissions, file_content, expected_output)
6351
test_cases = [
6452
# FLU, full permissions, lowercase action flags
65-
("FLU", ["FLU.CRUD"], valid_content_new_and_update_lowercase, {"CREATE", "UPDATE", "DELETE"}),
53+
("FLU", ["FLU.CRUD"], {"CREATE", "UPDATE", "DELETE"}),
6654
# FLU, partial permissions, uppercase action flags
67-
("FLU", ["FLU.C"], valid_content_new_and_update_uppercase, {"CREATE"}),
55+
("FLU", ["FLU.C"], {"CREATE"}),
6856
# FLU, full permissions, mixed case action flags
69-
("FLU", ["FLU.CRUD"], valid_content_new_and_update_mixedcase, {"CREATE", "UPDATE", "DELETE"}),
57+
("FLU", ["FLU.CRUD"], {"CREATE", "UPDATE", "DELETE"}),
7058
# FLU, partial permissions (create)
71-
("FLU", ["FLU.D", "FLU.C"], valid_content_new_and_update_lowercase, {"CREATE", "DELETE"}),
59+
("FLU", ["FLU.D", "FLU.C"], {"CREATE", "DELETE"}),
7260
# FLU, partial permissions (update)
73-
("FLU", ["FLU.U"], valid_content_new_and_update_lowercase, {"UPDATE"}),
61+
("FLU", ["FLU.U"], {"UPDATE"}),
7462
# FLU, partial permissions (delete)
75-
("FLU", ["FLU.D"], valid_content_new_and_delete_lowercase, {"DELETE"}),
63+
("FLU", ["FLU.D"], {"DELETE"}),
7664
# COVID19, full permissions
77-
("COVID19", ["COVID19.CRUD"], valid_content_new_and_delete_lowercase, {"CREATE", "UPDATE", "DELETE"}),
65+
("COVID19", ["COVID19.CRUD"], {"CREATE", "UPDATE", "DELETE"}),
7866
# COVID19, partial permissions
79-
("COVID19", ["COVID19.U"], valid_content_update_and_delete_lowercase, {"UPDATE"}),
67+
("COVID19", ["COVID19.U"], {"UPDATE"}),
8068
# RSV, full permissions
81-
("RSV", ["RSV.CRUD"], valid_content_new_and_delete_lowercase, {"CREATE", "UPDATE", "DELETE"}),
69+
("RSV", ["RSV.CRUD"], {"CREATE", "UPDATE", "DELETE"}),
8270
# RSV, partial permissions
83-
("RSV", ["RSV.U"], valid_content_update_and_delete_lowercase, {"UPDATE"}),
71+
("RSV", ["RSV.U"], {"UPDATE"}),
8472
# RSV, full permissions, mixed case action flags
85-
("RSV", ["RSV.CRUD"], valid_content_new_and_update_mixedcase, {"CREATE", "UPDATE", "DELETE"}),
73+
("RSV", ["RSV.CRUD"], {"CREATE", "UPDATE", "DELETE"}),
8674
]
8775

88-
for vaccine_type, vaccine_permissions, file_content, expected_output in test_cases:
76+
for vaccine_type, vaccine_permissions, expected_output in test_cases:
8977
with self.subTest(f"Vaccine_type {vaccine_type} - permissions {vaccine_permissions}"):
9078
self.assertEqual(
91-
validate_action_flag_permissions("TEST_SUPPLIER", vaccine_type, vaccine_permissions, file_content),
79+
get_permitted_operations("TEST_SUPPLIER", vaccine_type, vaccine_permissions),
9280
expected_output,
9381
)
9482

9583
# Case: Supplier has no permissions to perform any of the requested operations
9684
# Test case tuples are stuctured as (vaccine_type, vaccine_permissions, file_content)
9785
invalid_cases = [
98-
# FLU, no permissions
99-
("FLU", ["FLU.U", "COVID19.CRUDS"], valid_content_new_and_delete_lowercase),
10086
# COVID19, no permissions
101-
("COVID19", ["FLU.C", "FLU.U"], valid_content_update_and_delete_lowercase),
102-
# RSV, no permissions
103-
("RSV", ["FLU.C", "FLU.U"], valid_content_update_and_delete_lowercase),
87+
("COVID19", ["FLU.CRUDS", "RSV.CUD"]),
88+
# RSV, no valid permissions
89+
("RSV", ["FLU.C", "RSV.XYZ"]),
10490
]
10591

106-
for vaccine_type, vaccine_permissions, file_content in invalid_cases:
92+
for vaccine_type, vaccine_permissions in invalid_cases:
10793
with self.subTest():
10894
with self.assertRaises(NoOperationPermissions):
109-
validate_action_flag_permissions("TEST_SUPPLIER", vaccine_type, vaccine_permissions, file_content)
110-
111-
no_flag_cases = [
112-
("FLU", ["FLU.C"], valid_file_content.replace("new", "").replace("update", "")),
113-
("COVID19", ["COVID19.CRUD"], valid_file_content.replace("new", "INVALID").replace("update", "")),
114-
]
115-
116-
for vaccine_type, permissions, file_content in no_flag_cases:
117-
with self.subTest(f"{vaccine_type} with invalid or missing ACTION_FLAGs"):
118-
result = validate_action_flag_permissions("TEST_SUPPLIER", vaccine_type, permissions, file_content)
119-
self.assertEqual(result, set())
95+
get_permitted_operations("TEST_SUPPLIER", vaccine_type, vaccine_permissions)
12096

12197

12298
if __name__ == "__main__":

recordprocessor/tests/test_logging_decorator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ def test_splunk_logger_handled_failure(self):
171171
# CASE: No operation permissions
172172
(
173173
ValidMockFileContent.with_new_and_update,
174-
MOCK_FILE_DETAILS.event_delete_permissions_only_dict, # No permission for NEW or UPDATE
174+
MOCK_FILE_DETAILS.event_no_permissions_dict, # No permission for NEW or UPDATE
175175
NoOperationPermissions,
176176
403,
177177
f"{MOCK_FILE_DETAILS.supplier} does not have permissions to perform any of the requested actions.",

recordprocessor/tests/test_process_csv_to_fhir.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,17 @@ def test_process_csv_to_fhir_partial_permissions(self):
8181
self.assertEqual(mock_send_to_kinesis.call_count, 3)
8282

8383
def test_process_csv_to_fhir_no_permissions(self):
84-
"""Tests that process_csv_to_fhir does not send a message to kinesis when the supplier has no permissions"""
84+
"""Tests that process_csv_to_fhir does not send fhir_json to kinesis when the supplier has no permissions"""
8585
self.upload_source_file(file_key=test_file.file_key, file_content=ValidMockFileContent.with_update_and_delete)
8686

8787
with patch("batch_processing.send_to_kinesis") as mock_send_to_kinesis:
8888
process_csv_to_fhir(deepcopy(test_file.event_create_permissions_only_dict))
8989

90-
self.assertEqual(mock_send_to_kinesis.call_count, 0)
90+
self.assertEqual(mock_send_to_kinesis.call_count, 2)
91+
for (_supplier, message_body, _vaccine), _kwargs in mock_send_to_kinesis.call_args_list:
92+
self.assertIn("diagnostics", message_body)
93+
self.assertNotIn("fhir_json", message_body)
94+
9195

9296
def test_process_csv_to_fhir_invalid_headers(self):
9397
"""Tests that process_csv_to_fhir does not send a message to kinesis when the csv has invalid headers"""

recordprocessor/tests/test_recordprocessor_main.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ def test_e2e_partial_permissions(self):
227227
self.make_inf_ack_assertions(file_details=mock_rsv_emis_file, passed_validation=True)
228228
self.make_kinesis_assertions(assertion_cases)
229229

230-
def test_e2e_no_permissions(self):
230+
def test_e2e_no_required_permissions(self):
231231
"""
232232
Tests that file containing UPDATE and DELETE is successfully processed when the supplier has CREATE permissions
233233
only.
@@ -236,6 +236,23 @@ def test_e2e_no_permissions(self):
236236

237237
main(mock_rsv_emis_file.event_create_permissions_only)
238238

239+
kinesis_records = kinesis_client.get_records(ShardIterator=self.get_shard_iterator(), Limit=10)["Records"]
240+
self.assertEqual(len(kinesis_records), 2)
241+
for record in kinesis_records:
242+
data_bytes = record["Data"]
243+
data_dict = json.loads(data_bytes)
244+
self.assertIn("diagnostics", data_dict)
245+
self.assertNotIn("fhir_json", data_dict)
246+
self.make_inf_ack_assertions(file_details=mock_rsv_emis_file, passed_validation=True)
247+
248+
def test_e2e_no_permissions(self):
249+
"""
250+
Tests that file containing UPDATE and DELETE is successfully processed when the supplier has no permissions.
251+
"""
252+
self.upload_source_files(ValidMockFileContent.with_update_and_delete)
253+
254+
main(mock_rsv_emis_file.event_no_permissions)
255+
239256
kinesis_records = kinesis_client.get_records(ShardIterator=self.get_shard_iterator(), Limit=10)["Records"]
240257
self.assertEqual(len(kinesis_records), 0)
241258
self.make_inf_ack_assertions(file_details=mock_rsv_emis_file, passed_validation=False)

recordprocessor/tests/test_unique_permission.py

Lines changed: 0 additions & 20 deletions
This file was deleted.

0 commit comments

Comments
 (0)