Skip to content

Commit 84309b6

Browse files
Reimplement Zip Operator Using Lift
- Use new lift operator implement and non-blocking synchronization approach. - I have had the concurrency model reviewed by some colleagues and all unit tests are passing but further review is justified and welcome.
1 parent 267d569 commit 84309b6

File tree

5 files changed

+479
-40
lines changed

5 files changed

+479
-40
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@
9797
import rx.operators.OperationToObservableFuture;
9898
import rx.operators.OperationUsing;
9999
import rx.operators.OperationWindow;
100-
import rx.operators.OperationZip;
100+
import rx.operators.OperatorZip;
101101
import rx.operators.OperatorCast;
102102
import rx.operators.OperatorFromIterable;
103103
import rx.operators.OperatorGroupBy;
@@ -1645,11 +1645,9 @@ public final static Observable<Long> interval(long interval, TimeUnit unit, Sche
16451645
* the type of that item
16461646
* @return an Observable that emits {@code value} as a single item and then completes
16471647
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-just">RxJava Wiki: just()</a>
1648-
* @deprecated Use {@link #from(T)}
16491648
*/
1650-
@Deprecated
16511649
public final static <T> Observable<T> just(T value) {
1652-
return from(Arrays.asList((value)));
1650+
return from(Arrays.asList(value));
16531651
}
16541652

16551653
/**
@@ -3058,7 +3056,11 @@ public final static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3
30583056
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
30593057
*/
30603058
public final static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) {
3061-
return create(OperationZip.zip(ws, zipFunction));
3059+
List<Observable<?>> os = new ArrayList<Observable<?>>();
3060+
for (Observable<?> o : ws) {
3061+
os.add(o);
3062+
}
3063+
return Observable.just(os.toArray(new Observable<?>[os.size()])).lift(new OperatorZip<R>(zipFunction));
30623064
}
30633065

30643066
/**
@@ -3087,12 +3089,14 @@ public final static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws,
30873089
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
30883090
*/
30893091
public final static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction) {
3090-
return ws.toList().mergeMap(new Func1<List<? extends Observable<?>>, Observable<? extends R>>() {
3092+
return ws.toList().map(new Func1<List<? extends Observable<?>>, Observable<?>[]>() {
3093+
30913094
@Override
3092-
public final Observable<R> call(List<? extends Observable<?>> wsList) {
3093-
return create(OperationZip.zip(wsList, zipFunction));
3095+
public Observable<?>[] call(List<? extends Observable<?>> o) {
3096+
return o.toArray(new Observable<?>[o.size()]);
30943097
}
3095-
});
3098+
3099+
}).lift(new OperatorZip<R>(zipFunction));
30963100
}
30973101

30983102
/**
@@ -3118,8 +3122,8 @@ public final Observable<R> call(List<? extends Observable<?>> wsList) {
31183122
* @return an Observable that emits the zipped results
31193123
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
31203124
*/
3121-
public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> zipFunction) {
3122-
return create(OperationZip.zip(o1, o2, zipFunction));
3125+
public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) {
3126+
return just(new Observable<?>[] { o1, o2 }).lift(new OperatorZip<R>(zipFunction));
31233127
}
31243128

31253129
/**
@@ -3149,7 +3153,7 @@ public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, O
31493153
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
31503154
*/
31513155
public final static <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> zipFunction) {
3152-
return create(OperationZip.zip(o1, o2, o3, zipFunction));
3156+
return just(new Observable<?>[] { o1, o2, o3 }).lift(new OperatorZip<R>(zipFunction));
31533157
}
31543158

31553159
/**
@@ -3181,7 +3185,7 @@ public final static <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o
31813185
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
31823186
*/
31833187
public final static <T1, T2, T3, T4, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> zipFunction) {
3184-
return create(OperationZip.zip(o1, o2, o3, o4, zipFunction));
3188+
return just(new Observable<?>[] { o1, o2, o3, o4 }).lift(new OperatorZip<R>(zipFunction));
31853189
}
31863190

