Skip to content

Commit 9b1dd8a

Browse files
committed
local tests work
1 parent bea32b8 commit 9b1dd8a

File tree

3 files changed

+110
-92
lines changed

3 files changed

+110
-92
lines changed

recordprocessor/src/batch_processor.py

Lines changed: 16 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
from mappings import map_target_disease
88
from send_to_kinesis import send_to_kinesis
99
from clients import logger
10-
from file_level_validation import file_level_validation, validate_content_headers
10+
from file_level_validation import file_level_validation
1111
from errors import NoOperationPermissions, InvalidHeaders, InvalidEncoding
1212
from utils_for_recordprocessor import get_csv_content_dict_reader
13+
from typing import Optional
1314

1415

1516
def process_csv_to_fhir(incoming_message_body: dict) -> None:
@@ -43,21 +44,21 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
4344

4445
target_disease = map_target_disease(vaccine)
4546
row_count = 0
47+
logger.info(f"process with encoder {encoder} from row {row_count+1}")
4648
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
4749
created_at_formatted_string, csv_reader, target_disease)
4850
if err:
49-
print(f"Error processing: {err}.")
50-
# check if it's a decode error, ie error.args[0] begins with "'utf-8' codec can't decode byte"
51+
logger.warning(f"Error processing: {err}.")
52+
# check if it's a decode error
5153
if err.reason == "invalid continuation byte":
5254
new_encoder = "cp1252"
53-
print(f"Encode error at row {row_count} with {encoder}. Switch to {new_encoder}")
54-
# print(f"Detected decode error: {error.reason}")
55+
logger.info(f"Encode error at row {row_count} with {encoder}. Switch to {new_encoder}")
5556
encoder = new_encoder
56-
# if we are here, re-read the file with alternative encoding and skip processed rows
57+
# load alternative encoder
5758
csv_reader = get_csv_content_dict_reader(file_key, encoder=encoder)
58-
validate_content_headers(csv_reader)
59-
row_count = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
60-
created_at_formatted_string, csv_reader, target_disease, row_count)
59+
# re-read the file and skip processed rows
60+
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
61+
created_at_formatted_string, csv_reader, target_disease, row_count)
6162
else:
6263
logger.error(f"Non-decode error: {err}. Cannot retry. Call someone.")
6364
raise err
@@ -66,47 +67,28 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
6667
return row_count
6768

6869

69-
def process_rows_retry(file_id, vaccine, supplier, file_key, allowed_operations,
70-
created_at_formatted_string, encoder, total_rows_processed_count=0) -> int:
71-
"""
72-
Retry processing rows with a different encoding from a specific row number
73-
"""
74-
print("process_rows_retry...")
75-
new_reader = get_csv_content_dict_reader(file_key, encoder=encoder)
76-
77-
total_rows_processed_count = process_rows(
78-
file_id, vaccine, supplier, file_key, allowed_operations,
79-
created_at_formatted_string, new_reader, total_rows_processed_count)
80-
81-
return total_rows_processed_count
82-
83-
8470
def process_rows(file_id, vaccine, supplier, file_key, allowed_operations, created_at_formatted_string,
8571
csv_reader, target_disease,
86-
total_rows_processed_count=0) -> int:
72+
total_rows_processed_count=0) -> tuple[int, Optional[Exception]]:
8773
"""
8874
Processes each row in the csv_reader starting from start_row.
8975
"""
90-
print("process_rows...")
9176
row_count = 0
9277
start_row = total_rows_processed_count
9378
try:
9479
for row in csv_reader:
95-
9680
row_count += 1
9781
if row_count > start_row:
9882
row_id = f"{file_id}^{row_count}"
9983
logger.info("MESSAGE ID : %s", row_id)
100-
101-
# convert dict to string and print first 20 chars
84+
# Log progress every 1000 rows and the first 10 rows after a restart
10285
if (total_rows_processed_count % 1000 == 0):
103-
print(f"Process: {total_rows_processed_count}")
104-
if (total_rows_processed_count > 19995):
105-
print(f"Process: {total_rows_processed_count} - {row['PERSON_SURNAME']}")
86+
logger.info(f"Process: {total_rows_processed_count+1}")
87+
if (start_row > 0 and row_count <= start_row+10):
88+
logger.info(f"Restarted Process (log up to first 10): {total_rows_processed_count+1}")
10689

