Skip to content

Commit c9f2c13

Browse files
Merge pull request #290 from abersnaze/issue103
Issue 103
2 parents ea84006 + ea16550 commit c9f2c13

File tree

2 files changed

+142
-0
lines changed

2 files changed

+142
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.ArrayList;
2323
import java.util.Arrays;
24+
import java.util.Collection;
2425
import java.util.List;
2526
import java.util.Map;
2627
import java.util.concurrent.CountDownLatch;
@@ -2688,6 +2689,119 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) {
26882689
});
26892690
}
26902691

2692+
/**
2693+
* Returns an Observable that emits the results of a function of your choosing applied to
2694+
* combinations of four items emitted, in sequence, by four other Observables.
2695+
* <p>
2696+
* <code>zip</code> applies this function in strict sequence, so the first item emitted by the
2697+
* new Observable will be the result of the function applied to the first item emitted by
2698+
* all of the Observalbes; the second item emitted by the new Observable will be the result of
2699+
* the function applied to the second item emitted by each of those Observables; and so forth.
2700+
* <p>
2701+
* The resulting <code>Observable<R></code> returned from <code>zip</code> will invoke
2702+
* <code>onNext</code> as many times as the number of <code>onNext</code> invokations of the
2703+
* source Observable that emits the fewest items.
2704+
* <p>
2705+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
2706+
*
2707+
* @param ws
2708+
* An Observable of source Observables
2709+
* @param reduceFunction
2710+
* a function that, when applied to an item emitted by each of the source
2711+
* Observables, results in an item that will be emitted by the resulting Observable
2712+
* @return an Observable that emits the zipped results
2713+
*/
2714+
public static <R> Observable<R> zip(Observable<Observable<?>> ws, final FuncN<R> reduceFunction) {
2715+
return ws.toList().mapMany(new Func1<List<Observable<?>>, Observable<R>>() {
2716+
@Override
2717+
public Observable<R> call(List<Observable<?>> wsList) {
2718+
return create(OperationZip.zip(wsList, reduceFunction));
2719+
}
2720+
});
2721+
}
2722+
2723+
/**
2724+
* Returns an Observable that emits the results of a function of your choosing applied to
2725+
* combinations of four items emitted, in sequence, by four other Observables.
2726+
* <p>
2727+
* <code>zip</code> applies this function in strict sequence, so the first item emitted by the
2728+
* new Observable will be the result of the function applied to the first item emitted by
2729+
* all of the Observalbes; the second item emitted by the new Observable will be the result of
2730+
* the function applied to the second item emitted by each of those Observables; and so forth.
2731+
* <p>
2732+
* The resulting <code>Observable<R></code> returned from <code>zip</code> will invoke
2733+
* <code>onNext</code> as many times as the number of <code>onNext</code> invocations of the
2734+
* source Observable that emits the fewest items.
2735+
* <p>
2736+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
2737+
*
2738+
* @param ws
2739+
* An Observable of source Observables
2740+
* @param function
2741+
* a function that, when applied to an item emitted by each of the source
2742+
* Observables, results in an item that will be emitted by the resulting Observable
2743+
* @return an Observable that emits the zipped results
2744+
*/
2745+
public static <R> Observable<R> zip(Observable<Observable<?>> ws, final Object function) {
2746+
@SuppressWarnings({ "unchecked" })
2747+
final FuncN<R> _f = Functions.from(function);
2748+
return zip(ws, _f);
2749+
}
2750+
2751+
/**
2752+
* Returns an Observable that emits the results of a function of your choosing applied to
2753+
* combinations of four items emitted, in sequence, by four other Observables.
2754+
* <p>
2755+
* <code>zip</code> applies this function in strict sequence, so the first item emitted by the
2756+
* new Observable will be the result of the function applied to the first item emitted by
2757+
* all of the Observalbes; the second item emitted by the new Observable will be the result of
2758+
* the function applied to the second item emitted by each of those Observables; and so forth.
2759+
* <p>
2760+
* The resulting <code>Observable<R></code> returned from <code>zip</code> will invoke
2761+
* <code>onNext</code> as many times as the number of <code>onNext</code> invokations of the
2762+
* source Observable that emits the fewest items.
2763+
* <p>
2764+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
2765+
*
2766+
* @param ws
2767+
* A collection of source Observables
2768+
* @param reduceFunction
2769+
* a function that, when applied to an item emitted by each of the source
2770+
* Observables, results in an item that will be emitted by the resulting Observable
2771+
* @return an Observable that emits the zipped results
2772+
*/
2773+
public static <R> Observable<R> zip(Collection<Observable<?>> ws, FuncN<R> reduceFunction) {
2774+
return create(OperationZip.zip(ws, reduceFunction));
2775+
}
2776+
2777+
/**
2778+
* Returns an Observable that emits the results of a function of your choosing applied to
2779+
* combinations of four items emitted, in sequence, by four other Observables.
2780+
* <p>
2781+
* <code>zip</code> applies this function in strict sequence, so the first item emitted by the
2782+
* new Observable will be the result of the function applied to the first item emitted by
2783+
* all of the Observalbes; the second item emitted by the new Observable will be the result of
2784+
* the function applied to the second item emitted by each of those Observables; and so forth.
2785+
* <p>
2786+
* The resulting <code>Observable<R></code> returned from <code>zip</code> will invoke
2787+
* <code>onNext</code> as many times as the number of <code>onNext</code> invocations of the
2788+
* source Observable that emits the fewest items.
2789+
* <p>
2790+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
2791+
*
2792+
* @param ws
2793+
* A collection of source Observables
2794+
* @param function
2795+
* a function that, when applied to an item emitted by each of the source
2796+
* Observables, results in an item that will be emitted by the resulting Observable
2797+
* @return an Observable that emits the zipped results
2798+
*/
2799+
public static <R> Observable<R> zip(Collection<Observable<?>> ws, final Object function) {
2800+
@SuppressWarnings({ "unchecked" })
2801+
final FuncN<R> _f = Functions.from(function);
2802+
return zip(ws, _f);
2803+
}
2804+
26912805
/**
26922806
* Filters an Observable by discarding any items it emits that do not meet some test.
26932807
* <p>

rxjava-core/src/main/java/rx/operators/OperationZip.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.mockito.Mockito.*;
2020

2121
import java.util.Arrays;
22+
import java.util.Collection;
2223
import java.util.concurrent.ConcurrentHashMap;
2324
import java.util.concurrent.ConcurrentLinkedQueue;
2425
import java.util.concurrent.atomic.AtomicBoolean;
@@ -77,6 +78,16 @@ public static <T0, T1, T2, T3, R> Func1<Observer<R>, Subscription> zip(Observabl
7778
return a;
7879
}
7980

81+
@SuppressWarnings("unchecked")
82+
public static <R> Func1<Observer<R>, Subscription> zip(Collection<Observable<?>> ws, FuncN<R> zipFunction) {
83+
Aggregator a = new Aggregator(zipFunction);
84+
for (Observable w : ws) {
85+
ZipObserver zipObserver = new ZipObserver(a, w);
86+
a.addObserver(zipObserver);
87+
}
88+
return a;
89+
}
90+
8091
/*
8192
* ThreadSafe
8293
*/
@@ -284,6 +295,23 @@ private void stop() {
284295
}
285296

286297
public static class UnitTest {
298+
299+
@SuppressWarnings("unchecked")
300+
@Test
301+
public void testCollectionSizeDifferentThanFunction() {
302+
FuncN<String> zipr = Functions.from(getConcatStringIntegerIntArrayZipr());
303+
304+
/* define a Observer to receive aggregated events */
305+
Observer<String> aObserver = mock(Observer.class);
306+
307+
Collection ws = java.util.Collections.singleton(Observable.from("one", "two"));
308+
Observable<String> w = Observable.create(zip(ws, zipr));
309+
w.subscribe(aObserver);
310+
311+
verify(aObserver, times(1)).onError(any(Exception.class));
312+
verify(aObserver, never()).onCompleted();
313+
verify(aObserver, never()).onNext(any(String.class));
314+
}
287315

288316
@SuppressWarnings("unchecked")
289317
/* mock calls don't do generics */

0 commit comments

Comments
 (0)