Skip to content

Commit 4b7772c

Browse files
Observable.toFuture
1 parent ff92254 commit 4b7772c

File tree

2 files changed

+208
-10
lines changed

2 files changed

+208
-10
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@
4242
import rx.operators.OperationConcat;
4343
import rx.operators.OperationDefer;
4444
import rx.operators.OperationDematerialize;
45-
import rx.operators.OperationGroupBy;
4645
import rx.operators.OperationFilter;
4746
import rx.operators.OperationFinally;
47+
import rx.operators.OperationGroupBy;
4848
import rx.operators.OperationMap;
4949
import rx.operators.OperationMaterialize;
5050
import rx.operators.OperationMerge;
@@ -64,6 +64,7 @@
6464
import rx.operators.OperationTakeLast;
6565
import rx.operators.OperationTakeUntil;
6666
import rx.operators.OperationTakeWhile;
67+
import rx.operators.OperationToFuture;
6768
import rx.operators.OperationToIterator;
6869
import rx.operators.OperationToObservableFuture;
6970
import rx.operators.OperationToObservableIterable;
@@ -590,9 +591,11 @@ public void call(Object args) {
590591

591592
/**
592593
* Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
593-
*
594-
* @param subject the subject to push source elements into.
595-
* @param <R> result type
594+
*
595+
* @param subject
596+
* the subject to push source elements into.
597+
* @param <R>
598+
* result type
596599
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
597600
*/
598601
public <R> ConnectableObservable<R> multicast(Subject<T, R> subject) {
@@ -2005,6 +2008,19 @@ public Boolean call(T t, Integer integer)
20052008
}));
20062009
}
20072010

