11"""Focused tests for import_threaded to improve coverage."""
22
3- import csv
43import io
54import tempfile
65from pathlib import Path
1211from odoo_data_flow .import_threaded import (
1312 _convert_external_id_field ,
1413 _create_batch_individually ,
15- _execute_load_batch ,
1614 _filter_ignored_columns ,
1715 _format_odoo_error ,
1816 _get_model_fields ,
2220 _read_data_file ,
2321 _recursive_create_batches ,
2422 _setup_fail_file ,
25- RPCThreadImport ,
26- import_data ,
2723)
28- from odoo_data_flow .enums import PreflightMode
2924
3025
3126class TestFormatOdooError :
@@ -90,6 +85,7 @@ def test_read_data_file_success(self) -> None:
9085 assert data [0 ][0 ] == "1" # id
9186 assert data [0 ][1 ] == "Test" # name
9287 import os
88+
9389 os .unlink (filepath )
9490
9591 def test_read_data_file_not_found (self ) -> None :
@@ -109,6 +105,7 @@ def test_read_data_file_no_id_column(self) -> None:
109105 with pytest .raises (ValueError , match = "Source file must contain an 'id' column" ):
110106 header , data = _read_data_file (filepath , ";" , "utf-8" , 0 )
111107 import os
108+
112109 os .unlink (filepath )
113110
114111
@@ -120,7 +117,7 @@ def test_filter_ignored_columns(self) -> None:
120117 header = ["id" , "name" , "to_ignore" , "value" ]
121118 data = [
122119 ["1" , "Test" , "ignore_value" , "val1" ],
123- ["2" , "Test2" , "ignore_value2" , "val2" ]
120+ ["2" , "Test2" , "ignore_value2" , "val2" ],
124121 ]
125122 ignore_list = ["to_ignore" ]
126123 new_header , new_data = _filter_ignored_columns (ignore_list , header , data )
@@ -133,10 +130,7 @@ def test_filter_ignored_columns(self) -> None:
133130 def test_filter_ignored_columns_none (self ) -> None :
134131 """Test filtering with empty ignored list."""
135132 header = ["id" , "name" , "value" ]
136- data = [
137- ["1" , "Test" , "val1" ],
138- ["2" , "Test2" , "val2" ]
139- ]
133+ data = [["1" , "Test" , "val1" ], ["2" , "Test2" , "val2" ]]
140134 ignore_list : list [str ] = []
141135 new_header , new_data = _filter_ignored_columns (ignore_list , header , data )
142136 assert new_header == header
@@ -150,14 +144,18 @@ def test_setup_fail_file_success(self) -> None:
150144 """Test setting up fail file."""
151145 with tempfile .TemporaryDirectory () as tmpdir :
152146 fail_filename = Path (tmpdir ) / "fail.csv"
153- writer , handle = _setup_fail_file (str (fail_filename ), ["id" , "name" ], ";" , "utf-8" )
147+ writer , handle = _setup_fail_file (
148+ str (fail_filename ), ["id" , "name" ], ";" , "utf-8"
149+ )
154150 assert writer is not None
155151 assert handle is not None
156152 handle .close ()
157153
158154 def test_setup_fail_file_os_error (self ) -> None :
159155 """Test setting up fail file with OS error."""
160- writer , handle = _setup_fail_file ("/root/nonexistent/fail.csv" , ["id" , "name" ], ";" , "utf-8" )
156+ writer , handle = _setup_fail_file (
157+ "/root/nonexistent/fail.csv" , ["id" , "name" ], ";" , "utf-8"
158+ )
161159 assert writer is None
162160 assert handle is None
163161
@@ -178,7 +176,7 @@ def test_prepare_pass_2_data(self) -> None:
178176 result = _prepare_pass_2_data (
179177 all_data , header , unique_id_field_index , id_map , deferred_fields
180178 )
181-
179+
182180 # Should return list of (db_id, update_vals) tuples
183181 assert isinstance (result , list )
184182 if result :
@@ -200,7 +198,9 @@ def test_recursive_create_batches_single_column(self) -> None:
200198 group_cols : list [str ] = []
201199 batch_size = 2
202200 o2m = False
203- result = list (_recursive_create_batches (current_data , group_cols , header , batch_size , o2m ))
201+ result = list (
202+ _recursive_create_batches (current_data , group_cols , header , batch_size , o2m )
203+ )
204204 assert len (result ) >= 1
205205
206206 def test_recursive_create_batches_multiple_columns (self ) -> None :
@@ -214,7 +214,9 @@ def test_recursive_create_batches_multiple_columns(self) -> None:
214214 group_cols = ["tags" ]
215215 batch_size = 1 # Small batch size to force multiple chunks
216216 o2m = False
217- result = list (_recursive_create_batches (current_data , group_cols , header , batch_size , o2m ))
217+ result = list (
218+ _recursive_create_batches (current_data , group_cols , header , batch_size , o2m )
219+ )
218220 assert len (result ) >= 1
219221
220222
@@ -226,7 +228,7 @@ def test_get_model_fields_success(self, mock_conf_lib: Mock) -> None:
226228 """Test getting model fields successfully."""
227229 mock_model = Mock ()
228230 mock_model ._fields = {"id" : {"type" : "integer" }}
229-
231+
230232 result = _get_model_fields (mock_model )
231233 assert result is not None
232234 assert "id" in result
@@ -235,8 +237,10 @@ def test_get_model_fields_success(self, mock_conf_lib: Mock) -> None:
235237 def test_get_model_fields_exception (self , mock_conf_lib : Mock ) -> None :
236238 """Test getting model fields with exception."""
237239 mock_model = Mock ()
238- del mock_model ._fields # Remove the _fields attribute to trigger the exception path
239-
240+ del (
241+ mock_model ._fields
242+ ) # Remove the _fields attribute to trigger the exception path
243+
240244 result = _get_model_fields (mock_model )
241245 assert result is None
242246
@@ -249,9 +253,7 @@ def test_convert_external_id_field(self) -> None:
249253 # Create a mock model
250254 mock_model = Mock ()
251255 result = _convert_external_id_field (
252- model = mock_model ,
253- field_name = "parent_id/id" ,
254- field_value = "module.ref1"
256+ model = mock_model , field_name = "parent_id/id" , field_value = "module.ref1"
255257 )
256258 # The function returns a tuple (base_field_name, converted_value)
257259 base_field_name , converted_value = result
@@ -265,7 +267,7 @@ def test_convert_external_id_field_special_chars(self) -> None:
265267 result = _convert_external_id_field (
266268 model = mock_model ,
267269 field_name = "parent_id/id" ,
268- field_value = "module.name-with.special/chars"
270+ field_value = "module.name-with.special/chars" ,
269271 )
270272 # The function returns a tuple (base_field_name, converted_value)
271273 base_field_name , converted_value = result
@@ -279,19 +281,16 @@ class TestHandleCreateError:
279281 def test_handle_create_error_connection (self ) -> None :
280282 """Test handling create error with connection error."""
281283 # Mock a connection object and batch
282- mock_connection = Mock ()
283- batch = [["1" , "test" ]]
284- context : dict [str , Any ] = {}
285- fail_file = "test.csv.fail"
286-
284+ Mock ()
285+
287286 # Test the function with correct signature
288287 i = 0
289288 create_error = Exception ("Connection error" )
290289 line = ["1" , "test" ]
291290 error_summary = "Initial error"
292-
291+
293292 # This function has complex signature, test by calling it
294- with patch ("odoo_data_flow.import_threaded.log" ) as mock_log :
293+ with patch ("odoo_data_flow.import_threaded.log" ):
295294 error_message , failed_line , new_error_summary = _handle_create_error (
296295 i , create_error , line , error_summary
297296 )
@@ -311,15 +310,21 @@ def test_create_batch_individually_success(self) -> None:
311310 uid_index = 0
312311 context : dict [str , Any ] = {}
313312 ignore_list : list [str ] = []
314-
313+
315314 # Mock the load method to return success
316315 mock_model .load .return_value = [[1 ], []] # Success IDs, errors
317-
318- with patch ("odoo_data_flow.import_threaded._handle_create_error" ) as mock_handle_error :
319- mock_handle_error .return_value = {"id_map" : {}, "failed_lines" : [], "connection_failure" : False }
320-
316+
317+ with patch (
318+ "odoo_data_flow.import_threaded._handle_create_error"
319+ ) as mock_handle_error :
320+ mock_handle_error .return_value = {
321+ "id_map" : {},
322+ "failed_lines" : [],
323+ "connection_failure" : False ,
324+ }
325+
321326 result = _create_batch_individually (
322327 mock_model , batch_lines , batch_header , uid_index , context , ignore_list
323328 )
324329 assert "id_map" in result
325- assert "failed_lines" in result
330+ assert "failed_lines" in result
0 commit comments