Skip to content

Commit 2c67fc9

Browse files
committed
log rows returned. Tests Pass
1 parent f2d17ed commit 2c67fc9

File tree

5 files changed

+192
-191
lines changed

5 files changed

+192
-191
lines changed

recordprocessor/src/batch_processor.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
from typing import Optional
1414

1515

16-
def process_csv_to_fhir(incoming_message_body: dict) -> None:
16+
def process_csv_to_fhir(incoming_message_body: dict) -> int:
1717
"""
1818
For each row of the csv, attempts to transform into FHIR format, sends a message to kinesis,
1919
and documents the outcome for each row in the ack file.
20+
Returns the number of rows processed. While this is not used by the handler, the number of rows
21+
processed must be correct and therefore is returned for logging and test purposes.
2022
"""
2123
encoder = "utf-8" # default encoding
2224
try:
@@ -50,10 +52,10 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
5052
logger.info(f"process with encoder {encoder} from row {row_count+1}")
5153
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
5254
created_at_formatted_string, csv_reader, target_disease)
55+
5356
if err:
54-
logger.warning(f"Error processing: {err}.")
55-
# check if it's a decode error
56-
if err.reason == "invalid continuation byte":
57+
logger.warning(f"Processing Error: {err}.")
58+
if isinstance(err, InvalidEncoding):
5759
new_encoder = "cp1252"
5860
logger.info(f"Encode error at row {row_count} with {encoder}. Switch to {new_encoder}")
5961
encoder = new_encoder
@@ -63,10 +65,9 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
6365
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
6466
created_at_formatted_string, csv_reader, target_disease, row_count)
6567
else:
66-
logger.error(f"Non-decode error: {err}. Cannot retry. Call someone.")
68+
logger.error(f"Row Processing error: {err}")
6769
raise err
6870

69-
logger.info("Total rows processed: %s", row_count)
7071
return row_count
7172

7273

@@ -108,7 +109,7 @@ def process_rows(file_id, vaccine, supplier, file_key, allowed_operations, creat
108109
# if error reason is 'invalid continuation byte', then it's a decode error
109110
logger.error("Error processing row %s: %s", row_count, error)
110111
if hasattr(error, 'reason') and error.reason == "invalid continuation byte":
111-
return total_rows_processed_count, error
112+
return total_rows_processed_count, InvalidEncoding("Invalid continuation byte")
112113
else:
113114
raise error
114115
return total_rows_processed_count, None
@@ -118,11 +119,13 @@ def main(event: str) -> None:
118119
"""Process each row of the file"""
119120
logger.info("task started")
120121
start = time.time()
122+
n_rows_processed = 0
121123
try:
122-
process_csv_to_fhir(incoming_message_body=json.loads(event))
124+
n_rows_processed = process_csv_to_fhir(incoming_message_body=json.loads(event))
123125
except Exception as error: # pylint: disable=broad-exception-caught
124126
logger.error("Error processing message: %s", error)
125127
end = time.time()
128+
logger.info("Total rows processed: %s", n_rows_processed)
126129
logger.info("Total time for completion: %ss", round(end - start, 5))
127130

128131

recordprocessor/tests/test_batch_processor.py

Lines changed: 0 additions & 183 deletions
This file was deleted.

0 commit comments

Comments
 (0)