Skip to content

Commit 857ea87

Browse files
committed
works!
1 parent 262dd5e commit 857ea87

File tree

3 files changed

+87
-61
lines changed

3 files changed

+87
-61
lines changed

recordprocessor/src/batch_processor.py

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99
from clients import logger
1010
from file_level_validation import file_level_validation
1111
from errors import NoOperationPermissions, InvalidHeaders
12+
from utils_for_recordprocessor import get_csv_content_dict_reader
1213

1314

1415
def process_csv_to_fhir(incoming_message_body: dict) -> None:
1516
"""
1617
For each row of the csv, attempts to transform into FHIR format, sends a message to kinesis,
1718
and documents the outcome for each row in the ack file.
1819
"""
19-
encoder = "utf-8" # default encoding
2020
try:
2121
interim_message_body = file_level_validation(incoming_message_body=incoming_message_body)
2222
except (InvalidHeaders, NoOperationPermissions, Exception): # pylint: disable=broad-exception-caught
@@ -32,53 +32,72 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
3232
csv_reader = interim_message_body.get("csv_dict_reader")
3333

3434
target_disease = map_target_disease(vaccine)
35+
print("process csv to fhir")
3536
row_count = 0
37+
encoder = "utf-8" # default encoding
3638
try:
3739
row_count = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
3840
created_at_formatted_string, csv_reader, target_disease)
3941
except Exception as error: # pylint: disable=broad-exception-caught
4042
new_encoder = "cp1252"
4143
print(f"Error processing: {error}.")
4244
# check if it's a decode error, ie error.args[0] begins with "'utf-8' codec can't decode byte"
43-
if error.args[0].startswith("'utf-8' codec can't decode byte"):
45+
if error.reason == "invalid continuation byte":
4446
print(f"Encode error at row {row_count} with {encoder}. Switch to {new_encoder}")
45-
print(f"Detected decode error: {error.args[0]}")
46-
# if we are here, re-read the file with correct encoding and ignore the processed rows
47-
# if error.args[0] == "'utf-8' codec can't decode byte 0xe9 in position 2996: invalid continuation byte":
48-
# cp1252
49-
row_count += process_rows_retry(file_id, vaccine, supplier, file_key,
50-
allowed_operations, created_at_formatted_string,
51-
"cp1252", start_row=row_count)
47+
# print(f"Detected decode error: {error.reason}")
48+
encoder = new_encoder
49+
# if we are here, re-read the file with alternative encoding and skip processed rows
50+
row_count = process_rows_retry(file_id, vaccine, supplier, file_key,
51+
allowed_operations, created_at_formatted_string,
52+
encoder, row_count)
5253
else:
53-
logger.error(f"Non-decode error: {error}. Cannot retry.")
54+
logger.error(f"Non-decode error: {error}. Cannot retry. Call someone.")
5455
raise error from error
5556

5657
logger.info("Total rows processed: %s", row_count)
57-
update_audit_table_status(file_key, file_id, FileStatus.PREPROCESSED)
5858

5959

6060
def process_rows_retry(file_id, vaccine, supplier, file_key, allowed_operations,
61-
created_at_formatted_string, encoder, target_disease, start_row=0) -> int:
62-
new_reader = get_csv_content_dict_reader(file_key, encoding=encoder)
63-
return process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
64-
created_at_formatted_string, new_reader, start_row)
61+
created_at_formatted_string, encoder, total_rows_processed_count=0) -> int:
62+
"""
63+
Retry processing rows with a different encoding from a specific row number
64+
"""
65+
print("process_rows_retry...")
66+
new_reader = get_csv_content_dict_reader(file_key, encoder=encoder)
67+
68+
total_rows_processed_count = process_rows(
69+
file_id, vaccine, supplier, file_key, allowed_operations,
70+
created_at_formatted_string, new_reader, total_rows_processed_count)
71+
72+
return total_rows_processed_count
6573

6674

6775
def process_rows(file_id, vaccine, supplier, file_key, allowed_operations, created_at_formatted_string,
68-
csv_reader, target_disease, start_row=0) -> int:
76+
csv_reader, target_disease,
77+
total_rows_processed_count=0) -> int:
6978
"""
7079
Processes each row in the csv_reader starting from start_row.
7180
"""
72-
81+
print("process_rows...")
7382
row_count = 0
83+
start_row = total_rows_processed_count
7484
for row in csv_reader:
75-
if row_count >= start_row:
76-
row_count += 1
85+
86+
row_count += 1
87+
if row_count > start_row:
7788
row_id = f"{file_id}^{row_count}"
7889
logger.info("MESSAGE ID : %s", row_id)
7990

