Skip to content

Commit 35d8005

Browse files
committed
Reimplement the 'SequenceEqual' operator using other operators
1 parent 6a4a3d0 commit 35d8005

File tree

4 files changed

+229
-21
lines changed

4 files changed

+229
-21
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import rx.operators.OperationRetry;
7777
import rx.operators.OperationSample;
7878
import rx.operators.OperationScan;
79+
import rx.operators.OperationSequenceEqual;
7980
import rx.operators.OperationSkip;
8081
import rx.operators.OperationSkipLast;
8182
import rx.operators.OperationSkipUntil;
@@ -2298,16 +2299,16 @@ public static <T> Observable<T> from(Future<? extends T> future, long timeout, T
22982299
}
22992300

23002301
/**
2301-
* Returns an Observable that emits Boolean values that indicate whether the
2302-
* pairs of items emitted by two source Observables are equal.
2302+
* Returns an Observable that emits a Boolean value that indicate
2303+
* whether two sequences are equal by comparing the elements pairwise.
23032304
* <p>
23042305
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sequenceEqual.png">
23052306
*
23062307
* @param first the first Observable to compare
23072308
* @param second the second Observable to compare
23082309
* @param <T> the type of items emitted by each Observable
2309-
* @return an Observable that emits Booleans that indicate whether the
2310-
* corresponding items emitted by the source Observables are equal
2310+
* @return an Observable that emits a Boolean value that indicate
2311+
* whether two sequences are equal by comparing the elements pairwise.
23112312
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#sequenceequal">RxJava Wiki: sequenceEqual()</a>
23122313
*/
23132314
public static <T> Observable<Boolean> sequenceEqual(Observable<? extends T> first, Observable<? extends T> second) {
@@ -2320,9 +2321,9 @@ public Boolean call(T first, T second) {
23202321
}
23212322

23222323
/**
2323-
* Returns an Observable that emits Boolean values that indicate whether the
2324-
* pairs of items emitted by two source Observables are equal based on the
2325-
* results of a specified equality function.
2324+
* Returns an Observable that emits a Boolean value that indicate
2325+
* whether two sequences are equal by comparing the elements pairwise
2326+
* based on the results of a specified equality function.
23262327
* <p>
23272328
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sequenceEqual.png">
23282329
*
@@ -2331,12 +2332,12 @@ public Boolean call(T first, T second) {
23312332
* @param equality a function used to compare items emitted by both
23322333
* Observables
23332334
* @param <T> the type of items emitted by each Observable
2334-
* @return an Observable that emits Booleans that indicate whether the
2335-
* corresponding items emitted by the source Observables are equal
2335+
* @return an Observable that emits a Boolean value that indicate
2336+
* whether two sequences are equal by comparing the elements pairwise.
23362337
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#sequenceequal">RxJava Wiki: sequenceEqual()</a>
23372338
*/
23382339
public static <T> Observable<Boolean> sequenceEqual(Observable<? extends T> first, Observable<? extends T> second, Func2<? super T, ? super T, Boolean> equality) {
2339-
return zip(first, second, equality);
2340+
return OperationSequenceEqual.sequenceEqual(first, second, equality);
23402341
}
23412342

23422343
/**
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static rx.Observable.concat;
19+
import static rx.Observable.from;
20+
import static rx.Observable.zip;
21+
import rx.Notification;
22+
import rx.Observable;
23+
import rx.util.functions.Func1;
24+
import rx.util.functions.Func2;
25+
import rx.util.functions.Functions;
26+
27+
/**
28+
* Returns an Observable that emits a Boolean value that indicate whether two
29+
* sequences are equal by comparing the elements pairwise.
30+
*/
31+
public class OperationSequenceEqual {
32+
33+
public static <T> Observable<Boolean> sequenceEqual(
34+
Observable<? extends T> first, Observable<? extends T> second,
35+
final Func2<? super T, ? super T, Boolean> equality) {
36+
Observable<Notification<T>> firstObservable = concat(
37+
first.map(new Func1<T, Notification<T>>() {
38+
39+
@Override
40+
public Notification<T> call(T t1) {
41+
return new Notification<T>(t1);
42+
}
43+
44+
}), from(new Notification<T>()));
45+
46+
Observable<Notification<T>> secondObservable = concat(
47+
second.map(new Func1<T, Notification<T>>() {
48+
49+
@Override
50+
public Notification<T> call(T t1) {
51+
return new Notification<T>(t1);
52+
}
53+
54+
}), from(new Notification<T>()));
55+
56+
return zip(firstObservable, secondObservable,
57+
new Func2<Notification<T>, Notification<T>, Boolean>() {
58+
59+
@Override
60+
public Boolean call(Notification<T> t1, Notification<T> t2) {
61+
if (t1.isOnCompleted() && t2.isOnCompleted()) {
62+
return true;
63+
}
64+
if (t1.isOnCompleted() || t2.isOnCompleted()) {
65+
return false;
66+
}
67+
// Now t1 and t2 must be 'onNext'.
68+
return equality.call(t1.getValue(), t2.getValue());
69+
}
70+
71+
}).all(Functions.<Boolean> identity());
72+
}
73+
}

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -275,17 +275,6 @@ public Integer call(Integer t1, Integer t2) {
275275
verify(w).onNext(60);
276276
}
277277

278-
@Test
279-
public void testSequenceEqual() {
280-
Observable<Integer> first = Observable.from(1, 2, 3);
281-
Observable<Integer> second = Observable.from(1, 2, 4);
282-
@SuppressWarnings("unchecked")
283-
Observer<Boolean> result = mock(Observer.class);
284-
Observable.sequenceEqual(first, second).subscribe(result);
285-
verify(result, times(2)).onNext(true);
286-
verify(result, times(1)).onNext(false);
287-
}
288-
289278
@Test
290279
public void testOnSubscribeFails() {
291280
@SuppressWarnings("unchecked")
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Matchers.isA;
19+
import static org.mockito.Mockito.inOrder;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.times;
22+
23+
import org.junit.Test;
24+
import org.mockito.InOrder;
25+
26+
import rx.Observable;
27+
import rx.Observer;
28+
29+
public class OperationSequenceEqualTests {
30+
31+
@Test
32+
public void test1() {
33+
Observable<Boolean> observable = Observable.sequenceEqual(
34+
Observable.from("one", "two", "three"),
35+
Observable.from("one", "two", "three"));
36+
verifyResult(observable, true);
37+
}
38+
39+
@Test
40+
public void test2() {
41+
Observable<Boolean> observable = Observable.sequenceEqual(
42+
Observable.from("one", "two", "three"),
43+
Observable.from("one", "two", "three", "four"));
44+
verifyResult(observable, false);
45+
}
46+
47+
@Test
48+
public void test3() {
49+
Observable<Boolean> observable = Observable.sequenceEqual(
50+
Observable.from("one", "two", "three", "four"),
51+
Observable.from("one", "two", "three"));
52+
verifyResult(observable, false);
53+
}
54+
55+
@Test
56+
public void testWithError1() {
57+
Observable<Boolean> observable = Observable.sequenceEqual(
58+
Observable.concat(Observable.from("one"),
59+
Observable.<String> error(new TestException())),
60+
Observable.from("one", "two", "three"));
61+
verifyError(observable);
62+
}
63+
64+
@Test
65+
public void testWithError2() {
66+
Observable<Boolean> observable = Observable.sequenceEqual(
67+
Observable.from("one", "two", "three"),
68+
Observable.concat(Observable.from("one"),
69+
Observable.<String> error(new TestException())));
70+
verifyError(observable);
71+
}
72+
73+
@Test
74+
public void testWithError3() {
75+
Observable<Boolean> observable = Observable.sequenceEqual(
76+
Observable.concat(Observable.from("one"),
77+
Observable.<String> error(new TestException())),
78+
Observable.concat(Observable.from("one"),
79+
Observable.<String> error(new TestException())));
80+
verifyError(observable);
81+
}
82+
83+
@Test
84+
public void testWithEmpty1() {
85+
Observable<Boolean> observable = Observable.sequenceEqual(
86+
Observable.<String> empty(),
87+
Observable.from("one", "two", "three"));
88+
verifyResult(observable, false);
89+
}
90+
91+
@Test
92+
public void testWithEmpty2() {
93+
Observable<Boolean> observable = Observable.sequenceEqual(
94+
Observable.from("one", "two", "three"),
95+
Observable.<String> empty());
96+
verifyResult(observable, false);
97+
}
98+
99+
@Test
100+
public void testWithEmpty3() {
101+
Observable<Boolean> observable = Observable.sequenceEqual(
102+
Observable.<String> empty(), Observable.<String> empty());
103+
verifyResult(observable, true);
104+
}
105+
106+
@Test
107+
public void testWithEqualityError() {
108+
Observable<Boolean> observable = Observable.sequenceEqual(
109+
Observable.from((String) null), Observable.from("one"));
110+
111+
@SuppressWarnings("unchecked")
112+
Observer<Boolean> observer = mock(Observer.class);
113+
observable.subscribe(observer);
114+
115+
InOrder inOrder = inOrder(observer);
116+
inOrder.verify(observer, times(1)).onError(
117+
isA(NullPointerException.class));
118+
inOrder.verifyNoMoreInteractions();
119+
}
120+
121+
private void verifyResult(Observable<Boolean> observable, boolean result) {
122+
@SuppressWarnings("unchecked")
123+
Observer<Boolean> observer = mock(Observer.class);
124+
observable.subscribe(observer);
125+
126+
InOrder inOrder = inOrder(observer);
127+
inOrder.verify(observer, times(1)).onNext(result);
128+
inOrder.verify(observer).onCompleted();
129+
inOrder.verifyNoMoreInteractions();
130+
}
131+
132+
private void verifyError(Observable<Boolean> observable) {
133+
@SuppressWarnings("unchecked")
134+
Observer<Boolean> observer = mock(Observer.class);
135+
observable.subscribe(observer);
136+
137+
InOrder inOrder = inOrder(observer);
138+
inOrder.verify(observer, times(1)).onError(isA(TestException.class));
139+
inOrder.verifyNoMoreInteractions();
140+
}
141+
142+
private class TestException extends RuntimeException {
143+
private static final long serialVersionUID = 1L;
144+
}
145+
}

0 commit comments

Comments
 (0)