Skip to content

Commit 3650a1b

Browse files
committed
wip
1 parent 10e7d6c commit 3650a1b

File tree

3 files changed

+83
-28
lines changed

3 files changed

+83
-28
lines changed

recordprocessor/src/batch_processor.py

Lines changed: 72 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,24 @@
1212
from clients import logger
1313
from file_level_validation import file_level_validation
1414
from errors import NoOperationPermissions, InvalidHeaders
15+
from utils_for_recordprocessor import get_csv_content_dict_reader
16+
from typing import Optional
1517

1618

17-
def process_csv_to_fhir(incoming_message_body: dict) -> None:
19+
def process_csv_to_fhir(incoming_message_body: dict) -> int:
1820
"""
1921
For each row of the csv, attempts to transform into FHIR format, sends a message to kinesis,
2022
and documents the outcome for each row in the ack file.
23+
Returns the number of rows processed. While this is not used by the handler, the number of rows
24+
processed must be correct and therefore is returned for logging and test purposes.
2125
"""
26+
encoder = "utf-8" # default encoding
2227
try:
28+
incoming_message_body["encoder"] = encoder
2329
interim_message_body = file_level_validation(incoming_message_body=incoming_message_body)
24-
except (InvalidHeaders, NoOperationPermissions, Exception): # pylint: disable=broad-exception-caught
25-
# If the file is invalid, processing should cease immediately
26-
return
30+
except (InvalidHeaders, NoOperationPermissions, Exception) as e: # pylint: disable=broad-exception-caught
31+
logger.error(f"File level validation failed: {e}") # If the file is invalid, processing should cease immediately
32+
return 0
2733

2834
file_id = interim_message_body.get("message_id")
2935
vaccine = interim_message_body.get("vaccine")
@@ -36,40 +42,81 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
3642
target_disease = map_target_disease(vaccine)
3743

3844
row_count = 0
39-
for row in csv_reader:
40-
row_count += 1
41-
row_id = f"{file_id}^{row_count}"
42-
logger.info("MESSAGE ID : %s", row_id)
45+
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
46+
created_at_formatted_string, csv_reader, target_disease)
47+
48+
if err:
49+
if isinstance(err, UnicodeDecodeError):
50+
""" resolves encoding issue VED-754 """
51+
logger.warning(f"Encoding Error: {err}.")
52+
new_encoder = "cp1252"
53+
logger.info(f"Encode error at row {row_count} with {encoder}. Switch to {new_encoder}")
54+
encoder = new_encoder
55+
56+
# load alternative encoder
57+
csv_reader = get_csv_content_dict_reader(file_key, encoder=encoder)
58+
# re-read the file and skip processed rows
59+
row_count, = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
60+
created_at_formatted_string, csv_reader, target_disease, row_count)
61+
else:
62+
logger.error(f"Row Processing error: {err}")
63+
raise err
4364

44-
# Process the row to obtain the details needed for the message_body and ack file
45-
details_from_processing = process_row(target_disease, allowed_operations, row)
46-
47-
# Create the message body for sending
48-
outgoing_message_body = {
49-
"row_id": row_id,
50-
"file_key": file_key,
51-
"supplier": supplier,
52-
"vax_type": vaccine,
53-
"created_at_formatted_string": created_at_formatted_string,
54-
**details_from_processing,
55-
}
56-
57-
send_to_kinesis(supplier, outgoing_message_body, vaccine)
65+
update_audit_table_status(file_key, file_id, FileStatus.PREPROCESSED)
66+
return row_count
5867

59-
logger.info("Total rows processed: %s", row_count)
6068

