Skip to content

Commit 55c1fbe

Browse files
bosdbosd
authored andcommitted
[IMP]: Coverage of rpc_thread
1 parent c041e05 commit 55c1fbe

File tree

1 file changed

+72
-0
lines changed

1 file changed

+72
-0
lines changed

tests/test_rpc_thread.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
"""Test the RpcThread class."""
2+
3+
from unittest.mock import MagicMock, patch
4+
5+
import pytest
6+
7+
from odoo_data_flow.lib.internal.rpc_thread import RpcThread
8+
9+
10+
def test_rpc_thread_invalid_max_connection() -> None:
11+
"""Test invalid max connection.
12+
13+
Tests that initializing RpcThread with a non-positive max_connection
14+
raises a ValueError.
15+
"""
16+
with pytest.raises(ValueError, match="max_connection must be a positive integer"):
17+
RpcThread(0)
18+
19+
with pytest.raises(ValueError, match="max_connection must be a positive integer"):
20+
RpcThread(-1)
21+
22+
23+
@patch("odoo_data_flow.lib.internal.rpc_thread.log.error")
24+
def test_rpc_thread_wait_handles_exception(mock_log_error: MagicMock) -> None:
25+
"""Test Wait handle exception.
26+
27+
Tests that the wait() method correctly catches and logs exceptions
28+
from worker threads.
29+
"""
30+
# 1. Setup
31+
rpc_thread = RpcThread(max_connection=1)
32+
33+
def failing_function() -> None:
34+
"""A simple function that always raises an error."""
35+
raise ValueError("This is a test failure.")
36+
37+
# 2. Action
38+
# Spawn a thread that will execute the failing function
39+
rpc_thread.spawn_thread(failing_function, args=[], kwargs={})
40+
# The wait method should catch the exception and log it
41+
rpc_thread.wait()
42+
43+
# 3. Assertions
44+
mock_log_error.assert_called_once()
45+
# Check that the log message contains the exception's message
46+
log_message = mock_log_error.call_args[0][0]
47+
assert "A task in a worker thread failed" in log_message
48+
assert "This is a test failure" in log_message
49+
50+
51+
def test_rpc_thread_thread_number() -> None:
52+
"""Test the thread number.
53+
54+
Tests that the thread_number() method returns the correct count of
55+
submitted tasks.
56+
"""
57+
# 1. Setup
58+
rpc_thread = RpcThread(max_connection=2)
59+
60+
def dummy_function() -> None:
61+
pass
62+
63+
# 2. Action
64+
rpc_thread.spawn_thread(dummy_function, [], {})
65+
rpc_thread.spawn_thread(dummy_function, [], {})
66+
rpc_thread.spawn_thread(dummy_function, [], {})
67+
68+
# 3. Assertions
69+
assert rpc_thread.thread_number() == 3
70+
71+
# Clean up the threads
72+
rpc_thread.wait()

0 commit comments

Comments
 (0)