Skip to content

Commit 52342a2

Browse files
author
jmhofer
committed
added variance to Func1 (hopefully) everywhere...
1 parent 34942ad commit 52342a2

21 files changed

+72
-80
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/RxImplicits.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,8 @@ class UnitTestSuite extends JUnitSuite {
338338
@Test def testMap {
339339
val numbers = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9)
340340
val mappedNumbers = ArrayBuffer.empty[Int]
341-
numbers.map((x: Int) => x * x).subscribe((squareVal: Int) => {
341+
val mapped: Observable[Int] = numbers map ((x: Int) => x * x)
342+
mapped.subscribe((squareVal: Int) => {
342343
mappedNumbers.append(squareVal)
343344
})
344345
assertEquals(List(1, 4, 9, 16, 25, 36, 49, 64, 81), mappedNumbers.toList)
@@ -559,7 +560,7 @@ class UnitTestSuite extends JUnitSuite {
559560

560561
@Test def testFilterInForComprehension {
561562
val doubler = (i: Int) => Observable.from(i, i)
562-
val filteredObservable = for {
563+
val filteredObservable: Observable[Int] = for {
563564
i: Int <- Observable.from(1, 2, 3, 4)
564565
j: Int <- doubler(i) if isOdd(i)
565566
} yield j

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public class Observable<T> {
108108

109109
private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
110110

111-
private final Func1<Observer<T>, Subscription> onSubscribe;
111+
private final Func1<? super Observer<T>, ? extends Subscription> onSubscribe;
112112

113113
/**
114114
* Observable with Function to execute when subscribed to.
@@ -119,7 +119,7 @@ public class Observable<T> {
119119
* @param onSubscribe
120120
* {@link Func1} to be executed when {@link #subscribe(Observer)} is called.
121121
*/
122-
protected Observable(Func1<Observer<T>, Subscription> onSubscribe) {
122+
protected Observable(Func1<? super Observer<T>, ? extends Subscription> onSubscribe) {
123123
this.onSubscribe = onSubscribe;
124124
}
125125

@@ -157,7 +157,7 @@ protected Observable() {
157157
*/
158158
public Subscription subscribe(Observer<T> observer) {
159159
// allow the hook to intercept and/or decorate
160-
Func1<Observer<T>, Subscription> onSubscribeFunction = hook.onSubscribeStart(this, onSubscribe);
160+
Func1<? super Observer<T>, ? extends Subscription> onSubscribeFunction = hook.onSubscribeStart(this, onSubscribe);
161161
// validate and proceed
162162
if (observer == null) {
163163
throw new IllegalArgumentException("observer can not be null");
@@ -461,7 +461,7 @@ public Subscription call(Observer<T> observer) {
461461
* @return an Observable that, when an {@link Observer} subscribes to it, will execute the given
462462
* function
463463
*/
464-
public static <T> Observable<T> create(Func1<Observer<T>, Subscription> func) {
464+
public static <T> Observable<T> create(Func1<? super Observer<T>, ? extends Subscription> func) {
465465
return new Observable<T>(func);
466466
}
467467

@@ -1096,7 +1096,7 @@ public Observable<List<T>> buffer(Func0<? extends Observable<BufferClosing>> buf
10961096
* @return
10971097
* An {@link Observable} which produces buffers which are created and emitted when the specified {@link Observable}s publish certain objects.
10981098
*/
1099-
public Observable<List<T>> buffer(Observable<BufferOpening> bufferOpenings, Func1<BufferOpening, Observable<BufferClosing>> bufferClosingSelector) {
1099+
public Observable<List<T>> buffer(Observable<BufferOpening> bufferOpenings, Func1<? super BufferOpening, ? extends Observable<BufferClosing>> bufferClosingSelector) {
11001100
return create(OperationBuffer.buffer(this, bufferOpenings, bufferClosingSelector));
11011101
}
11021102

@@ -1325,7 +1325,7 @@ public static <R> Observable<R> zip(Collection<Observable<?>> ws, FuncN<R> reduc
13251325
* @return an Observable that emits only those items in the original Observable that the filter
13261326
* evaluates as {@code true}
13271327
*/
1328-
public Observable<T> filter(Func1<T, Boolean> predicate) {
1328+
public Observable<T> filter(Func1<? super T, Boolean> predicate) {
13291329
return create(OperationFilter.filter(this, predicate));
13301330
}
13311331

@@ -1360,7 +1360,7 @@ public Observable<T> finallyDo(Action0 action) {
13601360
* obtained from this transformation.
13611361
* @see #mapMany(Func1)
13621362
*/
1363-
public <R> Observable<R> flatMap(Func1<T, Observable<R>> func) {
1363+
public <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<R>> func) {
13641364
return mapMany(func);
13651365
}
13661366

@@ -1374,7 +1374,7 @@ public <R> Observable<R> flatMap(Func1<T, Observable<R>> func) {
13741374
* evaluates as {@code true}
13751375
* @see #filter(Func1)
13761376
*/
1377-
public Observable<T> where(Func1<T, Boolean> predicate) {
1377+
public Observable<T> where(Func1<? super T, Boolean> predicate) {
13781378
return filter(predicate);
13791379
}
13801380

@@ -1389,7 +1389,7 @@ public Observable<T> where(Func1<T, Boolean> predicate) {
13891389
* @return an Observable that emits the items from the source Observable, transformed by the
13901390
* given function
13911391
*/
1392-
public <R> Observable<R> map(Func1<T, R> func) {
1392+
public <R> Observable<R> map(Func1<? super T, ? extends R> func) {
13931393
return create(OperationMap.map(this, func));
13941394
}
13951395

@@ -1410,7 +1410,7 @@ public <R> Observable<R> map(Func1<T, R> func) {
14101410
* obtained from this transformation.
14111411
* @see #flatMap(Func1)
14121412
*/
1413-
public <R> Observable<R> mapMany(Func1<T, Observable<R>> func) {
1413+
public <R> Observable<R> mapMany(Func1<? super T, ? extends Observable<R>> func) {
14141414
return create(OperationMap.mapMany(this, func));
14151415
}
14161416

@@ -1496,7 +1496,7 @@ public <T2> Observable<T2> dematerialize() {
14961496
* encounters an error
14971497
* @return the original Observable, with appropriately modified behavior
14981498
*/
1499-
public Observable<T> onErrorResumeNext(final Func1<Throwable, Observable<T>> resumeFunction) {
1499+
public Observable<T> onErrorResumeNext(final Func1<? super Throwable, ? extends Observable<T>> resumeFunction) {
15001500
return create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(this, resumeFunction));
15011501
}
15021502

@@ -1581,7 +1581,7 @@ public Observable<T> onExceptionResumeNext(final Observable<T> resumeSequence) {
15811581
* Observable encounters an error
15821582
* @return the original Observable with appropriately modified behavior
15831583
*/
1584-
public Observable<T> onErrorReturn(Func1<Throwable, T> resumeFunction) {
1584+
public Observable<T> onErrorReturn(Func1<? super Throwable, ? extends T> resumeFunction) {
15851585
return create(OperationOnErrorReturn.onErrorReturn(this, resumeFunction));
15861586
}
15871587

@@ -1800,7 +1800,7 @@ public <R> Observable<R> scan(R initialValue, Func2<R, T, R> accumulator) {
18001800
* @return an Observable that emits <code>true</code> if all items emitted by the source
18011801
* Observable satisfy the predicate; otherwise, <code>false</code>
18021802
*/
1803-
public Observable<Boolean> all(Func1<T, Boolean> predicate) {
1803+
public Observable<Boolean> all(Func1<? super T, Boolean> predicate) {
18041804
return create(OperationAll.all(this, predicate));
18051805
}
18061806

@@ -1853,7 +1853,7 @@ public Observable<T> take(final int num) {
18531853
* @return an Observable that emits the items from the source Observable so long as each item
18541854
* satisfies the condition defined by <code>predicate</code>
18551855
*/
1856-
public Observable<T> takeWhile(final Func1<T, Boolean> predicate) {
1856+
public Observable<T> takeWhile(final Func1<? super T, Boolean> predicate) {
18571857
return create(OperationTakeWhile.takeWhile(this, predicate));
18581858
}
18591859

@@ -1992,7 +1992,7 @@ public Observable<T> startWith(T... values) {
19921992
* unique key value and emits items representing items from the source Observable that
19931993
* share that key value
19941994
*/
1995-
public <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<T, K> keySelector, final Func1<T, R> elementSelector) {
1995+
public <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector) {
19961996
return create(OperationGroupBy.groupBy(this, keySelector, elementSelector));
19971997
}
19981998

@@ -2010,7 +2010,7 @@ public <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<T, K> keyS
20102010
* unique key value and emits items representing items from the source Observable that
20112011
* share that key value
20122012
*/
2013-
public <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<T, K> keySelector) {
2013+
public <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector) {
20142014
return create(OperationGroupBy.groupBy(this, keySelector));
20152015
}
20162016

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
*/
6161
public class BlockingObservable<T> extends Observable<T> {
6262

63-
protected BlockingObservable(Func1<Observer<T>, Subscription> onSubscribe) {
63+
protected BlockingObservable(Func1<? super Observer<T>, ? extends Subscription> onSubscribe) {
6464
super(onSubscribe);
6565
}
6666

@@ -126,7 +126,7 @@ public static <T> T last(final Observable<T> source) {
126126
* @return the last item emitted by the {@link Observable} for which the predicate function
127127
* returns <code>true</code>
128128
*/
129-
public static <T> T last(final Observable<T> source, final Func1<T, Boolean> predicate) {
129+
public static <T> T last(final Observable<T> source, final Func1<? super T, Boolean> predicate) {
130130
return last(source.filter(predicate));
131131
}
132132

@@ -166,7 +166,7 @@ public static <T> T lastOrDefault(Observable<T> source, T defaultValue) {
166166
* @return the last item emitted by an {@link Observable} that matches the predicate, or the
167167
* default value if no matching item is emitted
168168
*/
169-
public static <T> T lastOrDefault(Observable<T> source, T defaultValue, Func1<T, Boolean> predicate) {
169+
public static <T> T lastOrDefault(Observable<T> source, T defaultValue, Func1<? super T, Boolean> predicate) {
170170
return lastOrDefault(source.filter(predicate), defaultValue);
171171
}
172172

@@ -257,7 +257,7 @@ public static <T> T single(Observable<T> source) {
257257
* if the {@link Observable} does not emit exactly one item that matches the
258258
* predicate
259259
*/
260-
public static <T> T single(Observable<T> source, Func1<T, Boolean> predicate) {
260+
public static <T> T single(Observable<T> source, Func1<? super T, Boolean> predicate) {
261261
return from(source).single(predicate);
262262
}
263263

@@ -293,7 +293,7 @@ public static <T> T singleOrDefault(Observable<T> source, T defaultValue) {
293293
* @return the single item emitted by the source {@link Observable} that matches the predicate,
294294
* or a default value if no such value is emitted
295295
*/
296-
public static <T> T singleOrDefault(Observable<T> source, T defaultValue, Func1<T, Boolean> predicate) {
296+
public static <T> T singleOrDefault(Observable<T> source, T defaultValue, Func1<? super T, Boolean> predicate) {
297297
return from(source).singleOrDefault(defaultValue, predicate);
298298
}
299299

@@ -443,7 +443,7 @@ public T last() {
443443
* a predicate function to evaluate items emitted by the {@link Observable}
444444
* @return the last item emitted by the {@link Observable} that matches the predicate
445445
*/
446-
public T last(final Func1<T, Boolean> predicate) {
446+
public T last(final Func1<? super T, Boolean> predicate) {
447447
return last(this, predicate);
448448
}
449449

@@ -487,7 +487,7 @@ public T lastOrDefault(T defaultValue) {
487487
* @return the last item emitted by the {@link Observable} that matches the predicate, or the
488488
* default value if no matching items are emitted
489489
*/
490-
public T lastOrDefault(T defaultValue, Func1<T, Boolean> predicate) {
490+
public T lastOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
491491
return lastOrDefault(this, defaultValue, predicate);
492492
}
493493

@@ -542,7 +542,7 @@ public T single() {
542542
* a predicate function to evaluate items emitted by the {@link Observable}
543543
* @return the single item emitted by the source {@link Observable} that matches the predicate
544544
*/
545-
public T single(Func1<T, Boolean> predicate) {
545+
public T single(Func1<? super T, Boolean> predicate) {
546546
return _singleOrDefault(from(this.filter(predicate)), false, null);
547547
}
548548

@@ -575,7 +575,7 @@ public T singleOrDefault(T defaultValue) {
575575
* @return the single item emitted by the {@link Observable} that matches the predicate, or the
576576
* default value if no such items are emitted
577577
*/
578-
public T singleOrDefault(T defaultValue, Func1<T, Boolean> predicate) {
578+
public T singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
579579
return _singleOrDefault(from(this.filter(predicate)), true, defaultValue);
580580
}
581581

rxjava-core/src/main/java/rx/observables/ConnectableObservable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737

3838
public abstract class ConnectableObservable<T> extends Observable<T> {
3939

40-
protected ConnectableObservable(Func1<Observer<T>, Subscription> onSubscribe) {
40+
protected ConnectableObservable(Func1<? super Observer<T>, ? extends Subscription> onSubscribe) {
4141
super(onSubscribe);
4242
}
4343

rxjava-core/src/main/java/rx/observables/GroupedObservable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
public class GroupedObservable<K, T> extends Observable<T> {
3333
private final K key;
3434

35-
public GroupedObservable(K key, Func1<Observer<T>, Subscription> onSubscribe) {
35+
public GroupedObservable(K key, Func1<? super Observer<T>, ? extends Subscription> onSubscribe) {
3636
super(onSubscribe);
3737
this.key = key;
3838
}

rxjava-core/src/main/java/rx/operators/OperationAll.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,18 @@
3434
*/
3535
public class OperationAll {
3636

37-
public static <T> Func1<Observer<Boolean>, Subscription> all(Observable<T> sequence, Func1<T, Boolean> predicate) {
37+
public static <T> Func1<Observer<Boolean>, Subscription> all(Observable<T> sequence, Func1<? super T, Boolean> predicate) {
3838
return new AllObservable<T>(sequence, predicate);
3939
}
4040

4141
private static class AllObservable<T> implements Func1<Observer<Boolean>, Subscription> {
4242
private final Observable<T> sequence;
43-
private final Func1<T, Boolean> predicate;
43+
private final Func1<? super T, Boolean> predicate;
4444

4545
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
4646

4747

48-
private AllObservable(Observable<T> sequence, Func1<T, Boolean> predicate) {
48+
private AllObservable(Observable<T> sequence, Func1<? super T, Boolean> predicate) {
4949
this.sequence = sequence;
5050
this.predicate = predicate;
5151
}

rxjava-core/src/main/java/rx/operators/OperationBuffer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public Subscription call(final Observer<List<T>> observer) {
107107
* @return
108108
* the {@link Func1} object representing the specified buffer operation.
109109
*/
110-
public static <T> Func1<Observer<List<T>>, Subscription> buffer(final Observable<T> source, final Observable<BufferOpening> bufferOpenings, final Func1<BufferOpening, Observable<BufferClosing>> bufferClosingSelector) {
110+
public static <T> Func1<Observer<List<T>>, Subscription> buffer(final Observable<T> source, final Observable<BufferOpening> bufferOpenings, final Func1<? super BufferOpening, ? extends Observable<BufferClosing>> bufferClosingSelector) {
111111
return new Func1<Observer<List<T>>, Subscription>() {
112112
@Override
113113
public Subscription call(final Observer<List<T>> observer) {
@@ -489,7 +489,7 @@ private static class ObservableBasedMultiBufferCreator<T> implements BufferCreat
489489

490490
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
491491

492-
public ObservableBasedMultiBufferCreator(final OverlappingBuffers<T> buffers, Observable<BufferOpening> bufferOpenings, final Func1<BufferOpening, Observable<BufferClosing>> bufferClosingSelector) {
492+
public ObservableBasedMultiBufferCreator(final OverlappingBuffers<T> buffers, Observable<BufferOpening> bufferOpenings, final Func1<? super BufferOpening, ? extends Observable<BufferClosing>> bufferClosingSelector) {
493493
subscription.wrap(bufferOpenings.subscribe(new Action1<BufferOpening>() {
494494
@Override
495495
public void call(BufferOpening opening) {

rxjava-core/src/main/java/rx/operators/OperationFilter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,16 @@
3333
*/
3434
public final class OperationFilter<T> {
3535

36-
public static <T> Func1<Observer<T>, Subscription> filter(Observable<T> that, Func1<T, Boolean> predicate) {
36+
public static <T> Func1<Observer<T>, Subscription> filter(Observable<T> that, Func1<? super T, Boolean> predicate) {
3737
return new Filter<T>(that, predicate);
3838
}
3939

4040
private static class Filter<T> implements Func1<Observer<T>, Subscription> {
4141

4242
private final Observable<T> that;
43-
private final Func1<T, Boolean> predicate;
43+
private final Func1<? super T, Boolean> predicate;
4444

45-
public Filter(Observable<T> that, Func1<T, Boolean> predicate) {
45+
public Filter(Observable<T> that, Func1<? super T, Boolean> predicate) {
4646
this.that = that;
4747
this.predicate = predicate;
4848
}

rxjava-core/src/main/java/rx/operators/OperationGroupBy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
*/
4949
public final class OperationGroupBy {
5050

51-
public static <K, T, R> Func1<Observer<GroupedObservable<K, R>>, Subscription> groupBy(Observable<T> source, final Func1<T, K> keySelector, final Func1<T, R> elementSelector) {
51+
public static <K, T, R> Func1<Observer<GroupedObservable<K, R>>, Subscription> groupBy(Observable<T> source, final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector) {
5252

5353
final Observable<KeyValue<K, R>> keyval = source.map(new Func1<T, KeyValue<K, R>>() {
5454
@Override
@@ -63,7 +63,7 @@ public KeyValue<K, R> call(T t) {
6363
return new GroupBy<K, R>(keyval);
6464
}
6565

66-
public static <K, T> Func1<Observer<GroupedObservable<K, T>>, Subscription> groupBy(Observable<T> source, final Func1<T, K> keySelector) {
66+
public static <K, T> Func1<Observer<GroupedObservable<K, T>>, Subscription> groupBy(Observable<T> source, final Func1<? super T, ? extends K> keySelector) {
6767
return groupBy(source, keySelector, Functions.<T> identity());
6868
}
6969

@@ -204,7 +204,7 @@ public void unsubscribe() {
204204

205205
private final AtomicReference<Observer<T>> subscribedObserver;
206206

207-
public GroupedSubject(K key, Func1<Observer<T>, Subscription> onSubscribe, AtomicReference<Observer<T>> subscribedObserver) {
207+
public GroupedSubject(K key, Func1<? super Observer<T>, ? extends Subscription> onSubscribe, AtomicReference<Observer<T>> subscribedObserver) {
208208
super(key, onSubscribe);
209209
this.subscribedObserver = subscribedObserver;
210210
}

0 commit comments

Comments
 (0)