Skip to content

Commit cf28bce

Browse files
OperatorZipIterable
1 parent 9b3fca1 commit cf28bce

File tree

4 files changed

+413
-262
lines changed

4 files changed

+413
-262
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import rx.operators.OperatorTimestamp;
109109
import rx.operators.OperatorToObservableList;
110110
import rx.operators.OperatorToObservableSortedList;
111+
import rx.operators.OperatorZipIterable;
111112
import rx.plugins.RxJavaObservableExecutionHook;
112113
import rx.plugins.RxJavaPlugins;
113114
import rx.schedulers.Schedulers;
@@ -8407,9 +8408,7 @@ public final <U> Observable<Observable<T>> window(Observable<U> boundary) {
84078408
* @return an Observable that pairs up values from the source Observable and the {@code other} Iterable sequence and emits the results of {@code zipFunction} applied to these pairs
84088409
*/
84098410
public final <T2, R> Observable<R> zip(Iterable<? extends T2> other, Func2<? super T, ? super T2, ? extends R> zipFunction) {
8410-
// return create(OperatorZip.zipIterable(this, other, zipFunction));
8411-
// TODO fix this
8412-
return null;
8411+
return lift(new OperatorZipIterable<T, T2, R>(other, zipFunction));
84138412
}
84148413

84158414
/**
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.Iterator;
19+
20+
import rx.Subscriber;
21+
import rx.observers.Subscribers;
22+
import rx.util.functions.Func2;
23+
24+
public final class OperatorZipIterable<T1, T2, R> implements Operator<R, T1> {
25+
26+
final Iterable<? extends T2> iterable;
27+
final Func2<? super T1, ? super T2, ? extends R> zipFunction;
28+
29+
public OperatorZipIterable(Iterable<? extends T2> iterable, Func2<? super T1, ? super T2, ? extends R> zipFunction) {
30+
this.iterable = iterable;
31+
this.zipFunction = zipFunction;
32+
}
33+
34+
@Override
35+
public Subscriber<? super T1> call(final Subscriber<? super R> subscriber) {
36+
final Iterator<? extends T2> iterator = iterable.iterator();
37+
try {
38+
if (!iterator.hasNext()) {
39+
subscriber.onCompleted();
40+
return Subscribers.create();
41+
}
42+
} catch (Throwable e) {
43+
subscriber.onError(e);
44+
}
45+
return new Subscriber<T1>() {
46+
47+
@Override
48+
public void onCompleted() {
49+
subscriber.onCompleted();
50+
}
51+
52+
@Override
53+
public void onError(Throwable e) {
54+
subscriber.onError(e);
55+
}
56+
57+
@Override
58+
public void onNext(T1 t) {
59+
try {
60+
subscriber.onNext(zipFunction.call(t, iterator.next()));
61+
if (!iterator.hasNext()) {
62+
onCompleted();
63+
}
64+
} catch (Throwable e) {
65+
onError(e);
66+
}
67+
}
68+
69+
};
70+
}
71+
72+
}

0 commit comments

Comments
 (0)