Skip to content

Commit 5a35bb2

Browse files
committed
Add zip of a collection of Observables
1 parent 0da939b commit 5a35bb2

File tree

2 files changed

+66
-0
lines changed

2 files changed

+66
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.ArrayList;
2323
import java.util.Arrays;
24+
import java.util.Collection;
2425
import java.util.List;
2526
import java.util.Map;
2627
import java.util.concurrent.CountDownLatch;
@@ -2630,6 +2631,60 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) {
26302631
});
26312632
}
26322633

2634+
/**
2635+
* Returns an Observable that emits the results of a function of your choosing applied to
2636+
* combinations of four items emitted, in sequence, by four other Observables.
2637+
* <p>
2638+
* <code>zip</code> applies this function in strict sequence, so the first item emitted by the
2639+
* new Observable will be the result of the function applied to the first item emitted by
2640+
* all of the Observalbes; the second item emitted by the new Observable will be the result of
2641+
* the function applied to the second item emitted by each of those Observables; and so forth.
2642+
* <p>
2643+
* The resulting <code>Observable<R></code> returned from <code>zip</code> will invoke
2644+
* <code>onNext</code> as many times as the number of <code>onNext</code> invokations of the
2645+
* source Observable that emits the fewest items.
2646+
* <p>
2647+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
2648+
*
2649+
* @param ws
2650+
* A collection of source Observable
2651+
* @param reduceFunction
2652+
* a function that, when applied to an item emitted by each of the source
2653+
* Observables, results in an item that will be emitted by the resulting Observable
2654+
* @return an Observable that emits the zipped results
2655+
*/
2656+
public static <R> Observable<R> zip(Collection<Observable<?>> ws, FuncN<R> reduceFunction) {
2657+
return create(OperationZip.zip(ws, reduceFunction));
2658+
}
2659+
2660+
/**
2661+
* Returns an Observable that emits the results of a function of your choosing applied to
2662+
* combinations of four items emitted, in sequence, by four other Observables.
2663+
* <p>
2664+
* <code>zip</code> applies this function in strict sequence, so the first item emitted by the
2665+
* new Observable will be the result of the function applied to the first item emitted by
2666+
* all of the Observalbes; the second item emitted by the new Observable will be the result of
2667+
* the function applied to the second item emitted by each of those Observables; and so forth.
2668+
* <p>
2669+
* The resulting <code>Observable<R></code> returned from <code>zip</code> will invoke
2670+
* <code>onNext</code> as many times as the number of <code>onNext</code> invocations of the
2671+
* source Observable that emits the fewest items.
2672+
* <p>
2673+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
2674+
*
2675+
* @param ws
2676+
* A collection of source Observable
2677+
* @param function
2678+
* a function that, when applied to an item emitted by each of the source
2679+
* Observables, results in an item that will be emitted by the resulting Observable
2680+
* @return an Observable that emits the zipped results
2681+
*/
2682+
public static <R> Observable<R> zip(Collection<Observable<?>> ws, final Object function) {
2683+
@SuppressWarnings({ "unchecked" })
2684+
final FuncN<R> _f = Functions.from(function);
2685+
return zip(ws, _f);
2686+
}
2687+
26332688
/**
26342689
* Filters an Observable by discarding any items it emits that do not meet some test.
26352690
* <p>

rxjava-core/src/main/java/rx/operators/OperationZip.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.mockito.Mockito.*;
2020

2121
import java.util.Arrays;
22+
import java.util.Collection;
2223
import java.util.concurrent.ConcurrentHashMap;
2324
import java.util.concurrent.ConcurrentLinkedQueue;
2425
import java.util.concurrent.atomic.AtomicBoolean;
@@ -63,6 +64,16 @@ public static <T0, T1, T2, T3, R> Func1<Observer<R>, Subscription> zip(Observabl
6364
return a;
6465
}
6566

67+
@SuppressWarnings("unchecked")
68+
public static <R> Func1<Observer<R>, Subscription> zip(Collection<Observable<?>> ws, FuncN<R> zipFunction) {
69+
Aggregator a = new Aggregator(zipFunction);
70+
for (Observable<?> w : ws) {
71+
ZipObserver zipObserver = new ZipObserver(a, w);
72+
a.addObserver(zipObserver);
73+
}
74+
return a;
75+
}
76+
6677
/*
6778
* ThreadSafe
6879
*/

0 commit comments

Comments
 (0)