Skip to content

Commit d3b3147

Browse files
committed
Tests Pass
1 parent f2d17ed commit d3b3147

File tree

5 files changed

+185
-187
lines changed

5 files changed

+185
-187
lines changed

recordprocessor/src/batch_processor.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
5050
logger.info(f"process with encoder {encoder} from row {row_count+1}")
5151
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
5252
created_at_formatted_string, csv_reader, target_disease)
53+
5354
if err:
54-
logger.warning(f"Error processing: {err}.")
55-
# check if it's a decode error
56-
if err.reason == "invalid continuation byte":
55+
logger.warning(f"Processing Error: {err}.")
56+
if isinstance(err, InvalidEncoding):
5757
new_encoder = "cp1252"
5858
logger.info(f"Encode error at row {row_count} with {encoder}. Switch to {new_encoder}")
5959
encoder = new_encoder
@@ -108,7 +108,7 @@ def process_rows(file_id, vaccine, supplier, file_key, allowed_operations, creat
108108
# if error reason is 'invalid continuation byte', then it's a decode error
109109
logger.error("Error processing row %s: %s", row_count, error)
110110
if hasattr(error, 'reason') and error.reason == "invalid continuation byte":
111-
return total_rows_processed_count, error
111+
return total_rows_processed_count, InvalidEncoding("Invalid continuation byte")
112112
else:
113113
raise error
114114
return total_rows_processed_count, None

recordprocessor/tests/test_batch_processor.py

Lines changed: 0 additions & 183 deletions
This file was deleted.
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
import unittest
2+
import os
3+
from io import BytesIO
4+
from unittest.mock import patch
5+
from batch_processor import process_csv_to_fhir
6+
from tests.utils_for_recordprocessor_tests.utils_for_recordprocessor_tests import create_patch
7+
8+
9+
class TestProcessorEdgeCases(unittest.TestCase):
10+
11+
def setUp(self):
12+
self.mock_logger_info = create_patch("logging.Logger.info")
13+
self.mock_logger_warning = create_patch("logging.Logger.warning")
14+
self.mock_logger_error = create_patch("logging.Logger.error")
15+
self.mock_send_to_kinesis = create_patch("batch_processor.send_to_kinesis")
16+
self.mock_map_target_disease = create_patch("batch_processor.map_target_disease")
17+
self.mock_s3_get_object = create_patch("utils_for_recordprocessor.s3_client.get_object")
18+
self.mock_s3_put_object = create_patch("utils_for_recordprocessor.s3_client.put_object")
19+
self.mock_make_and_move = create_patch("file_level_validation.make_and_upload_ack_file")
20+
self.mock_move_file = create_patch("file_level_validation.move_file")
21+
self.mock_get_permitted_operations = create_patch("file_level_validation.get_permitted_operations")
22+
self.mock_firehose_client = create_patch("logging_decorator.firehose_client")
23+
24+
def tearDown(self):
25+
patch.stopall()
26+
27+
def expand_test_data(self, data: list[bytes], num_rows: int) -> list[bytes]:
28+
n_rows = len(data) - 1 # Exclude header
29+
30+
if n_rows < num_rows:
31+
multiplier = (num_rows // n_rows) + 1
32+
header = data[0:1]
33+
body = data[1:] * multiplier
34+
data = header + body
35+
data = data[:num_rows + 1]
36+
return data
37+
38+
def create_test_data_from_file(self, file_name: str) -> list[bytes]:
39+
test_csv_path = os.path.join(
40+
os.path.dirname(__file__), "test_data", file_name
41+
)
42+
with open(test_csv_path, "rb") as f:
43+
data = f.readlines()
44+
return data
45+
46+
def insert_cp1252_at_end(self, data: list[bytes], new_text: bytes, field: int) -> list[bytes]:
47+
for i in reversed(range(len(data))):
48+
line = data[i]
49+
# Split fields by pipe
50+
fields = line.strip().split(b"|")
51+
fields[field] = new_text
52+
# Reconstruct the line
53+
data[i] = b"|".join(fields) + b"\n"
54+
break
55+
return data
56+
57+
def test_process_large_file_with_cp1252(self):
58+
""" Test processing a large file with cp1252 encoding """
59+
n_rows = 500
60+
data = self.create_test_data_from_file("test-batch-data.csv")
61+
data = self.expand_test_data(data, n_rows)
62+
data = self.insert_cp1252_at_end(data, b'D\xe9cembre', 2)
63+
ret1 = {"Body": BytesIO(b"".join(data))}
64+
ret2 = {"Body": BytesIO(b"".join(data))}
65+
self.mock_s3_get_object.side_effect = [ret1, ret2]
66+
self.mock_map_target_disease.return_value = "some disease"
67+
68+
message_body = {
69+
"vaccine_type": "vax-type-1",
70+
"supplier": "test-supplier",
71+
}
72+
self.mock_map_target_disease.return_value = "some disease"
73+
74+
n_rows_processed = process_csv_to_fhir(message_body)
75+
self.assertEqual(n_rows_processed, n_rows)
76+
self.assertEqual(self.mock_send_to_kinesis.call_count, n_rows)
77+
# check logger.warning called for decode error
78+
self.mock_logger_warning.assert_called()
79+
warning_call_args = self.mock_logger_warning.call_args[0][0]
80+
self.assertEqual(warning_call_args, "Processing Error: Invalid continuation byte.")
81+
82+
def test_process_large_file_with_utf8(self):
83+
""" Test processing a large file with utf-8 encoding """
84+
n_rows = 500
85+
data = self.create_test_data_from_file("test-batch-data.csv")
86+
data = self.expand_test_data(data, n_rows)
87+
ret1 = {"Body": BytesIO(b"".join(data))}
88+
ret2 = {"Body": BytesIO(b"".join(data))}
89+
self.mock_s3_get_object.side_effect = [ret1, ret2]
90+
self.mock_map_target_disease.return_value = "some disease"
91+
92+
message_body = {
93+
"vaccine_type": "vax-type-1",
94+
"supplier": "test-supplier",
95+
}
96+
self.mock_map_target_disease.return_value = "some disease"
97+
98+
n_rows_processed = process_csv_to_fhir(message_body)
99+
self.assertEqual(n_rows_processed, n_rows)
100+
self.assertEqual(self.mock_send_to_kinesis.call_count, n_rows)
101+
self.mock_logger_warning.assert_not_called()
102+
self.mock_logger_error.assert_not_called()
103+
104+
def test_process_cp1252_small_file(self):
105+
""" Test processing a small file with cp1252 encoding """
106+
data = self.create_test_data_from_file("test-batch-data-cp1252.csv")
107+
data = [line if line.endswith(b"\n") else line + b"\n" for line in data]
108+
n_rows = len(data) - 1 # Exclude header
109+
110+
ret1 = {"Body": BytesIO(b"".join(data))}
111+
ret2 = {"Body": BytesIO(b"".join(data))}
112+
self.mock_s3_get_object.side_effect = [ret1, ret2]
113+
self.mock_map_target_disease.return_value = "some disease"
114+
115+
message_body = {
116+
"vaccine_type": "vax-type-1",
117+
"supplier": "test-supplier",
118+
}
119+
120+
self.mock_map_target_disease.return_value = "some disease"
121+
122+
n_rows_processed = process_csv_to_fhir(message_body)
123+
self.assertEqual(n_rows_processed, n_rows)
124+
self.assertEqual(self.mock_send_to_kinesis.call_count, n_rows)
125+
self.mock_logger_warning.assert_called()
126+
warning_call_args = self.mock_logger_warning.call_args[0][0]
127+
self.assertTrue(warning_call_args.startswith("Invalid Encoding detected in process_csv_to_fhir"))
128+
129+
def test_process_utf8_small_file(self):
130+
""" Test processing a small file with utf-8 encoding """
131+
data = self.create_test_data_from_file("test-batch-data.csv")
132+
data = [line if line.endswith(b"\n") else line + b"\n" for line in data]
133+
n_rows = len(data) - 1 # Exclude header
134+
135+
ret1 = {"Body": BytesIO(b"".join(data))}
136+
ret2 = {"Body": BytesIO(b"".join(data))}
137+
self.mock_s3_get_object.side_effect = [ret1, ret2]
138+
self.mock_map_target_disease.return_value = "some disease"
139+
140+
message_body = {
141+
"vaccine_type": "vax-type-1",
142+
"supplier": "test-supplier",
143+
}
144+
self.mock_map_target_disease.return_value = "some disease"
145+
146+
n_rows_processed = process_csv_to_fhir(message_body)
147+
self.assertEqual(n_rows_processed, n_rows)
148+
self.assertEqual(self.mock_send_to_kinesis.call_count, n_rows)
149+
self.mock_logger_warning.assert_not_called()
150+
self.mock_logger_error.assert_not_called()
151+
152+
def test_fix_cp1252(self):
153+
# create a cp1252 string that contains an accented E
154+
# this is not a unit test as such but checks/confirms encoding assumptions
155+
source_text = b'D\xe9cembre'
156+
test_dict = {
157+
"date": source_text,
158+
"name": "Test Name"}
159+
utf8_dict = dict_decode(test_dict, "cp1252")
160+
self.assertEqual(utf8_dict["date"], "Décembre")
161+
self.assertEqual(utf8_dict["name"], "Test Name")
162+
163+
164+
def dict_decode(input_dict: dict, encoding: str) -> dict:
165+
"""
166+
Decode all byte strings in a dictionary to UTF-8 strings using the specified encoding.
167+
"""
168+
decoded_dict = {}
169+
for key, value in input_dict.items():
170+
if isinstance(value, bytes):
171+
decoded_dict[key] = value.decode(encoding)
172+
else:
173+
decoded_dict[key] = value
174+
return decoded_dict

recordprocessor/tests/test_recordprocessor_main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
REGION_NAME,
2424
)
2525
from tests.utils_for_recordprocessor_tests.mock_environment_variables import MOCK_ENVIRONMENT_DICT, BucketNames, Kinesis
26+
from tests.utils_for_recordprocessor_tests.utils_for_recordprocessor_tests import create_patch
2627

2728
with patch("os.environ", MOCK_ENVIRONMENT_DICT):
2829
from constants import Diagnostics
@@ -52,6 +53,7 @@ def setUp(self) -> None:
5253
"code": "55735004",
5354
"term": "Respiratory syncytial virus infection (disorder)"
5455
}])
56+
self.mock_logger_info = create_patch("logging.Logger.info")
5557

5658
def tearDown(self) -> None:
5759
GenericTearDown(s3_client, firehose_client, kinesis_client)

0 commit comments

Comments
 (0)