Skip to content

Commit eba4857

Browse files
author
jmhofer
committed
generalized everything in Observable that deals with covariance of observables
1 parent a127b06 commit eba4857

16 files changed

+243
-117
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/RxImplicits.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ class UnitTestSuite extends JUnitSuite {
248248

249249
@Test def testFlattenMerge {
250250
val observable = Observable.from(Observable.from(1, 2, 3))
251-
val merged = Observable.merge(observable)
251+
val merged = Observable.merge[Int](observable)
252252
assertSubscribeReceives(merged)(1, 2, 3)
253253
}
254254

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 69 additions & 28 deletions
Large diffs are not rendered by default.

rxjava-core/src/main/java/rx/operators/OperationBuffer.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public final class OperationBuffer {
6969
* @return
7070
* the {@link Func1} object representing the specified buffer operation.
7171
*/
72-
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(final Observable<T> source, final Func0<? extends Observable<BufferClosing>> bufferClosingSelector) {
72+
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(final Observable<? extends T> source, final Func0<? extends Observable<? extends BufferClosing>> bufferClosingSelector) {
7373
return new Func1<Observer<? super List<T>>, Subscription>() {
7474
@Override
7575
public Subscription call(final Observer<? super List<T>> observer) {
@@ -107,7 +107,7 @@ public Subscription call(final Observer<? super List<T>> observer) {
107107
* @return
108108
* the {@link Func1} object representing the specified buffer operation.
109109
*/
110-
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(final Observable<T> source, final Observable<BufferOpening> bufferOpenings, final Func1<? super BufferOpening, ? extends Observable<BufferClosing>> bufferClosingSelector) {
110+
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(final Observable<? extends T> source, final Observable<? extends BufferOpening> bufferOpenings, final Func1<? super BufferOpening, ? extends Observable<? extends BufferClosing>> bufferClosingSelector) {
111111
return new Func1<Observer<? super List<T>>, Subscription>() {
112112
@Override
113113
public Subscription call(final Observer<? super List<T>> observer) {
@@ -135,7 +135,7 @@ public Subscription call(final Observer<? super List<T>> observer) {
135135
* @return
136136
* the {@link Func1} object representing the specified buffer operation.
137137
*/
138-
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(Observable<T> source, int count) {
138+
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(Observable<? extends T> source, int count) {
139139
return buffer(source, count, count);
140140
}
141141

@@ -162,7 +162,7 @@ public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(Observab
162162
* @return
163163
* the {@link Func1} object representing the specified buffer operation.
164164
*/
165-
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(final Observable<T> source, final int count, final int skip) {
165+
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(final Observable<? extends T> source, final int count, final int skip) {
166166
return new Func1<Observer<? super List<T>>, Subscription>() {
167167
@Override
168168
public Subscription call(final Observer<? super List<T>> observer) {
@@ -192,7 +192,7 @@ public Subscription call(final Observer<? super List<T>> observer) {
192192
* @return
193193
* the {@link Func1} object representing the specified buffer operation.
194194
*/
195-
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(Observable<T> source, long timespan, TimeUnit unit) {
195+
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(Observable<? extends T> source, long timespan, TimeUnit unit) {
196196
return buffer(source, timespan, unit, Schedulers.threadPoolForComputation());
197197
}
198198

@@ -217,7 +217,7 @@ public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(Observab
217217
* @return
218218
* the {@link Func1} object representing the specified buffer operation.
219219
*/
220-
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(final Observable<T> source, final long timespan, final TimeUnit unit, final Scheduler scheduler) {
220+
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(final Observable<? extends T> source, final long timespan, final TimeUnit unit, final Scheduler scheduler) {
221221
return new Func1<Observer<? super List<T>>, Subscription>() {
222222
@Override
223223
public Subscription call(final Observer<? super List<T>> observer) {
@@ -250,7 +250,7 @@ public Subscription call(final Observer<? super List<T>> observer) {
250250
* @return
251251
* the {@link Func1} object representing the specified buffer operation.
252252
*/
253-
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(Observable<T> source, long timespan, TimeUnit unit, int count) {
253+
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(Observable<? extends T> source, long timespan, TimeUnit unit, int count) {
254254
return buffer(source, timespan, unit, count, Schedulers.threadPoolForComputation());
255255
}
256256

@@ -278,7 +278,7 @@ public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(Observab
278278
* @return
279279
* the {@link Func1} object representing the specified buffer operation.
280280
*/
281-
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(final Observable<T> source, final long timespan, final TimeUnit unit, final int count, final Scheduler scheduler) {
281+
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(final Observable<? extends T> source, final long timespan, final TimeUnit unit, final int count, final Scheduler scheduler) {
282282
return new Func1<Observer<? super List<T>>, Subscription>() {
283283
@Override
284284
public Subscription call(final Observer<? super List<T>> observer) {
@@ -311,7 +311,7 @@ public Subscription call(final Observer<? super List<T>> observer) {
311311
* @return
312312
* the {@link Func1} object representing the specified buffer operation.
313313
*/
314-
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(Observable<T> source, long timespan, long timeshift, TimeUnit unit) {
314+
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(Observable<? extends T> source, long timespan, long timeshift, TimeUnit unit) {
315315
return buffer(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation());
316316
}
317317

@@ -339,7 +339,7 @@ public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(Observab
339339
* @return
340340
* the {@link Func1} object representing the specified buffer operation.
341341
*/
342-
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(final Observable<T> source, final long timespan, final long timeshift, final TimeUnit unit, final Scheduler scheduler) {
342+
public static <T> Func1<Observer<? super List<T>>, Subscription> buffer(final Observable<? extends T> source, final long timespan, final long timeshift, final TimeUnit unit, final Scheduler scheduler) {
343343
return new Func1<Observer<? super List<T>>, Subscription>() {
344344
@Override
345345
public Subscription call(final Observer<? super List<T>> observer) {
@@ -444,10 +444,10 @@ public void stop() {
444444
private static class ObservableBasedSingleBufferCreator<T> implements BufferCreator<T> {
445445

446446
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
447-
private final Func0<? extends Observable<BufferClosing>> bufferClosingSelector;
447+
private final Func0<? extends Observable<? extends BufferClosing>> bufferClosingSelector;
448448
private final NonOverlappingBuffers<T> buffers;
449449

450-
public ObservableBasedSingleBufferCreator(NonOverlappingBuffers<T> buffers, Func0<? extends Observable<BufferClosing>> bufferClosingSelector) {
450+
public ObservableBasedSingleBufferCreator(NonOverlappingBuffers<T> buffers, Func0<? extends Observable<? extends BufferClosing>> bufferClosingSelector) {
451451
this.buffers = buffers;
452452
this.bufferClosingSelector = bufferClosingSelector;
453453

@@ -456,7 +456,7 @@ public ObservableBasedSingleBufferCreator(NonOverlappingBuffers<T> buffers, Func
456456
}
457457

458458
private void listenForBufferEnd() {
459-
Observable<BufferClosing> closingObservable = bufferClosingSelector.call();
459+
Observable<? extends BufferClosing> closingObservable = bufferClosingSelector.call();
460460
closingObservable.subscribe(new Action1<BufferClosing>() {
461461
@Override
462462
public void call(BufferClosing closing) {
@@ -489,12 +489,12 @@ private static class ObservableBasedMultiBufferCreator<T> implements BufferCreat
489489

490490
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
491491

492-
public ObservableBasedMultiBufferCreator(final OverlappingBuffers<T> buffers, Observable<BufferOpening> bufferOpenings, final Func1<? super BufferOpening, ? extends Observable<BufferClosing>> bufferClosingSelector) {
492+
public ObservableBasedMultiBufferCreator(final OverlappingBuffers<T> buffers, Observable<? extends BufferOpening> bufferOpenings, final Func1<? super BufferOpening, ? extends Observable<? extends BufferClosing>> bufferClosingSelector) {
493493
subscription.wrap(bufferOpenings.subscribe(new Action1<BufferOpening>() {
494494
@Override
495495
public void call(BufferOpening opening) {
496496
final Buffer<T> buffer = buffers.createBuffer();
497-
Observable<BufferClosing> closingObservable = bufferClosingSelector.call(opening);
497+
Observable<? extends BufferClosing> closingObservable = bufferClosingSelector.call(opening);
498498

499499
closingObservable.subscribe(new Action1<BufferClosing>() {
500500
@Override

rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@
3838
import rx.util.functions.Func2;
3939
import rx.util.functions.Func3;
4040
import rx.util.functions.Func4;
41+
import rx.util.functions.Func5;
42+
import rx.util.functions.Func6;
43+
import rx.util.functions.Func7;
44+
import rx.util.functions.Func8;
45+
import rx.util.functions.Func9;
4146
import rx.util.functions.FuncN;
4247
import rx.util.functions.Functions;
4348

@@ -62,7 +67,7 @@ public class OperationCombineLatest {
6267
* The aggregation function used to combine the source observable values.
6368
* @return A function from an observer to a subscription. This can be used to create an observable from.
6469
*/
65-
public static <T0, T1, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<? super T0, ? super T1, ? extends R> combineLatestFunction) {
70+
public static <T0, T1, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? extends T0> w0, Observable<T1> w1, Func2<? super T0, ? super T1, ? extends R> combineLatestFunction) {
6671
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
6772
a.addObserver(new CombineObserver<R, T0>(a, w0));
6873
a.addObserver(new CombineObserver<R, T1>(a, w1));
@@ -72,7 +77,8 @@ public static <T0, T1, R> Func1<Observer<? super R>, Subscription> combineLatest
7277
/**
7378
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
7479
*/
75-
public static <T0, T1, T2, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Func3<? super T0, ? super T1, ? super T2, ? extends R> combineLatestFunction) {
80+
public static <T0, T1, T2, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? extends T0> w0, Observable<? extends T1> w1, Observable<? extends T2> w2,
81+
Func3<? super T0, ? super T1, ? super T2, ? extends R> combineLatestFunction) {
7682
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
7783
a.addObserver(new CombineObserver<R, T0>(a, w0));
7884
a.addObserver(new CombineObserver<R, T1>(a, w1));
@@ -83,7 +89,8 @@ public static <T0, T1, T2, R> Func1<Observer<? super R>, Subscription> combineLa
8389
/**
8490
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
8591
*/
86-
public static <T0, T1, T2, T3, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> combineLatestFunction) {
92+
public static <T0, T1, T2, T3, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? extends T0> w0, Observable<? extends T1> w1, Observable<? extends T2> w2, Observable<? extends T3> w3,
93+
Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> combineLatestFunction) {
8794
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
8895
a.addObserver(new CombineObserver<R, T0>(a, w0));
8996
a.addObserver(new CombineObserver<R, T1>(a, w1));
@@ -92,12 +99,92 @@ public static <T0, T1, T2, T3, R> Func1<Observer<? super R>, Subscription> combi
9299
return a;
93100
}
94101

102+
/**
103+
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
104+
*/
105+
public static <T0, T1, T2, T3, T4, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? extends T0> w0, Observable<? extends T1> w1, Observable<? extends T2> w2, Observable<? extends T3> w3, Observable<? extends T4> w4,
106+
Func5<? super T0, ? super T1, ? super T2, ? super T3, ? super T4, ? extends R> combineLatestFunction) {
107+
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
108+
a.addObserver(new CombineObserver<R, T0>(a, w0));
109+
a.addObserver(new CombineObserver<R, T1>(a, w1));
110+
a.addObserver(new CombineObserver<R, T2>(a, w2));
111+
a.addObserver(new CombineObserver<R, T3>(a, w3));
112+
a.addObserver(new CombineObserver<R, T4>(a, w4));
113+
return a;
114+
}
115+
116+
/**
117+
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
118+
*/
119+
public static <T0, T1, T2, T3, T4, T5, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? extends T0> w0, Observable<? extends T1> w1, Observable<? extends T2> w2, Observable<? extends T3> w3, Observable<? extends T4> w4, Observable<? extends T5> w5,
120+
Func6<? super T0, ? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> combineLatestFunction) {
121+
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
122+
a.addObserver(new CombineObserver<R, T0>(a, w0));
123+
a.addObserver(new CombineObserver<R, T1>(a, w1));
124+
a.addObserver(new CombineObserver<R, T2>(a, w2));
125+
a.addObserver(new CombineObserver<R, T3>(a, w3));
126+
a.addObserver(new CombineObserver<R, T4>(a, w4));
127+
a.addObserver(new CombineObserver<R, T5>(a, w5));
128+
return a;
129+
}
130+
131+
/**
132+
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
133+
*/
134+
public static <T0, T1, T2, T3, T4, T5, T6, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? extends T0> w0, Observable<? extends T1> w1, Observable<? extends T2> w2, Observable<? extends T3> w3, Observable<? extends T4> w4, Observable<? extends T5> w5, Observable<? extends T6> w6,
135+
Func7<? super T0, ? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> combineLatestFunction) {
136+
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
137+
a.addObserver(new CombineObserver<R, T0>(a, w0));
138+
a.addObserver(new CombineObserver<R, T1>(a, w1));
139+
a.addObserver(new CombineObserver<R, T2>(a, w2));
140+
a.addObserver(new CombineObserver<R, T3>(a, w3));
141+
a.addObserver(new CombineObserver<R, T4>(a, w4));
142+
a.addObserver(new CombineObserver<R, T5>(a, w5));
143+
a.addObserver(new CombineObserver<R, T6>(a, w6));
144+
return a;
145+
}
146+
147+
/**
148+
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
149+
*/
150+
public static <T0, T1, T2, T3, T4, T5, T6, T7, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? extends T0> w0, Observable<? extends T1> w1, Observable<? extends T2> w2, Observable<? extends T3> w3, Observable<? extends T4> w4, Observable<? extends T5> w5, Observable<? extends T6> w6, Observable<? extends T7> w7,
151+
Func8<? super T0, ? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> combineLatestFunction) {
152+
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
153+
a.addObserver(new CombineObserver<R, T0>(a, w0));
154+
a.addObserver(new CombineObserver<R, T1>(a, w1));
155+
a.addObserver(new CombineObserver<R, T2>(a, w2));
156+
a.addObserver(new CombineObserver<R, T3>(a, w3));
157+
a.addObserver(new CombineObserver<R, T4>(a, w4));
158+
a.addObserver(new CombineObserver<R, T5>(a, w5));
159+
a.addObserver(new CombineObserver<R, T6>(a, w6));
160+
a.addObserver(new CombineObserver<R, T7>(a, w7));
161+
return a;
162+
}
163+
164+
/**
165+
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
166+
*/
167+
public static <T0, T1, T2, T3, T4, T5, T6, T7, T8, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? extends T0> w0, Observable<? extends T1> w1, Observable<? extends T2> w2, Observable<? extends T3> w3, Observable<? extends T4> w4, Observable<? extends T5> w5, Observable<? extends T6> w6, Observable<? extends T7> w7, Observable<? extends T8> w8,
168+
Func9<? super T0, ? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> combineLatestFunction) {
169+
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
170+
a.addObserver(new CombineObserver<R, T0>(a, w0));
171+
a.addObserver(new CombineObserver<R, T1>(a, w1));
172+
a.addObserver(new CombineObserver<R, T2>(a, w2));
173+
a.addObserver(new CombineObserver<R, T3>(a, w3));
174+
a.addObserver(new CombineObserver<R, T4>(a, w4));
175+
a.addObserver(new CombineObserver<R, T5>(a, w5));
176+
a.addObserver(new CombineObserver<R, T6>(a, w6));
177+
a.addObserver(new CombineObserver<R, T7>(a, w7));
178+
a.addObserver(new CombineObserver<R, T8>(a, w8));
179+
return a;
180+
}
181+
95182
private static class CombineObserver<R, T> implements Observer<T> {
96-
final Observable<T> w;
183+
final Observable<? extends T> w;
97184
final Aggregator<R> a;
98185
private Subscription subscription;
99186

100-
public CombineObserver(Aggregator<R> a, Observable<T> w) {
187+
public CombineObserver(Aggregator<R> a, Observable<? extends T> w) {
101188
this.a = a;
102189
this.w = w;
103190
}

0 commit comments

Comments
 (0)