Skip to content

Commit aad7066

Browse files
committed
Add more tests
1 parent 9af8dfb commit aad7066

12 files changed

+2276
-845
lines changed

reactivex/observable/mixins/utility.py

Lines changed: 462 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
"""Tests for CombinationMixin fluent API methods.
2+
3+
This module tests the combination operators fluent syntax from CombinationMixin,
4+
ensuring they produce identical results to the pipe-based functional syntax.
5+
"""
6+
7+
import reactivex as rx
8+
from reactivex import Observable, operators as ops
9+
10+
11+
class TestMergeMethodChaining:
12+
"""Tests for merge() method."""
13+
14+
def test_merge_equivalence(self) -> None:
15+
"""Verify merge fluent and functional styles are equivalent."""
16+
source1: Observable[int] = rx.of(1, 2, 3)
17+
source2: Observable[int] = rx.of(4, 5, 6)
18+
19+
fluent_result: Observable[int] = source1.merge(source2)
20+
pipe_result: Observable[int] = source1.pipe(ops.merge(source2))
21+
22+
fluent_values: list[int] = []
23+
pipe_values: list[int] = []
24+
25+
fluent_result.subscribe(on_next=fluent_values.append)
26+
pipe_result.subscribe(on_next=pipe_values.append)
27+
28+
# Merge is concurrent, but in sync mode results should be same
29+
assert sorted(fluent_values) == sorted(pipe_values)
30+
31+
32+
class TestConcatMethodChaining:
33+
"""Tests for concat() method."""
34+
35+
def test_concat_equivalence(self) -> None:
36+
"""Verify concat fluent and functional styles are equivalent."""
37+
source1: Observable[int] = rx.of(1, 2, 3)
38+
source2: Observable[int] = rx.of(4, 5, 6)
39+
40+
fluent_result: Observable[int] = source1.concat(source2)
41+
pipe_result: Observable[int] = source1.pipe(ops.concat(source2))
42+
43+
fluent_values: list[int] = []
44+
pipe_values: list[int] = []
45+
46+
fluent_result.subscribe(on_next=fluent_values.append)
47+
pipe_result.subscribe(on_next=pipe_values.append)
48+
49+
assert fluent_values == pipe_values == [1, 2, 3, 4, 5, 6]
50+
51+
52+
class TestStartWithMethodChaining:
53+
"""Tests for start_with() method."""
54+
55+
def test_start_with_equivalence(self) -> None:
56+
"""Verify start_with fluent and functional styles are equivalent."""
57+
source: Observable[int] = rx.of(4, 5, 6)
58+
59+
fluent_result: Observable[int] = source.start_with(1, 2, 3)
60+
pipe_result: Observable[int] = source.pipe(ops.start_with(1, 2, 3))
61+
62+
fluent_values: list[int] = []
63+
pipe_values: list[int] = []
64+
65+
fluent_result.subscribe(on_next=fluent_values.append)
66+
pipe_result.subscribe(on_next=pipe_values.append)
67+
68+
assert fluent_values == pipe_values == [1, 2, 3, 4, 5, 6]
69+
70+
71+
class TestForkJoinMethodChaining:
72+
"""Tests for fork_join() method."""
73+
74+
def test_fork_join_equivalence(self) -> None:
75+
"""Verify fork_join fluent and functional styles are equivalent."""
76+
source1: Observable[int] = rx.of(1, 2, 3)
77+
source2: Observable[int] = rx.of(4, 5, 6)
78+
source3: Observable[int] = rx.of(7, 8, 9)
79+
80+
fluent_result: Observable[tuple[int, ...]] = source1.fork_join(source2, source3)
81+
pipe_result: Observable[tuple[int, ...]] = source1.pipe(
82+
ops.fork_join(source2, source3)
83+
)
84+
85+
fluent_values: list[tuple[int, ...]] = []
86+
pipe_values: list[tuple[int, ...]] = []
87+
88+
fluent_result.subscribe(on_next=fluent_values.append)
89+
pipe_result.subscribe(on_next=pipe_values.append)
90+
91+
assert fluent_values == pipe_values == [(3, 6, 9)]
92+
93+
94+
class TestAmbMethodChaining:
95+
"""Tests for amb() method."""
96+
97+
def test_amb_equivalence(self) -> None:
98+
"""Verify amb fluent and functional styles are equivalent."""
99+
source1: Observable[int] = rx.of(1, 2, 3)
100+
source2: Observable[int] = rx.of(4, 5, 6)
101+
102+
fluent_result: Observable[int] = source1.amb(source2)
103+
pipe_result: Observable[int] = source1.pipe(ops.amb(source2))
104+
105+
fluent_values: list[int] = []
106+
pipe_values: list[int] = []
107+
108+
fluent_result.subscribe(on_next=fluent_values.append)
109+
pipe_result.subscribe(on_next=pipe_values.append)
110+
111+
# amb picks the first observable that emits
112+
assert fluent_values == pipe_values == [1, 2, 3]
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
"""Tests for ConditionalMixin fluent API methods.
2+
3+
This module tests the conditional operators fluent syntax from ConditionalMixin,
4+
ensuring they produce identical results to the pipe-based functional syntax.
5+
"""
6+
7+
import reactivex as rx
8+
from reactivex import Observable, operators as ops
9+
10+
11+
class TestDefaultIfEmptyMethodChaining:
12+
"""Tests for default_if_empty() method."""
13+
14+
def test_default_if_empty_equivalence(self) -> None:
15+
"""Verify default_if_empty fluent and functional styles are equivalent."""
16+
source: Observable[int] = rx.empty()
17+
18+
fluent_result: Observable[int] = source.default_if_empty(42)
19+
pipe_result: Observable[int] = source.pipe(ops.default_if_empty(42))
20+
21+
fluent_values: list[int] = []
22+
pipe_values: list[int] = []
23+
24+
fluent_result.subscribe(on_next=fluent_values.append)
25+
pipe_result.subscribe(on_next=pipe_values.append)
26+
27+
assert fluent_values == pipe_values == [42]
28+
29+
def test_default_if_empty_with_values(self) -> None:
30+
"""Test default_if_empty with non-empty source."""
31+
source: Observable[int] = rx.of(1, 2, 3)
32+
33+
result: Observable[int] = source.default_if_empty(99)
34+
35+
values: list[int] = []
36+
result.subscribe(on_next=values.append)
37+
38+
# Should return original values, not default
39+
assert values == [1, 2, 3]
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
"""Tests for ErrorHandlingMixin fluent API methods.
2+
3+
This module tests the error handling operators fluent syntax from ErrorHandlingMixin,
4+
ensuring they produce identical results to the pipe-based functional syntax.
5+
"""
6+
7+
import reactivex as rx
8+
from reactivex import Observable, operators as ops
9+
10+
11+
class TestCatchMethodChaining:
12+
"""Tests for catch() method."""
13+
14+
def test_catch_equivalence(self) -> None:
15+
"""Verify catch fluent and functional styles are equivalent."""
16+
source: Observable[int] = rx.throw(Exception("Error"))
17+
fallback: Observable[int] = rx.of(99)
18+
19+
fluent_result: Observable[int] = source.catch(fallback)
20+
pipe_result: Observable[int] = source.pipe(ops.catch(fallback))
21+
22+
fluent_values: list[int] = []
23+
pipe_values: list[int] = []
24+
25+
fluent_result.subscribe(on_next=fluent_values.append, on_error=lambda e: None)
26+
pipe_result.subscribe(on_next=pipe_values.append, on_error=lambda e: None)
27+
28+
assert fluent_values == pipe_values == [99]
29+
30+
def test_catch_with_normal_completion(self) -> None:
31+
"""Test catch when source completes normally."""
32+
source: Observable[int] = rx.of(1, 2, 3)
33+
fallback: Observable[int] = rx.of(99)
34+
35+
result: Observable[int] = source.catch(fallback)
36+
37+
values: list[int] = []
38+
result.subscribe(on_next=values.append, on_error=lambda e: None)
39+
40+
# Should return original values since no error
41+
assert values == [1, 2, 3]

0 commit comments

Comments
 (0)