61-
update_audit_table_status(file_key, file_id, FileStatus.PREPROCESSED)
69+
# Process the row to obtain the details needed for the message_body and ack file
70+
def process_rows(file_id, vaccine, supplier, file_key, allowed_operations, created_at_formatted_string,
71+
csv_reader, target_disease,
72+
total_rows_processed_count=0) -> tuple[int, Optional[Exception]]:
73+
"""
74+
Processes each row in the csv_reader starting from start_row.
75+
"""
76+
row_count = 0
77+
start_row = total_rows_processed_count
78+
try:
79+
for row in csv_reader:
80+
row_count += 1
81+
if row_count > start_row:
82+
row_id = f"{file_id}^{row_count}"
83+
logger.info("MESSAGE ID : %s", row_id)
84+
# Log progress every 1000 rows and the first 10 rows after a restart
85+
if total_rows_processed_count % 1000 == 0:
86+
logger.info(f"Process: {total_rows_processed_count+1}")
87+
if start_row > 0 and row_count <= start_row+10:
88+
logger.info(f"Restarted Process (log up to first 10): {total_rows_processed_count+1}")
89+
# Process the row to obtain the details needed for the message_body and ack file
90+
details_from_processing = process_row(target_disease, allowed_operations, row)
91+
# Create the message body for sending
92+
outgoing_message_body = {
93+
"row_id": row_id,
94+
"file_key": file_key,
95+
"supplier": supplier,
96+
"vax_type": vaccine,
97+
"created_at_formatted_string": created_at_formatted_string,
98+
**details_from_processing,
99+
}
100+
send_to_kinesis(supplier, outgoing_message_body, vaccine)
101+
total_rows_processed_count += 1
102+
except UnicodeDecodeError as error: # pylint: disable=broad-exception-caught
103+
logger.error("Error processing row %s: %s", row_count, error)
104+
return total_rows_processed_count, error
105+
106+
return total_rows_processed_count, None
62107

63108

64109
def main(event: str) -> None:
65110
"""Process each row of the file"""
66111
logger.info("task started")
67112
start = time.time()
113+
n_rows_processed = 0
68114
try:
69-
process_csv_to_fhir(incoming_message_body=json.loads(event))
115+
n_rows_processed = process_csv_to_fhir(incoming_message_body=json.loads(event))
70116
except Exception as error: # pylint: disable=broad-exception-caught
71117
logger.error("Error processing message: %s", error)
72118
end = time.time()
119+
logger.info("Total rows processed: %s", n_rows_processed)
73120
logger.info("Total time for completion: %ss", round(end - start, 5))
74121

75122

recordprocessor/src/file_level_validation.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,17 @@ def file_level_validation(incoming_message_body: dict) -> dict:
7676
file_key = incoming_message_body.get("filename")
7777
permission = incoming_message_body.get("permission")
7878
created_at_formatted_string = incoming_message_body.get("created_at_formatted_string")
79+
encoder = incoming_message_body.get("encoder", "utf-8")
7980

8081
# Fetch the data
81-
csv_reader = get_csv_content_dict_reader(file_key)
82+
try:
83+
csv_reader = get_csv_content_dict_reader(file_key, encoder=encoder)
84+
validate_content_headers(csv_reader)
85+
except UnicodeDecodeError as e:
86+
logger.warning("Invalid Encoding detected: %s", e)
87+
# retry with cp1252 encoding
88+
csv_reader = get_csv_content_dict_reader(file_key, encoder="cp1252")
89+
validate_content_headers(csv_reader)
8290

8391
validate_content_headers(csv_reader)
8492

recordprocessor/src/utils_for_recordprocessor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ def get_environment() -> str:
1313
return _env if _env in ["internal-dev", "int", "ref", "sandbox", "prod"] else "internal-dev"
1414

1515

16-
def get_csv_content_dict_reader(file_key: str) -> DictReader:
16+
def get_csv_content_dict_reader(file_key: str, encoder="utf-8") -> DictReader:
1717
"""Returns the requested file contents from the source bucket in the form of a DictReader"""
1818
response = s3_client.get_object(Bucket=os.getenv("SOURCE_BUCKET_NAME"), Key=file_key)
1919
binary_io = response["Body"]
20-
text_io = TextIOWrapper(binary_io, encoding="utf-8", newline="")
20+
text_io = TextIOWrapper(binary_io, encoding=encoder, newline="")
2121
return DictReader(text_io, delimiter="|")
2222

2323

0 commit comments

Comments
 (0)