1313from typing import Optional
1414
1515
16- def process_csv_to_fhir (incoming_message_body : dict ) -> None :
16+ def process_csv_to_fhir (incoming_message_body : dict ) -> int :
1717 """
1818 For each row of the csv, attempts to transform into FHIR format, sends a message to kinesis,
1919 and documents the outcome for each row in the ack file.
20+ Returns the number of rows processed. While this is not used by the handler, the number of rows
21+ processed must be correct and therefore is returned for logging and test purposes.
2022 """
2123 encoder = "utf-8" # default encoding
2224 try :
@@ -50,10 +52,10 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
5052 logger .info (f"process with encoder { encoder } from row { row_count + 1 } " )
5153 row_count , err = process_rows (file_id , vaccine , supplier , file_key , allowed_operations ,
5254 created_at_formatted_string , csv_reader , target_disease )
55+
5356 if err :
54- logger .warning (f"Error processing: { err } ." )
55- # check if it's a decode error
56- if err .reason == "invalid continuation byte" :
57+ logger .warning (f"Processing Error: { err } ." )
58+ if isinstance (err , InvalidEncoding ):
5759 new_encoder = "cp1252"
5860 logger .info (f"Encode error at row { row_count } with { encoder } . Switch to { new_encoder } " )
5961 encoder = new_encoder
@@ -63,10 +65,9 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
6365 row_count , err = process_rows (file_id , vaccine , supplier , file_key , allowed_operations ,
6466 created_at_formatted_string , csv_reader , target_disease , row_count )
6567 else :
66- logger .error (f"Non-decode error: { err } . Cannot retry. Call someone. " )
68+ logger .error (f"Row Processing error: { err } " )
6769 raise err
6870
69- logger .info ("Total rows processed: %s" , row_count )
7071 return row_count
7172
7273
@@ -108,7 +109,7 @@ def process_rows(file_id, vaccine, supplier, file_key, allowed_operations, creat
108109 # if error reason is 'invalid continuation byte', then it's a decode error
109110 logger .error ("Error processing row %s: %s" , row_count , error )
110111 if hasattr (error , 'reason' ) and error .reason == "invalid continuation byte" :
111- return total_rows_processed_count , error
112+ return total_rows_processed_count , InvalidEncoding ( "Invalid continuation byte" )
112113 else :
113114 raise error
114115 return total_rows_processed_count , None
@@ -118,11 +119,13 @@ def main(event: str) -> None:
118119 """Process each row of the file"""
119120 logger .info ("task started" )
120121 start = time .time ()
122+ n_rows_processed = 0
121123 try :
122- process_csv_to_fhir (incoming_message_body = json .loads (event ))
124+ n_rows_processed = process_csv_to_fhir (incoming_message_body = json .loads (event ))
123125 except Exception as error : # pylint: disable=broad-exception-caught
124126 logger .error ("Error processing message: %s" , error )
125127 end = time .time ()
128+ logger .info ("Total rows processed: %s" , n_rows_processed )
126129 logger .info ("Total time for completion: %ss" , round (end - start , 5 ))
127130
128131
0 commit comments