Skip to content

Commit 6bece88

Browse files
committed
Added ChannelObservable.get
1 parent 43719c7 commit 6bece88

File tree

2 files changed

+157
-9
lines changed

2 files changed

+157
-9
lines changed

rxjava-contrib/rxjava-quasar/src/main/java/rx/quasar/ChannelObservable.java

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,17 @@
1414
* limitations under the License.
1515
*/package rx.quasar;
1616

17+
import co.paralleluniverse.fibers.FiberAsync;
1718
import co.paralleluniverse.fibers.SuspendExecution;
1819
import co.paralleluniverse.fibers.Suspendable;
1920
import co.paralleluniverse.strands.Strand;
2021
import co.paralleluniverse.strands.channels.Channel;
2122
import co.paralleluniverse.strands.channels.Channels;
2223
import co.paralleluniverse.strands.channels.ReceivePort;
2324
import co.paralleluniverse.strands.channels.SendPort;
25+
import java.util.concurrent.ExecutionException;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.TimeoutException;
2428
import rx.Observable;
2529
import rx.Observer;
2630
import rx.Scheduler;
@@ -45,7 +49,7 @@ private ChannelObservable() {
4549
* @return an Observable that emits each message received on the source {@link ReceivePort}
4650
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-from">RxJava Wiki: from()</a>
4751
*/
48-
public final static <T> Observable<T> from(ReceivePort<T> channel) {
52+
public static <T> Observable<T> from(ReceivePort<T> channel) {
4953
return Observable.create(new OnSubscribeFromChannel<T>(channel));
5054
}
5155

@@ -67,7 +71,7 @@ public final static <T> Observable<T> from(ReceivePort<T> channel) {
6771
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-from">RxJava Wiki: from()</a>
6872
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212140.aspx">MSDN: Observable.ToObservable</a>
6973
*/
70-
public final static <T> Observable<T> from(ReceivePort<T> channel, Scheduler scheduler) {
74+
public static <T> Observable<T> from(ReceivePort<T> channel, Scheduler scheduler) {
7175
return Observable.create(new OnSubscribeFromChannel<T>(channel)).subscribeOn(scheduler);
7276
}
7377

@@ -79,7 +83,7 @@ public final static <T> Observable<T> from(ReceivePort<T> channel, Scheduler sch
7983
* @param channel the target {@link SendPort}
8084
* @return
8185
*/
82-
public final static <T> Observer<T> to(final SendPort<T> channel) {
86+
public static <T> Observer<T> to(final SendPort<T> channel) {
8387
return new Observer<T>() {
8488

8589
@Override
@@ -115,9 +119,9 @@ public void onError(Throwable e) {
115119
* @param o the observable
116120
* @return A new channel with the given buffer size and overflow policy that will receive all events emitted by the observable.
117121
*/
118-
public final static <T> ReceivePort<T> subscribe(int bufferSize, Channels.OverflowPolicy policy, Observable<T> o) {
122+
public static <T> ReceivePort<T> subscribe(int bufferSize, Channels.OverflowPolicy policy, Observable<T> o) {
119123
final Channel<T> channel = Channels.newChannel(bufferSize, policy);
120-
124+
121125
o.subscribe(new Observer<T>() {
122126
@Override
123127
@Suspendable
@@ -143,4 +147,72 @@ public void onError(Throwable e) {
143147
});
144148
return channel;
145149
}
150+
151+
/**
152+
* Takes an observable that generates <i>at most one value</i>, blocks until it completes and returns the result.
153+
* If the observable completes before a value has been emitted, this method returns {@code null}.
154+
* It the observable fails, this function throws an {@link ExecutionException} that wraps the observable's exception.
155+
*
156+
* @param o the observable
157+
* @return the observable's result, or {@code null} if the observable completes before a value is emitted.
158+
* @throws ExecutionException if the observable fails
159+
*/
160+
public static <T> T get(final Observable<T> o) throws ExecutionException, SuspendExecution, InterruptedException {
161+
return new AsyncObservable<T>(o).run();
162+
}
163+
164+
/**
165+
* Takes an observable that generates <i>at most one value</i>, blocks until it completes or the timeout expires, and returns the result.
166+
* If the observable completes before a value has been emitted, this method returns {@code null}.
167+
* It the observable fails, this function throws an {@link ExecutionException} that wraps the observable's exception.
168+
*
169+
* @param o the observable
170+
* @param timeout the maximum time this method will blcok
171+
* @param unit the timeout's time unit
172+
* @return the observable's result, or {@code null} if the observable completes before a value is emitted.
173+
* @throws ExecutionException if the observable fails
174+
* @throws TimeoutException if the timeout expires before the observable completes
175+
*/
176+
public static <T> T get(final Observable<T> o, long timeout, TimeUnit unit) throws ExecutionException, SuspendExecution, InterruptedException, TimeoutException {
177+
return new AsyncObservable<T>(o).run(timeout, unit);
178+
}
179+
180+
private static class AsyncObservable<T> extends FiberAsync<T, Void, ExecutionException> implements Observer<T> {
181+
private final Observable<T> o;
182+
183+
public AsyncObservable(Observable<T> o) {
184+
this.o = o;
185+
}
186+
187+
@Override
188+
protected Void requestAsync() {
189+
o.subscribe(this);
190+
return null;
191+
}
192+
193+
@Override
194+
public void onNext(T t) {
195+
if (isCompleted())
196+
throw new IllegalStateException("Operation already completed");
197+
asyncCompleted(t);
198+
}
199+
200+
@Override
201+
public void onError(Throwable e) {
202+
if (isCompleted())
203+
throw new IllegalStateException("Operation already completed");
204+
asyncFailed(e);
205+
}
206+
207+
@Override
208+
public void onCompleted() {
209+
if (!isCompleted())
210+
asyncCompleted(null);
211+
}
212+
213+
@Override
214+
protected ExecutionException wrapException(Throwable t) {
215+
return new ExecutionException(t);
216+
}
217+
}
146218
}

rxjava-contrib/rxjava-quasar/src/test/java/rx/quasar/ChannelObservableTest.java

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
*/
1616
package rx.quasar;
1717

18+
import co.paralleluniverse.fibers.Fiber;
1819
import co.paralleluniverse.fibers.SuspendExecution;
1920
import co.paralleluniverse.fibers.Suspendable;
2021
import co.paralleluniverse.strands.Strand;
22+
import co.paralleluniverse.strands.SuspendableCallable;
2123
import co.paralleluniverse.strands.channels.Channel;
2224
import co.paralleluniverse.strands.channels.Channels;
2325
import co.paralleluniverse.strands.channels.ProducerException;
@@ -26,6 +28,7 @@
2628
import java.util.Arrays;
2729
import java.util.Queue;
2830
import java.util.concurrent.ConcurrentLinkedQueue;
31+
import java.util.concurrent.ExecutionException;
2932
import java.util.concurrent.atomic.AtomicBoolean;
3033
import static org.hamcrest.CoreMatchers.*;
3134
import static org.junit.Assert.*;
@@ -113,18 +116,91 @@ public void testObserverChannelWithError() throws Exception {
113116
o.onNext("a");
114117
o.onError(new MyException());
115118
o.onNext("c");
116-
119+
117120
assertThat(c.receive(), equalTo("a"));
118121
try {
119122
c.receive();
120123
fail();
121-
} catch(ProducerException e) {
124+
} catch (ProducerException e) {
122125
assertThat(e.getCause(), instanceOf(MyException.class));
123126
}
124127
assertThat(c.isClosed(), is(true));
125128
}
126-
129+
130+
@Test
131+
public void whenGetThenBlockAndReturnResult() throws Exception {
132+
final PublishSubject<String> o = PublishSubject.create();
133+
134+
Fiber<String> f = new Fiber<String>(new SuspendableCallable<String>() {
135+
136+
@Override
137+
public String run() throws SuspendExecution, InterruptedException {
138+
try {
139+
return ChannelObservable.get(o);
140+
} catch (ExecutionException e) {
141+
throw new AssertionError();
142+
}
143+
}
144+
}).start();
145+
146+
Thread.sleep(100);
147+
148+
o.onNext("foo");
149+
o.onCompleted();
150+
151+
assertThat(f.get(), equalTo("foo"));
152+
}
153+
154+
@Test
155+
public void whenGetAndObservableFailsThenThrowExecutionException() throws Exception {
156+
final PublishSubject<String> o = PublishSubject.create();
157+
158+
Fiber<String> f = new Fiber<String>(new SuspendableCallable<String>() {
159+
160+
@Override
161+
public String run() throws SuspendExecution, InterruptedException {
162+
try {
163+
return ChannelObservable.get(o);
164+
} catch (ExecutionException e) {
165+
return e.getCause().getMessage();
166+
}
167+
}
168+
}).start();
169+
170+
Thread.sleep(100);
171+
172+
o.onError(new Exception("ohoh"));
173+
174+
assertThat(f.get(), equalTo("ohoh"));
175+
}
176+
177+
@Test
178+
public void whenGetAndObservableEmitsTwoValuesThenBlowup() throws Exception {
179+
final PublishSubject<String> o = PublishSubject.create();
180+
181+
Fiber<String> f = new Fiber<String>(new SuspendableCallable<String>() {
182+
183+
@Override
184+
public String run() throws SuspendExecution, InterruptedException {
185+
try {
186+
return ChannelObservable.get(o);
187+
} catch (ExecutionException e) {
188+
throw new AssertionError();
189+
}
190+
}
191+
}).start();
192+
193+
Thread.sleep(100);
194+
195+
o.onNext("foo");
196+
try {
197+
o.onNext("bar");
198+
fail();
199+
} catch (Exception e) {
200+
}
201+
}
202+
127203
static class MyException extends RuntimeException {
128-
204+
129205
}
130206
}

0 commit comments

Comments
 (0)