Skip to content

Commit 1443aec

Browse files
Merge branch 'OperatorSequenceEquals' of github.com:akarnokd/RxJava into merge-prs
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents a4f2bab + 7eb21ab commit 1443aec

File tree

3 files changed

+27
-29
lines changed

3 files changed

+27
-29
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import rx.operators.OnSubscribeFromIterable;
5050
import rx.operators.OnSubscribeRange;
5151
import rx.operators.OperationDelay;
52-
import rx.operators.OperationSequenceEqual;
5352
import rx.operators.OperationSkip;
5453
import rx.operators.OperationSkipUntil;
5554
import rx.operators.OperationSwitch;
@@ -108,6 +107,7 @@
108107
import rx.operators.OperatorSampleWithObservable;
109108
import rx.operators.OperatorSampleWithTime;
110109
import rx.operators.OperatorScan;
110+
import rx.operators.OperatorSequenceEqual;
111111
import rx.operators.OperatorSerialize;
112112
import rx.operators.OperatorSingle;
113113
import rx.operators.OperatorSkip;
@@ -2465,7 +2465,7 @@ public final Boolean call(T first, T second) {
24652465
* @see <a href="https://github.com/Netflix/RxJava/wiki/Conditional-and-Boolean-Operators#wiki-sequenceequal">RxJava Wiki: sequenceEqual()</a>
24662466
*/
24672467
public final static <T> Observable<Boolean> sequenceEqual(Observable<? extends T> first, Observable<? extends T> second, Func2<? super T, ? super T, Boolean> equality) {
2468-
return OperationSequenceEqual.sequenceEqual(first, second, equality);
2468+
return OperatorSequenceEqual.sequenceEqual(first, second, equality);
24692469
}
24702470

24712471
/**

rxjava-core/src/main/java/rx/operators/OperationSequenceEqual.java renamed to rxjava-core/src/main/java/rx/operators/OperatorSequenceEqual.java

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import static rx.Observable.concat;
1919
import static rx.Observable.from;
2020
import static rx.Observable.zip;
21-
import rx.Notification;
2221
import rx.Observable;
2322
import rx.functions.Func1;
2423
import rx.functions.Func2;
@@ -28,44 +27,43 @@
2827
* Returns an Observable that emits a Boolean value that indicate whether two
2928
* sequences are equal by comparing the elements pairwise.
3029
*/
31-
public class OperationSequenceEqual {
32-
33-
public static <T> Observable<Boolean> sequenceEqual(
34-
Observable<? extends T> first, Observable<? extends T> second,
35-
final Func2<? super T, ? super T, Boolean> equality) {
36-
Observable<Notification<T>> firstObservable = concat(
37-
first.map(new Func1<T, Notification<T>>() {
30+
public final class OperatorSequenceEqual {
31+
private OperatorSequenceEqual() { throw new IllegalStateException("No instances!"); }
32+
/** NotificationLite doesn't work as zip uses it. */
33+
private static final Object LOCAL_ONCOMPLETED = new Object();
34+
static <T> Observable<Object> materializeLite(Observable<T> source) {
35+
return concat(
36+
source.map(new Func1<T, Object>() {
3837

3938
@Override
40-
public Notification<T> call(T t1) {
41-
return Notification.createOnNext(t1);
39+
public Object call(T t1) {
40+
return t1;
4241
}
4342

44-
}), from(Notification.<T>createOnCompleted()));
45-
46-
Observable<Notification<T>> secondObservable = concat(
47-
second.map(new Func1<T, Notification<T>>() {
48-
49-
@Override
50-
public Notification<T> call(T t1) {
51-
return Notification.createOnNext(t1);
52-
}
53-
54-
}), from(Notification.<T>createOnCompleted()));
43+
}), from(LOCAL_ONCOMPLETED));
44+
}
45+
public static <T> Observable<Boolean> sequenceEqual(
46+
Observable<? extends T> first, Observable<? extends T> second,
47+
final Func2<? super T, ? super T, Boolean> equality) {
48+
Observable<Object> firstObservable = materializeLite(first);
49+
Observable<Object> secondObservable = materializeLite(second);
5550

5651
return zip(firstObservable, secondObservable,
57-
new Func2<Notification<T>, Notification<T>, Boolean>() {
52+
new Func2<Object, Object, Boolean>() {
5853

5954
@Override
60-
public Boolean call(Notification<T> t1, Notification<T> t2) {
61-
if (t1.isOnCompleted() && t2.isOnCompleted()) {
55+
@SuppressWarnings("unchecked")
56+
public Boolean call(Object t1, Object t2) {
57+
boolean c1 = t1 == LOCAL_ONCOMPLETED;
58+
boolean c2 = t2 == LOCAL_ONCOMPLETED;
59+
if (c1 && c2) {
6260
return true;
6361
}
64-
if (t1.isOnCompleted() || t2.isOnCompleted()) {
62+
if (c1 || c2) {
6563
return false;
6664
}
6765
// Now t1 and t2 must be 'onNext'.
68-
return equality.call(t1.getValue(), t2.getValue());
66+
return equality.call((T)t1, (T)t2);
6967
}
7068

7169
}).all(Functions.<Boolean> identity());

rxjava-core/src/test/java/rx/operators/OperationSequenceEqualTests.java renamed to rxjava-core/src/test/java/rx/operators/OperatorSequenceEqualTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import rx.Observer;
2828
import rx.functions.Func2;
2929

30-
public class OperationSequenceEqualTests {
30+
public class OperatorSequenceEqualTest {
3131

3232
@Test
3333
public void test1() {

0 commit comments

Comments
 (0)