Skip to content

Commit 29289d1

Browse files
author
jmhofer
committed
Timestamped, Notification and Future are now also treated as covariant
1 parent eba4857 commit 29289d1

File tree

8 files changed

+25
-36
lines changed

8 files changed

+25
-36
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -850,7 +850,7 @@ public Observable<Timestamped<T>> timestamp() {
850850
* be emitted by the resulting Observable
851851
* @return an Observable that emits the item from the source Future
852852
*/
853-
public static <T> Observable<T> from(Future<T> future) {
853+
public static <T> Observable<T> from(Future<? extends T> future) {
854854
return create(OperationToObservableFuture.toObservableFuture(future));
855855
}
856856

@@ -873,7 +873,7 @@ public static <T> Observable<T> from(Future<T> future) {
873873
* be emitted by the resulting Observable
874874
* @return an Observable that emits the item from the source Future
875875
*/
876-
public static <T> Observable<T> from(Future<T> future, Scheduler scheduler) {
876+
public static <T> Observable<T> from(Future<? extends T> future, Scheduler scheduler) {
877877
return create(OperationToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler);
878878
}
879879

@@ -899,7 +899,7 @@ public static <T> Observable<T> from(Future<T> future, Scheduler scheduler) {
899899
* be emitted by the resulting Observable
900900
* @return an Observable that emits the item from the source {@link Future}
901901
*/
902-
public static <T> Observable<T> from(Future<T> future, long timeout, TimeUnit unit) {
902+
public static <T> Observable<T> from(Future<? extends T> future, long timeout, TimeUnit unit) {
903903
return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit));
904904
}
905905

@@ -1703,7 +1703,7 @@ public Observable<T> observeOn(Scheduler scheduler) {
17031703
*/
17041704
@SuppressWarnings("unchecked")
17051705
public <T2> Observable<T2> dematerialize() {
1706-
return create(OperationDematerialize.dematerialize((Observable<? extends Notification<T2>>) this));
1706+
return create(OperationDematerialize.dematerialize((Observable<? extends Notification<? extends T2>>) this));
17071707
}
17081708

17091709
/**

rxjava-core/src/main/java/rx/operators/OperationDematerialize.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,21 +45,21 @@ public final class OperationDematerialize {
4545
* @return An observable sequence exhibiting the behavior corresponding to the source sequence's notification values.
4646
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229047(v=vs.103).aspx">Observable.Dematerialize(TSource) Method </a>
4747
*/
48-
public static <T> Func1<Observer<? super T>, Subscription> dematerialize(final Observable<? extends Notification<T>> sequence) {
48+
public static <T> Func1<Observer<? super T>, Subscription> dematerialize(final Observable<? extends Notification<? extends T>> sequence) {
4949
return new DematerializeObservable<T>(sequence);
5050
}
5151

5252
private static class DematerializeObservable<T> implements Func1<Observer<? super T>, Subscription> {
5353

54-
private final Observable<? extends Notification<T>> sequence;
54+
private final Observable<? extends Notification<? extends T>> sequence;
5555

56-
public DematerializeObservable(Observable<? extends Notification<T>> sequence) {
56+
public DematerializeObservable(Observable<? extends Notification<? extends T>> sequence) {
5757
this.sequence = sequence;
5858
}
5959

6060
@Override
6161
public Subscription call(final Observer<? super T> observer) {
62-
return sequence.subscribe(new Observer<Notification<T>>() {
62+
return sequence.subscribe(new Observer<Notification<? extends T>>() {
6363
@Override
6464
public void onCompleted() {
6565
}
@@ -69,7 +69,7 @@ public void onError(Throwable e) {
6969
}
7070

7171
@Override
72-
public void onNext(Notification<T> value) {
72+
public void onNext(Notification<? extends T> value) {
7373
switch (value.getKind()) {
7474
case OnNext:
7575
observer.onNext(value.getValue());

rxjava-core/src/main/java/rx/operators/OperationMaterialize.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,15 @@ public final class OperationMaterialize {
4747
* @return An observable sequence whose elements are the result of materializing the notifications of the given sequence.
4848
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229453(v=VS.103).aspx">Observable.Materialize(TSource) Method </a>
4949
*/
50-
public static <T> Func1<Observer<? super Notification<T>>, Subscription> materialize(final Observable<T> sequence) {
50+
public static <T> Func1<Observer<? super Notification<T>>, Subscription> materialize(final Observable<? extends T> sequence) {
5151
return new MaterializeObservable<T>(sequence);
5252
}
5353

5454
private static class MaterializeObservable<T> implements Func1<Observer<? super Notification<T>>, Subscription> {
5555

56-
private final Observable<T> sequence;
56+
private final Observable<? extends T> sequence;
5757

58-
public MaterializeObservable(Observable<T> sequence) {
58+
public MaterializeObservable(Observable<? extends T> sequence) {
5959
this.sequence = sequence;
6060
}
6161

rxjava-core/src/main/java/rx/operators/OperationTimestamp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public final class OperationTimestamp {
3737
* the type of the input sequence.
3838
* @return a sequence of timestamped values created by adding timestamps to each item in the input sequence.
3939
*/
40-
public static <T> Func1<Observer<? super Timestamped<T>>, Subscription> timestamp(Observable<T> sequence) {
40+
public static <T> Func1<Observer<? super Timestamped<T>>, Subscription> timestamp(Observable<? extends T> sequence) {
4141
return OperationMap.map(sequence, new Func1<T, Timestamped<T>>() {
4242
@Override
4343
public Timestamped<T> call(T value) {

rxjava-core/src/main/java/rx/operators/OperationToObservableFuture.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,17 @@
4040
*/
4141
public class OperationToObservableFuture {
4242
private static class ToObservableFuture<T> implements Func1<Observer<? super T>, Subscription> {
43-
private final Future<T> that;
43+
private final Future<? extends T> that;
4444
private final Long time;
4545
private final TimeUnit unit;
4646

47-
public ToObservableFuture(Future<T> that) {
47+
public ToObservableFuture(Future<? extends T> that) {
4848
this.that = that;
4949
this.time = null;
5050
this.unit = null;
5151
}
5252

53-
public ToObservableFuture(Future<T> that, long time, TimeUnit unit) {
53+
public ToObservableFuture(Future<? extends T> that, long time, TimeUnit unit) {
5454
this.that = that;
5555
this.time = time;
5656
this.unit = unit;
@@ -75,11 +75,11 @@ public Subscription call(Observer<? super T> observer) {
7575
}
7676
}
7777

78-
public static <T> Func1<Observer<? super T>, Subscription> toObservableFuture(final Future<T> that) {
78+
public static <T> Func1<Observer<? super T>, Subscription> toObservableFuture(final Future<? extends T> that) {
7979
return new ToObservableFuture<T>(that);
8080
}
8181

82-
public static <T> Func1<Observer<? super T>, Subscription> toObservableFuture(final Future<T> that, long time, TimeUnit unit) {
82+
public static <T> Func1<Observer<? super T>, Subscription> toObservableFuture(final Future<? extends T> that, long time, TimeUnit unit) {
8383
return new ToObservableFuture<T>(that, time, unit);
8484
}
8585

rxjava-core/src/main/java/rx/operators/ScheduledObserver.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
private final Observer<? super T> underlying;
2828
private final Scheduler scheduler;
2929

30-
private final ConcurrentLinkedQueue<Notification<T>> queue = new ConcurrentLinkedQueue<Notification<T>>();
30+
private final ConcurrentLinkedQueue<Notification<? extends T>> queue = new ConcurrentLinkedQueue<Notification<? extends T>>();
3131
private final AtomicInteger counter = new AtomicInteger(0);
3232

3333
public ScheduledObserver(Observer<? super T> underlying, Scheduler scheduler) {
@@ -50,7 +50,7 @@ public void onNext(final T args) {
5050
enqueue(new Notification<T>(args));
5151
}
5252

53-
private void enqueue(Notification<T> notification) {
53+
private void enqueue(Notification<? extends T> notification) {
5454
// this must happen before 'counter' is used to provide synchronization between threads
5555
queue.offer(notification);
5656

@@ -66,7 +66,7 @@ private void processQueue() {
6666
scheduler.schedule(new Action0() {
6767
@Override
6868
public void call() {
69-
Notification<T> not = queue.poll();
69+
Notification<? extends T> not = queue.poll();
7070

7171
switch (not.getKind()) {
7272
case OnNext:

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
public class PublishSubject<T> extends Subject<T, T> {
6969
public static <T> PublishSubject<T> create() {
7070
final ConcurrentHashMap<Subscription, Observer<? super T>> observers = new ConcurrentHashMap<Subscription, Observer<? super T>>();
71-
final AtomicReference<Notification<T>> terminalState = new AtomicReference<Notification<T>>();
71+
final AtomicReference<Notification<? extends T>> terminalState = new AtomicReference<Notification<? extends T>>();
7272

7373
Func1<Observer<? super T>, Subscription> onSubscribe = new Func1<Observer<? super T>, Subscription>() {
7474
@Override
@@ -111,7 +111,7 @@ public void unsubscribe() {
111111
}
112112

113113
private Subscription checkTerminalState(Observer<? super T> observer) {
114-
Notification<T> n = terminalState.get();
114+
Notification<? extends T> n = terminalState.get();
115115
if (n != null) {
116116
// we are terminated to immediately emit and don't continue with subscription
117117
if (n.isOnCompleted()) {
@@ -130,9 +130,9 @@ private Subscription checkTerminalState(Observer<? super T> observer) {
130130
}
131131

132132
private final ConcurrentHashMap<Subscription, Observer<? super T>> observers;
133-
private final AtomicReference<Notification<T>> terminalState;
133+
private final AtomicReference<Notification<? extends T>> terminalState;
134134

135-
protected PublishSubject(Func1<? super Observer<? super T>, ? extends Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<? super T>> observers, AtomicReference<Notification<T>> terminalState) {
135+
protected PublishSubject(Func1<? super Observer<? super T>, ? extends Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<? super T>> observers, AtomicReference<Notification<? extends T>> terminalState) {
136136
super(onSubscribe);
137137
this.observers = observers;
138138
this.terminalState = terminalState;

rxjava-core/src/test/java/rx/CovarianceTest.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,52 +28,41 @@ public ExtendedResult call(Media m, Rating r) {
2828
};
2929

3030
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1<Result>() {
31-
3231
@Override
3332
public void call(Result t1) {
3433
System.out.println("Result: " + t1);
3534
}
36-
3735
});
3836

3937
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1<Result>() {
40-
4138
@Override
4239
public void call(Result t1) {
4340
System.out.println("Result: " + t1);
4441
}
45-
4642
});
4743

4844
Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1<ExtendedResult>() {
49-
5045
@Override
5146
public void call(ExtendedResult t1) {
5247
System.out.println("Result: " + t1);
5348
}
54-
5549
});
5650

5751
Observable.<Media, Rating, Result> zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1<Result>() {
58-
5952
@Override
6053
public void call(Result t1) {
6154
System.out.println("Result: " + t1);
6255
}
63-
6456
});
6557

6658
Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1<Result>() {
67-
6859
@Override
6960
public void call(Result t1) {
7061
System.out.println("Result: " + t1);
7162
}
72-
7363
});
7464

7565
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine);
76-
7766
}
7867

7968
static class Media {

0 commit comments

Comments
 (0)