Skip to content

Commit ab37558

Browse files
bosdbosd
authored andcommitted
[IMP]: Transform coverage
1 parent 4df9cdd commit ab37558

File tree

1 file changed

+207
-36
lines changed

1 file changed

+207
-36
lines changed

tests/test_transform.py

Lines changed: 207 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
"""Test the trasnform functions."""
1+
"""Test the core Processor class and its subclasses."""
22

33
from pathlib import Path
44
from typing import Any, Callable
5+
from unittest.mock import MagicMock, patch
56

67
import pytest
78

@@ -10,17 +11,21 @@
1011
MapperRepr,
1112
Processor,
1213
ProductProcessorV9,
14+
ProductProcessorV10,
1315
)
1416

1517

16-
def test_mapper_repr() -> None:
17-
"""Tests the __repr__ method of the MapperRepr class."""
18-
mapper_repr = MapperRepr("mapper.val('test')", lambda: "value")
18+
def test_mapper_repr_and_call() -> None:
19+
"""Tests the __repr__ and __call__ methods of the MapperRepr class."""
20+
# Test __repr__
21+
mapper_repr = MapperRepr("mapper.val('test')", lambda x: x.upper())
1922
assert repr(mapper_repr) == "mapper.val('test')"
23+
# Test __call__
24+
assert mapper_repr("hello") == "HELLO"
2025

2126

2227
def test_processor_init_fails_without_args() -> None:
23-
"""Tests that the Processor raises a ValueError if initialized without args."""
28+
"""Tests that the Processor raises a ValueError if initialized with no args."""
2429
with pytest.raises(
2530
ValueError, match="must be initialized with either a 'filename' or both"
2631
):
@@ -38,13 +43,70 @@ def test_read_file_xml_syntax_error(tmp_path: Path) -> None:
3843
assert processor.data == []
3944

4045

46+
@patch("odoo_data_flow.lib.transform.etree.parse")
47+
def test_read_file_xml_generic_exception(mock_parse: MagicMock, tmp_path: Path) -> None:
48+
"""Tests that a generic exception during XML parsing is handled."""
49+
mock_parse.side_effect = Exception("Generic XML read error")
50+
xml_file = tmp_path / "any.xml"
51+
xml_file.touch()
52+
53+
processor = Processor(filename=str(xml_file), xml_root_tag="./record")
54+
assert processor.header == []
55+
assert processor.data == []
56+
57+
4158
def test_read_file_csv_not_found() -> None:
4259
"""Tests that a non-existent CSV file is handled correctly."""
4360
processor = Processor(filename="non_existent_file.csv")
4461
assert processor.header == []
4562
assert processor.data == []
4663

4764

65+
@patch("odoo_data_flow.lib.transform.csv.reader")
66+
def test_read_file_csv_generic_exception(
67+
mock_reader: MagicMock, tmp_path: Path
68+
) -> None:
69+
"""Tests that a generic exception during CSV reading is handled."""
70+
mock_reader.side_effect = Exception("Generic CSV read error")
71+
csv_file = tmp_path / "any.csv"
72+
csv_file.touch()
73+
74+
processor = Processor(filename=str(csv_file))
75+
assert processor.header == []
76+
assert processor.data == []
77+
78+
79+
@patch("odoo_data_flow.lib.transform.log.warning")
80+
def test_check_failure(mock_log_warning: MagicMock) -> None:
81+
"""Tests that the check method logs a warning when a check fails."""
82+
processor = Processor(header=[], data=[])
83+
84+
def failing_check(h: list[str], d: list[list[Any]]) -> bool:
85+
return False
86+
87+
result = processor.check(failing_check, message="Custom fail message")
88+
89+
assert result is False
90+
mock_log_warning.assert_called_once()
91+
assert "Custom fail message" in mock_log_warning.call_args[0][0]
92+
93+
94+
def test_join_file_success(tmp_path: Path) -> None:
95+
"""Tests that join_file successfully merges data from two files."""
96+
master_file = tmp_path / "master.csv"
97+
master_file.write_text("id,name\n1,master_record")
98+
child_file = tmp_path / "child.csv"
99+
child_file.write_text("child_id,value\n1,child_value")
100+
101+
processor = Processor(filename=str(master_file), separator=",")
102+
processor.join_file(
103+
str(child_file), master_key="id", child_key="child_id", separator=","
104+
)
105+
106+
assert processor.header == ["id", "name", "child_child_id", "child_value"]
107+
assert processor.data == [["1", "master_record", "1", "child_value"]]
108+
109+
48110
def test_join_file_missing_key(tmp_path: Path) -> None:
49111
"""Tests that join_file handles a missing join key gracefully."""
50112
master_file = tmp_path / "master.csv"
@@ -55,13 +117,15 @@ def test_join_file_missing_key(tmp_path: Path) -> None:
55117
processor = Processor(filename=str(master_file), separator=",")
56118
original_header_len = len(processor.header)
57119

