Skip to content

Commit fe14345

Browse files
committed
changes applied
1 parent 3650a1b commit fe14345

File tree

6 files changed

+175
-3
lines changed

6 files changed

+175
-3
lines changed

recordprocessor/src/batch_processor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def process_csv_to_fhir(incoming_message_body: dict) -> int:
2828
incoming_message_body["encoder"] = encoder
2929
interim_message_body = file_level_validation(incoming_message_body=incoming_message_body)
3030
except (InvalidHeaders, NoOperationPermissions, Exception) as e: # pylint: disable=broad-exception-caught
31-
logger.error(f"File level validation failed: {e}") # If the file is invalid, processing should cease immediately
31+
logger.error(f"File level validation failed: {e}") # If the file is invalid, processing should cease
3232
return 0
3333

3434
file_id = interim_message_body.get("message_id")
@@ -56,8 +56,8 @@ def process_csv_to_fhir(incoming_message_body: dict) -> int:
5656
# load alternative encoder
5757
csv_reader = get_csv_content_dict_reader(file_key, encoder=encoder)
5858
# re-read the file and skip processed rows
59-
row_count, = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
60-
created_at_formatted_string, csv_reader, target_disease, row_count)
59+
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
60+
created_at_formatted_string, csv_reader, target_disease, row_count)
6161
else:
6262
logger.error(f"Row Processing error: {err}")
6363
raise err
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
NHS_NUMBER|PERSON_FORENAME|PERSON_SURNAME|PERSON_DOB|PERSON_GENDER_CODE|PERSON_POSTCODE|DATE_AND_TIME|SITE_CODE|SITE_CODE_TYPE_URI|UNIQUE_ID|UNIQUE_ID_URI|ACTION_FLAG|PERFORMING_PROFESSIONAL_FORENAME|PERFORMING_PROFESSIONAL_SURNAME|RECORDED_DATE|PRIMARY_SOURCE|VACCINATION_PROCEDURE_CODE|VACCINATION_PROCEDURE_TERM|DOSE_SEQUENCE|VACCINE_PRODUCT_CODE|VACCINE_PRODUCT_TERM|VACCINE_MANUFACTURER|BATCH_NUMBER|EXPIRY_DATE|SITE_OF_VACCINATION_CODE|SITE_OF_VACCINATION_TERM|ROUTE_OF_VACCINATION_CODE|ROUTE_OF_VACCINATION_TERM|DOSE_AMOUNT|DOSE_UNIT_CODE|DOSE_UNIT_TERM|INDICATION_CODE|LOCATION_CODE|LOCATION_CODE_TYPE_URI
2+
"9473081898"|"DELYTH"|"PRESCOT"|"20030731"|"2"|"NE23 1NW"|"20250730T11525000"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0801"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"308081000000105"|"Measles mumps and rubella vaccination - first dose (procedure)"|"1"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"NIQUTMBYAHKSGIERGSCB"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684005"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
3+
"9473084064"|"DORTHY"|"DICEMBRE"|"20020403"|"2"|"NE29 6BA"|"20250730T11304900"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0802"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"38598009"|"Administration of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"3"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"HJIXUMURNAOVKRKIECZH"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684006"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
4+
"9473087667"|"SHAYE"|"PUHAR"|"20010903"|"0"|"NE39 2HB"|"20250730T18191400"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0803"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170431005"|"Administration of booster dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"3"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"MSBDJRRNHLWPFZQYDFGM"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684007"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
5+
"9473088736"|"TANESHA"|"SIEGELER"|"20010130"|"2"|"NE46 1EA"|"20250730T14452700"|"B23002"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0804"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170432003"|"Administration of pre-school booster dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens"|"2"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"YRJNNROADZHBJHBNCHNG"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684008"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
6+
"9473089333"|"HELAH"|"JUDD"|"20010908"|"2"|"NE47 9HQ"|"20250730T13504900"|"B23002"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0805"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170433008"|"Administration of second dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"1"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"SNOJRTGYMZLPMGDNRBVD"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684009"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
7+
""|"DELYTH"|"PRESCOT"|"20030731"|"2"|"NE23 1NW"|"20250730T11525000"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0806"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"308081000000105"|"Measles mumps and rubella vaccination - first dose (procedure)"|"1"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"NIQUTMBYAHKSGIERGSCB"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684005"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
8+
"0"|"DORTHY"|"DIC�MBRE"|"20020403"|"2"|"NE29 6BA"|"20250730T11304900"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0807"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"38598009"|"Administration of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"3"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"HJIXUMURNAOVKRKIECZH"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684006"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
NHS_NUMBER|PERSON_FORENAME|PERSON_SURNAME|PERSON_DOB|PERSON_GENDER_CODE|PERSON_POSTCODE|DATE_AND_TIME|SITE_CODE|SITE_CODE_TYPE_URI|UNIQUE_ID|UNIQUE_ID_URI|ACTION_FLAG|PERFORMING_PROFESSIONAL_FORENAME|PERFORMING_PROFESSIONAL_SURNAME|RECORDED_DATE|PRIMARY_SOURCE|VACCINATION_PROCEDURE_CODE|VACCINATION_PROCEDURE_TERM|DOSE_SEQUENCE|VACCINE_PRODUCT_CODE|VACCINE_PRODUCT_TERM|VACCINE_MANUFACTURER|BATCH_NUMBER|EXPIRY_DATE|SITE_OF_VACCINATION_CODE|SITE_OF_VACCINATION_TERM|ROUTE_OF_VACCINATION_CODE|ROUTE_OF_VACCINATION_TERM|DOSE_AMOUNT|DOSE_UNIT_CODE|DOSE_UNIT_TERM|INDICATION_CODE|LOCATION_CODE|LOCATION_CODE_TYPE_URI
2+
"9473081898"|"DELYTH"|"PRESCOT"|"20030731"|"2"|"NE23 1NW"|"20250730T11525000"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0801"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"308081000000105"|"Measles mumps and rubella vaccination - first dose (procedure)"|"1"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"NIQUTMBYAHKSGIERGSCB"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684005"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
3+
"9473088736"|"TANESHA"|"SIEGELER"|"20010130"|"2"|"NE46 1EA"|"20250730T14452700"|"B23002"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0804"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170432003"|"Administration of pre-school booster dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens"|"2"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"YRJNNROADZHBJHBNCHNG"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684008"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
4+
"9473088733"|"TANESHA"|"SIEGELER"|"20010131"|"2"|"NE46 1EA"|"20250730T14452700"|"B23002"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0804"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170432003"|"Administration of pre-school booster dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens"|"2"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"YRJNNROADZHBJHBNCHNG"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684008"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import unittest
2+
import os
3+
from io import BytesIO
4+
from unittest.mock import patch
5+
from batch_processor import process_csv_to_fhir
6+
from tests.utils_for_recordprocessor_tests.utils_for_recordprocessor_tests import create_patch
7+
8+
9+
class TestProcessorEdgeCases(unittest.TestCase):
10+
11+
def setUp(self):
12+
self.mock_logger_info = create_patch("logging.Logger.info")
13+
self.mock_logger_warning = create_patch("logging.Logger.warning")
14+
self.mock_logger_error = create_patch("logging.Logger.error")
15+
self.mock_send_to_kinesis = create_patch("batch_processor.send_to_kinesis")
16+
self.mock_map_target_disease = create_patch("batch_processor.map_target_disease")
17+
self.mock_s3_get_object = create_patch("utils_for_recordprocessor.s3_client.get_object")
18+
self.mock_s3_put_object = create_patch("utils_for_recordprocessor.s3_client.put_object")
19+
self.mock_make_and_move = create_patch("file_level_validation.make_and_upload_ack_file")
20+
self.mock_move_file = create_patch("file_level_validation.move_file")
21+
self.mock_get_permitted_operations = create_patch("file_level_validation.get_permitted_operations")
22+
self.mock_firehose_client = create_patch("logging_decorator.firehose_client")
23+
self.mock_update_audit_table_status = create_patch("batch_processor.update_audit_table_status")
24+
25+
def tearDown(self):
26+
patch.stopall()
27+
28+
def expand_test_data(self, data: list[bytes], num_rows: int) -> list[bytes]:
29+
n_rows = len(data) - 1 # Exclude header
30+
31+
if n_rows < num_rows:
32+
multiplier = (num_rows // n_rows) + 1
33+
header = data[0:1]
34+
body = data[1:] * multiplier
35+
data = header + body
36+
data = data[:num_rows + 1]
37+
return data
38+
39+
def create_test_data_from_file(self, file_name: str) -> list[bytes]:
40+
test_csv_path = os.path.join(
41+
os.path.dirname(__file__), "test_data", file_name
42+
)
43+
with open(test_csv_path, "rb") as f:
44+
data = f.readlines()
45+
return data
46+
47+
def insert_cp1252_at_end(self, data: list[bytes], new_text: bytes, field: int) -> list[bytes]:
48+
for i in reversed(range(len(data))):
49+
line = data[i]
50+
# Split fields by pipe
51+
fields = line.strip().split(b"|")
52+
fields[field] = new_text
53+
# Reconstruct the line
54+
data[i] = b"|".join(fields) + b"\n"
55+
break
56+
return data
57+
58+
def test_process_large_file_cp1252(self):
59+
""" Test processing a large file with cp1252 encoding """
60+
n_rows = 500
61+
data = self.create_test_data_from_file("test-batch-data.csv")
62+
data = self.expand_test_data(data, n_rows)
63+
data = self.insert_cp1252_at_end(data, b'D\xe9cembre', 2)
64+
ret1 = {"Body": BytesIO(b"".join(data))}
65+
ret2 = {"Body": BytesIO(b"".join(data))}
66+
self.mock_s3_get_object.side_effect = [ret1, ret2]
67+
self.mock_map_target_disease.return_value = "some disease"
68+
69+
message_body = {
70+
"vaccine_type": "vax-type-1",
71+
"supplier": "test-supplier",
72+
}
73+
self.mock_map_target_disease.return_value = "some disease"
74+
75+
n_rows_processed = process_csv_to_fhir(message_body)
76+
self.assertEqual(n_rows_processed, n_rows)
77+
self.assertEqual(self.mock_send_to_kinesis.call_count, n_rows)
78+
# check logger.warning called for decode error
79+
self.mock_logger_warning.assert_called()
80+
warning_call_args = self.mock_logger_warning.call_args[0][0]
81+
self.assertTrue(warning_call_args.startswith("Encoding Error: 'utf-8' codec can't decode byte 0xe9"))
82+
83+
def test_process_large_file_utf8(self):
84+
""" Test processing a large file with utf-8 encoding """
85+
n_rows = 500
86+
data = self.create_test_data_from_file("test-batch-data.csv")
87+
data = self.expand_test_data(data, n_rows)
88+
ret1 = {"Body": BytesIO(b"".join(data))}
89+
ret2 = {"Body": BytesIO(b"".join(data))}
90+
self.mock_s3_get_object.side_effect = [ret1, ret2]
91+
self.mock_map_target_disease.return_value = "some disease"
92+
93+
message_body = {
94+
"vaccine_type": "vax-type-1",
95+
"supplier": "test-supplier",
96+
}
97+
self.mock_map_target_disease.return_value = "some disease"
98+
99+
n_rows_processed = process_csv_to_fhir(message_body)
100+
self.assertEqual(n_rows_processed, n_rows)
101+
self.assertEqual(self.mock_send_to_kinesis.call_count, n_rows)
102+
self.mock_logger_warning.assert_not_called()
103+
self.mock_logger_error.assert_not_called()
104+
105+
def test_process_small_file_cp1252(self):
106+
""" Test processing a small file with cp1252 encoding """
107+
data = self.create_test_data_from_file("test-batch-data-cp1252.csv")
108+
data = self.insert_cp1252_at_end(data, b'D\xe9cembre', 2)
109+
data = [line if line.endswith(b"\n") else line + b"\n" for line in data]
110+
n_rows = len(data) - 1 # Exclude header
111+
112+
ret1 = {"Body": BytesIO(b"".join(data))}
113+
ret2 = {"Body": BytesIO(b"".join(data))}
114+
self.mock_s3_get_object.side_effect = [ret1, ret2]
115+
self.mock_map_target_disease.return_value = "some disease"
116+
117+
message_body = {
118+
"vaccine_type": "vax-type-1",
119+
"supplier": "test-supplier",
120+
}
121+
122+
self.mock_map_target_disease.return_value = "some disease"
123+
124+
n_rows_processed = process_csv_to_fhir(message_body)
125+
self.assertEqual(n_rows_processed, n_rows)
126+
self.assertEqual(self.mock_send_to_kinesis.call_count, n_rows)
127+
self.mock_logger_warning.assert_called()
128+
warning_call_args = self.mock_logger_warning.call_args[0][0]
129+
self.assertTrue(warning_call_args.startswith("Invalid Encoding detected"))
130+
131+
def test_process_small_file_utf8(self):
132+
""" Test processing a small file with utf-8 encoding """
133+
data = self.create_test_data_from_file("test-batch-data.csv")
134+
data = [line if line.endswith(b"\n") else line + b"\n" for line in data]
135+
n_rows = len(data) - 1 # Exclude header
136+
137+
ret1 = {"Body": BytesIO(b"".join(data))}
138+
ret2 = {"Body": BytesIO(b"".join(data))}
139+
self.mock_s3_get_object.side_effect = [ret1, ret2]
140+
self.mock_map_target_disease.return_value = "some disease"
141+
142+
message_body = {
143+
"vaccine_type": "vax-type-1",
144+
"supplier": "test-supplier",
145+
}
146+
self.mock_map_target_disease.return_value = "some disease"
147+
148+
n_rows_processed = process_csv_to_fhir(message_body)
149+
self.assertEqual(n_rows_processed, n_rows)
150+
self.assertEqual(self.mock_send_to_kinesis.call_count, n_rows)
151+
self.mock_logger_warning.assert_not_called()
152+
self.mock_logger_error.assert_not_called()

recordprocessor/tests/test_recordprocessor_main.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
REGION_NAME,
2424
)
2525
from tests.utils_for_recordprocessor_tests.mock_environment_variables import MOCK_ENVIRONMENT_DICT, BucketNames, Kinesis
26+
from tests.utils_for_recordprocessor_tests.utils_for_recordprocessor_tests import create_patch
2627

2728
with patch("os.environ", MOCK_ENVIRONMENT_DICT):
2829
from constants import Diagnostics
@@ -52,8 +53,10 @@ def setUp(self) -> None:
5253
"code": "55735004",
5354
"term": "Respiratory syncytial virus infection (disorder)"
5455
}])
56+
self.mock_logger_info = create_patch("logging.Logger.info")
5557

5658
def tearDown(self) -> None:
59+
patch.stopall()
5760
GenericTearDown(s3_client, firehose_client, kinesis_client)
5861

5962
@staticmethod

recordprocessor/tests/utils_for_recordprocessor_tests/utils_for_recordprocessor_tests.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,8 @@ def assert_audit_table_entry(file_details: FileDetails, expected_status: FileSta
102102
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": file_details.message_id}}
103103
).get("Item")
104104
assert table_entry == {**file_details.audit_table_entry, "status": {"S": expected_status}}
105+
106+
107+
def create_patch(target: str):
108+
patcher = patch(target)
109+
return patcher.start()

0 commit comments

Comments
 (0)