Skip to content

Commit dc15e2b

Browse files
Merge pull request #785 from benjchristensen/operator-zip
Reimplement Zip Operator Using Lift [Preview]
2 parents 267d569 + 3d5474f commit dc15e2b

File tree

10 files changed

+1080
-294
lines changed

10 files changed

+1080
-294
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@
9797
import rx.operators.OperationToObservableFuture;
9898
import rx.operators.OperationUsing;
9999
import rx.operators.OperationWindow;
100-
import rx.operators.OperationZip;
100+
import rx.operators.OperatorZip;
101101
import rx.operators.OperatorCast;
102102
import rx.operators.OperatorFromIterable;
103103
import rx.operators.OperatorGroupBy;
@@ -108,6 +108,7 @@
108108
import rx.operators.OperatorTimestamp;
109109
import rx.operators.OperatorToObservableList;
110110
import rx.operators.OperatorToObservableSortedList;
111+
import rx.operators.OperatorZipIterable;
111112
import rx.plugins.RxJavaObservableExecutionHook;
112113
import rx.plugins.RxJavaPlugins;
113114
import rx.schedulers.Schedulers;
@@ -1645,11 +1646,9 @@ public final static Observable<Long> interval(long interval, TimeUnit unit, Sche
16451646
* the type of that item
16461647
* @return an Observable that emits {@code value} as a single item and then completes
16471648
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-just">RxJava Wiki: just()</a>
1648-
* @deprecated Use {@link #from(T)}
16491649
*/
1650-
@Deprecated
16511650
public final static <T> Observable<T> just(T value) {
1652-
return from(Arrays.asList((value)));
1651+
return from(Arrays.asList(value));
16531652
}
16541653

16551654
/**
@@ -3058,7 +3057,11 @@ public final static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3
30583057
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
30593058
*/
30603059
public final static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) {
3061-
return create(OperationZip.zip(ws, zipFunction));
3060+
List<Observable<?>> os = new ArrayList<Observable<?>>();
3061+
for (Observable<?> o : ws) {
3062+
os.add(o);
3063+
}
3064+
return Observable.just(os.toArray(new Observable<?>[os.size()])).lift(new OperatorZip<R>(zipFunction));
30623065
}
30633066

30643067
/**
@@ -3087,12 +3090,14 @@ public final static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws,
30873090
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
30883091
*/
30893092
public final static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction) {
3090-
return ws.toList().mergeMap(new Func1<List<? extends Observable<?>>, Observable<? extends R>>() {
3093+
return ws.toList().map(new Func1<List<? extends Observable<?>>, Observable<?>[]>() {
3094+
30913095
@Override
3092-
public final Observable<R> call(List<? extends Observable<?>> wsList) {
3093-
return create(OperationZip.zip(wsList, zipFunction));
3096+
public Observable<?>[] call(List<? extends Observable<?>> o) {
3097+
return o.toArray(new Observable<?>[o.size()]);
30943098
}
3095-
});
3099+
3100+
}).lift(new OperatorZip<R>(zipFunction));
30963101
}
30973102

30983103
/**
@@ -3118,8 +3123,8 @@ public final Observable<R> call(List<? extends Observable<?>> wsList) {
31183123
* @return an Observable that emits the zipped results
31193124
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
31203125
*/
3121-
public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> zipFunction) {
3122-
return create(OperationZip.zip(o1, o2, zipFunction));
3126+
public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) {
3127+
return just(new Observable<?>[] { o1, o2 }).lift(new OperatorZip<R>(zipFunction));
31233128
}
31243129

31253130
/**
@@ -3149,7 +3154,7 @@ public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, O
31493154
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
31503155
*/
31513156
public final static <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> zipFunction) {
3152-
return create(OperationZip.zip(o1, o2, o3, zipFunction));
3157+
return just(new Observable<?>[] { o1, o2, o3 }).lift(new OperatorZip<R>(zipFunction));
31533158
}
31543159

31553160
/**
@@ -3181,7 +3186,7 @@ public final static <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o
31813186
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
31823187
*/
31833188
public final static <T1, T2, T3, T4, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> zipFunction) {
3184-
return create(OperationZip.zip(o1, o2, o3, o4, zipFunction));
3189+
return just(new Observable<?>[] { o1, o2, o3, o4 }).lift(new OperatorZip<R>(zipFunction));
31853190
}
31863191