31873191
/**
@@ -3215,7 +3219,7 @@ public final static <T1, T2, T3, T4, R> Observable<R> zip(Observable<? extends T
32153219
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
32163220
*/
32173221
public final static <T1, T2, T3, T4, T5, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Func5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> zipFunction) {
3218-
return create(OperationZip.zip(o1, o2, o3, o4, o5, zipFunction));
3222+
return just(new Observable<?>[] { o1, o2, o3, o4, o5 }).lift(new OperatorZip<R>(zipFunction));
32193223
}
32203224

32213225
/**
@@ -3251,7 +3255,7 @@ public final static <T1, T2, T3, T4, T5, R> Observable<R> zip(Observable<? exten
32513255
*/
32523256
public final static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6,
32533257
Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
3254-
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, zipFunction));
3258+
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6 }).lift(new OperatorZip<R>(zipFunction));
32553259
}
32563260

32573261
/**
@@ -3289,7 +3293,7 @@ public final static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? e
32893293
*/
32903294
public final static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7,
32913295
Func7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> zipFunction) {
3292-
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, zipFunction));
3296+
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7 }).lift(new OperatorZip<R>(zipFunction));
32933297
}
32943298

32953299
/**
@@ -3329,7 +3333,7 @@ public final static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> zip(Observable
33293333
*/
33303334
public final static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
33313335
Func8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> zipFunction) {
3332-
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, zipFunction));
3336+
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7, o8 }).lift(new OperatorZip<R>(zipFunction));
33333337
}
33343338

33353339
/**
@@ -3371,7 +3375,7 @@ public final static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> zip(Observ
33713375
*/
33723376
public final static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
33733377
Observable<? extends T9> o9, Func9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipFunction) {
3374-
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, o9, zipFunction));
3378+
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7, o8, o9 }).lift(new OperatorZip<R>(zipFunction));
33753379
}
33763380

33773381
/**
@@ -8403,7 +8407,9 @@ public final <U> Observable<Observable<T>> window(Observable<U> boundary) {
84038407
* @return an Observable that pairs up values from the source Observable and the {@code other} Iterable sequence and emits the results of {@code zipFunction} applied to these pairs
84048408
*/
84058409
public final <T2, R> Observable<R> zip(Iterable<? extends T2> other, Func2<? super T, ? super T2, ? extends R> zipFunction) {
8406-
return create(OperationZip.zipIterable(this, other, zipFunction));
8410+
// return create(OperatorZip.zipIterable(this, other, zipFunction));
8411+
// TODO fix this
8412+
return null;
84078413
}
84088414

