Skip to content

Commit 0da0847

Browse files
authored
Merge pull request #16698 from tapankavasthi/dev/tavasthi-bael-7998
BAEL-7998: RxJava Observable concat() vs merge()
2 parents 450439e + ffe6510 commit 0da0847

File tree

1 file changed

+129
-0
lines changed

1 file changed

+129
-0
lines changed
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package com.baeldung.rxjava.combine;
2+
3+
import static org.junit.jupiter.api.Assertions.assertTrue;
4+
5+
import java.util.Arrays;
6+
import java.util.List;
7+
import java.util.concurrent.TimeUnit;
8+
9+
import org.junit.Test;
10+
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
import rx.Observable;
15+
import rx.observers.TestSubscriber;
16+
17+
public class ObservableMergeVsConcatUnitTest {
18+
19+
private static final Logger logger = LoggerFactory.getLogger(ObservableMergeVsConcatUnitTest.class);
20+
21+
@Test
22+
public void givenTwoSynchronousObservables_whenConcatenated_thenAllValuesEmittedInOrder() {
23+
Observable<Integer> observable1 = Observable.just(1, 2, 3);
24+
Observable<Integer> observable2 = Observable.just(4, 5, 6);
25+
Observable<Integer> observable3 = Observable.just(7, 8, 9);
26+
27+
TestSubscriber<Integer> testSubscriberForConcat = new TestSubscriber<>();
28+
29+
Observable.concat(observable1, observable2, observable3)
30+
.subscribe(testSubscriberForConcat);
31+
32+
testSubscriberForConcat.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9);
33+
}
34+
35+
@Test
36+
public void givenTwoSynchronousObservables_whenMerged_thenAllValuesEmitted() {
37+
Observable<Integer> observable1 = Observable.just(1, 2, 3);
38+
Observable<Integer> observable2 = Observable.just(4, 5, 6);
39+
Observable<Integer> observable3 = Observable.just(7, 8, 9);
40+
41+
TestSubscriber<Integer> testSubscriberForMerge = new TestSubscriber<>();
42+
43+
Observable.merge(observable1, observable2, observable3)
44+
.subscribe(testSubscriberForMerge);
45+
46+
testSubscriberForMerge.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9);
47+
}
48+
49+
@Test
50+
public void givenTwoAsynchronousObservables_whenConcatenated_thenAllValuesEmittedInOrder() {
51+
Observable<Integer> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
52+
.map(i -> i.intValue() + 1)
53+
.take(3);
54+
55+
Observable<Integer> observable2 = Observable.interval(30, TimeUnit.MILLISECONDS)
56+
.map(i -> i.intValue() + 4)
57+
.take(7);
58+
59+
TestSubscriber<Integer> testSubscriberForConcat = new TestSubscriber<>();
60+
61+
Observable.concat(observable1, observable2)
62+
.subscribe(testSubscriberForConcat);
63+
64+
testSubscriberForConcat.awaitTerminalEvent();
65+
66+
testSubscriberForConcat.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
67+
}
68+
69+
@Test
70+
public void givenTwoAsynchronousObservables_whenMerged_thenAllValuesEmittedInterleaved() {
71+
Observable<Integer> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
72+
.map(i -> i.intValue() + 1)
73+
.take(3);
74+
Observable<Integer> observable2 = Observable.interval(30, TimeUnit.MILLISECONDS)
75+
.map(i -> i.intValue() + 4)
76+
.take(7);
77+
TestSubscriber<Integer> testSubscriberForMerge = new TestSubscriber<>();
78+
79+
Observable.merge(observable1, observable2)
80+
.subscribe(testSubscriberForMerge);
81+
82+
testSubscriberForMerge.awaitTerminalEvent();
83+
84+
testSubscriberForMerge.assertValues(4, 5, 6, 1, 7, 8, 9, 2, 10, 3);
85+
}
86+
87+
@Test
88+
public void givenTwoAsynchronousObservablesWithSimilarDelays_whenConcat_thenAllValuesEmittedInOrder() {
89+
Observable<Integer> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
90+
.map(i -> i.intValue() + 1)
91+
.take(3);
92+
93+
Observable<Integer> observable2 = Observable.interval(100, TimeUnit.MILLISECONDS)
94+
.map(i -> i.intValue() + 4)
95+
.take(3);
96+
97+
TestSubscriber<Integer> testSubscriberForConcat = new TestSubscriber<>();
98+
99+
Observable.concat(observable1, observable2)
100+
.subscribe(testSubscriberForConcat);
101+
102+
testSubscriberForConcat.awaitTerminalEvent();
103+
104+
testSubscriberForConcat.assertValues(1, 2, 3, 4, 5, 6);
105+
}
106+
107+
@Test
108+
public void givenTwoAsynchronousObservablesWithSimilarDelays_whenMerged_thenAllValuesEmittedRegardlessOfOrder() {
109+
Observable<Integer> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
110+
.map(i -> i.intValue() + 1)
111+
.take(3);
112+
113+
Observable<Integer> observable2 = Observable.interval(100, TimeUnit.MILLISECONDS)
114+
.map(i -> i.intValue() + 4)
115+
.take(3);
116+
117+
TestSubscriber<Integer> testSubscriberForMerge = new TestSubscriber<>();
118+
119+
Observable.merge(observable1, observable2)
120+
.subscribe(testSubscriberForMerge);
121+
122+
testSubscriberForMerge.awaitTerminalEvent();
123+
124+
List<Integer> actual = testSubscriberForMerge.getOnNextEvents();
125+
List<Integer> expected = Arrays.asList(1, 2, 3, 4, 5, 6);
126+
assertTrue(actual.containsAll(expected) && expected.containsAll(actual));
127+
logger.info("actual emissions: {}", actual);
128+
}
129+
}

0 commit comments

Comments
 (0)