Skip to content

Commit 1ca5900

Browse files
author
jmhofer
committed
Generalized all the operators, too
1 parent 78a0a1b commit 1ca5900

23 files changed

+78
-78
lines changed

rxjava-core/src/main/java/rx/operators/OperationAll.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,18 @@
3434
*/
3535
public class OperationAll {
3636

37-
public static <T> Func1<Observer<? super Boolean>, Subscription> all(Observable<T> sequence, Func1<? super T, Boolean> predicate) {
37+
public static <T> Func1<Observer<? super Boolean>, Subscription> all(Observable<? extends T> sequence, Func1<? super T, Boolean> predicate) {
3838
return new AllObservable<T>(sequence, predicate);
3939
}
4040

4141
private static class AllObservable<T> implements Func1<Observer<? super Boolean>, Subscription> {
42-
private final Observable<T> sequence;
42+
private final Observable<? extends T> sequence;
4343
private final Func1<? super T, Boolean> predicate;
4444

4545
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
4646

4747

48-
private AllObservable(Observable<T> sequence, Func1<? super T, Boolean> predicate) {
48+
private AllObservable(Observable<? extends T> sequence, Func1<? super T, Boolean> predicate) {
4949
this.sequence = sequence;
5050
this.predicate = predicate;
5151
}

rxjava-core/src/main/java/rx/operators/OperationCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
*/
4848
public class OperationCache {
4949

50-
public static <T> Func1<Observer<? super T>, Subscription> cache(final Observable<T> source) {
50+
public static <T> Func1<Observer<? super T>, Subscription> cache(final Observable<? extends T> source) {
5151
return new Func1<Observer<? super T>, Subscription>() {
5252

5353
final AtomicBoolean subscribed = new AtomicBoolean(false);

rxjava-core/src/main/java/rx/operators/OperationDefer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@
3737
*/
3838
public final class OperationDefer {
3939

40-
public static <T> Func1<Observer<? super T>, Subscription> defer(final Func0<? extends Observable<T>> observableFactory) {
40+
public static <T> Func1<Observer<? super T>, Subscription> defer(final Func0<? extends Observable<? extends T>> observableFactory) {
4141

4242
return new Func1<Observer<? super T>, Subscription>() {
4343
@Override
4444
public Subscription call(Observer<? super T> observer) {
45-
Observable<T> obs = observableFactory.call();
45+
Observable<? extends T> obs = observableFactory.call();
4646
return obs.subscribe(observer);
4747
}
4848
};

rxjava-core/src/main/java/rx/operators/OperationFilter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,16 @@
3333
*/
3434
public final class OperationFilter<T> {
3535

36-
public static <T> Func1<Observer<? super T>, Subscription> filter(Observable<T> that, Func1<? super T, Boolean> predicate) {
36+
public static <T> Func1<Observer<? super T>, Subscription> filter(Observable<? extends T> that, Func1<? super T, Boolean> predicate) {
3737
return new Filter<T>(that, predicate);
3838
}
3939

4040
private static class Filter<T> implements Func1<Observer<? super T>, Subscription> {
4141

42-
private final Observable<T> that;
42+
private final Observable<? extends T> that;
4343
private final Func1<? super T, Boolean> predicate;
4444

45-
public Filter(Observable<T> that, Func1<? super T, Boolean> predicate) {
45+
public Filter(Observable<? extends T> that, Func1<? super T, Boolean> predicate) {
4646
this.that = that;
4747
this.predicate = predicate;
4848
}

rxjava-core/src/main/java/rx/operators/OperationFinally.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public final class OperationFinally {
5252
* the given action will be called.
5353
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx">MSDN Observable.Finally method</a>
5454
*/
55-
public static <T> Func1<Observer<? super T>, Subscription> finallyDo(final Observable<T> sequence, final Action0 action) {
55+
public static <T> Func1<Observer<? super T>, Subscription> finallyDo(final Observable<? extends T> sequence, final Action0 action) {
5656
return new Func1<Observer<? super T>, Subscription>() {
5757
@Override
5858
public Subscription call(Observer<? super T> observer) {
@@ -62,10 +62,10 @@ public Subscription call(Observer<? super T> observer) {
6262
}
6363

6464
private static class Finally<T> implements Func1<Observer<? super T>, Subscription> {
65-
private final Observable<T> sequence;
65+
private final Observable<? extends T> sequence;
6666
private final Action0 finalAction;
6767

68-
Finally(final Observable<T> sequence, Action0 finalAction) {
68+
Finally(final Observable<? extends T> sequence, Action0 finalAction) {
6969
this.sequence = sequence;
7070
this.finalAction = finalAction;
7171
}

rxjava-core/src/main/java/rx/operators/OperationGroupBy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
*/
4949
public final class OperationGroupBy {
5050

51-
public static <K, T, R> Func1<Observer<? super GroupedObservable<K, R>>, Subscription> groupBy(Observable<T> source, final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector) {
51+
public static <K, T, R> Func1<Observer<? super GroupedObservable<K, R>>, Subscription> groupBy(Observable<? extends T> source, final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector) {
5252

5353
final Observable<KeyValue<K, R>> keyval = source.map(new Func1<T, KeyValue<K, R>>() {
5454
@Override
@@ -63,7 +63,7 @@ public KeyValue<K, R> call(T t) {
6363
return new GroupBy<K, R>(keyval);
6464
}
6565

66-
public static <K, T> Func1<Observer<? super GroupedObservable<K, T>>, Subscription> groupBy(Observable<T> source, final Func1<? super T, ? extends K> keySelector) {
66+
public static <K, T> Func1<Observer<? super GroupedObservable<K, T>>, Subscription> groupBy(Observable<? extends T> source, final Func1<? super T, ? extends K> keySelector) {
6767
return groupBy(source, keySelector, Functions.<T> identity());
6868
}
6969

rxjava-core/src/main/java/rx/operators/OperationMostRecent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
*/
3838
public final class OperationMostRecent {
3939

40-
public static <T> Iterable<T> mostRecent(final Observable<T> source, T initialValue) {
40+
public static <T> Iterable<T> mostRecent(final Observable<? extends T> source, T initialValue) {
4141

4242
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
4343
final MostRecentIterator<T> nextIterator = new MostRecentIterator<T>(mostRecentObserver);

rxjava-core/src/main/java/rx/operators/OperationMulticast.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,19 @@
2828
import rx.util.functions.Func1;
2929

3030
public class OperationMulticast {
31-
public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, final Subject<T, R> subject) {
31+
public static <T, R> ConnectableObservable<R> multicast(Observable<? extends T> source, final Subject<T, R> subject) {
3232
return new MulticastConnectableObservable<T, R>(source, subject);
3333
}
3434

3535
private static class MulticastConnectableObservable<T, R> extends ConnectableObservable<R> {
3636
private final Object lock = new Object();
3737

38-
private final Observable<T> source;
38+
private final Observable<? extends T> source;
3939
private final Subject<T, R> subject;
4040

4141
private Subscription subscription;
4242

43-
public MulticastConnectableObservable(Observable<T> source, final Subject<T, R> subject) {
43+
public MulticastConnectableObservable(Observable<? extends T> source, final Subject<T, R> subject) {
4444
super(new Func1<Observer<? super R>, Subscription>() {
4545
@Override
4646
public Subscription call(Observer<? super R> observer) {

rxjava-core/src/main/java/rx/operators/OperationNext.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
*/
4848
public final class OperationNext {
4949

50-
public static <T> Iterable<T> next(final Observable<T> items) {
50+
public static <T> Iterable<T> next(final Observable<? extends T> items) {
5151

5252
NextObserver<T> nextObserver = new NextObserver<T>();
5353
final NextIterator<T> nextIterator = new NextIterator<T>(nextObserver);
@@ -65,9 +65,9 @@ public Iterator<T> iterator() {
6565

6666
private static class NextIterator<T> implements Iterator<T> {
6767

68-
private final NextObserver<T> observer;
68+
private final NextObserver<? extends T> observer;
6969

70-
private NextIterator(NextObserver<T> observer) {
70+
private NextIterator(NextObserver<? extends T> observer) {
7171
this.observer = observer;
7272
}
7373

@@ -99,8 +99,8 @@ public void remove() {
9999
}
100100
}
101101

102-
private static class NextObserver<T> implements Observer<Notification<T>> {
103-
private final BlockingQueue<Notification<T>> buf = new ArrayBlockingQueue<Notification<T>>(1);
102+
private static class NextObserver<T> implements Observer<Notification<? extends T>> {
103+
private final BlockingQueue<Notification<? extends T>> buf = new ArrayBlockingQueue<Notification<? extends T>>(1);
104104
private final AtomicBoolean waiting = new AtomicBoolean(false);
105105

106106
@Override
@@ -114,12 +114,12 @@ public void onError(Throwable e) {
114114
}
115115

116116
@Override
117-
public void onNext(Notification<T> args) {
117+
public void onNext(Notification<? extends T> args) {
118118

119119
if (waiting.getAndSet(false) || !args.isOnNext()) {
120-
Notification<T> toOffer = args;
120+
Notification<? extends T> toOffer = args;
121121
while (!buf.offer(toOffer)) {
122-
Notification<T> concurrentItem = buf.poll();
122+
Notification<? extends T> concurrentItem = buf.poll();
123123

124124
// in case if we won race condition with onComplete/onError method
125125
if (!concurrentItem.isOnNext()) {
@@ -135,7 +135,7 @@ public void await() {
135135
}
136136

137137
public boolean isCompleted(boolean rethrowExceptionIfExists) {
138-
Notification<T> lastItem = buf.peek();
138+
Notification<? extends T> lastItem = buf.peek();
139139
if (lastItem == null) {
140140
return false;
141141
}
@@ -152,7 +152,7 @@ public boolean isCompleted(boolean rethrowExceptionIfExists) {
152152
}
153153

154154
public T takeNext() throws InterruptedException {
155-
Notification<T> next = buf.take();
155+
Notification<? extends T> next = buf.take();
156156

157157
if (next.isOnError()) {
158158
throw Exceptions.propagate(next.getThrowable());

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@
4141
*/
4242
public class OperationObserveOn {
4343

44-
public static <T> Func1<Observer<? super T>, Subscription> observeOn(Observable<T> source, Scheduler scheduler) {
44+
public static <T> Func1<Observer<? super T>, Subscription> observeOn(Observable<? extends T> source, Scheduler scheduler) {
4545
return new ObserveOn<T>(source, scheduler);
4646
}
4747

4848
private static class ObserveOn<T> implements Func1<Observer<? super T>, Subscription> {
49-
private final Observable<T> source;
49+
private final Observable<? extends T> source;
5050
private final Scheduler scheduler;
5151

52-
public ObserveOn(Observable<T> source, Scheduler scheduler) {
52+
public ObserveOn(Observable<? extends T> source, Scheduler scheduler) {
5353
this.source = source;
5454
this.scheduler = scheduler;
5555
}

0 commit comments

Comments
 (0)