58-
# Attempt to join on a key that doesn't exist in the master file
59-
processor.join_file(
60-
str(child_file),
61-
master_key="non_existent_key",
62-
child_key="child_id",
63-
separator=",",
64-
)
120+
with patch("odoo_data_flow.lib.transform.log.error") as mock_log_error:
121+
processor.join_file(
122+
str(child_file),
123+
master_key="non_existent_key",
124+
child_key="child_id",
125+
separator=",",
126+
)
127+
mock_log_error.assert_called_once()
128+
assert "Join key error" in mock_log_error.call_args[0][0]
65129

66130
# The header and data should remain unchanged because the join failed
67131
assert len(processor.header) == original_header_len
@@ -81,24 +145,6 @@ def test_process_with_legacy_mapper() -> None:
81145
assert list(processed_data) == [["a"]]
82146

83147

84-
def test_process_returns_set() -> None:
85-
"""Tests that process correctly returns a set when t='set'."""
86-
header = ["col1"]
87-
# Include duplicate data
88-
data = [["A"], ["B"], ["A"]]
89-
processor = Processor(header=header, data=data)
90-
mapping = {"new_col": mapper.val("col1")}
91-
92-
# Process with t='set' to get unique records
93-
head, processed_data = processor.process(mapping, filename_out="", t="set")
94-
95-
assert isinstance(processed_data, set)
96-
# The set should only contain unique values
97-
assert len(processed_data) == 2
98-
assert ("A",) in processed_data
99-
assert ("B",) in processed_data
100-
101-
102148
def test_v9_extract_attribute_value_data_malformed_mapping() -> None:
103149
"""Tests that _extract_attribute_value_data handles a malformed mapping.
104150
@@ -107,18 +153,143 @@ def test_v9_extract_attribute_value_data_malformed_mapping() -> None:
107153
"""
108154
processor = ProductProcessorV9(header=["col1"], data=[["val1"]])
109155

110-
# Create a malformed mapping where the 'name' mapper returns a string,
111-
# not a dict
112-
# The lambda is defined to accept an optional state to handle the fallback
113-
# logic.
114-
# Explicitly type the dictionary to satisfy mypy.
115156
malformed_mapping: dict[str, Callable[..., Any]] = {
116157
"name": mapper.val("col1"),
117158
"attribute_id/id": lambda line, state=None: "some_id",
118159
}
119160