91+
# convert dict to string and print first 20 chars
92+
if (total_rows_processed_count % 1000 == 0):
93+
print(f"Process: {total_rows_processed_count}")
94+
if (total_rows_processed_count > 19995):
95+
print(f"Process: {total_rows_processed_count} - {row['PERSON_SURNAME']}")
96+
97+
# Process the row to obtain the details needed for the message_body and ack file
8098
details_from_processing = process_row(target_disease, allowed_operations, row)
8199

100+
# Create the message body for sending
82101
outgoing_message_body = {
83102
"row_id": row_id,
84103
"file_key": file_key,
@@ -89,8 +108,9 @@ def process_rows(file_id, vaccine, supplier, file_key, allowed_operations, creat
89108
}
90109

91110
send_to_kinesis(supplier, outgoing_message_body, vaccine)
92-
93-
return row_count
111+
total_rows_processed_count += 1
112+
logger.info("Total rows processed: %s", total_rows_processed_count)
113+
return total_rows_processed_count
94114

95115

96116
def main(event: str) -> None:

recordprocessor/src/utils_for_recordprocessor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ def get_environment() -> str:
1515
return _env if _env in ["internal-dev", "int", "ref", "sandbox", "prod"] else "internal-dev"
1616

1717

18-
def get_csv_content_dict_reader(file_key: str) -> DictReader:
18+
def get_csv_content_dict_reader(file_key: str, encoder="utf-8") -> DictReader:
1919
"""Returns the requested file contents from the source bucket in the form of a DictReader"""
2020
response = s3_client.get_object(Bucket=os.getenv("SOURCE_BUCKET_NAME"), Key=file_key)
2121
binary_io = response["Body"]
22-
text_io = TextIOWrapper(binary_io, encoding="utf-8", newline="")
22+
text_io = TextIOWrapper(binary_io, encoding=encoder, newline="")
2323
return DictReader(text_io, delimiter="|")
2424

2525

recordprocessor/tests/test_batch_processor.py

Lines changed: 44 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os
33
from io import BytesIO
44
from unittest.mock import patch
5-
from utils_for_recordprocessor import dict_decode
5+
# from utils_for_recordprocessor import dict_decode
66

77
with patch("logging_decorator.file_level_validation_logging_decorator", lambda f: f):
88
# from file_level_validation import file_level_validation
@@ -14,8 +14,8 @@ class TestProcessCsvToFhir(unittest.TestCase):
1414
def setUp(self):
1515
self.logger_info_patcher = patch("logging.Logger.info")
1616
self.mock_logger_info = self.logger_info_patcher.start()
17-
self.update_audit_table_status_patcher = patch("batch_processor.update_audit_table_status")
18-
self.mock_update_audit_table_status = self.update_audit_table_status_patcher.start()
17+
# self.update_audit_table_status_patcher = patch("batch_processor.update_audit_table_status")
18+
# self.mock_update_audit_table_status = self.update_audit_table_status_patcher.start()
1919
self.send_to_kinesis_patcher = patch("batch_processor.send_to_kinesis")
2020
self.mock_send_to_kinesis = self.send_to_kinesis_patcher.start()
2121
self.map_target_disease_patcher = patch("batch_processor.map_target_disease")
@@ -36,7 +36,7 @@ def setUp(self):
3636
def tearDown(self):
3737
patch.stopall()
3838

39-
def create_large_test_data(self, data: list[bytes], num_rows: int) -> list[bytes]:
39+
def expand_test_data(self, data: list[bytes], num_rows: int) -> list[bytes]:
4040
n_rows = len(data) - 1 # Exclude header
4141

4242
if n_rows < num_rows:
@@ -45,53 +45,38 @@ def create_large_test_data(self, data: list[bytes], num_rows: int) -> list[bytes
4545
body = data[1:] * multiplier
4646
data = header + body
4747
data = data[:num_rows + 1]
48-
48+
print(f"Expanded test data to {len(data)-1} rows")
4949
return data
5050

51-
def create_test_data_from_file(self, file_name: str, num_rows: int) -> list[bytes]:
51+
def create_test_data_from_file(self, file_name: str) -> list[bytes]:
5252
test_csv_path = os.path.join(
5353
os.path.dirname(__file__), "test_data", file_name
5454
)
55-
5655
with open(test_csv_path, "rb") as f:
5756
data = f.readlines()
57+
return data
5858

59-
n_rows = len(data) - 1 # Exclude header
60-
61-
if n_rows < num_rows:
62-
multiplier = (num_rows // n_rows) + 1
63-
header = data[0:1]
64-
body = data[1:] * multiplier
65-
data = header + body
66-
data = data[:num_rows + 1]
67-
59+
def insert_cp1252_at_end(self, data: list[bytes], new_text: bytes, field: int) -> list[bytes]:
60+
for i in reversed(range(len(data))):
61+
line = data[i]
62+
# Split fields by pipe
63+
fields = line.strip().split(b"|")
64+
print(f"replace field: {fields[field]}")
65+
fields[field] = new_text
66+
print(f"replaced field: {fields[field]}")
67+
68+
# Reconstruct the line
69+
data[i] = b"|".join(fields) + b"\n"
70+
break
6871
return data
6972

7073
def test_process_csv_to_fhir_success(self):
7174
# Setup mocks
7275
print("test_process_csv_to_fhir_success")
7376
try:
74-
data = self.create_test_data_from_file("test_batch_processor.csv", 20_000)
75-
test_csv_path = os.path.join(
76-
os.path.dirname(__file__), "test_data", "test_batch_processor.csv"
77-
)
78-
with open(test_csv_path, "rb") as f:
79-
data = f.readlines()
80-
81-
# insert source_text into last row of cp1252_bytes
82-
for i in reversed(range(len(data))):
83-
line = data[i]
84-
# Split fields by pipe
85-
fields = line.strip().split(b"|")
86-
print(f"replace field: {fields[2]}")
87-
fields[2] = b'D\xe9cembre'
88-
print(f"replaced field: {fields[2]}")
89-
90-
# Reconstruct the line
91-
data[i] = b"|".join(fields) + b"\n"
92-
break
93-
94-
# manually add
77+
data = self.create_test_data_from_file("test-batch-data.csv")
78+
data = self.expand_test_data(data, 20_000)
79+
data = self.insert_cp1252_at_end(data, b'D\xe9cembre', 2)
9580

9681
# Read CSV from test_csv_path as utf-8
9782
ret1 = {"Body": BytesIO(b"".join(data))}
@@ -107,7 +92,6 @@ def test_process_csv_to_fhir_success(self):
10792
"permission": ["COVID.R", "COVID.U", "COVID.D"],
10893
"allowed_operations": ["CREATE", "UPDATE", "DELETE"],
10994
"created_at_formatted_string": "2024-09-05T12:00:00Z"
110-
# "csv_dict_reader": csv_rows
11195
}
11296
# self.mock_file_level_validation.return_value = message_body
11397
self.mock_get_permitted_operations.return_value = {"CREATE", "UPDATE", "DELETE"}
@@ -127,3 +111,25 @@ def test_fix_cp1252(self):
127111
utf8_dict = dict_decode(test_dict, "cp1252")
128112
self.assertEqual(utf8_dict["date"], "Décembre")
129113
self.assertEqual(utf8_dict["name"], "Test Name")
114+
115+
def dict_decode(self):
116+
source_text = b'D\xe9cembre'
117+
test_dict = {
118+
"date": source_text,
119+
"name": "Test Name"}
120+
utf8_dict = dict_decode(test_dict, "cp1252")
121+
self.assertEqual(utf8_dict["date"], "Décembre")
122+
self.assertEqual(utf8_dict["name"], "Test Name")
123+
124+
125+
def dict_decode(input_dict: dict, encoding: str) -> dict:
126+
"""
127+
Decode all byte strings in a dictionary to UTF-8 strings using the specified encoding.
128+
"""
129+
decoded_dict = {}
130+
for key, value in input_dict.items():
131+
if isinstance(value, bytes):
132+
decoded_dict[key] = value.decode(encoding)
133+
else:
134+
decoded_dict[key] = value
135+
return decoded_dict

0 commit comments

Comments
 (0)