Skip to content

Commit 5f33c27

Browse files
committed
Extra import_threaded_coverage
1 parent 8e3cd1b commit 5f33c27

File tree

1 file changed

+396
-0
lines changed

1 file changed

+396
-0
lines changed
Lines changed: 396 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,396 @@
1+
"""Additional tests to improve coverage for uncovered lines in import_threaded.py."""
2+
3+
from io import StringIO
4+
from typing import Any
5+
from unittest.mock import MagicMock, patch
6+
7+
import pytest
8+
from rich.progress import Progress
9+
10+
from odoo_data_flow.import_threaded import (
11+
_create_batch_individually,
12+
_execute_load_batch,
13+
_format_odoo_error,
14+
_get_model_fields,
15+
_handle_create_error,
16+
_handle_fallback_create,
17+
_handle_tuple_index_error,
18+
_orchestrate_pass_1,
19+
_parse_csv_data,
20+
_read_data_file,
21+
_safe_convert_field_value,
22+
_setup_fail_file,
23+
import_data,
24+
)
25+
26+
27+
def test_parse_csv_data_insufficient_lines() -> None:
28+
"""Test _parse_csv_data when there are not enough lines after skipping."""
29+
from io import StringIO
30+
31+
f = StringIO("") # Empty file
32+
header, data = _parse_csv_data(f, ",", 0) # Should return empty lists
33+
assert header == []
34+
assert data == []
35+
36+
37+
def test_read_data_file_unicode_decode_error() -> None:
38+
"""Test _read_data_file when UnicodeDecodeError occurs and all fallbacks fail."""
39+
# Test when all encodings fail with UnicodeDecodeError
40+
with patch("builtins.open") as mock_open:
41+
def open_side_effect(*args: Any, **kwargs: Any) -> Any:
42+
# Always raise UnicodeDecodeError regardless of encoding
43+
raise UnicodeDecodeError("utf-8", b"test", 0, 1, "fake error")
44+
45+
mock_open.side_effect = open_side_effect
46+
47+
header, data = _read_data_file("dummy.csv", ",", "utf-8", 0)
48+
assert header == []
49+
assert data == []
50+
51+
52+
def test_safe_convert_field_value_edge_cases() -> None:
53+
"""Test _safe_convert_field_value with various edge cases."""
54+
# Test with /id suffixed fields
55+
result = _safe_convert_field_value("parent_id/id", "some_value", "char")
56+
assert result == "some_value"
57+
58+
# Test with positive field type and negative value
59+
result = _safe_convert_field_value("field", "-5", "positive")
60+
assert result == -5 # Should be converted to int since it's numeric
61+
62+
# Test with negative field type and positive value
63+
result = _safe_convert_field_value("field", "5", "negative")
64+
assert result == 5 # Should be converted to int since it's numeric
65+
66+
# Test with empty value for numeric fields
67+
result = _safe_convert_field_value("field", "", "integer")
68+
assert result == 0
69+
70+
result = _safe_convert_field_value("field", "", "float")
71+
assert result == 0
72+
73+
# Test with invalid float values
74+
result = _safe_convert_field_value("field", "not_a_number", "float")
75+
assert result == "not_a_number"
76+
77+
# Test with non-integer float values (should remain as string)
78+
result = _safe_convert_field_value("field", "1.5", "integer")
79+
assert result == "1.5" # Should remain as string since it's not an integer
80+
81+
82+
def test_handle_create_error_various_errors() -> None:
83+
"""Test _handle_create_error with various error types."""
84+
# Test constraint violation error
85+
error = Exception("constraint violation")
86+
error_str, failed_line, summary = _handle_create_error(
87+
0, error, ["test", "data"], "test summary"
88+
)
89+
assert "Constraint violation" in error_str
90+
91+
# Test database connection pool exhaustion errors
92+
error = Exception("connection pool is full")
93+
error_str, failed_line, summary = _handle_create_error(
94+
0, error, ["test", "data"], "test summary"
95+
)
96+
assert "Database connection pool exhaustion" in error_str
97+
98+
# Test database serialization errors
99+
error = Exception("could not serialize access")
100+
error_str, failed_line, summary = _handle_create_error(
101+
0, error, ["test", "data"], "test summary"
102+
)
103+
assert "Database serialization error" in error_str
104+
105+
# Test tuple index out of range errors
106+
error = Exception("tuple index out of range")
107+
error_str, failed_line, summary = _handle_create_error(
108+
0, error, ["test", "data"], "test summary"
109+
)
110+
assert "Tuple unpacking error" in error_str
111+
112+
113+
def test_handle_tuple_index_error() -> None:
114+
"""Test _handle_tuple_index_error function."""
115+
# Use None as progress to avoid console issues
116+
failed_lines: list[list[Any]] = []
117+
118+
# Test the function with progress=None to avoid rich console issues in tests
119+
from typing import Any
120+
progress_console: Any = None
121+
122+
_handle_tuple_index_error(
123+
progress_console, "source_id_123", ["id", "name"], failed_lines
124+
)
125+
126+
# The function should add an entry to failed_lines
127+
assert len(failed_lines) == 1
128+
assert "source_id_123" in str(failed_lines[0])
129+
130+
131+
def test_create_batch_individually_tuple_index_out_of_range() -> None:
132+
"""Test _create_batch_individually with tuple index out of range."""
133+
mock_model = MagicMock()
134+
mock_model.browse().env.ref.return_value = None # No existing record
135+
136+
# Mock create method to raise IndexError
137+
mock_model.create.side_effect = IndexError("tuple index out of range")
138+
139+
batch_header = ["id", "name", "value"]
140+
batch_lines = [["rec1", "Name", "Value"]]
141+
142+
result = _create_batch_individually(mock_model, batch_lines, batch_header, 0, {}, [])
143+
144+
# Should handle the error and return failed lines
145+
assert len(result["failed_lines"]) == 1
146+
error_msg = result["failed_lines"][0][-1].lower()
147+
assert "tuple index" in error_msg or "range" in error_msg
148+
149+
150+
def test_handle_fallback_create_with_progress() -> None:
151+
"""Test _handle_fallback_create function."""
152+
mock_model = MagicMock()
153+
current_chunk = [["rec1", "A"], ["rec2", "B"]]
154+
batch_header = ["id", "name"]
155+
uid_index = 0
156+
context: dict[str, Any] = {}
157+
ignore_list: list[str] = []
158+
progress = MagicMock()
159+
aggregated_id_map: dict[str, int] = {}
160+
aggregated_failed_lines: list[list[Any]] = []
161+
batch_number = 1
162+
163+
with patch("odoo_data_flow.import_threaded._create_batch_individually") as mock_create_ind:
164+
mock_create_ind.return_value = {
165+
"id_map": {"rec1": 1, "rec2": 2},
166+
"failed_lines": [],
167+
"error_summary": "test"
168+
}
169+
170+
_handle_fallback_create(
171+
mock_model,
172+
current_chunk,
173+
batch_header,
174+
uid_index,
175+
context,
176+
ignore_list,
177+
progress,
178+
aggregated_id_map,
179+
aggregated_failed_lines,
180+
batch_number,
181+
error_message="test error"
182+
)
183+
184+
assert aggregated_id_map == {"rec1": 1, "rec2": 2}
185+
186+
187+
def test_execute_load_batch_force_create_with_progress() -> None:
188+
"""Test _execute_load_batch with force_create enabled."""
189+
mock_model = MagicMock()
190+
thread_state = {
191+
"model": mock_model,
192+
"progress": MagicMock(),
193+
"unique_id_field_index": 0,
194+
"force_create": True, # Enable force create
195+
"ignore_list": [],
196+
}
197+
batch_header = ["id", "name"]
198+
batch_lines = [["rec1", "A"], ["rec2", "B"]]
199+
200+
with patch("odoo_data_flow.import_threaded._create_batch_individually") as mock_create:
201+
mock_create.return_value = {
202+
"id_map": {"rec1": 1, "rec2": 2},
203+
"failed_lines": [],
204+
"error_summary": "test",
205+
"success": True
206+
}
207+
208+
result = _execute_load_batch(thread_state, batch_lines, batch_header, 1)
209+
210+
assert result["success"] is True
211+
assert result["id_map"] == {"rec1": 1, "rec2": 2}
212+
mock_create.assert_called_once()
213+
214+
215+
@patch("builtins.open")
216+
def test_read_data_file_os_error(mock_open: MagicMock) -> None:
217+
"""Test _read_data_file with OSError (not UnicodeDecodeError)."""
218+
mock_open.side_effect = OSError("File access error")
219+
220+
header, data = _read_data_file("nonexistent.txt", ",", "utf-8", 0)
221+
assert header == []
222+
assert data == []
223+
224+
225+
def test_read_data_file_all_fallbacks_fail() -> None:
226+
"""Test _read_data_file when all fallback encodings fail."""
227+
with patch("builtins.open") as mock_open:
228+
def open_side_effect(*args: Any, **kwargs: Any) -> Any:
229+
# Always raise UnicodeDecodeError regardless of encoding
230+
raise UnicodeDecodeError("utf-8", b"test", 0, 1, "fake error")
231+
232+
mock_open.side_effect = open_side_effect
233+
234+
header, data = _read_data_file("dummy.csv", ",", "utf-8", 0)
235+
assert header == []
236+
assert data == []
237+
238+
239+
def test_setup_fail_file_with_error_reason_column() -> None:
240+
"""Test _setup_fail_file when _ERROR_REASON is already in header."""
241+
with patch("builtins.open") as mock_open:
242+
mock_file = MagicMock()
243+
mock_open.return_value.__enter__.return_value = mock_file
244+
245+
header = ["id", "_ERROR_REASON", "name"]
246+
writer, handle = _setup_fail_file("fail.csv", header, ",", "utf-8")
247+
248+
# Should not add _ERROR_REASON again since it's already in header
249+
# Just verify it doesn't crash
250+
assert writer is not None
251+
assert handle is not None
252+
253+
254+
def test_recursive_create_batches_no_id_column() -> None:
255+
"""Test _recursive_create_batches when no 'id' column exists."""
256+
from odoo_data_flow.import_threaded import _recursive_create_batches
257+
258+
header = ["name", "age"] # No 'id' column
259+
data = [["Alice", "25"], ["Bob", "30"]]
260+
261+
batches = list(_recursive_create_batches(data, [], header, 10, True)) # o2m=True
262+
263+
# Should handle the case where no 'id' column exists
264+
assert len(batches) >= 0 # This should not crash
265+
266+
267+
def test_orchestrate_pass_1_force_create() -> None:
268+
"""Test _orchestrate_pass_1 with force_create enabled."""
269+
mock_model = MagicMock()
270+
header = ["id", "name"]
271+
all_data = [["rec1", "A"], ["rec2", "B"]]
272+
unique_id_field = "id"
273+
deferred_fields: list[str] = []
274+
ignore: list[str] = []
275+
context: dict[str, Any] = {}
276+
fail_writer = None
277+
fail_handle = None
278+
max_connection = 1
279+
batch_size = 10
280+
o2m = False
281+
split_by_cols = None
282+
283+
with Progress() as progress:
284+
result = _orchestrate_pass_1(
285+
progress,
286+
mock_model,
287+
"res.partner",
288+
header,
289+
all_data,
290+
unique_id_field,
291+
deferred_fields,
292+
ignore,
293+
context,
294+
fail_writer,
295+
fail_handle,
296+
max_connection,
297+
batch_size,
298+
o2m,
299+
split_by_cols,
300+
force_create=True # Enable force create
301+
)
302+
303+
# Should return a result dict
304+
assert isinstance(result, dict)
305+
306+
307+
def test_import_data_connection_dict() -> None:
308+
"""Test import_data with connection config as dict."""
309+
mock_connection = MagicMock()
310+
mock_model = MagicMock()
311+
312+
with patch("odoo_data_flow.import_threaded._read_data_file", return_value=(["id"], [["1"]])):
313+
with patch("odoo_data_flow.import_threaded.conf_lib.get_connection_from_dict", return_value=mock_connection):
314+
mock_connection.get_model.return_value = mock_model
315+
316+
# Mock the _run_threaded_pass function
317+
with patch("odoo_data_flow.import_threaded._run_threaded_pass") as mock_run_pass:
318+
mock_run_pass.return_value = (
319+
{"id_map": {"1": 1}, "failed_lines": []}, # results dict
320+
False, # aborted = False
321+
)
322+
323+
result, stats = import_data(
324+
config={"host": "localhost"}, # Dict config instead of file
325+
model="res.partner",
326+
unique_id_field="id",
327+
file_csv="dummy.csv",
328+
)
329+
330+
# Should succeed
331+
assert result is True
332+
333+
334+
@patch("odoo_data_flow.import_threaded._read_data_file", return_value=(["id"], [["1"]]))
335+
@patch("odoo_data_flow.import_threaded.conf_lib.get_connection_from_config")
336+
def test_import_data_connection_failure(
337+
mock_get_conn: MagicMock, mock_read_file: MagicMock
338+
) -> None:
339+
"""Test import_data when connection fails."""
340+
mock_get_conn.side_effect = Exception("Connection failed")
341+
342+
result, stats = import_data(
343+
config="dummy.conf",
344+
model="res.partner",
345+
unique_id_field="id",
346+
file_csv="dummy.csv",
347+
)
348+
349+
# Should fail gracefully
350+
assert result is False
351+
assert stats == {}
352+
353+
354+
@patch("odoo_data_flow.import_threaded._read_data_file", return_value=([], []))
355+
def test_import_data_no_header(mock_read_file: MagicMock) -> None:
356+
"""Test import_data when there's no header in the CSV."""
357+
result, stats = import_data(
358+
config="dummy.conf",
359+
model="res.partner",
360+
unique_id_field="id",
361+
file_csv="dummy.csv",
362+
)
363+
364+
# Should fail gracefully
365+
assert result is False
366+
assert stats == {}
367+
368+
369+
@patch("odoo_data_flow.import_threaded.conf_lib.get_connection_from_config")
370+
def test_get_model_fields_callable_method(mock_get_conn: MagicMock) -> None:
371+
"""Test _get_model_fields when _fields is a callable method."""
372+
mock_model = MagicMock()
373+
mock_model._fields = lambda: {"field1": {"type": "char"}}
374+
375+
result = _get_model_fields(mock_model)
376+
assert result == {"field1": {"type": "char"}}
377+
378+
379+
@patch("odoo_data_flow.import_threaded.conf_lib.get_connection_from_config")
380+
def test_get_model_fields_callable_method_exception(mock_get_conn: MagicMock) -> None:
381+
"""Test _get_model_fields when _fields callable raises exception."""
382+
mock_model = MagicMock()
383+
mock_model._fields = MagicMock(side_effect=Exception("Error"))
384+
385+
result = _get_model_fields(mock_model)
386+
assert result is None
387+
388+
389+
@patch("odoo_data_flow.import_threaded.conf_lib.get_connection_from_config")
390+
def test_get_model_fields_callable_method_non_dict(mock_get_conn: MagicMock) -> None:
391+
"""Test _get_model_fields when _fields callable returns non-dict."""
392+
mock_model = MagicMock()
393+
mock_model._fields = MagicMock(return_value="not a dict")
394+
395+
result = _get_model_fields(mock_model)
396+
assert result is None

0 commit comments

Comments
 (0)