33import json
44import os
55import time
6+ from csv import DictReader
67from json import JSONDecodeError
78
8- from constants import FileStatus
9+ from constants import FileStatus , FileNotProcessedReason , SOURCE_BUCKET_NAME , ARCHIVE_DIR_NAME , PROCESSING_DIR_NAME
910from process_row import process_row
1011from mappings import map_target_disease
1112from audit_table import update_audit_table_status
1213from send_to_kinesis import send_to_kinesis
1314from clients import logger
14- from file_level_validation import file_level_validation
15+ from file_level_validation import file_level_validation , file_is_empty , move_file
1516from errors import NoOperationPermissions , InvalidHeaders
1617from utils_for_recordprocessor import get_csv_content_dict_reader
1718from typing import Optional
@@ -42,7 +43,6 @@ def process_csv_to_fhir(incoming_message_body: dict) -> int:
4243
4344 target_disease = map_target_disease (vaccine )
4445
45- row_count = 0
4646 row_count , err = process_rows (file_id , vaccine , supplier , file_key , allowed_operations ,
4747 created_at_formatted_string , csv_reader , target_disease )
4848
@@ -55,22 +55,37 @@ def process_csv_to_fhir(incoming_message_body: dict) -> int:
5555 encoder = new_encoder
5656
5757 # load alternative encoder
58- csv_reader = get_csv_content_dict_reader (f"processing /{ file_key } " , encoder = encoder )
58+ csv_reader = get_csv_content_dict_reader (f"{ PROCESSING_DIR_NAME } /{ file_key } " , encoder = encoder )
5959 # re-read the file and skip processed rows
6060 row_count , err = process_rows (file_id , vaccine , supplier , file_key , allowed_operations ,
6161 created_at_formatted_string , csv_reader , target_disease , row_count )
6262 else :
6363 logger .error (f"Row Processing error: { err } " )
6464 raise err
6565
66- update_audit_table_status (file_key , file_id , FileStatus .PREPROCESSED )
66+ file_status = FileStatus .PREPROCESSED
67+
68+ if file_is_empty (row_count ):
69+ logger .warning ("File was empty: %s. Moving file to archive directory." , file_key )
70+ move_file (SOURCE_BUCKET_NAME , f"{ PROCESSING_DIR_NAME } /{ file_key } " , f"{ ARCHIVE_DIR_NAME } /{ file_key } " )
71+ file_status = f"{ FileStatus .NOT_PROCESSED } - { FileNotProcessedReason .EMPTY } "
72+
73+ update_audit_table_status (file_key , file_id , file_status )
6774 return row_count
6875
6976
7077# Process the row to obtain the details needed for the message_body and ack file
71- def process_rows (file_id , vaccine , supplier , file_key , allowed_operations , created_at_formatted_string ,
72- csv_reader , target_disease ,
73- total_rows_processed_count = 0 ) -> tuple [int , Optional [Exception ]]:
78+ def process_rows (
79+ file_id : str ,
80+ vaccine : str ,
81+ supplier : str ,
82+ file_key : str ,
83+ allowed_operations : set ,
84+ created_at_formatted_string : str ,
85+ csv_reader : DictReader ,
86+ target_disease : list [dict ],
87+ total_rows_processed_count : int = 0
88+ ) -> tuple [int , Optional [Exception ]]:
7489 """
7590 Processes each row in the csv_reader starting from start_row.
7691 """
0 commit comments