10790
# Process the row to obtain the details needed for the message_body and ack file
10891
details_from_processing = process_row(target_disease, allowed_operations, row)
109-
11092
# Create the message body for sending
11193
outgoing_message_body = {
11294
"row_id": row_id,
@@ -116,14 +98,13 @@ def process_rows(file_id, vaccine, supplier, file_key, allowed_operations, creat
11698
"created_at_formatted_string": created_at_formatted_string,
11799
**details_from_processing,
118100
}
119-
120101
send_to_kinesis(supplier, outgoing_message_body, vaccine)
121102
total_rows_processed_count += 1
122103
logger.info("Total rows processed: %s", total_rows_processed_count)
123104
except Exception as error: # pylint: disable=broad-exception-caught
124105
logger.error("Error processing row %s: %s", row_count, error)
125106
return total_rows_processed_count, error
126-
return total_rows_processed_count
107+
return total_rows_processed_count, None
127108

128109

129110
def main(event: str) -> None:

recordprocessor/tests/test_batch_processor.py

Lines changed: 93 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ class TestProcessCsvToFhir(unittest.TestCase):
1616

1717
def setUp(self):
1818
self.mock_logger_info = create_patch("logging.Logger.info")
19+
self.mock_logger_warning = create_patch("logging.Logger.warning")
20+
self.mock_logger_error = create_patch("logging.Logger.error")
1921
self.mock_send_to_kinesis = create_patch("batch_processor.send_to_kinesis")
2022
self.mock_map_target_disease = create_patch("batch_processor.map_target_disease")
2123
self.mock_s3_get_object = create_patch("utils_for_recordprocessor.s3_client.get_object")
@@ -35,7 +37,6 @@ def expand_test_data(self, data: list[bytes], num_rows: int) -> list[bytes]:
3537
body = data[1:] * multiplier
3638
data = header + body
3739
data = data[:num_rows + 1]
38-
print(f"Expanded test data to {len(data)-1} rows")
3940
return data
4041

4142
def create_test_data_from_file(self, file_name: str) -> list[bytes]:
@@ -51,70 +52,106 @@ def insert_cp1252_at_end(self, data: list[bytes], new_text: bytes, field: int) -
5152
line = data[i]
5253
# Split fields by pipe
5354
fields = line.strip().split(b"|")
54-
print(f"replace field: {fields[field]}")
5555
fields[field] = new_text
56-
print(f"replaced field: {fields[field]}")
5756
# Reconstruct the line
5857
data[i] = b"|".join(fields) + b"\n"
5958
break
6059
return data
6160

6261
def test_process_large_file_with_cp1252(self):
6362
""" Test processing a large file with cp1252 encoding """
64-
try:
65-
n_rows = 20000
66-
data = self.create_test_data_from_file("test-batch-data.csv")
67-
data = self.expand_test_data(data, n_rows)
68-
data = self.insert_cp1252_at_end(data, b'D\xe9cembre', 2)
69-
self.mock_s3_get_object.side_effect = [{"Body": BytesIO(b"".join(data))},
70-
{"Body": BytesIO(b"".join(data))}]
71-
self.mock_map_target_disease.return_value = "RSV"
72-
73-
message_body = {
74-
"message_id": "file123",
75-
"vaccine_type": "covid",
76-
"supplier": "test-supplier",
77-
"filename": "file-key-1",
78-
"permission": ["COVID.R", "COVID.U", "COVID.D"],
79-
"allowed_operations": ["CREATE", "UPDATE", "DELETE"],
80-
"created_at_formatted_string": "2024-09-05T12:00:00Z"
81-
}
82-
self.mock_get_permitted_operations.return_value = {"CREATE", "UPDATE", "DELETE"}
83-
84-
self.mock_map_target_disease.return_value = "RSV"
85-
86-
n_rows_processed = process_csv_to_fhir(message_body)
87-
self.assertEqual(n_rows_processed, n_rows)
88-
except Exception as e:
89-
print(f"Exception during test: {e}")
90-
91-
def test_process_small_file_with_cp1252(self):
63+
n_rows = 500
64+
data = self.create_test_data_from_file("test-batch-data.csv")
65+
data = self.expand_test_data(data, n_rows)
66+
data = self.insert_cp1252_at_end(data, b'D\xe9cembre', 2)
67+
ret1 = {"Body": BytesIO(b"".join(data))}
68+
ret2 = {"Body": BytesIO(b"".join(data))}
69+
self.mock_s3_get_object.side_effect = [ret1, ret2]
70+
self.mock_map_target_disease.return_value = "some disease"
71+
72+
message_body = {
73+
"vaccine_type": "vax-type-1",
74+
"supplier": "test-supplier",
75+
}
76+
self.mock_map_target_disease.return_value = "some disease"
77+
78+
n_rows_processed = process_csv_to_fhir(message_body)
79+
self.assertEqual(n_rows_processed, n_rows)
80+
self.assertEqual(self.mock_send_to_kinesis.call_count, n_rows)
81+
# check logger.warning called for decode error
82+
self.mock_logger_warning.assert_called()
83+
warning_call_args = self.mock_logger_warning.call_args[0][0]
84+
self.assertTrue(warning_call_args.startswith("Error processing: 'utf-8' codec can't decode byte"))
85+
86+
def test_process_large_file_with_utf8(self):
87+
""" Test processing a large file with utf-8 encoding """
88+
n_rows = 500
89+
data = self.create_test_data_from_file("test-batch-data.csv")
90+
data = self.expand_test_data(data, n_rows)
91+
ret1 = {"Body": BytesIO(b"".join(data))}
92+
ret2 = {"Body": BytesIO(b"".join(data))}
93+
self.mock_s3_get_object.side_effect = [ret1, ret2]
94+
self.mock_map_target_disease.return_value = "some disease"
95+
96+
message_body = {
97+
"vaccine_type": "vax-type-1",
98+
"supplier": "test-supplier",
99+
}
100+
self.mock_map_target_disease.return_value = "some disease"
101+
102+
n_rows_processed = process_csv_to_fhir(message_body)
103+
self.assertEqual(n_rows_processed, n_rows)
104+
self.assertEqual(self.mock_send_to_kinesis.call_count, n_rows)
105+
self.mock_logger_warning.assert_not_called()
106+
self.mock_logger_error.assert_not_called()
107+
108+
def test_process_cp1252_small_file(self):
92109
""" Test processing a small file with cp1252 encoding """
93-
try:
94-
data = self.create_test_data_from_file("test-batch-data-cp1252.csv")
95-
n_rows = len(data) - 1 # Exclude header
96-
# data = self.insert_cp1252_at_end(data, b'D\xe9cembre', 2)
97-
self.mock_s3_get_object.side_effect = [{"Body": BytesIO(b"".join(data))},
98-
{"Body": BytesIO(b"".join(data))}]
99-
self.mock_map_target_disease.return_value = "RSV"
100-
101-
message_body = {
102-
"message_id": "file123",
103-
"vaccine_type": "covid",
104-
"supplier": "test-supplier",
105-
"filename": "file-key-1",
106-
"permission": ["COVID.R", "COVID.U", "COVID.D"],
107-
"allowed_operations": ["CREATE", "UPDATE", "DELETE"],
108-
"created_at_formatted_string": "2024-09-05T12:00:00Z"
109-
}
110-
self.mock_get_permitted_operations.return_value = {"CREATE", "UPDATE", "DELETE"}
111-
112-
self.mock_map_target_disease.return_value = "RSV"
113-
114-
n_rows_processed = process_csv_to_fhir(message_body)
115-
self.assertEqual(n_rows_processed, n_rows)
116-
except Exception as e:
117-
print(f"Exception during test: {e}")
110+
data = self.create_test_data_from_file("test-batch-data-cp1252.csv")
111+
data = [line if line.endswith(b"\n") else line + b"\n" for line in data]
112+
n_rows = len(data) - 1 # Exclude header
113+
114+
ret1 = {"Body": BytesIO(b"".join(data))}
115+
ret2 = {"Body": BytesIO(b"".join(data))}
116+
self.mock_s3_get_object.side_effect = [ret1, ret2]
117+
self.mock_map_target_disease.return_value = "some disease"
118+
119+
message_body = {
120+
"vaccine_type": "vax-type-1",
121+
"supplier": "test-supplier",
122+
}
123+
124+
self.mock_map_target_disease.return_value = "some disease"
125+
126+
n_rows_processed = process_csv_to_fhir(message_body)
127+
self.assertEqual(n_rows_processed, n_rows)
128+
self.assertEqual(self.mock_send_to_kinesis.call_count, n_rows)
129+
self.mock_logger_warning.assert_called()
130+
warning_call_args = self.mock_logger_warning.call_args[0][0]
131+
self.assertTrue(warning_call_args.startswith("Invalid Encoding detected in process_csv_to_fhir"))
132+
133+
def test_process_utf8_small_file(self):
134+
""" Test processing a small file with cp1252 encoding """
135+
data = self.create_test_data_from_file("test-batch-data.csv")
136+
data = [line if line.endswith(b"\n") else line + b"\n" for line in data]
137+
n_rows = len(data) - 1 # Exclude header
138+
139+
ret1 = {"Body": BytesIO(b"".join(data))}
140+
ret2 = {"Body": BytesIO(b"".join(data))}
141+
self.mock_s3_get_object.side_effect = [ret1, ret2]
142+
self.mock_map_target_disease.return_value = "some disease"
143+
144+
message_body = {
145+
"vaccine_type": "vax-type-1",
146+
"supplier": "test-supplier",
147+
}
148+
self.mock_map_target_disease.return_value = "some disease"
149+
150+
n_rows_processed = process_csv_to_fhir(message_body)
151+
self.assertEqual(n_rows_processed, n_rows)
152+
self.assertEqual(self.mock_send_to_kinesis.call_count, n_rows)
153+
self.mock_logger_warning.assert_not_called()
154+
self.mock_logger_error.assert_not_called()
118155

119156
def test_fix_cp1252(self):
120157
# create a cp1252 string that contains an accented E
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
NHS_NUMBER|PERSON_FORENAME|PERSON_SURNAME|PERSON_DOB|PERSON_GENDER_CODE|PERSON_POSTCODE|DATE_AND_TIME|SITE_CODE|SITE_CODE_TYPE_URI|UNIQUE_ID|UNIQUE_ID_URI|ACTION_FLAG|PERFORMING_PROFESSIONAL_FORENAME|PERFORMING_PROFESSIONAL_SURNAME|RECORDED_DATE|PRIMARY_SOURCE|VACCINATION_PROCEDURE_CODE|VACCINATION_PROCEDURE_TERM|DOSE_SEQUENCE|VACCINE_PRODUCT_CODE|VACCINE_PRODUCT_TERM|VACCINE_MANUFACTURER|BATCH_NUMBER|EXPIRY_DATE|SITE_OF_VACCINATION_CODE|SITE_OF_VACCINATION_TERM|ROUTE_OF_VACCINATION_CODE|ROUTE_OF_VACCINATION_TERM|DOSE_AMOUNT|DOSE_UNIT_CODE|DOSE_UNIT_TERM|INDICATION_CODE|LOCATION_CODE|LOCATION_CODE_TYPE_URI
22
"9473081898"|"DELYTH"|"PRESCOT"|"20030731"|"2"|"NE23 1NW"|"20250730T11525000"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0801"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"308081000000105"|"Measles mumps and rubella vaccination - first dose (procedure)"|"1"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"NIQUTMBYAHKSGIERGSCB"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684005"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
33
"9473088736"|"TANESHA"|"SIEGELER"|"20010130"|"2"|"NE46 1EA"|"20250730T14452700"|"B23002"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0804"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170432003"|"Administration of pre-school booster dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens"|"2"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"YRJNNROADZHBJHBNCHNG"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684008"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
4-
"1111111111"|"DORTHY"|"DICXMBRE"|"20020403"|"2"|"NE29 6BA"|"20250730T11304900"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0807"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"38598009"|"Administration of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"3"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"HJIXUMURNAOVKRKIECZH"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684006"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
4+
"9473088733"|"TANESHA"|"SIEGELER"|"20010131"|"2"|"NE46 1EA"|"20250730T14452700"|"B23002"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0804"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170432003"|"Administration of pre-school booster dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens"|"2"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"YRJNNROADZHBJHBNCHNG"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684008"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"

0 commit comments

Comments
 (0)