2011+
/**
2012+
* Return a Future representing a single value of the Observable.
2013+
* <p>
2014+
* This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use <code>toList().toFuture()</code>.
2015+
*
2016+
* @param that
2017+
* the source Observable
2018+
* @returna Future that expects a single item emitted by the source Observable
2019+
*/
2020+
public static <T> Future<T> toFuture(final Observable<T> that) {
2021+
return OperationToFuture.toFuture(that);
2022+
}
2023+
20082024
/**
20092025
* Returns an Observable that emits a single item, a list composed of all the items emitted by
20102026
* the source Observable.
@@ -2088,11 +2104,15 @@ public static <T> Iterable<T> mostRecent(Observable<T> source, T initialValue) {
20882104

20892105
/**
20902106
* Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
2091-
*
2092-
* @param source the source sequence whose elements will be pushed into the specified subject.
2093-
* @param subject the subject to push source elements into.
2094-
* @param <T> source type
2095-
* @param <R> result type
2107+
*
2108+
* @param source
2109+
* the source sequence whose elements will be pushed into the specified subject.
2110+
* @param subject
2111+
* the subject to push source elements into.
2112+
* @param <T>
2113+
* source type
2114+
* @param <R>
2115+
* result type
20962116
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
20972117
*/
20982118
public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, final Subject<T, R> subject) {
@@ -2101,7 +2121,7 @@ public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, fi
21012121

21022122
/**
21032123
* Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence.
2104-
*
2124+
*
21052125
* @param that
21062126
* the source Observable
21072127
* @return The single element in the observable sequence.
@@ -3360,6 +3380,17 @@ public <E> Observable<T> takeUntil(Observable<E> other) {
33603380
return takeUntil(this, other);
33613381
}
33623382

3383+
/**
3384+
* Return a Future representing a single value of the Observable.
3385+
* <p>
3386+
* This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use <code>toList().toFuture()</code>.
3387+
*
3388+
* @returna Future that expects a single item emitted by the source Observable
3389+
*/
3390+
public Future<T> toFuture() {
3391+
return toFuture(this);
3392+
}
3393+
33633394
/**
33643395
* Returns an Observable that emits a single item, a list composed of all the items emitted by
33653396
* the source Observable.
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package rx.operators;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.util.List;
6+
import java.util.concurrent.CountDownLatch;
7+
import java.util.concurrent.ExecutionException;
8+
import java.util.concurrent.Future;
9+
import java.util.concurrent.TimeUnit;
10+
import java.util.concurrent.TimeoutException;
11+
import java.util.concurrent.atomic.AtomicReference;
12+
13+
import org.junit.Test;
14+
15+
import rx.Observable;
16+
import rx.Observer;
17+
import rx.Subscription;
18+
import rx.subscriptions.Subscriptions;
19+
import rx.util.functions.Func1;
20+
21+
/**
22+
* Convert an Observable into a Future.
23+
*/
24+
public class OperationToFuture {
25+
26+
/**
27+
* Returns a Future that expects a single item from the observable.
28+
*
29+
* @param that
30+
* an observable sequence to get a Future for.
31+
* @param <T>
32+
* the type of source.
33+
* @return the Future to retrieve a single elements from an Observable
34+
*/
35+
public static <T> Future<T> toFuture(Observable<T> that) {
36+
37+
final CountDownLatch finished = new CountDownLatch(1);
38+
final AtomicReference<T> value = new AtomicReference<T>();
39+
final AtomicReference<Exception> error = new AtomicReference<Exception>();
40+
41+
final Subscription s = that.subscribe(new Observer<T>() {
42+
43+
@Override
44+
public void onCompleted() {
45+
finished.countDown();
46+
}
47+
48+
@Override
49+
public void onError(Exception e) {
50+
error.compareAndSet(null, e);
51+
finished.countDown();
52+
}
53+
54+
@Override
55+
public void onNext(T v) {
56+
if (!value.compareAndSet(null, v)) {
57+
// this means we received more than one value and must fail as a Future can handle only a single value
58+
error.compareAndSet(null, new IllegalStateException("Observable.toFuture() only supports sequences with a single value. Use .toList().toFuture() if multiple values are expected."));
59+
finished.countDown();
60+
}
61+
}
62+
});
63+
64+
return new Future<T>() {
65+
66+
private volatile boolean cancelled = false;
67+
68+
@Override
69+
public boolean cancel(boolean mayInterruptIfRunning) {
70+
if (finished.getCount() > 0) {
71+
cancelled = true;
72+
s.unsubscribe();
73+
// release the latch (a race condition may have already released it by now)
74+
finished.countDown();
75+
return true;
76+
} else {
77+
// can't cancel
78+
return false;
79+
}
80+
}
81+
82+
@Override
83+
public boolean isCancelled() {
84+
return cancelled;
85+
}
86+
87+
@Override
88+
public boolean isDone() {
89+
return finished.getCount() == 0;
90+
}
91+
92+
@Override
93+
public T get() throws InterruptedException, ExecutionException {
94+
finished.await();
95+
return getValue();
96+
}
97+
98+
@Override
99+
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
100+
if (finished.await(timeout, unit)) {
101+
return getValue();
102+
} else {
103+
throw new TimeoutException("Timed out after " + unit.toMillis(timeout) + "ms waiting for underlying Observable.");
104+
}
105+
}
106+
107+
private T getValue() throws ExecutionException {
108+
if (error.get() != null) {
109+
throw new ExecutionException("Observable onError", error.get());
110+
} else {
111+
return value.get();
112+
}
113+
}
114+
115+
};
116+
117+
}
118+
119+
@Test
120+
public void testToFuture() throws InterruptedException, ExecutionException {
121+
Observable<String> obs = Observable.toObservable("one");
122+
Future<String> f = toFuture(obs);
123+
assertEquals("one", f.get());
124+
}
125+
126+
@Test
127+
public void testToFutureList() throws InterruptedException, ExecutionException {
128+
Observable<String> obs = Observable.toObservable("one", "two", "three");
129+
Future<List<String>> f = toFuture(obs.toList());
130+
assertEquals("one", f.get().get(0));
131+
assertEquals("two", f.get().get(1));
132+
assertEquals("three", f.get().get(2));
133+
}
134+
135+
@Test(expected = ExecutionException.class)
136+
public void testExceptionWithMoreThanOneElement() throws InterruptedException, ExecutionException {
137+
Observable<String> obs = Observable.toObservable("one", "two");
138+
Future<String> f = toFuture(obs);
139+
assertEquals("one", f.get());
140+
// we expect an exception since there are more than 1 element
141+
}
142+
143+
@Test
144+
public void testToFutureWithException() {
145+
Observable<String> obs = Observable.create(new Func1<Observer<String>, Subscription>() {
146+
147+
@Override
148+
public Subscription call(Observer<String> observer) {
149+
observer.onNext("one");
150+
observer.onError(new TestException());
151+
return Subscriptions.empty();
152+
}
153+
});
154+
155+
Future<String> f = toFuture(obs);
156+
try {
157+
f.get();
158+
fail("expected exception");
159+
} catch (Exception e) {
160+
assertEquals(TestException.class, e.getCause().getClass());
161+
}
162+
}
163+
164+
private static class TestException extends RuntimeException {
165+
private static final long serialVersionUID = 1L;
166+
}
167+
}

0 commit comments

Comments
 (0)