Skip to content

Commit 04f35cd

Browse files
author
jmhofer
committed
generalized BlockingObservable and the execution hook further
1 parent 1ca5900 commit 04f35cd

File tree

2 files changed

+19
-19
lines changed

2 files changed

+19
-19
lines changed

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private BlockingObservable() {
7575
/**
7676
* Convert an Observable into a BlockingObservable.
7777
*/
78-
public static <T> BlockingObservable<T> from(final Observable<T> o) {
78+
public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
7979
return new BlockingObservable<T>(new Func1<Observer<? super T>, Subscription>() {
8080

8181
@Override
@@ -97,7 +97,7 @@ public Subscription call(Observer<? super T> observer) {
9797
* the type of items emitted by the source {@link Observable}
9898
* @return an {@link Iterator} that can iterate over the items emitted by the {@link Observable}
9999
*/
100-
public static <T> Iterator<T> toIterator(Observable<T> source) {
100+
public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
101101
return OperationToIterator.toIterator(source);
102102
}
103103

@@ -110,7 +110,7 @@ public static <T> Iterator<T> toIterator(Observable<T> source) {
110110
* the source {@link Observable}
111111
* @return the last item emitted by the source {@link Observable}
112112
*/
113-
public static <T> T last(final Observable<T> source) {
113+
public static <T> T last(final Observable<? extends T> source) {
114114
return from(source).last();
115115
}
116116

@@ -126,7 +126,7 @@ public static <T> T last(final Observable<T> source) {
126126
* @return the last item emitted by the {@link Observable} for which the predicate function
127127
* returns <code>true</code>
128128
*/
129-
public static <T> T last(final Observable<T> source, final Func1<? super T, Boolean> predicate) {
129+
public static <T> T last(final Observable<? extends T> source, final Func1<? super T, Boolean> predicate) {
130130
return last(source.filter(predicate));
131131
}
132132

@@ -145,7 +145,7 @@ public static <T> T last(final Observable<T> source, final Func1<? super T, Bool
145145
* @return the last item emitted by an {@link Observable}, or the default value if no item is
146146
* emitted
147147
*/
148-
public static <T> T lastOrDefault(Observable<T> source, T defaultValue) {
148+
public static <T> T lastOrDefault(Observable<? extends T> source, T defaultValue) {
149149
return from(source).lastOrDefault(defaultValue);
150150
}
151151

@@ -166,7 +166,7 @@ public static <T> T lastOrDefault(Observable<T> source, T defaultValue) {
166166
* @return the last item emitted by an {@link Observable} that matches the predicate, or the
167167
* default value if no matching item is emitted
168168
*/
169-
public static <T> T lastOrDefault(Observable<T> source, T defaultValue, Func1<? super T, Boolean> predicate) {
169+
public static <T> T lastOrDefault(Observable<? extends T> source, T defaultValue, Func1<? super T, Boolean> predicate) {
170170
return lastOrDefault(source.filter(predicate), defaultValue);
171171
}
172172

@@ -186,7 +186,7 @@ public static <T> T lastOrDefault(Observable<T> source, T defaultValue, Func1<?
186186
* @return an {@link Iterable} that on each iteration returns the item that the
187187
* {@link Observable} has most recently emitted
188188
*/
189-
public static <T> Iterable<T> mostRecent(Observable<T> source, T initialValue) {
189+
public static <T> Iterable<T> mostRecent(Observable<? extends T> source, T initialValue) {
190190
return OperationMostRecent.mostRecent(source, initialValue);
191191
}
192192

@@ -203,12 +203,12 @@ public static <T> Iterable<T> mostRecent(Observable<T> source, T initialValue) {
203203
* @return an {@link Iterable} that blocks upon each iteration until the {@link Observable}
204204
* emits a new item, whereupon the Iterable returns that item
205205
*/
206-
public static <T> Iterable<T> next(Observable<T> items) {
206+
public static <T> Iterable<T> next(Observable<? extends T> items) {
207207
return OperationNext.next(items);
208208
}
209209

210-
private static <T> T _singleOrDefault(BlockingObservable<T> source, boolean hasDefault, T defaultValue) {
211-
Iterator<T> it = source.toIterable().iterator();
210+
private static <T> T _singleOrDefault(BlockingObservable<? extends T> source, boolean hasDefault, T defaultValue) {
211+
Iterator<? extends T> it = source.toIterable().iterator();
212212

213213
if (!it.hasNext()) {
214214
if (hasDefault) {
@@ -238,7 +238,7 @@ private static <T> T _singleOrDefault(BlockingObservable<T> source, boolean hasD
238238
* @throws IllegalStateException
239239
* if the {@link Observable} does not emit exactly one item
240240
*/
241-
public static <T> T single(Observable<T> source) {
241+
public static <T> T single(Observable<? extends T> source) {
242242
return from(source).single();
243243
}
244244

@@ -257,7 +257,7 @@ public static <T> T single(Observable<T> source) {
257257
* if the {@link Observable} does not emit exactly one item that matches the
258258
* predicate
259259
*/
260-
public static <T> T single(Observable<T> source, Func1<? super T, Boolean> predicate) {
260+
public static <T> T single(Observable<? extends T> source, Func1<? super T, Boolean> predicate) {
261261
return from(source).single(predicate);
262262
}
263263

@@ -274,7 +274,7 @@ public static <T> T single(Observable<T> source, Func1<? super T, Boolean> predi
274274
* @return the single item emitted by the source {@link Observable}, or a default value if no
275275
* value is emitted
276276
*/
277-
public static <T> T singleOrDefault(Observable<T> source, T defaultValue) {
277+
public static <T> T singleOrDefault(Observable<? extends T> source, T defaultValue) {
278278
return from(source).singleOrDefault(defaultValue);
279279
}
280280

@@ -293,7 +293,7 @@ public static <T> T singleOrDefault(Observable<T> source, T defaultValue) {
293293
* @return the single item emitted by the source {@link Observable} that matches the predicate,
294294
* or a default value if no such value is emitted
295295
*/
296-
public static <T> T singleOrDefault(Observable<T> source, T defaultValue, Func1<? super T, Boolean> predicate) {
296+
public static <T> T singleOrDefault(Observable<? extends T> source, T defaultValue, Func1<? super T, Boolean> predicate) {
297297
return from(source).singleOrDefault(defaultValue, predicate);
298298
}
299299

@@ -310,7 +310,7 @@ public static <T> T singleOrDefault(Observable<T> source, T defaultValue, Func1<
310310
* the source {@link Observable}
311311
* @return a Future that expects a single item to be emitted by the source {@link Observable}
312312
*/
313-
public static <T> Future<T> toFuture(final Observable<T> source) {
313+
public static <T> Future<T> toFuture(final Observable<? extends T> source) {
314314
return OperationToFuture.toFuture(source);
315315
}
316316

@@ -323,7 +323,7 @@ public static <T> Future<T> toFuture(final Observable<T> source) {
323323
* the source {@link Observable}
324324
* @return an {@link Iterable} version of the underlying {@link Observable}
325325
*/
326-
public static <T> Iterable<T> toIterable(final Observable<T> source) {
326+
public static <T> Iterable<T> toIterable(final Observable<? extends T> source) {
327327
return from(source).toIterable();
328328
}
329329

@@ -334,7 +334,7 @@ public static <T> Iterable<T> toIterable(final Observable<T> source) {
334334
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect
335335
* calls to user code from within an operator"
336336
*/
337-
private Subscription protectivelyWrapAndSubscribe(Observer<T> o) {
337+
private Subscription protectivelyWrapAndSubscribe(Observer<? super T> o) {
338338
SafeObservableSubscription subscription = new SafeObservableSubscription();
339339
return subscription.wrap(subscribe(new SafeObserver<T>(subscription, o)));
340340
}

rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public abstract class RxJavaObservableExecutionHook {
6363
* original {@link Subscription}
6464
* @return {@link Subscription} subscription that can be modified, decorated, replaced or just returned as a pass-thru.
6565
*/
66-
public <T> Subscription onSubscribeReturn(Observable<T> observableInstance, Subscription subscription) {
66+
public <T> Subscription onSubscribeReturn(Observable<? extends T> observableInstance, Subscription subscription) {
6767
// pass-thru by default
6868
return subscription;
6969
}
@@ -80,7 +80,7 @@ public <T> Subscription onSubscribeReturn(Observable<T> observableInstance, Subs
8080
* Throwable thrown by {@link Observable#subscribe(Observer)}
8181
* @return Throwable that can be decorated, replaced or just returned as a pass-thru.
8282
*/
83-
public <T> Throwable onSubscribeError(Observable<T> observableInstance, Throwable e) {
83+
public <T> Throwable onSubscribeError(Observable<? extends T> observableInstance, Throwable e) {
8484
// pass-thru by default
8585
return e;
8686
}

0 commit comments

Comments
 (0)