22import os
33from io import BytesIO
44from unittest .mock import patch
5+ from moto import mock_kinesis , mock_firehose
6+ from batch_processor import process_csv_to_fhir
57
68
79def create_patch (target : str ):
810 patcher = patch (target )
911 return patcher .start ()
1012
1113
14+ @mock_kinesis
15+ @mock_firehose
1216class TestProcessCsvToFhir (unittest .TestCase ):
1317
1418 def setUp (self ):
@@ -18,9 +22,12 @@ def setUp(self):
1822 self .mock_send_to_kinesis = create_patch ("batch_processor.send_to_kinesis" )
1923 self .mock_map_target_disease = create_patch ("batch_processor.map_target_disease" )
2024 self .mock_s3_get_object = create_patch ("utils_for_recordprocessor.s3_client.get_object" )
25+ self .mock_s3_put_object = create_patch ("utils_for_recordprocessor.s3_client.put_object" )
2126 self .mock_make_and_move = create_patch ("file_level_validation.make_and_upload_ack_file" )
2227 self .mock_move_file = create_patch ("file_level_validation.move_file" )
2328 self .mock_get_permitted_operations = create_patch ("file_level_validation.get_permitted_operations" )
29+ self .mock_firehose_client = create_patch ("logging_decorator.firehose_client" )
30+ self .mock_firehose_client .put_record .return_value = True
2431
2532 def tearDown (self ):
2633 patch .stopall ()
@@ -57,106 +64,98 @@ def insert_cp1252_at_end(self, data: list[bytes], new_text: bytes, field: int) -
5764
5865 def test_process_large_file_with_cp1252 (self ):
5966 """ Test processing a large file with cp1252 encoding """
60- with patch ("logging_decorator.file_level_validation_logging_decorator" , lambda f : f ):
61- from batch_processor import process_csv_to_fhir
62- n_rows = 500
63- data = self .create_test_data_from_file ("test-batch-data.csv" )
64- data = self .expand_test_data (data , n_rows )
65- data = self .insert_cp1252_at_end (data , b'D\xe9 cembre' , 2 )
66- ret1 = {"Body" : BytesIO (b"" .join (data ))}
67- ret2 = {"Body" : BytesIO (b"" .join (data ))}
68- self .mock_s3_get_object .side_effect = [ret1 , ret2 ]
69- self .mock_map_target_disease .return_value = "some disease"
70-
71- message_body = {
72- "vaccine_type" : "vax-type-1" ,
73- "supplier" : "test-supplier" ,
74- }
75- self .mock_map_target_disease .return_value = "some disease"
76-
77- n_rows_processed = process_csv_to_fhir (message_body )
78- self .assertEqual (n_rows_processed , n_rows )
79- self .assertEqual (self .mock_send_to_kinesis .call_count , n_rows )
80- # check logger.warning called for decode error
81- self .mock_logger_warning .assert_called ()
82- warning_call_args = self .mock_logger_warning .call_args [0 ][0 ]
83- self .assertTrue (warning_call_args .startswith ("Error processing: 'utf-8' codec can't decode byte" ))
67+ n_rows = 500
68+ data = self .create_test_data_from_file ("test-batch-data.csv" )
69+ data = self .expand_test_data (data , n_rows )
70+ data = self .insert_cp1252_at_end (data , b'D\xe9 cembre' , 2 )
71+ ret1 = {"Body" : BytesIO (b"" .join (data ))}
72+ ret2 = {"Body" : BytesIO (b"" .join (data ))}
73+ self .mock_s3_get_object .side_effect = [ret1 , ret2 ]
74+ self .mock_map_target_disease .return_value = "some disease"
75+
76+ message_body = {
77+ "vaccine_type" : "vax-type-1" ,
78+ "supplier" : "test-supplier" ,
79+ }
80+ self .mock_map_target_disease .return_value = "some disease"
81+
82+ n_rows_processed = process_csv_to_fhir (message_body )
83+ self .assertEqual (n_rows_processed , n_rows )
84+ self .assertEqual (self .mock_send_to_kinesis .call_count , n_rows )
85+ # check logger.warning called for decode error
86+ self .mock_logger_warning .assert_called ()
87+ warning_call_args = self .mock_logger_warning .call_args [0 ][0 ]
88+ self .assertTrue (warning_call_args .startswith ("Error processing: 'utf-8' codec can't decode byte" ))
8489
8590 def test_process_large_file_with_utf8 (self ):
8691 """ Test processing a large file with utf-8 encoding """
87- with patch ("logging_decorator.file_level_validation_logging_decorator" , lambda f : f ):
88- from batch_processor import process_csv_to_fhir
89- n_rows = 500
90- data = self .create_test_data_from_file ("test-batch-data.csv" )
91- data = self .expand_test_data (data , n_rows )
92- ret1 = {"Body" : BytesIO (b"" .join (data ))}
93- ret2 = {"Body" : BytesIO (b"" .join (data ))}
94- self .mock_s3_get_object .side_effect = [ret1 , ret2 ]
95- self .mock_map_target_disease .return_value = "some disease"
96-
97- message_body = {
98- "vaccine_type" : "vax-type-1" ,
99- "supplier" : "test-supplier" ,
100- }
101- self .mock_map_target_disease .return_value = "some disease"
102-
103- n_rows_processed = process_csv_to_fhir (message_body )
104- self .assertEqual (n_rows_processed , n_rows )
105- self .assertEqual (self .mock_send_to_kinesis .call_count , n_rows )
106- self .mock_logger_warning .assert_not_called ()
107- self .mock_logger_error .assert_not_called ()
92+ n_rows = 500
93+ data = self .create_test_data_from_file ("test-batch-data.csv" )
94+ data = self .expand_test_data (data , n_rows )
95+ ret1 = {"Body" : BytesIO (b"" .join (data ))}
96+ ret2 = {"Body" : BytesIO (b"" .join (data ))}
97+ self .mock_s3_get_object .side_effect = [ret1 , ret2 ]
98+ self .mock_map_target_disease .return_value = "some disease"
99+
100+ message_body = {
101+ "vaccine_type" : "vax-type-1" ,
102+ "supplier" : "test-supplier" ,
103+ }
104+ self .mock_map_target_disease .return_value = "some disease"
105+
106+ n_rows_processed = process_csv_to_fhir (message_body )
107+ self .assertEqual (n_rows_processed , n_rows )
108+ self .assertEqual (self .mock_send_to_kinesis .call_count , n_rows )
109+ self .mock_logger_warning .assert_not_called ()
110+ self .mock_logger_error .assert_not_called ()
108111
109112 def test_process_cp1252_small_file (self ):
110113 """ Test processing a small file with cp1252 encoding """
111- with patch ("logging_decorator.file_level_validation_logging_decorator" , lambda f : f ):
112- from batch_processor import process_csv_to_fhir
113- data = self .create_test_data_from_file ("test-batch-data-cp1252.csv" )
114- data = [line if line .endswith (b"\n " ) else line + b"\n " for line in data ]
115- n_rows = len (data ) - 1 # Exclude header
116-
117- ret1 = {"Body" : BytesIO (b"" .join (data ))}
118- ret2 = {"Body" : BytesIO (b"" .join (data ))}
119- self .mock_s3_get_object .side_effect = [ret1 , ret2 ]
120- self .mock_map_target_disease .return_value = "some disease"
121-
122- message_body = {
123- "vaccine_type" : "vax-type-1" ,
124- "supplier" : "test-supplier" ,
125- }
126-
127- self .mock_map_target_disease .return_value = "some disease"
128-
129- n_rows_processed = process_csv_to_fhir (message_body )
130- self .assertEqual (n_rows_processed , n_rows )
131- self .assertEqual (self .mock_send_to_kinesis .call_count , n_rows )
132- self .mock_logger_warning .assert_called ()
133- warning_call_args = self .mock_logger_warning .call_args [0 ][0 ]
134- self .assertTrue (warning_call_args .startswith ("Invalid Encoding detected in process_csv_to_fhir" ))
114+ data = self .create_test_data_from_file ("test-batch-data-cp1252.csv" )
115+ data = [line if line .endswith (b"\n " ) else line + b"\n " for line in data ]
116+ n_rows = len (data ) - 1 # Exclude header
117+
118+ ret1 = {"Body" : BytesIO (b"" .join (data ))}
119+ ret2 = {"Body" : BytesIO (b"" .join (data ))}
120+ self .mock_s3_get_object .side_effect = [ret1 , ret2 ]
121+ self .mock_map_target_disease .return_value = "some disease"
122+
123+ message_body = {
124+ "vaccine_type" : "vax-type-1" ,
125+ "supplier" : "test-supplier" ,
126+ }
127+
128+ self .mock_map_target_disease .return_value = "some disease"
129+
130+ n_rows_processed = process_csv_to_fhir (message_body )
131+ self .assertEqual (n_rows_processed , n_rows )
132+ self .assertEqual (self .mock_send_to_kinesis .call_count , n_rows )
133+ self .mock_logger_warning .assert_called ()
134+ warning_call_args = self .mock_logger_warning .call_args [0 ][0 ]
135+ self .assertTrue (warning_call_args .startswith ("Invalid Encoding detected in process_csv_to_fhir" ))
135136
136137 def test_process_utf8_small_file (self ):
137- """ Test processing a small file with cp1252 encoding """
138- with patch ("logging_decorator.file_level_validation_logging_decorator" , lambda f : f ):
139- from batch_processor import process_csv_to_fhir
140- data = self .create_test_data_from_file ("test-batch-data.csv" )
141- data = [line if line .endswith (b"\n " ) else line + b"\n " for line in data ]
142- n_rows = len (data ) - 1 # Exclude header
143-
144- ret1 = {"Body" : BytesIO (b"" .join (data ))}
145- ret2 = {"Body" : BytesIO (b"" .join (data ))}
146- self .mock_s3_get_object .side_effect = [ret1 , ret2 ]
147- self .mock_map_target_disease .return_value = "some disease"
148-
149- message_body = {
150- "vaccine_type" : "vax-type-1" ,
151- "supplier" : "test-supplier" ,
152- }
153- self .mock_map_target_disease .return_value = "some disease"
154-
155- n_rows_processed = process_csv_to_fhir (message_body )
156- self .assertEqual (n_rows_processed , n_rows )
157- self .assertEqual (self .mock_send_to_kinesis .call_count , n_rows )
158- self .mock_logger_warning .assert_not_called ()
159- self .mock_logger_error .assert_not_called ()
138+ """ Test processing a small file with utf-8 encoding """
139+ data = self .create_test_data_from_file ("test-batch-data.csv" )
140+ data = [line if line .endswith (b"\n " ) else line + b"\n " for line in data ]
141+ n_rows = len (data ) - 1 # Exclude header
142+
143+ ret1 = {"Body" : BytesIO (b"" .join (data ))}
144+ ret2 = {"Body" : BytesIO (b"" .join (data ))}
145+ self .mock_s3_get_object .side_effect = [ret1 , ret2 ]
146+ self .mock_map_target_disease .return_value = "some disease"
147+
148+ message_body = {
149+ "vaccine_type" : "vax-type-1" ,
150+ "supplier" : "test-supplier" ,
151+ }
152+ self .mock_map_target_disease .return_value = "some disease"
153+
154+ n_rows_processed = process_csv_to_fhir (message_body )
155+ self .assertEqual (n_rows_processed , n_rows )
156+ self .assertEqual (self .mock_send_to_kinesis .call_count , n_rows )
157+ self .mock_logger_warning .assert_not_called ()
158+ self .mock_logger_error .assert_not_called ()
160159
161160 def test_fix_cp1252 (self ):
162161 # create a cp1252 string that contains an accented E
0 commit comments