Skip to content

Commit 1500859

Browse files
author
jmhofer
committed
improved generics for combineLatest (PECS principle)
1 parent d5259ca commit 1500859

File tree

2 files changed

+17
-17
lines changed

2 files changed

+17
-17
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2749,21 +2749,21 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) {
27492749
* The aggregation function used to combine the source observable values.
27502750
* @return A function from an observer to a subscription. This can be used to create an observable from.
27512751
*/
2752-
public static <R, T0, T1> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> combineFunction) {
2752+
public static <R, T0, T1> Observable<R> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Func2<? super T0, ? super T1, ? extends R> combineFunction) {
27532753
return create(OperationCombineLatest.combineLatest(w0, w1, combineFunction));
27542754
}
27552755

27562756
/**
27572757
* @see #combineLatest(Observable, Observable, Func2)
27582758
*/
2759-
public static <R, T0, T1, T2> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Func3<T0, T1, T2, R> combineFunction) {
2759+
public static <R, T0, T1, T2> Observable<R> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Observable<? super T2> w2, Func3<? super T0, ? super T1, ? super T2, ? extends R> combineFunction) {
27602760
return create(OperationCombineLatest.combineLatest(w0, w1, w2, combineFunction));
27612761
}
27622762

27632763
/**
27642764
* @see #combineLatest(Observable, Observable, Func2)
27652765
*/
2766-
public static <R, T0, T1, T2, T3> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<T0, T1, T2, T3, R> combineFunction) {
2766+
public static <R, T0, T1, T2, T3> Observable<R> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Observable<? super T2> w2, Observable<? super T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
27672767
return create(OperationCombineLatest.combineLatest(w0, w1, w2, w3, combineFunction));
27682768
}
27692769

rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class OperationCombineLatest {
5454
* The aggregation function used to combine the source observable values.
5555
* @return A function from an observer to a subscription. This can be used to create an observable from.
5656
*/
57-
public static <T0, T1, R> Func1<Observer<R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> combineLatestFunction) {
57+
public static <T0, T1, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Func2<? super T0, ? super T1, ? extends R> combineLatestFunction) {
5858
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
5959
a.addObserver(new CombineObserver<R, T0>(a, w0));
6060
a.addObserver(new CombineObserver<R, T1>(a, w1));
@@ -64,7 +64,7 @@ public static <T0, T1, R> Func1<Observer<R>, Subscription> combineLatest(Observa
6464
/**
6565
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
6666
*/
67-
public static <T0, T1, T2, R> Func1<Observer<R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Func3<T0, T1, T2, R> combineLatestFunction) {
67+
public static <T0, T1, T2, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Observable<? super T2> w2, Func3<? super T0, ? super T1, ? super T2, ? extends R> combineLatestFunction) {
6868
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
6969
a.addObserver(new CombineObserver<R, T0>(a, w0));
7070
a.addObserver(new CombineObserver<R, T1>(a, w1));
@@ -75,7 +75,7 @@ public static <T0, T1, T2, R> Func1<Observer<R>, Subscription> combineLatest(Obs
7575
/**
7676
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
7777
*/
78-
public static <T0, T1, T2, T3, R> Func1<Observer<R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<T0, T1, T2, T3, R> combineLatestFunction) {
78+
public static <T0, T1, T2, T3, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Observable<? super T2> w2, Observable<? super T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> combineLatestFunction) {
7979
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
8080
a.addObserver(new CombineObserver<R, T0>(a, w0));
8181
a.addObserver(new CombineObserver<R, T1>(a, w1));
@@ -85,11 +85,11 @@ public static <T0, T1, T2, T3, R> Func1<Observer<R>, Subscription> combineLatest
8585
}
8686

8787
private static class CombineObserver<R, T> implements Observer<T> {
88-
final Observable<T> w;
89-
final Aggregator<R> a;
88+
final Observable<? super T> w;
89+
final Aggregator<? super R> a;
9090
private Subscription subscription;
9191

92-
public CombineObserver(Aggregator<R> a, Observable<T> w) {
92+
public CombineObserver(Aggregator<? super R> a, Observable<? super T> w) {
9393
this.a = a;
9494
this.w = w;
9595
}
@@ -122,11 +122,11 @@ public void onNext(T args) {
122122
* whenever we have received an event from one of the observables, as soon as each Observable has received
123123
* at least one event.
124124
*/
125-
private static class Aggregator<R> implements Func1<Observer<R>, Subscription> {
125+
private static class Aggregator<R> implements Func1<Observer<? super R>, Subscription> {
126126

127-
private Observer<R> observer;
127+
private Observer<? super R> observer;
128128

129-
private final FuncN<R> combineLatestFunction;
129+
private final FuncN<? extends R> combineLatestFunction;
130130
private final AtomicBoolean running = new AtomicBoolean(true);
131131

132132
// Stores how many observers have already completed
@@ -135,15 +135,15 @@ private static class Aggregator<R> implements Func1<Observer<R>, Subscription> {
135135
/**
136136
* The latest value from each observer.
137137
*/
138-
private final Map<CombineObserver<R, ?>, Object> latestValue = new ConcurrentHashMap<CombineObserver<R, ?>, Object>();
138+
private final Map<CombineObserver<? extends R, ?>, Object> latestValue = new ConcurrentHashMap<CombineObserver<? extends R, ?>, Object>();
139139

140140
/**
141141
* Ordered list of observers to combine.
142142
* No synchronization is necessary as these can not be added or changed asynchronously.
143143
*/
144144
private final List<CombineObserver<R, ?>> observers = new LinkedList<CombineObserver<R, ?>>();
145145

146-
public Aggregator(FuncN<R> combineLatestFunction) {
146+
public Aggregator(FuncN<? extends R> combineLatestFunction) {
147147
this.combineLatestFunction = combineLatestFunction;
148148
}
149149

@@ -161,7 +161,7 @@ <T> void addObserver(CombineObserver<R, T> w) {
161161
*
162162
* @param w The observer that has completed.
163163
*/
164-
<T> void complete(CombineObserver<R, T> w) {
164+
<T> void complete(CombineObserver<? extends R, ? super T> w) {
165165
int completed = numCompleted.incrementAndGet();
166166
// if all CombineObservers are completed, we mark the whole thing as completed
167167
if (completed == observers.size()) {
@@ -191,7 +191,7 @@ void error(Exception e) {
191191
* @param w
192192
* @param arg
193193
*/
194-
<T> void next(CombineObserver<R, T> w, T arg) {
194+
<T> void next(CombineObserver<? extends R, ? super T> w, T arg) {
195195
if (observer == null) {
196196
throw new RuntimeException("This shouldn't be running if an Observer isn't registered");
197197
}
@@ -224,7 +224,7 @@ <T> void next(CombineObserver<R, T> w, T arg) {
224224
}
225225

226226
@Override
227-
public Subscription call(Observer<R> observer) {
227+
public Subscription call(Observer<? super R> observer) {
228228
if (this.observer != null) {
229229
throw new IllegalStateException("Only one Observer can subscribe to this Observable.");
230230
}

0 commit comments

Comments
 (0)