31873192
/**
@@ -3215,7 +3220,7 @@ public final static <T1, T2, T3, T4, R> Observable<R> zip(Observable<? extends T
32153220
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
32163221
*/
32173222
public final static <T1, T2, T3, T4, T5, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Func5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> zipFunction) {
3218-
return create(OperationZip.zip(o1, o2, o3, o4, o5, zipFunction));
3223+
return just(new Observable<?>[] { o1, o2, o3, o4, o5 }).lift(new OperatorZip<R>(zipFunction));
32193224
}
32203225

32213226
/**
@@ -3251,7 +3256,7 @@ public final static <T1, T2, T3, T4, T5, R> Observable<R> zip(Observable<? exten
32513256
*/
32523257
public final static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6,
32533258
Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
3254-
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, zipFunction));
3259+
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6 }).lift(new OperatorZip<R>(zipFunction));
32553260
}
32563261

32573262
/**
@@ -3289,7 +3294,7 @@ public final static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? e
32893294
*/
32903295
public final static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7,
32913296
Func7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> zipFunction) {
3292-
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, zipFunction));
3297+
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7 }).lift(new OperatorZip<R>(zipFunction));
32933298
}
32943299

32953300
/**
@@ -3329,7 +3334,7 @@ public final static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> zip(Observable
33293334
*/
33303335
public final static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
33313336
Func8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> zipFunction) {
3332-
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, zipFunction));
3337+
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7, o8 }).lift(new OperatorZip<R>(zipFunction));
33333338
}
33343339

33353340
/**
@@ -3371,7 +3376,7 @@ public final static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> zip(Observ
33713376
*/
33723377
public final static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
33733378
Observable<? extends T9> o9, Func9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipFunction) {
3374-
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, o9, zipFunction));
3379+
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7, o8, o9 }).lift(new OperatorZip<R>(zipFunction));
33753380
}
33763381

33773382
/**
@@ -8403,7 +8408,7 @@ public final <U> Observable<Observable<T>> window(Observable<U> boundary) {
84038408
* @return an Observable that pairs up values from the source Observable and the {@code other} Iterable sequence and emits the results of {@code zipFunction} applied to these pairs
84048409
*/
84058410
public final <T2, R> Observable<R> zip(Iterable<? extends T2> other, Func2<? super T, ? super T2, ? extends R> zipFunction) {
8406-
return create(OperationZip.zipIterable(this, other, zipFunction));
8411+
return lift(new OperatorZipIterable<T, T2, R>(other, zipFunction));
84078412
}
84088413