84098415
/**

rxjava-core/src/main/java/rx/observers/TestObserver.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.observers;
1717

1818
import java.util.ArrayList;
19+
import java.util.Arrays;
1920
import java.util.Collections;
2021
import java.util.List;
2122

@@ -73,6 +74,14 @@ public List<T> getOnNextEvents() {
7374
return Collections.unmodifiableList(onNextEvents);
7475
}
7576

77+
public List<Object> getEvents() {
78+
ArrayList<Object> events = new ArrayList<Object>();
79+
events.add(onNextEvents);
80+
events.add(onErrorEvents);
81+
events.add(onCompletedEvents);
82+
return Collections.unmodifiableList(events);
83+
}
84+
7685
public void assertReceivedOnNext(List<T> items) {
7786
if (onNextEvents.size() != items.size()) {
7887
throw new AssertionError("Number of items does not match. Provided: " + items.size() + " Actual: " + onNextEvents.size());
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.ConcurrentLinkedQueue;
19+
import java.util.concurrent.atomic.AtomicLong;
20+
21+
import rx.Notification;
22+
import rx.Observable;
23+
import rx.Observer;
24+
import rx.Subscriber;
25+
import rx.subscriptions.CompositeSubscription;
26+
import rx.util.functions.Func2;
27+
import rx.util.functions.Func3;
28+
import rx.util.functions.Func4;
29+
import rx.util.functions.Func5;
30+
import rx.util.functions.Func6;
31+
import rx.util.functions.Func7;
32+
import rx.util.functions.Func8;
33+
import rx.util.functions.Func9;
34+
import rx.util.functions.FuncN;
35+
import rx.util.functions.Functions;
36+
37+
/**
38+
* Returns an Observable that emits the results of a function applied to sets of items emitted, in
39+
* sequence, by two or more other Observables.
40+
* <p>
41+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/zip.png">
42+
* <p>
43+
* The zip operation applies this function in strict sequence, so the first item emitted by the new
44+
* Observable will be the result of the function applied to the first item emitted by each zipped
45+
* Observable; the second item emitted by the new Observable will be the result of the function
46+
* applied to the second item emitted by each zipped Observable; and so forth.
47+
* <p>
48+
* The resulting Observable returned from zip will invoke <code>onNext</code> as many times as the
49+
* number of <code>onNext</code> invocations of the source Observable that emits the fewest items.
50+
*/
51+
public final class OperatorZip<R> implements Operator<R, Observable<?>[]> {
52+
/*
53+
* Raw types are used so we can use a single implementation for all arities such as zip(t1, t2) and zip(t1, t2, t3) etc.
54+
* The types will be cast on the edges so usage will be the type-safe but the internals are not.
55+
*/
56+
57+
final FuncN<? extends R> zipFunction;
58+
59+
public OperatorZip(FuncN<? extends R> f) {
60+
this.zipFunction = f;
61+
}
62+
63+
@SuppressWarnings({ "unchecked", "rawtypes" })
64+
public OperatorZip(Func2 f) {
65+
this.zipFunction = Functions.fromFunc(f);
66+
}
67+
68+
@SuppressWarnings({ "unchecked", "rawtypes" })
69+
public OperatorZip(Func3 f) {
70+
this.zipFunction = Functions.fromFunc(f);
71+
}
72+
73+
@SuppressWarnings({ "unchecked", "rawtypes" })
74+
public OperatorZip(Func4 f) {
75+
this.zipFunction = Functions.fromFunc(f);
76+
}
77+
78+
@SuppressWarnings({ "unchecked", "rawtypes" })
79+
public OperatorZip(Func5 f) {
80+
this.zipFunction = Functions.fromFunc(f);
81+
}
82+
83+
@SuppressWarnings({ "unchecked", "rawtypes" })
84+
public OperatorZip(Func6 f) {
85+
this.zipFunction = Functions.fromFunc(f);
86+
}
87+
88+
@SuppressWarnings({ "unchecked", "rawtypes" })
89+
public OperatorZip(Func7 f) {
90+
this.zipFunction = Functions.fromFunc(f);
91+
}
92+
93+
@SuppressWarnings({ "unchecked", "rawtypes" })
94+
public OperatorZip(Func8 f) {
95+
this.zipFunction = Functions.fromFunc(f);
96+
}
97+
98+
@SuppressWarnings({ "unchecked", "rawtypes" })
99+
public OperatorZip(Func9 f) {
100+
this.zipFunction = Functions.fromFunc(f);
101+
}
102+
103+
@SuppressWarnings("rawtypes")
104+
@Override
105+
public Subscriber<? super Observable[]> call(final Subscriber<? super R> observer) {
106+
return new Subscriber<Observable[]>(observer) {
107+
108+
@Override
109+
public void onCompleted() {
110+
// we only complete once a child Observable completes or errors
111+
}
112+
113+
@Override
114+
public void onError(Throwable e) {
115+
observer.onError(e);
116+
}
117+
118+
@Override
119+
public void onNext(Observable[] observables) {
120+
new Zip<R>(observables, observer, zipFunction).zip();
121+
}
122+
123+
};
124+
}
125+
126+
private static class Zip<R> {
127+
@SuppressWarnings("rawtypes")
128+
final Observable[] os;
129+
final Object[] observers;
130+
final Observer<? super R> observer;
131+
final FuncN<? extends R> zipFunction;
132+
final CompositeSubscription childSubscription = new CompositeSubscription();
133+
134+
@SuppressWarnings("rawtypes")
135+
public Zip(Observable[] os, final Subscriber<? super R> observer, FuncN<? extends R> zipFunction) {
136+
this.os = os;
137+
this.observer = observer;
138+
this.zipFunction = zipFunction;
139+
observers = new Object[os.length];
140+
for (int i = 0; i < os.length; i++) {
141+
InnerObserver io = new InnerObserver();
142+
observers[i] = io;
143+
childSubscription.add(io);
144+
}
145+
146+
observer.add(childSubscription);
147+
}
148+
149+
@SuppressWarnings("unchecked")
150+
public void zip() {
151+
for (int i = 0; i < os.length; i++) {
152+
os[i].subscribe((InnerObserver) observers[i]);
153+
}
154+
}
155+
156+
final AtomicLong counter = new AtomicLong(0);
157+
158+
/**
159+
* check if we have values for each and emit if we do
160+
*
161+
* This will only allow one thread at a time to do the work, but ensures via `counter` increment/decrement
162+
* that there is always once who acts on each `tick`. Same concept as used in OperationObserveOn.
163+
*
164+
*/
165+
@SuppressWarnings("unchecked")
166+
void tick() {
167+
if (counter.getAndIncrement() == 0) {
168+
do {
169+
Object[] vs = new Object[observers.length];
170+
boolean allHaveValues = true;
171+
for (int i = 0; i < observers.length; i++) {
172+
vs[i] = ((InnerObserver) observers[i]).items.peek();
173+
if (vs[i] instanceof Notification) {
174+
observer.onCompleted();
175+
// we need to unsubscribe from all children since children are independently subscribed
176+
childSubscription.unsubscribe();
177+
return;
178+
}
179+
if (vs[i] == null) {
180+
allHaveValues = false;
181+
// we continue as there may be an onCompleted on one of the others
182+
continue;
183+
}
184+
}
185+
if (allHaveValues) {
186+
// all have something so emit
187+
observer.onNext(zipFunction.call(vs));
188+
// now remove them
189+
for (int i = 0; i < observers.length; i++) {
190+
((InnerObserver) observers[i]).items.poll();
191+
// eagerly check if the next item on this queue is an onComplete
192+
if (((InnerObserver) observers[i]).items.peek() instanceof Notification) {
193+
// it is an onComplete so shut down
194+
observer.onCompleted();
195+
// we need to unsubscribe from all children since children are independently subscribed
196+
childSubscription.unsubscribe();
197+
return;
198+
}
199+
}
200+
}
201+
} while (counter.decrementAndGet() > 0);
202+
}
203+
204+
}
205+
206+
// used to observe each Observable we are zipping together
207+
// it collects all items in an internal queue
208+
@SuppressWarnings("rawtypes")
209+
final class InnerObserver extends Subscriber {
210+
// Concurrent* since we need to read it from across threads
211+
final ConcurrentLinkedQueue items = new ConcurrentLinkedQueue();
212+
213+
@SuppressWarnings("unchecked")
214+
@Override
215+
public void onCompleted() {
216+
items.add(Notification.createOnCompleted());
217+
tick();
218+
}
219+
220+
@Override
221+
public void onError(Throwable e) {
222+
// emit error and shut down
223+
observer.onError(e);
224+
}
225+
226+
@SuppressWarnings("unchecked")
227+
@Override
228+
public void onNext(Object t) {
229+
// TODO use a placeholder for NULL, such as Notification<T>(null)
230+
items.add(t);
231+
tick();
232+
}
233+
};
234+
}
235+
236+
}

0 commit comments

Comments
 (0)