Skip to content

Commit b0d975c

Browse files
Merge pull request #860 from abersnaze/merge-generics
Fixing the generics for merge and lift
2 parents 1a86347 + 98e75c2 commit b0d975c

File tree

7 files changed

+39
-143
lines changed

7 files changed

+39
-143
lines changed

rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@ public final class DebugSubscriber<T> extends Subscriber<T> {
1111
private final Func1<T, T> onNextHook;
1212
final Action1<DebugNotification> events;
1313
final Observer<? super T> o;
14-
Operator<T, ?> from = null;
15-
Operator<?, T> to = null;
14+
Operator<? extends T, ?> from = null;
15+
Operator<?, ? super T> to = null;
1616

1717
public DebugSubscriber(
1818
Func1<T, T> onNextHook,
1919
Action1<DebugNotification> _events,
2020
Subscriber<? super T> _o,
21-
Operator<T, ?> _out,
22-
Operator<?, T> _in) {
21+
Operator<? extends T, ?> _out,
22+
Operator<?, ? super T> _in) {
2323
super(_o);
2424
this.events = _events;
2525
this.o = _o;
@@ -47,19 +47,19 @@ public void onNext(T t) {
4747
o.onNext(onNextHook.call(t));
4848
}
4949

50-
public Operator<T, ?> getFrom() {
50+
public Operator<? extends T, ?> getFrom() {
5151
return from;
5252
}
5353

54-
public void setFrom(Operator<T, ?> op) {
55-
this.from = op;
54+
public void setFrom(Operator<? extends T, ?> bind) {
55+
this.from = bind;
5656
}
5757

58-
public Operator<?, T> getTo() {
58+
public Operator<?, ? super T> getTo() {
5959
return to;
6060
}
6161

62-
public void setTo(Operator<?, T> op) {
62+
public void setTo(Operator<?, ? super T> op) {
6363
this.to = op;
6464
}
6565

rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public void call(Subscriber<? super T> o) {
6363
}
6464

6565
@Override
66-
public <T, R> Operator<R, T> onLift(final Operator<R, T> bind) {
66+
public <T, R> Operator<? extends R, ? super T> onLift(final Operator<? extends R, ? super T> bind) {
6767
return new Operator<R, T>() {
6868
@Override
6969
public Subscriber<? super T> call(final Subscriber<? super R> o) {
@@ -78,7 +78,7 @@ public <T> Subscription onAdd(Subscriber<T> subscriber, Subscription s) {
7878
}
7979

8080
@SuppressWarnings("unchecked")
81-
private <R> Subscriber<? super R> wrapOutbound(Operator<R, ?> bind, Subscriber<? super R> o) {
81+
private <R> Subscriber<? super R> wrapOutbound(Operator<? extends R, ?> bind, Subscriber<? super R> o) {
8282
if (o instanceof DebugSubscriber) {
8383
if (bind != null)
8484
((DebugSubscriber<R>) o).setFrom(bind);
@@ -88,7 +88,7 @@ private <R> Subscriber<? super R> wrapOutbound(Operator<R, ?> bind, Subscriber<?
8888
}
8989

9090
@SuppressWarnings("unchecked")
91-
private <T> Subscriber<? super T> wrapInbound(Operator<?, T> bind, Subscriber<? super T> o) {
91+
private <T> Subscriber<? super T> wrapInbound(Operator<?, ? super T> bind, Subscriber<? super T> o) {
9292
if (o instanceof DebugSubscriber) {
9393
if (bind != null)
9494
((DebugSubscriber<T>) o).setTo(bind);

rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,17 @@ public static enum Kind {
1313
}
1414

1515
private final OnSubscribe<T> source;
16-
private final Operator<T, ?> from;
16+
private final Operator<? extends T, ?> from;
1717
private final Kind kind;
1818
private final Notification<T> notification;
19-
private final Operator<?, T> to;
19+
private final Operator<?, ? super T> to;
2020
private final long nanoTime;
2121
private final long threadId;
2222
private Observer o;
2323

2424
public static <T> DebugNotification<T> createSubscribe(Observer<? super T> o, OnSubscribe<T> source) {
25-
Operator<?, T> to = null;
26-
Operator<T, ?> from = null;
25+
Operator<?, ? super T> to = null;
26+
Operator<? extends T, ?> from = null;
2727
if (o instanceof DebugSubscriber) {
2828
to = ((DebugSubscriber<T>) o).getTo();
2929
from = ((DebugSubscriber<T>) o).getFrom();
@@ -32,23 +32,23 @@ public static <T> DebugNotification<T> createSubscribe(Observer<? super T> o, On
3232
return new DebugNotification<T>(o, from, Kind.Subscribe, null, to, source);
3333
}
3434

35-
public static <T> DebugNotification<T> createOnNext(Observer<? super T> o, Operator<T, ?> from, T t, Operator<?, T> to) {
35+
public static <T> DebugNotification<T> createOnNext(Observer<? super T> o, Operator<? extends T, ?> from, T t, Operator<?, ? super T> to) {
3636
return new DebugNotification<T>(o, from, Kind.OnNext, Notification.createOnNext(t), to, null);
3737
}
3838

39-
public static <T> DebugNotification<T> createOnError(Observer<? super T> o, Operator<T, ?> from, Throwable e, Operator<?, T> to) {
39+
public static <T> DebugNotification<T> createOnError(Observer<? super T> o, Operator<? extends T, ?> from, Throwable e, Operator<?, ? super T> to) {
4040
return new DebugNotification<T>(o, from, Kind.OnError, Notification.<T> createOnError(e), to, null);
4141
}
4242

43-
public static <T> DebugNotification<T> createOnCompleted(Observer<? super T> o, Operator<T, ?> from, Operator<?, T> to) {
43+
public static <T> DebugNotification<T> createOnCompleted(Observer<? super T> o, Operator<? extends T, ?> from, Operator<?, ? super T> to) {
4444
return new DebugNotification<T>(o, from, Kind.OnCompleted, Notification.<T> createOnCompleted(), to, null);
4545
}
4646

47-
public static <T> DebugNotification<T> createUnsubscribe(Observer<? super T> o, Operator<T, ?> from, Operator<?, T> to) {
47+
public static <T> DebugNotification<T> createUnsubscribe(Observer<? super T> o, Operator<? extends T, ?> from, Operator<?, ? super T> to) {
4848
return new DebugNotification<T>(o, from, Kind.Unsubscribe, null, to, null);
4949
}
5050

51-
private DebugNotification(Observer o, Operator<T, ?> from, Kind kind, Notification<T> notification, Operator<?, T> to, OnSubscribe<T> source) {
51+
private DebugNotification(Observer o, Operator<? extends T, ?> from, Kind kind, Notification<T> notification, Operator<?, ? super T> to, OnSubscribe<T> source) {
5252
this.o = (o instanceof SafeSubscriber) ? ((SafeSubscriber) o).getActual() : o;
5353
this.from = from;
5454
this.kind = kind;
@@ -59,15 +59,15 @@ private DebugNotification(Observer o, Operator<T, ?> from, Kind kind, Notificati
5959
this.threadId = Thread.currentThread().getId();
6060
}
6161

62-
public Operator<T, ?> getFrom() {
62+
public Operator<? extends T, ?> getFrom() {
6363
return from;
6464
}
6565

6666
public Notification<T> getNotification() {
6767
return notification;
6868
}
6969

70-
public Operator<?, T> getTo() {
70+
public Operator<?, ? super T> getTo() {
7171
return to;
7272
}
7373

rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/NotificationEvent.java

Lines changed: 0 additions & 104 deletions
This file was deleted.

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ public static interface OnSubscribeFunc<T> extends Function {
261261
* @param bind
262262
* @return an Observable that emits values that are the result of applying the bind function to the values of the current Observable
263263
*/
264-
public <R> Observable<R> lift(final Operator<R, T> bind) {
264+
public <R> Observable<R> lift(final Operator<? extends R, ? super T> bind) {
265265
return new Observable<R>(new OnSubscribe<R>() {
266266
@Override
267267
public void call(Subscriber<? super R> o) {
@@ -1777,7 +1777,7 @@ public final static <T> Observable<T> merge(Iterable<? extends Observable<? exte
17771777
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
17781778
*/
17791779
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
1780-
return source.lift(new OperatorMerge()); // any idea how to get these generics working?!
1780+
return source.lift(new OperatorMerge<T>());
17811781
}
17821782

17831783
/**
@@ -1821,7 +1821,7 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
18211821
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
18221822
*/
18231823
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2) {
1824-
return merge(from(t1, t2));
1824+
return merge(from(Arrays.asList(t1, t2)));
18251825
}
18261826

18271827
/**
@@ -1843,7 +1843,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
18431843
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
18441844
*/
18451845
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3) {
1846-
return merge(from(t1, t2, t3));
1846+
return merge(from(Arrays.asList(t1, t2, t3)));
18471847
}
18481848

18491849
/**
@@ -1867,7 +1867,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
18671867
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
18681868
*/
18691869
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4) {
1870-
return merge(from(t1, t2, t3, t4));
1870+
return merge(from(Arrays.asList(t1, t2, t3, t4)));
18711871
}
18721872

18731873
/**
@@ -1893,7 +1893,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
18931893
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
18941894
*/
18951895
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5) {
1896-
return merge(from(t1, t2, t3, t4, t5));
1896+
return merge(from(Arrays.asList(t1, t2, t3, t4, t5)));
18971897
}
18981898

18991899
/**
@@ -1921,7 +1921,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
19211921
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
19221922
*/
19231923
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6) {
1924-
return merge(from(t1, t2, t3, t4, t5, t6));
1924+
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6)));
19251925
}
19261926

19271927
/**
@@ -1951,7 +1951,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
19511951
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
19521952
*/
19531953
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7) {
1954-
return merge(from(t1, t2, t3, t4, t5, t6, t7));
1954+
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7)));
19551955
}
19561956

19571957
/**
@@ -1984,7 +1984,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
19841984
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
19851985
*/
19861986
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7, Observable<? extends T> t8) {
1987-
return merge(from(t1, t2, t3, t4, t5, t6, t7, t8));
1987+
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8)));
19881988
}
19891989

19901990
/**
@@ -2019,7 +2019,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
20192019
*/
20202020
// suppress because the types are checked by the method signature before using a vararg
20212021
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7, Observable<? extends T> t8, Observable<? extends T> t9) {
2022-
return merge(from(t1, t2, t3, t4, t5, t6, t7, t8, t9));
2022+
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9)));
20232023
}
20242024

20252025
/**
@@ -4863,7 +4863,7 @@ public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
48634863
* @deprecated use {@link #flatMap(Func1)}
48644864
*/
48654865
@Deprecated
4866-
public final <R> Observable<R> mapMany(Func1<? super T, ? extends Observable<? extends R>> func) {
4866+
public final <R> Observable<R> mapMany(Func1<? super T, ? extends Observable<R>> func) {
48674867
return mergeMap(func);
48684868
}
48694869

rxjava-core/src/main/java/rx/operators/OperatorMerge.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@
3030
* You can combine the items emitted by multiple Observables so that they act like a single
3131
* Observable, by using the merge operation.
3232
*/
33-
public final class OperatorMerge<T> implements Operator<T, Observable<T>> {
33+
public final class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
3434

3535
@Override
36-
public Subscriber<Observable<T>> call(final Subscriber<? super T> outerOperation) {
36+
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> outerOperation) {
3737

3838
final Subscriber<T> o = new SynchronizedSubscriber<T>(outerOperation);
39-
return new Subscriber<Observable<T>>(outerOperation) {
39+
return new Subscriber<Observable<? extends T>>(outerOperation) {
4040

4141
private volatile boolean completed = false;
4242
private final AtomicInteger runningCount = new AtomicInteger();
@@ -55,7 +55,7 @@ public void onError(Throwable e) {
5555
}
5656

5757
@Override
58-
public void onNext(Observable<T> innerObservable) {
58+
public void onNext(Observable<? extends T> innerObservable) {
5959
runningCount.incrementAndGet();
6060
innerObservable.subscribe(new InnerObserver());
6161
}

rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
9797
return f;
9898
}
9999

100-
public <T, R> Operator<R, T> onLift(final Operator<R, T> bind) {
100+
public <T, R> Operator<? extends R, ? super T> onLift(final Operator<? extends R, ? super T> bind) {
101101
return bind;
102102
}
103103

0 commit comments

Comments
 (0)