84098414
/**

rxjava-core/src/main/java/rx/Subscriber.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,27 +47,6 @@ protected Subscriber(Subscriber<?> op) {
4747
this(op.cs);
4848
}
4949

50-
public static <T> Subscriber<T> from(final Observer<? super T> o) {
51-
return new Subscriber<T>() {
52-
53-
@Override
54-
public void onCompleted() {
55-
o.onCompleted();
56-
}
57-
58-
@Override
59-
public void onError(Throwable e) {
60-
o.onError(e);
61-
}
62-
63-
@Override
64-
public void onNext(T t) {
65-
o.onNext(t);
66-
}
67-
68-
};
69-
}
70-
7150
/**
7251
* Used to register an unsubscribe callback.
7352
*/
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package rx.observers;
2+
3+
import rx.Observer;
4+
import rx.Subscriber;
5+
import rx.util.OnErrorNotImplementedException;
6+
import rx.util.functions.Action0;
7+
import rx.util.functions.Action1;
8+
9+
public class Subscribers {
10+
11+
public static <T> Subscriber<T> from(final Observer<? super T> o) {
12+
return new Subscriber<T>() {
13+
14+
@Override
15+
public void onCompleted() {
16+
o.onCompleted();
17+
}
18+
19+
@Override
20+
public void onError(Throwable e) {
21+
o.onError(e);
22+
}
23+
24+
@Override
25+
public void onNext(T t) {
26+
o.onNext(t);
27+
}
28+
29+
};
30+
}
31+
32+
/**
33+
* Create an empty Subscriber that ignores all events.
34+
*/
35+
public static final <T> Subscriber<T> create() {
36+
return new Subscriber<T>() {
37+
38+
@Override
39+
public final void onCompleted() {
40+
// do nothing
41+
}
42+
43+
@Override
44+
public final void onError(Throwable e) {
45+
throw new OnErrorNotImplementedException(e);
46+
}
47+
48+
@Override
49+
public final void onNext(T args) {
50+
// do nothing
51+
}
52+
53+
};
54+
}
55+
56+
/**
57+
* Create an Subscriber that receives `onNext` and ignores `onError` and `onCompleted`.
58+
*/
59+
public static final <T> Subscriber<T> create(final Action1<? super T> onNext) {
60+
if (onNext == null) {
61+
throw new IllegalArgumentException("onNext can not be null");
62+
}
63+
64+
return new Subscriber<T>() {
65+
66+
@Override
67+
public final void onCompleted() {
68+
// do nothing
69+
}
70+
71+
@Override
72+
public final void onError(Throwable e) {
73+
throw new OnErrorNotImplementedException(e);
74+
}
75+
76+
@Override
77+
public final void onNext(T args) {
78+
onNext.call(args);
79+
}
80+
81+
};
82+
}
83+
84+
/**
85+
* Create an Subscriber that receives `onNext` and `onError` and ignores `onCompleted`.
86+
*
87+
*/
88+
public static final <T> Subscriber<T> create(final Action1<? super T> onNext, final Action1<Throwable> onError) {
89+
if (onNext == null) {
90+
throw new IllegalArgumentException("onNext can not be null");
91+
}
92+
if (onError == null) {
93+
throw new IllegalArgumentException("onError can not be null");
94+
}
95+
96+
return new Subscriber<T>() {
97+
98+
@Override
99+
public final void onCompleted() {
100+
// do nothing
101+
}
102+
103+
@Override
104+
public final void onError(Throwable e) {
105+
onError.call(e);
106+
}
107+
108+
@Override
109+
public final void onNext(T args) {
110+
onNext.call(args);
111+
}
112+
113+
};
114+
}
115+
116+
/**
117+
* Create an Subscriber that receives `onNext`, `onError` and `onCompleted`.
118+
*
119+
*/
120+
public static final <T> Subscriber<T> create(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
121+
if (onNext == null) {
122+
throw new IllegalArgumentException("onNext can not be null");
123+
}
124+
if (onError == null) {
125+
throw new IllegalArgumentException("onError can not be null");
126+
}
127+
if (onComplete == null) {
128+
throw new IllegalArgumentException("onComplete can not be null");
129+
}
130+
131+
return new Subscriber<T>() {
132+
133+
@Override
134+
public final void onCompleted() {
135+
onComplete.call();
136+
}
137+
138+
@Override
139+
public final void onError(Throwable e) {
140+
onError.call(e);
141+
}
142+
143+
@Override
144+
public final void onNext(T args) {
145+
onNext.call(args);
146+
}
147+
148+
};
149+
}
150+
151+
}

rxjava-core/src/main/java/rx/observers/TestObserver.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.observers;
1717

1818
import java.util.ArrayList;
19+
import java.util.Arrays;
1920
import java.util.Collections;
2021
import java.util.List;
2122

@@ -73,6 +74,14 @@ public List<T> getOnNextEvents() {
7374
return Collections.unmodifiableList(onNextEvents);
7475
}
7576

77+
public List<Object> getEvents() {
78+
ArrayList<Object> events = new ArrayList<Object>();
79+
events.add(onNextEvents);
80+
events.add(onErrorEvents);
81+
events.add(onCompletedEvents);
82+
return Collections.unmodifiableList(events);
83+
}
84+
7685
public void assertReceivedOnNext(List<T> items) {
7786
if (onNextEvents.size() != items.size()) {
7887
throw new AssertionError("Number of items does not match. Provided: " + items.size() + " Actual: " + onNextEvents.size());

rxjava-core/src/main/java/rx/observers/TestSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
*/
2727
public class TestSubscriber<T> extends Subscriber<T> {
2828

29-
private final Subscriber<Object> EMPTY = Subscriber.from(new EmptyObserver<Object>());
29+
private final Subscriber<Object> EMPTY = Subscribers.create();
3030

3131
private final TestObserver<T> testObserver;
3232

0 commit comments

Comments
 (0)