Skip to content

Commit 6d381b4

Browse files
authored
VED-372: Use configurable disease mappings in batch process. (#593)
* VED-372: Use configurable disease mappings in batch process. * VED-372: Call Redis once per batch instead of once per row. * VED-372: Connect ECS task to Redis. * VED-372: Fix lint errors. * VED-372: Add missing Redis patch. * VED-372: Convert env var value to a string. * VED-372: Fix import. * VED-372: Query for correct key.
1 parent 3958b49 commit 6d381b4

16 files changed

+141
-111
lines changed

recordprocessor/poetry.lock

Lines changed: 34 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

recordprocessor/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ structlog = "^24.1.0"
2525
pandas = "^2.3.0"
2626
freezegun = "^1.5.2"
2727
coverage = "^7.9.0"
28+
redis = "^6.2.0"
2829

2930
[build-system]
3031
requires = ["poetry-core ~= 1.5.0"]

recordprocessor/src/batch_processing.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
import time
66
from process_row import process_row
7+
from mappings import map_target_disease
78
from send_to_kinesis import send_to_kinesis
89
from clients import logger
910
from file_level_validation import file_level_validation
@@ -29,26 +30,28 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
2930
created_at_formatted_string = interim_message_body.get("created_at_formatted_string")
3031
csv_reader = interim_message_body.get("csv_dict_reader")
3132

33+
target_disease = map_target_disease(vaccine)
34+
3235
row_count = 0 # Initialize a counter for rows
3336
for row in csv_reader:
3437
row_count += 1
3538
row_id = f"{file_id}^{row_count}"
3639
logger.info("MESSAGE ID : %s", row_id)
3740

3841
# Process the row to obtain the details needed for the message_body and ack file
39-
details_from_processing = process_row(vaccine, allowed_operations, row)
42+
details_from_processing = process_row(target_disease, allowed_operations, row)
4043

4144
# Create the message body for sending
4245
outgoing_message_body = {
4346
"row_id": row_id,
4447
"file_key": file_key,
4548
"supplier": supplier,
46-
"vax_type": vaccine.value,
49+
"vax_type": vaccine,
4750
"created_at_formatted_string": created_at_formatted_string,
4851
**details_from_processing,
4952
}
5053

51-
send_to_kinesis(supplier, outgoing_message_body, vaccine.value)
54+
send_to_kinesis(supplier, outgoing_message_body, vaccine)
5255

5356
logger.info("Total rows processed: %s", row_count)
5457

recordprocessor/src/clients.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
"""Initialise s3 and kinesis clients"""
22

33
import logging
4+
import os
5+
6+
import redis
47
from boto3 import client as boto3_client, resource as boto3_resource
58
from botocore.config import Config
69

710
REGION_NAME = "eu-west-2"
11+
REDIS_HOST = os.getenv("REDIS_HOST", "")
12+
REDIS_PORT = os.getenv("REDIS_PORT", 6379)
813

914
s3_client = boto3_client("s3", region_name=REGION_NAME)
1015
kinesis_client = boto3_client(
@@ -14,6 +19,7 @@
1419
firehose_client = boto3_client("firehose", region_name=REGION_NAME)
1520
dynamodb_client = boto3_client("dynamodb", region_name=REGION_NAME)
1621
lambda_client = boto3_client("lambda", region_name=REGION_NAME)
22+
redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
1723

1824
dynamodb_resource = boto3_resource("dynamodb", region_name=REGION_NAME)
1925

recordprocessor/src/convert_to_fhir_imms_resource.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
from typing import List, Callable, Dict
44
from utils_for_fhir_conversion import _is_not_empty, Generate, Add, Convert
5-
from mappings import map_target_disease, Vaccine
65
from constants import Urls
76

87

@@ -196,13 +195,14 @@ def _decorate_performer(imms: dict, row: Dict[str, str]) -> None:
196195
]
197196

198197

199-
def convert_to_fhir_imms_resource(row: dict, vaccine: Vaccine) -> dict:
198+
def convert_to_fhir_imms_resource(row: dict, target_disease: list) -> dict:
200199
"""Converts a row of data to a FHIR Immunization Resource"""
201200
# Prepare the imms_resource. Note that all data sent via this service is assumed to be for completed vaccinations.
202-
imms_resource = {"resourceType": "Immunization", "status": "completed"}
203-
204-
# Add the targetDisease element based on the vaccine type
205-
imms_resource["protocolApplied"] = [{"targetDisease": map_target_disease(vaccine)}]
201+
imms_resource = {
202+
"resourceType": "Immunization",
203+
"status": "completed",
204+
"protocolApplied": [{"targetDisease": target_disease}]
205+
}
206206

207207
# Apply all decorators to add the relevant fields to the imms_resource
208208
for decorator in all_decorators:

recordprocessor/src/file_level_validation.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from unique_permission import get_unique_action_flags_from_s3
77
from clients import logger, s3_client
88
from make_and_upload_ack_file import make_and_upload_ack_file
9-
from mappings import Vaccine
109
from utils_for_recordprocessor import get_csv_content_dict_reader, invoke_filename_lambda
1110
from errors import InvalidHeaders, NoOperationPermissions
1211
from logging_decorator import file_level_validation_logging_decorator
@@ -75,9 +74,7 @@ def file_level_validation(incoming_message_body: dict) -> dict:
7574
"""
7675
try:
7776
message_id = incoming_message_body.get("message_id")
78-
vaccine: Vaccine = next( # Convert vaccine_type to Vaccine enum
79-
vaccine for vaccine in Vaccine if vaccine.value == incoming_message_body.get("vaccine_type").upper()
80-
)
77+
vaccine = incoming_message_body.get("vaccine_type").upper()
8178
supplier = incoming_message_body.get("supplier").upper()
8279
file_key = incoming_message_body.get("filename")
8380
permission = incoming_message_body.get("permission")
@@ -89,7 +86,7 @@ def file_level_validation(incoming_message_body: dict) -> dict:
8986
validate_content_headers(csv_reader)
9087

9188
# Validate has permission to perform at least one of the requested actions
92-
allowed_operations_set = validate_action_flag_permissions(supplier, vaccine.value, permission, csv_data)
89+
allowed_operations_set = validate_action_flag_permissions(supplier, vaccine, permission, csv_data)
9390

9491
make_and_upload_ack_file(message_id, file_key, True, True, created_at_formatted_string)
9592

recordprocessor/src/mappings.py

Lines changed: 7 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,20 @@
11
"""Mappings for converting vaccine type into target disease FHIR element"""
2-
3-
from enum import Enum
4-
from typing import Dict, List
2+
import json
53
from constants import Urls
4+
from clients import redis_client
65

76

8-
class Vaccine(Enum):
9-
"""Disease Codes"""
10-
11-
COVID_19: str = "COVID19"
12-
FLU: str = "FLU"
13-
MMR: str = "MMR"
14-
RSV: str = "RSV"
15-
16-
17-
class Disease(Enum):
18-
"""Disease Codes"""
19-
20-
COVID_19: str = "COVID19"
21-
FLU: str = "FLU"
22-
MEASLES: str = "MEASLES"
23-
MUMPS: str = "MUMPS"
24-
RUBELLA: str = "RUBELLA"
25-
RSV: str = "RSV"
26-
27-
28-
class DiseaseCode(Enum):
29-
"""Disease Codes"""
30-
31-
COVID_19: str = "840539006"
32-
FLU: str = "6142004"
33-
MEASLES: str = "14189004"
34-
MUMPS: str = "36989005"
35-
RUBELLA: str = "36653000"
36-
RSV: str = "55735004"
37-
38-
39-
class DiseaseDisplayTerm(Enum):
40-
"""Disease display terms which correspond to disease codes"""
41-
42-
COVID_19: str = "Disease caused by severe acute respiratory syndrome coronavirus 2"
43-
FLU: str = "Influenza"
44-
MEASLES: str = "Measles"
45-
MUMPS: str = "Mumps"
46-
RUBELLA: str = "Rubella"
47-
RSV: str = "Respiratory syncytial virus infection (disorder)"
48-
49-
50-
VACCINE_DISEASE_MAPPING: Dict[Vaccine, List[Disease]] = {
51-
Vaccine.COVID_19: [Disease.COVID_19],
52-
Vaccine.FLU: [Disease.FLU],
53-
Vaccine.MMR: [Disease.MEASLES, Disease.MUMPS, Disease.RUBELLA],
54-
Vaccine.RSV: [Disease.RSV],
55-
}
56-
57-
58-
def map_target_disease(vaccine: Vaccine) -> list:
7+
def map_target_disease(vaccine: str) -> list:
598
"""Returns the target disease element for the given vaccine type using the vaccine_disease_mapping"""
60-
diseases = VACCINE_DISEASE_MAPPING.get(vaccine, [])
9+
diseases_str = redis_client.hget("vacc_to_diseases", vaccine)
10+
diseases = json.loads(diseases_str) if diseases_str else []
6111
return [
6212
{
6313
"coding": [
6414
{
6515
"system": Urls.SNOMED,
66-
"code": DiseaseCode[disease.name].value,
67-
"display": DiseaseDisplayTerm[disease.name].value,
16+
"code": disease["code"],
17+
"display": disease["term"],
6818
}
6919
]
7020
}

recordprocessor/src/process_row.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22

33
from convert_to_fhir_imms_resource import convert_to_fhir_imms_resource
44
from constants import Diagnostics
5-
from mappings import Vaccine
65
from clients import logger
76
from utils_for_recordprocessor import create_diagnostics_dictionary
87

98

10-
def process_row(vaccine: Vaccine, allowed_operations: set, row: dict) -> dict:
9+
def process_row(target_disease: list, allowed_operations: set, row: dict) -> dict:
1110
"""
1211
Processes a row of the file and returns a dictionary containing the fhir_json, action_flag, imms_id, local_id
1312
(where applicable), version(where applicable) and any diagnostics.
@@ -51,7 +50,7 @@ def process_row(vaccine: Vaccine, allowed_operations: set, row: dict) -> dict:
5150

5251
# Handle success
5352
return {
54-
"fhir_json": convert_to_fhir_imms_resource(row, vaccine),
53+
"fhir_json": convert_to_fhir_imms_resource(row, target_disease),
5554
"operation_requested": operation_requested,
5655
"local_id": local_id,
5756
}

recordprocessor/tests/test_convert_to_fhir_imms_resource.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
11
"""Tests for convert_to_fhir_imms_resource"""
2-
32
import unittest
43
from unittest.mock import patch
54

65
from tests.utils_for_recordprocessor_tests.values_for_recordprocessor_tests import (
76
MockFhirImmsResources,
87
MockFieldDictionaries,
8+
TargetDiseaseElements
99
)
1010
from tests.utils_for_recordprocessor_tests.mock_environment_variables import MOCK_ENVIRONMENT_DICT
1111

1212
with patch("os.environ", MOCK_ENVIRONMENT_DICT):
13-
# Do not attempt 'from src.mappings import Vaccine' as this imports a different instance of Vaccine
14-
# and tests will break
15-
from mappings import Vaccine
1613
from convert_to_fhir_imms_resource import convert_to_fhir_imms_resource
1714

1815

@@ -24,6 +21,7 @@ def test_convert_to_fhir_imms_resource(self):
2421
Test that convert_to_fhir_imms_resource gives the expected output. These tests check that the entire
2522
outputted FHIR Immunization Resource matches the expected output.
2623
"""
24+
2725
# Test cases tuples are structure as (test_name, input_values, expected_output)
2826
cases = [
2927
("All fields", MockFieldDictionaries.all_fields, MockFhirImmsResources.all_fields),
@@ -41,4 +39,5 @@ def test_convert_to_fhir_imms_resource(self):
4139

4240
for test_name, input_values, expected_output in cases:
4341
with self.subTest(test_name):
44-
self.assertEqual(convert_to_fhir_imms_resource(input_values, Vaccine.RSV), expected_output)
42+
output = convert_to_fhir_imms_resource(input_values, TargetDiseaseElements.RSV)
43+
self.assertEqual(output, expected_output)
Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,46 @@
11
"""Tests for map_target_disease"""
2-
2+
import json
33
import unittest
44
from unittest.mock import patch
5-
from typing import List
6-
from tests.utils_for_recordprocessor_tests.values_for_recordprocessor_tests import TargetDiseaseElements
75
from tests.utils_for_recordprocessor_tests.mock_environment_variables import MOCK_ENVIRONMENT_DICT
86

97
with patch("os.environ", MOCK_ENVIRONMENT_DICT):
10-
from mappings import map_target_disease, Vaccine
8+
from mappings import map_target_disease
119

1210

11+
@patch("mappings.redis_client")
1312
class TestMapTargetDisease(unittest.TestCase):
1413
"""
1514
Test that map_target_disease returns the correct target disease element for valid vaccine types, or [] for
1615
invalid vaccine types.
1716
"""
1817

19-
def test_map_target_disease_valid(self):
18+
def test_map_target_disease_valid(self, mock_redis_client):
2019
"""Tests map_target_disease returns the disease coding information when using valid vaccine types"""
21-
# NOTE: TEST CASES SHOULD INCLUDE ALL VACCINE TYPES WHICH ARE VALID FOR THIS PRODUCT.
22-
# A NEW VACCINE TYPE SHOULD BE ADDED TO THIS TEST EVERY TIME THERE IS A VACCINE TYPE UPLIFT
23-
# (note that this will require adding the vaccine type to the TargetDiseaseElements class).
24-
# Target disease elements are intentionally hardcoded as a way of ensuring that the correct code and display
25-
# values are being used, and that the element is being built up correctly.
26-
vaccines: List[Vaccine] = [Vaccine.RSV, Vaccine.COVID_19, Vaccine.FLU, Vaccine.MMR]
27-
for vaccine in vaccines:
28-
with self.subTest(vaccine=vaccine):
29-
self.assertEqual(map_target_disease(vaccine), getattr(TargetDiseaseElements, vaccine.value))
30-
31-
def test_map_target_disease_invalid(self):
20+
mock_redis_client.hget.return_value = json.dumps([{
21+
"code": "55735004",
22+
"term": "Respiratory syncytial virus infection (disorder)"
23+
}])
24+
25+
self.assertEqual(map_target_disease("RSV"), [{
26+
"coding": [{
27+
"system": "http://snomed.info/sct",
28+
"code": "55735004",
29+
"display": "Respiratory syncytial virus infection (disorder)"
30+
}]
31+
}])
32+
33+
mock_redis_client.hget.assert_called_with("vacc_to_diseases", "RSV")
34+
35+
def test_map_target_disease_invalid(self, mock_redis_client):
3236
"""Tests map_target_disease does not return the disease coding information when using invalid vaccine types."""
37+
38+
mock_redis_client.hget.return_value = None
39+
3340
self.assertEqual(map_target_disease("invalid_vaccine"), [])
3441

42+
mock_redis_client.hget.assert_called_with("vacc_to_diseases", "invalid_vaccine")
43+
3544

3645
if __name__ == "__main__":
3746
unittest.main()

0 commit comments

Comments
 (0)