120-
# This should run without error and simply return an empty set
121161
result = processor._extract_attribute_value_data(
122162
malformed_mapping, ["Color"], [{"col1": "val1"}]
123163
)
124164
assert result == set()
165+
166+
167+
def test_process_returns_set() -> None:
168+
"""Tests that process correctly returns a set when t='set'."""
169+
processor = Processor(header=["col1"], data=[["A"], ["B"], ["A"]])
170+
_head, processed_data = processor.process(
171+
{"new_col": mapper.val("col1")}, filename_out="", t="set"
172+
)
173+
assert isinstance(processed_data, set)
174+
assert len(processed_data) == 2
175+
assert ("A",) in processed_data
176+
assert ("B",) in processed_data
177+
178+
179+
@patch("odoo_data_flow.lib.transform.log.info")
180+
def test_process_dry_run(mock_log_info: MagicMock) -> None:
181+
"""Tests that dry_run mode prints to log and does not write files."""
182+
processor = Processor(header=["col1"], data=[["A"]])
183+
mapping = {"new_col": mapper.val("col1")}
184+
processor.process(mapping, "file.csv", dry_run=True)
185+
186+
# Assert that no file was added to the write queue
187+
assert not processor.file_to_write
188+
# Assert that the dry run log messages were printed
189+
assert any("DRY RUN MODE" in call[0][0] for call in mock_log_info.call_args_list)
190+
191+
192+
@patch("odoo_data_flow.lib.transform.write_file")
193+
def test_write_to_file_append_and_no_fail(mock_write_file: MagicMock) -> None:
194+
"""Tests write_to_file with append=True and fail=False."""
195+
processor = Processor(header=["id"], data=[["1"]])
196+
processor.process({"id": mapper.val("id")}, "file1.csv", params={"model": "model1"})
197+
processor.process({"id": mapper.val("id")}, "file2.csv", params={"model": "model2"})
198+
199+
processor.write_to_file("script.sh", fail=False, append=True)
200+
201+
assert mock_write_file.call_count == 2
202+
assert mock_write_file.call_args_list[0].kwargs["init"] is False
203+
assert mock_write_file.call_args_list[1].kwargs["init"] is False
204+
205+
206+
def test_v10_process_attribute_value_data() -> None:
207+
"""Tests the attribute value data processing for the V10+ workflow."""
208+
header = ["Color", "Size"]
209+
data = [["Blue", "L"], ["Red", "L"], ["Blue", "M"]]
210+
processor = ProductProcessorV10(header=header, data=data)
211+
212+
processor.process_attribute_value_data(
213+
attribute_list=["Color", "Size"],
214+
attribute_value_prefix="val_prefix",
215+
attribute_prefix="attr_prefix",
216+
filename_out="product.attribute.value.csv",
217+
import_args={},
218+
)
219+
220+
assert "product.attribute.value.csv" in processor.file_to_write
221+
result = processor.file_to_write["product.attribute.value.csv"]
222+
assert result["header"] == ["id", "name", "attribute_id/id"]
223+
224+
# We expect 4 unique values: Blue, L, Red, M
225+
assert len(result["data"]) == 4
226+
# Check for one of the generated rows to ensure correctness
227+
expected_row = ["val_prefix.Color_Blue", "Blue", "attr_prefix.Color"]
228+
assert any(row == expected_row for row in result["data"])
229+
230+
231+
def test_v9_extract_attribute_value_data_legacy_mapper() -> None:
232+
"""Tests that _extract_attribute_value_data handles legacy mappers."""
233+
processor = ProductProcessorV9(header=["col1"], data=[["val1"]])
234+
235+
# This mapping uses a legacy 1-argument lambda
236+
legacy_mapping: dict[str, Callable[..., Any]] = {
237+
"name": lambda line: {"Color": line["col1"]},
238+
"attribute_id/id": lambda line: "some_id",
239+
}
240+
241+
result = processor._extract_attribute_value_data(
242+
legacy_mapping, ["Color"], [{"col1": "val1"}]
243+
)
244+
# The result should contain a tuple with the resolved value 'val1'
245+
assert ("val1", "some_id") in result
246+
247+
248+
def test_v9_process_attribute_mapping_with_custom_id_gen(tmp_path: Path) -> None:
249+
"""Tests the full process_attribute_mapping method from ProductProcessorV9.
250+
251+
This test uses a custom ID generation function.
252+
"""
253+
header = ["template_id", "Color", "Size"]
254+
data = [
255+
["TPL1", "Blue", "L"],
256+
["TPL2", "Red", "M"],
257+
["TPL1", "Green", "L"],
258+
]
259+
processor = ProductProcessorV9(header=header, data=data)
260+
attributes = ["Color", "Size"]
261+
prefix = "test_prefix"
262+
output_path = str(tmp_path) + "/"
263+
264+
value_mapping = {
265+
"id": mapper.m2m_attribute_value(prefix, *attributes),
266+
"name": mapper.val_att(attributes),
267+
"attribute_id/id": mapper.m2o_att_name(prefix, attributes),
268+
}
269+
line_mapping = {
270+
"product_tmpl_id/id": mapper.m2o_map("tmpl_", "template_id"),
271+
"attribute_id/id": mapper.m2o_att_name(prefix, attributes),
272+
"value_ids/id": mapper.m2o_att(prefix, attributes),
273+
}
274+
275+
processor.process_attribute_mapping(
276+
value_mapping, line_mapping, attributes, prefix, output_path, {}
277+
)
278+
279+
def custom_id_gen(tmpl_id: str, vals: dict[str, Any]) -> str:
280+
return f"custom_line_id_for_{tmpl_id}"
281+
282+
processor.process_attribute_mapping(
283+
value_mapping,
284+
line_mapping,
285+
attributes,
286+
prefix,
287+
output_path,
288+
{},
289+
id_gen_fun=custom_id_gen,
290+
)
291+
292+
# Assert that all three files were added to the write queue
293+
assert len(processor.file_to_write) == 3
294+
line_file_data = processor.file_to_write[output_path + "product.attribute.line.csv"]
295+
assert line_file_data["data"][0][0] == "custom_line_id_for_tmpl_.TPL1"

0 commit comments

Comments
 (0)