1818import java .util .concurrent .Future ;
1919import java .util .concurrent .TimeUnit ;
2020
21- import rx .Observable .OnSubscribeFunc ;
22- import rx .Observer ;
23- import rx .Subscription ;
21+ import rx .Observable .OnSubscribe ;
22+ import rx .Subscriber ;
23+ import rx .functions . Action0 ;
2424import rx .subscriptions .Subscriptions ;
2525
2626/**
3434 * This is blocking so the Subscription returned when calling
3535 * <code>Observable.unsafeSubscribe(Observer)</code> does nothing.
3636 */
37- public class OperationToObservableFuture {
38- /* package accessible for unit tests */ static class ToObservableFuture <T > implements OnSubscribeFunc <T > {
37+ public class OperatorToObservableFuture {
38+ /* package accessible for unit tests */ static class ToObservableFuture <T > implements OnSubscribe <T > {
3939 private final Future <? extends T > that ;
40- private final Long time ;
40+ private final long time ;
4141 private final TimeUnit unit ;
4242
4343 public ToObservableFuture (Future <? extends T > that ) {
4444 this .that = that ;
45- this .time = null ;
45+ this .time = 0 ;
4646 this .unit = null ;
4747 }
4848
@@ -53,29 +53,34 @@ public ToObservableFuture(Future<? extends T> that, long time, TimeUnit unit) {
5353 }
5454
5555 @ Override
56- public Subscription onSubscribe ( Observer <? super T > observer ) {
57- try {
58- T value = ( time == null ) ? ( T ) that . get () : ( T ) that . get ( time , unit );
59-
60- if (! that . isCancelled ()) {
61- observer . onNext ( value );
56+ public void call ( Subscriber <? super T > subscriber ) {
57+ subscriber . add ( Subscriptions . create ( new Action0 () {
58+ @ Override
59+ public void call () {
60+ // If the Future is already completed, "cancel" does nothing.
61+ that . cancel ( true );
6262 }
63- observer .onCompleted ();
63+ }));
64+ try {
65+ T value = (unit == null ) ? (T ) that .get () : (T ) that .get (time , unit );
66+ subscriber .onNext (value );
67+ subscriber .onCompleted ();
6468 } catch (Throwable e ) {
65- observer .onError (e );
69+ // If this Observable is unsubscribed, we will receive an CancellationException.
70+ // However, CancellationException will not be passed to the final Subscriber
71+ // since it's already subscribed.
72+ // If the Future is canceled in other place, CancellationException will be still
73+ // passed to the final Subscriber.
74+ subscriber .onError (e );
6675 }
67-
68- // the get() has already completed so there is no point in
69- // giving the user a way to cancel.
70- return Subscriptions .empty ();
7176 }
7277 }
7378
74- public static <T > OnSubscribeFunc <T > toObservableFuture (final Future <? extends T > that ) {
79+ public static <T > OnSubscribe <T > toObservableFuture (final Future <? extends T > that ) {
7580 return new ToObservableFuture <T >(that );
7681 }
7782
78- public static <T > OnSubscribeFunc <T > toObservableFuture (final Future <? extends T > that , long time , TimeUnit unit ) {
83+ public static <T > OnSubscribe <T > toObservableFuture (final Future <? extends T > that , long time , TimeUnit unit ) {
7984 return new ToObservableFuture <T >(that , time , unit );
8085 }
8186}
0 commit comments