Skip to content

Commit f0b6011

Browse files
committed
merge
1 parent ac1a001 commit f0b6011

File tree

1 file changed

+56
-20
lines changed

1 file changed

+56
-20
lines changed

recordprocessor/src/batch_processor.py

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,29 +31,65 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
3131
csv_reader = interim_message_body.get("csv_dict_reader")
3232

3333
target_disease = map_target_disease(vaccine)
34+
row_count = 0
35+
try:
36+
row_count = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
37+
created_at_formatted_string, csv_reader, target_disease)
38+
except Exception as error: # pylint: disable=broad-exception-caught
39+
encoder = "cp1252"
40+
print(f"Error processing: {error}.")
41+
print(f"Encode error at row {row_count} with {encoding}. Switch to {encoder}")
42+
# check if it's a decode error, ie error.args[0] begins with "'utf-8' codec can't decode byte"
43+
if error.args[0].startswith("'utf-8' codec can't decode byte"):
44+
print(f"Detected decode error: {error.args[0]}")
45+
# if we are here, re-read the file with correct encoding and ignore the processed rows
46+
# if error.args[0] == "'utf-8' codec can't decode byte 0xe9 in position 2996: invalid continuation byte":
47+
# cp1252
48+
row_count += process_rows_retry(file_id, vaccine, supplier, file_key,
49+
allowed_operations, created_at_formatted_string,
50+
"cp1252", start_row=row_count)
51+
else:
52+
logger.error(f"Non-decode error: {error}. Cannot retry.")
53+
raise error from error
54+
55+
logger.info("Total rows processed: %s", row_count)
56+
update_audit_table_status(file_key, file_id, FileStatus.PREPROCESSED)
57+
58+
59+
def process_rows_retry(file_id, vaccine, supplier, file_key, allowed_operations,
60+
created_at_formatted_string, encoder, target_disease, start_row=0) -> int:
61+
new_reader = get_csv_content_dict_reader(file_key, encoding=encoder)
62+
return process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
63+
created_at_formatted_string, new_reader, start_row)
64+
65+
66+
def process_rows(file_id, vaccine, supplier, file_key, allowed_operations, created_at_formatted_string,
67+
csv_reader, target_disease, start_row=0) -> int:
68+
"""
69+
Processes each row in the csv_reader starting from start_row.
70+
"""
3471

3572
row_count = 0
3673
for row in csv_reader:
37-
row_count += 1
38-
row_id = f"{file_id}^{row_count}"
39-
logger.info("MESSAGE ID : %s", row_id)
40-
41-
# Process the row to obtain the details needed for the message_body and ack file
42-
details_from_processing = process_row(target_disease, allowed_operations, row)
43-
44-
# Create the message body for sending
45-
outgoing_message_body = {
46-
"row_id": row_id,
47-
"file_key": file_key,
48-
"supplier": supplier,
49-
"vax_type": vaccine,
50-
"created_at_formatted_string": created_at_formatted_string,
51-
**details_from_processing,
52-
}
53-
54-
send_to_kinesis(supplier, outgoing_message_body, vaccine)
55-
56-
logger.info("Total rows processed: %s", row_count)
74+
if row_count >= start_row:
75+
row_count += 1
76+
row_id = f"{file_id}^{row_count}"
77+
logger.info("MESSAGE ID : %s", row_id)
78+
79+
details_from_processing = process_row(target_disease, allowed_operations, row)
80+
81+
outgoing_message_body = {
82+
"row_id": row_id,
83+
"file_key": file_key,
84+
"supplier": supplier,
85+
"vax_type": vaccine,
86+
"created_at_formatted_string": created_at_formatted_string,
87+
**details_from_processing,
88+
}
89+
90+
send_to_kinesis(supplier, outgoing_message_body, vaccine)
91+
92+
return row_count
5793

5894

5995
def main(event: str) -> None:

0 commit comments

Comments
 (0)