1+ """Focused tests for import_threaded to improve coverage."""
2+
3+ import csv
4+ import io
5+ import tempfile
6+ from pathlib import Path
7+ from typing import Any
8+ from unittest .mock import Mock , patch
9+
10+ import pytest
11+
12+ from odoo_data_flow .import_threaded import (
13+ _convert_external_id_field ,
14+ _create_batch_individually ,
15+ _execute_load_batch ,
16+ _filter_ignored_columns ,
17+ _format_odoo_error ,
18+ _get_model_fields ,
19+ _handle_create_error ,
20+ _parse_csv_data ,
21+ _prepare_pass_2_data ,
22+ _read_data_file ,
23+ _recursive_create_batches ,
24+ _setup_fail_file ,
25+ RPCThreadImport ,
26+ import_data ,
27+ )
28+ from odoo_data_flow .enums import PreflightMode
29+
30+
31+ class TestFormatOdooError :
32+ """Test _format_odoo_error function."""
33+
34+ def test_format_odoo_error_not_string (self ) -> None :
35+ """Test when error is not a string."""
36+ result = _format_odoo_error (123 )
37+ assert result == "123"
38+
39+ def test_format_odoo_error_non_parsable_string (self ) -> None :
40+ """Test when error is a non-parsable string."""
41+ result = _format_odoo_error ("Just a string" )
42+ assert result == "Just a string"
43+
44+ def test_format_odoo_error_parsable_with_message (self ) -> None :
45+ """Test when error is a parsable dict with message."""
46+ error = "{'data': {'message': 'Test error message'}}"
47+ result = _format_odoo_error (error )
48+ assert result == "Test error message"
49+
50+ def test_format_odoo_error_parsable_fallback (self ) -> None :
51+ """Test when error dict doesn't have the expected structure."""
52+ error = "{'other': 'data'}"
53+ result = _format_odoo_error (error )
54+ assert result == "{'other': 'data'}"
55+
56+
57+ class TestParseCSVData :
58+ """Test _parse_csv_data function."""
59+
60+ def test_parse_csv_data_simple (self ) -> None :
61+ """Test parsing basic CSV data."""
62+ # Create a string buffer to simulate a file
63+ csv_content = "id;name\n 1;Test\n 2;Another"
64+ f = io .StringIO (csv_content )
65+ header , data = _parse_csv_data (f , ";" , 0 )
66+ assert len (data ) == 2
67+ assert data [0 ][0 ] == "1" # id
68+ assert data [0 ][1 ] == "Test" # name
69+
70+ def test_parse_csv_data_empty (self ) -> None :
71+ """Test parsing empty CSV data."""
72+ f = io .StringIO ("" )
73+ header , data = _parse_csv_data (f , ";" , 0 )
74+ assert data == []
75+ assert header == []
76+
77+
78+ class TestReadDataFile :
79+ """Test _read_data_file function."""
80+
81+ def test_read_data_file_success (self ) -> None :
82+ """Test reading CSV file successfully."""
83+ with tempfile .NamedTemporaryFile (mode = "w" , suffix = ".csv" , delete = False ) as f :
84+ f .write ("id;name\n 1;Test\n 2;Another\n " )
85+ f .flush ()
86+ filepath = f .name
87+
88+ header , data = _read_data_file (filepath , ";" , "utf-8" , 0 )
89+ assert len (data ) == 2
90+ assert data [0 ][0 ] == "1" # id
91+ assert data [0 ][1 ] == "Test" # name
92+ import os
93+ os .unlink (filepath )
94+
95+ def test_read_data_file_not_found (self ) -> None :
96+ """Test reading non-existent file."""
97+ header , data = _read_data_file ("/nonexistent.csv" , ";" , "utf-8" , 0 )
98+ assert data == []
99+ assert header == []
100+
101+ def test_read_data_file_no_id_column (self ) -> None :
102+ """Test reading file without id column."""
103+ with tempfile .NamedTemporaryFile (mode = "w" , suffix = ".csv" , delete = False ) as f :
104+ f .write ("name;value\n Test;1\n Another;2\n " )
105+ f .flush ()
106+ filepath = f .name
107+
108+ # This should raise ValueError, catch it and check
109+ with pytest .raises (ValueError , match = "Source file must contain an 'id' column" ):
110+ header , data = _read_data_file (filepath , ";" , "utf-8" , 0 )
111+ import os
112+ os .unlink (filepath )
113+
114+
115+ class TestFilterIgnoredColumns :
116+ """Test _filter_ignored_columns function."""
117+
118+ def test_filter_ignored_columns (self ) -> None :
119+ """Test filtering ignored columns."""
120+ header = ["id" , "name" , "to_ignore" , "value" ]
121+ data = [
122+ ["1" , "Test" , "ignore_value" , "val1" ],
123+ ["2" , "Test2" , "ignore_value2" , "val2" ]
124+ ]
125+ ignore_list = ["to_ignore" ]
126+ new_header , new_data = _filter_ignored_columns (ignore_list , header , data )
127+ assert "to_ignore" not in new_header
128+ assert "id" in new_header
129+ assert "name" in new_header
130+ # Each row should have the ignored column removed
131+ assert len (new_data [0 ]) == 3 # id, name, value
132+
133+ def test_filter_ignored_columns_none (self ) -> None :
134+ """Test filtering with empty ignored list."""
135+ header = ["id" , "name" , "value" ]
136+ data = [
137+ ["1" , "Test" , "val1" ],
138+ ["2" , "Test2" , "val2" ]
139+ ]
140+ ignore_list : list [str ] = []
141+ new_header , new_data = _filter_ignored_columns (ignore_list , header , data )
142+ assert new_header == header
143+ assert new_data == data
144+
145+
146+ class TestSetupFailFile :
147+ """Test _setup_fail_file function."""
148+
149+ def test_setup_fail_file_success (self ) -> None :
150+ """Test setting up fail file."""
151+ with tempfile .TemporaryDirectory () as tmpdir :
152+ fail_filename = Path (tmpdir ) / "fail.csv"
153+ writer , handle = _setup_fail_file (str (fail_filename ), ["id" , "name" ], ";" , "utf-8" )
154+ assert writer is not None
155+ assert handle is not None
156+ handle .close ()
157+
158+ def test_setup_fail_file_os_error (self ) -> None :
159+ """Test setting up fail file with OS error."""
160+ writer , handle = _setup_fail_file ("/root/nonexistent/fail.csv" , ["id" , "name" ], ";" , "utf-8" )
161+ assert writer is None
162+ assert handle is None
163+
164+
165+ class TestPreparePass2Data :
166+ """Test _prepare_pass_2_data function."""
167+
168+ def test_prepare_pass_2_data (self ) -> None :
169+ """Test preparing data for pass 2."""
170+ all_data = [
171+ ["1" , "ref1" , "cat1,cat2" ],
172+ ["2" , "ref2" , "cat2" ],
173+ ]
174+ header = ["id" , "name/id" , "category_ids/id" ]
175+ unique_id_field_index = 0
176+ id_map = {"1" : 100 , "2" : 200 }
177+ deferred_fields = ["name/id" , "category_ids/id" ]
178+ result = _prepare_pass_2_data (
179+ all_data , header , unique_id_field_index , id_map , deferred_fields
180+ )
181+
182+ # Should return list of (db_id, update_vals) tuples
183+ assert isinstance (result , list )
184+ if result :
185+ db_id , update_vals = result [0 ]
186+ assert isinstance (db_id , int )
187+ assert isinstance (update_vals , dict )
188+
189+
190+ class TestRecursiveCreateBatches :
191+ """Test _recursive_create_batches function."""
192+
193+ def test_recursive_create_batches_single_column (self ) -> None :
194+ """Test recursive batch creation with single column."""
195+ current_data = [
196+ ["1" , "Test" , "A" ],
197+ ["2" , "Test2" , "B" ],
198+ ]
199+ header = ["id" , "name" , "tags" ]
200+ group_cols : list [str ] = []
201+ batch_size = 2
202+ o2m = False
203+ result = list (_recursive_create_batches (current_data , group_cols , header , batch_size , o2m ))
204+ assert len (result ) >= 1
205+
206+ def test_recursive_create_batches_multiple_columns (self ) -> None :
207+ """Test recursive batch creation with multiple columns."""
208+ current_data = [
209+ ["1" , "Test" , "A" ],
210+ ["2" , "Test2" , "A" ], # Same tag for grouping
211+ ["3" , "Test3" , "B" ],
212+ ]
213+ header = ["id" , "name" , "tags" ]
214+ group_cols = ["tags" ]
215+ batch_size = 1 # Small batch size to force multiple chunks
216+ o2m = False
217+ result = list (_recursive_create_batches (current_data , group_cols , header , batch_size , o2m ))
218+ assert len (result ) >= 1
219+
220+
221+ class TestGetModelFields :
222+ """Test _get_model_fields function."""
223+
224+ @patch ("odoo_data_flow.import_threaded.conf_lib" )
225+ def test_get_model_fields_success (self , mock_conf_lib : Mock ) -> None :
226+ """Test getting model fields successfully."""
227+ mock_model = Mock ()
228+ mock_model ._fields = {"id" : {"type" : "integer" }}
229+
230+ result = _get_model_fields (mock_model )
231+ assert result is not None
232+ assert "id" in result
233+
234+ @patch ("odoo_data_flow.import_threaded.conf_lib" )
235+ def test_get_model_fields_exception (self , mock_conf_lib : Mock ) -> None :
236+ """Test getting model fields with exception."""
237+ mock_model = Mock ()
238+ del mock_model ._fields # Remove the _fields attribute to trigger the exception path
239+
240+ result = _get_model_fields (mock_model )
241+ assert result is None
242+
243+
244+ class TestConvertExternalIdField :
245+ """Test methods within RPCThreadImport class."""
246+
247+ def test_convert_external_id_field (self ) -> None :
248+ """Test converting external ID field."""
249+ # Create a mock model
250+ mock_model = Mock ()
251+ result = _convert_external_id_field (
252+ model = mock_model ,
253+ field_name = "parent_id/id" ,
254+ field_value = "module.ref1"
255+ )
256+ # The function returns a tuple (base_field_name, converted_value)
257+ base_field_name , converted_value = result
258+ assert base_field_name == "parent_id"
259+ # Since we're mocking, the converted value will depend on the mock behavior
260+
261+ def test_convert_external_id_field_special_chars (self ) -> None :
262+ """Test converting external ID field with special characters."""
263+ # Create a mock model
264+ mock_model = Mock ()
265+ result = _convert_external_id_field (
266+ model = mock_model ,
267+ field_name = "parent_id/id" ,
268+ field_value = "module.name-with.special/chars"
269+ )
270+ # The function returns a tuple (base_field_name, converted_value)
271+ base_field_name , converted_value = result
272+ assert base_field_name == "parent_id"
273+ # Since we're mocking, the converted value will depend on the mock behavior
274+
275+
276+ class TestHandleCreateError :
277+ """Test _handle_create_error function."""
278+
279+ def test_handle_create_error_connection (self ) -> None :
280+ """Test handling create error with connection error."""
281+ # Mock a connection object and batch
282+ mock_connection = Mock ()
283+ batch = [["1" , "test" ]]
284+ context : dict [str , Any ] = {}
285+ fail_file = "test.csv.fail"
286+
287+ # Test the function with correct signature
288+ i = 0
289+ create_error = Exception ("Connection error" )
290+ line = ["1" , "test" ]
291+ error_summary = "Initial error"
292+
293+ # This function has complex signature, test by calling it
294+ with patch ("odoo_data_flow.import_threaded.log" ) as mock_log :
295+ error_message , failed_line , new_error_summary = _handle_create_error (
296+ i , create_error , line , error_summary
297+ )
298+ assert isinstance (error_message , str )
299+ assert isinstance (failed_line , list )
300+
301+
302+ class TestCreateBatchIndividually :
303+ """Test _create_batch_individually function."""
304+
305+ def test_create_batch_individually_success (self ) -> None :
306+ """Test creating batch individually with success."""
307+ # Mock objects for the function
308+ mock_model = Mock ()
309+ batch_lines = [["1" , "Test" ]]
310+ batch_header = ["id" , "name" ]
311+ uid_index = 0
312+ context : dict [str , Any ] = {}
313+ ignore_list : list [str ] = []
314+
315+ # Mock the load method to return success
316+ mock_model .load .return_value = [[1 ], []] # Success IDs, errors
317+
318+ with patch ("odoo_data_flow.import_threaded._handle_create_error" ) as mock_handle_error :
319+ mock_handle_error .return_value = {"id_map" : {}, "failed_lines" : [], "connection_failure" : False }
320+
321+ result = _create_batch_individually (
322+ mock_model , batch_lines , batch_header , uid_index , context , ignore_list
323+ )
324+ assert "id_map" in result
325+ assert "failed_lines" in result
0 commit comments