|
18 | 18 | import static rx.Observable.concat;
|
19 | 19 | import static rx.Observable.from;
|
20 | 20 | import static rx.Observable.zip;
|
21 |
| -import rx.Notification; |
22 | 21 | import rx.Observable;
|
23 | 22 | import rx.functions.Func1;
|
24 | 23 | import rx.functions.Func2;
|
|
28 | 27 | * Returns an Observable that emits a Boolean value that indicate whether two
|
29 | 28 | * sequences are equal by comparing the elements pairwise.
|
30 | 29 | */
|
31 |
| -public class OperationSequenceEqual { |
32 |
| - |
33 |
| - public static <T> Observable<Boolean> sequenceEqual( |
34 |
| - Observable<? extends T> first, Observable<? extends T> second, |
35 |
| - final Func2<? super T, ? super T, Boolean> equality) { |
36 |
| - Observable<Notification<T>> firstObservable = concat( |
37 |
| - first.map(new Func1<T, Notification<T>>() { |
| 30 | +public final class OperatorSequenceEqual { |
| 31 | + private OperatorSequenceEqual() { throw new IllegalStateException("No instances!"); } |
| 32 | + /** NotificationLite doesn't work as zip uses it. */ |
| 33 | + private static final Object LOCAL_ONCOMPLETED = new Object(); |
| 34 | + static <T> Observable<Object> materializeLite(Observable<T> source) { |
| 35 | + return concat( |
| 36 | + source.map(new Func1<T, Object>() { |
38 | 37 |
|
39 | 38 | @Override
|
40 |
| - public Notification<T> call(T t1) { |
41 |
| - return Notification.createOnNext(t1); |
| 39 | + public Object call(T t1) { |
| 40 | + return t1; |
42 | 41 | }
|
43 | 42 |
|
44 |
| - }), from(Notification.<T>createOnCompleted())); |
45 |
| - |
46 |
| - Observable<Notification<T>> secondObservable = concat( |
47 |
| - second.map(new Func1<T, Notification<T>>() { |
48 |
| - |
49 |
| - @Override |
50 |
| - public Notification<T> call(T t1) { |
51 |
| - return Notification.createOnNext(t1); |
52 |
| - } |
53 |
| - |
54 |
| - }), from(Notification.<T>createOnCompleted())); |
| 43 | + }), from(LOCAL_ONCOMPLETED)); |
| 44 | + } |
| 45 | + public static <T> Observable<Boolean> sequenceEqual( |
| 46 | + Observable<? extends T> first, Observable<? extends T> second, |
| 47 | + final Func2<? super T, ? super T, Boolean> equality) { |
| 48 | + Observable<Object> firstObservable = materializeLite(first); |
| 49 | + Observable<Object> secondObservable = materializeLite(second); |
55 | 50 |
|
56 | 51 | return zip(firstObservable, secondObservable,
|
57 |
| - new Func2<Notification<T>, Notification<T>, Boolean>() { |
| 52 | + new Func2<Object, Object, Boolean>() { |
58 | 53 |
|
59 | 54 | @Override
|
60 |
| - public Boolean call(Notification<T> t1, Notification<T> t2) { |
61 |
| - if (t1.isOnCompleted() && t2.isOnCompleted()) { |
| 55 | + @SuppressWarnings("unchecked") |
| 56 | + public Boolean call(Object t1, Object t2) { |
| 57 | + boolean c1 = t1 == LOCAL_ONCOMPLETED; |
| 58 | + boolean c2 = t2 == LOCAL_ONCOMPLETED; |
| 59 | + if (c1 && c2) { |
62 | 60 | return true;
|
63 | 61 | }
|
64 |
| - if (t1.isOnCompleted() || t2.isOnCompleted()) { |
| 62 | + if (c1 || c2) { |
65 | 63 | return false;
|
66 | 64 | }
|
67 | 65 | // Now t1 and t2 must be 'onNext'.
|
68 |
| - return equality.call(t1.getValue(), t2.getValue()); |
| 66 | + return equality.call((T)t1, (T)t2); |
69 | 67 | }
|
70 | 68 |
|
71 | 69 | }).all(Functions.<Boolean> identity());
|
|
0 commit comments