Skip to content

Commit a3a7e8a

Browse files
Merge branch 'OperatorCombineLatest' of github.com:akarnokd/RxJava into merge-prs
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents 923ea73 + 817fb3a commit a3a7e8a

File tree

5 files changed

+1054
-815
lines changed

5 files changed

+1054
-815
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import rx.observers.SafeSubscriber;
4949
import rx.operators.OnSubscribeFromIterable;
5050
import rx.operators.OnSubscribeRange;
51-
import rx.operators.OperationCombineLatest;
5251
import rx.operators.OperationDebounce;
5352
import rx.operators.OperationDefaultIfEmpty;
5453
import rx.operators.OperationDefer;
@@ -97,6 +96,7 @@
9796
import rx.operators.OperatorBufferWithTime;
9897
import rx.operators.OperatorCache;
9998
import rx.operators.OperatorCast;
99+
import rx.operators.OperatorCombineLatest;
100100
import rx.operators.OperatorConcat;
101101
import rx.operators.OperatorDoOnEach;
102102
import rx.operators.OperatorElementAt;
@@ -516,8 +516,8 @@ public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable
516516
* Observables by means of the given aggregation function
517517
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
518518
*/
519-
public final static <T1, T2, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction) {
520-
return create(OperationCombineLatest.combineLatest(o1, o2, combineFunction));
519+
public static final <T1, T2, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction) {
520+
return combineLatest(Arrays.asList(o1, o2), Functions.fromFunc(combineFunction));
521521
}
522522

523523
/**
@@ -539,8 +539,8 @@ public final static <T1, T2, R> Observable<R> combineLatest(Observable<? extends
539539
* Observables by means of the given aggregation function
540540
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
541541
*/
542-
public final static <T1, T2, T3, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
543-
return create(OperationCombineLatest.combineLatest(o1, o2, o3, combineFunction));
542+
public static final <T1, T2, T3, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
543+
return combineLatest(Arrays.asList(o1, o2, o3), Functions.fromFunc(combineFunction));
544544
}
545545

546546
/**
@@ -564,9 +564,9 @@ public final static <T1, T2, T3, R> Observable<R> combineLatest(Observable<? ext
564564
* Observables by means of the given aggregation function
565565
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
566566
*/
567-
public final static <T1, T2, T3, T4, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4,
567+
public static final <T1, T2, T3, T4, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4,
568568
Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> combineFunction) {
569-
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, combineFunction));
569+
return combineLatest(Arrays.asList(o1, o2, o3, o4), Functions.fromFunc(combineFunction));
570570
}
571571

572572
/**
@@ -592,9 +592,9 @@ public final static <T1, T2, T3, T4, R> Observable<R> combineLatest(Observable<?
592592
* Observables by means of the given aggregation function
593593
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
594594
*/
595-
public final static <T1, T2, T3, T4, T5, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5,
595+
public static final <T1, T2, T3, T4, T5, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5,
596596
Func5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> combineFunction) {
597-
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, combineFunction));
597+
return combineLatest(Arrays.asList(o1, o2, o3, o4, o5), Functions.fromFunc(combineFunction));
598598
}
599599

600600
/**
@@ -622,9 +622,9 @@ public final static <T1, T2, T3, T4, T5, R> Observable<R> combineLatest(Observab
622622
* Observables by means of the given aggregation function
623623
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
624624
*/
625-
public final static <T1, T2, T3, T4, T5, T6, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6,
625+
public static final <T1, T2, T3, T4, T5, T6, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6,
626626
Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> combineFunction) {
627-
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, combineFunction));
627+
return combineLatest(Arrays.asList(o1, o2, o3, o4, o5, o6), Functions.fromFunc(combineFunction));
628628
}
629629

630630
/**
@@ -654,9 +654,9 @@ public final static <T1, T2, T3, T4, T5, T6, R> Observable<R> combineLatest(Obse
654654
* Observables by means of the given aggregation function
655655
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
656656
*/
657-
public final static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7,
657+
public static final <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7,
658658
Func7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> combineFunction) {
659-
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, combineFunction));
659+
return combineLatest(Arrays.asList(o1, o2, o3, o4, o5, o6, o7), Functions.fromFunc(combineFunction));
660660
}
661661

662662
/**
@@ -688,9 +688,9 @@ public final static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> combineLatest(
688688
* Observables by means of the given aggregation function
689689
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
690690
*/
691-
public final static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
691+
public static final <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
692692
Func8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> combineFunction) {
693-
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, o8, combineFunction));
693+
return combineLatest(Arrays.asList(o1, o2, o3, o4, o5, o6, o7, o8), Functions.fromFunc(combineFunction));
694694
}
695695

696696
/**
@@ -724,10 +724,26 @@ public final static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> combineLat
724724
* Observables by means of the given aggregation function
725725
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
726726
*/
727-
public final static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
727+
public static final <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
728728
Observable<? extends T9> o9,
729729
Func9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> combineFunction) {
730-
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, o8, o9, combineFunction));
730+
return combineLatest(Arrays.asList(o1, o2, o3, o4, o5, o6, o7, o8, o9), Functions.fromFunc(combineFunction));
731+
}
732+
/**
733+
* Combines nine source Observables by emitting an item that aggregates the latest values of each of the
734+
* source Observables each time an item is received from any of the source Observables, where this
735+
* aggregation is defined by a specified function.
736+
* @param <T> the common base type of source values
737+
* @param <R> the result type
738+
* @param sources the list of observable sources
739+
* @param combineFunction
740+
* the aggregation function used to combine the items emitted by the source Observables
741+
* @return an Observable that emits items that are the result of combining the items emitted by the source
742+
* Observables by means of the given aggregation function
743+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
744+
*/
745+
public static final <T, R> Observable<R> combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) {
746+
return create(new OperatorCombineLatest<T, R>(sources, combineFunction));
731747
}
732748

733749
/**

0 commit comments

Comments
 (0)