Skip to content

Commit dd94e7b

Browse files
committed
Added ChannelObservable.subscribe
1 parent 6c7ac7f commit dd94e7b

File tree

6 files changed

+261
-6
lines changed

6 files changed

+261
-6
lines changed

rxjava-contrib/rxjava-quasar/README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,18 @@ and for Ivy:
2929
<dependency org="com.netflix.rxjava" name="rxjava-quasar" rev="x.y.z" />
3030
```
3131

32-
# Sample Usage
32+
# Usage
3333

34+
As always when using Quasar, the java agent has to be started by adding the following JVM option:
3435

36+
```
37+
-javaagent:path-to-quasar-jar.jar
38+
```
39+
40+
Alternatively, you can use AOT instrumentation (see [the Quasar documentation](http://docs.paralleluniverse.co/quasar/#instrumentation)).
41+
42+
Or, if you're running in Tomcat, you can use the Quasar class loader (see [the Comsat documentation](http://docs.paralleluniverse.co/comsat/#enabling-comsat)).
43+
44+
`Observer`, `Function` or `Action` method implementations can call fiber-blocking operations when using the `NewFiberScheduler` if the method is annotated with `@Suspendable`.
45+
(rx-core `Observer`s, `Function`s or `Action`s that manipulate, transform or delegate to other `Observer`s, `Function`s or `Action`s are automatically instrumented).
3546

rxjava-contrib/rxjava-quasar/src/main/java/rx/quasar/ChannelObservable.java

Lines changed: 157 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,24 @@
1717
import co.paralleluniverse.fibers.SuspendExecution;
1818
import co.paralleluniverse.fibers.Suspendable;
1919
import co.paralleluniverse.strands.Strand;
20+
import co.paralleluniverse.strands.Timeout;
21+
import co.paralleluniverse.strands.channels.Channel;
22+
import co.paralleluniverse.strands.channels.Channels;
2023
import co.paralleluniverse.strands.channels.ReceivePort;
2124
import co.paralleluniverse.strands.channels.SendPort;
25+
import java.util.concurrent.TimeUnit;
2226
import rx.Observable;
2327
import rx.Observer;
2428
import rx.Scheduler;
29+
import rx.util.Exceptions;
2530
import rx.util.OnErrorNotImplementedException;
2631

2732
/**
28-
*
33+
* This class contains static methods that connect {@link Observable}s and {@link Channel}s.
2934
*/
30-
public class ChannelObservable {
35+
public final class ChannelObservable {
36+
private ChannelObservable() {
37+
}
3138

3239
/**
3340
* Converts an {@link Iterable} sequence into an Observable that emits each message received on the channel.
@@ -102,4 +109,152 @@ public void onError(Throwable e) {
102109
}
103110
};
104111
}
112+
113+
/**
114+
* Creates a {@link ReceivePort} subscribed to an {@link Observable}.
115+
* <p>
116+
* @param <T> the type of messages emitted by the observable and received on the channel.
117+
* @param bufferSize the channel's buffer size
118+
* @param policy the channel's {@link Channels.OverflowPolicy OverflowPolicy}
119+
* @param o the observable
120+
* @return A new channel with the given buffer size and overflow policy that will receive all events emitted by the observable.
121+
*/
122+
public final static <T> ReceivePort<T> subscribe(int bufferSize, Channels.OverflowPolicy policy, Observable<T> o) {
123+
final ChannelWithErrors<T> channel = new ChannelWithErrors<T>(Channels.newChannel(bufferSize, policy));
124+
125+
o.subscribe(new Observer<T>() {
126+
@Override
127+
@Suspendable
128+
public void onNext(T t) {
129+
try {
130+
channel.sendPort().send(t);
131+
} catch (InterruptedException ex) {
132+
Strand.interrupted();
133+
} catch (SuspendExecution ex) {
134+
throw new AssertionError(ex);
135+
}
136+
}
137+
138+
@Override
139+
public void onCompleted() {
140+
channel.sendPort().close();
141+
}
142+
143+
@Override
144+
public void onError(Throwable e) {
145+
channel.error(e);
146+
}
147+
});
148+
return channel.receivePort();
149+
}
150+
151+
/**
152+
* Creates a {@link ReceivePort} subscribed to an {@link Observable}.
153+
* <p>
154+
* @param <T> the type of messages emitted by the observable and received on the channel.
155+
* @param bufferSize the channel's buffer size
156+
* @param policy the channel's {@link Channels.OverflowPolicy OverflowPolicy}
157+
* @param o the observable
158+
* @param scheduler the scheduler used to emit the observable's events
159+
* @return A new channel with the given buffer size and overflow policy that will receive all events emitted by the observable.
160+
*/
161+
public final static <T> ReceivePort<T> subscribe(int bufferSize, Channels.OverflowPolicy policy, Observable<T> o, Scheduler scheduler) {
162+
final ChannelWithErrors<T> channel = new ChannelWithErrors<T>(Channels.newChannel(bufferSize, policy));
163+
164+
o.subscribe(new Observer<T>() {
165+
@Override
166+
@Suspendable
167+
public void onNext(T t) {
168+
try {
169+
channel.sendPort().send(t);
170+
} catch (InterruptedException ex) {
171+
Strand.interrupted();
172+
} catch (SuspendExecution ex) {
173+
throw new AssertionError(ex);
174+
}
175+
}
176+
177+
@Override
178+
public void onCompleted() {
179+
channel.sendPort().close();
180+
}
181+
182+
@Override
183+
public void onError(Throwable e) {
184+
channel.error(e);
185+
}
186+
}, scheduler);
187+
return channel.receivePort();
188+
}
189+
190+
private static class ChannelWithErrors<T> {
191+
private final Channel<Object> ch;
192+
193+
public ChannelWithErrors(Channel<Object> ch) {
194+
this.ch = ch;
195+
}
196+
197+
@Suspendable
198+
public void error(Throwable t) {
199+
try {
200+
ch.send(new ThrowableWrapper(t));
201+
ch.close();
202+
} catch (InterruptedException e) {
203+
} catch (SuspendExecution e) {
204+
throw new AssertionError(e);
205+
}
206+
}
207+
208+
public ReceivePort<T> receivePort() {
209+
return new ReceivePort<T>() {
210+
@Override
211+
public T receive() throws SuspendExecution, InterruptedException {
212+
return get(ch.receive());
213+
}
214+
215+
@Override
216+
public T receive(long timeout, TimeUnit unit) throws SuspendExecution, InterruptedException {
217+
return get(ch.receive(timeout, unit));
218+
}
219+
220+
@Override
221+
public T receive(Timeout timeout) throws SuspendExecution, InterruptedException {
222+
return get(ch.receive(timeout));
223+
}
224+
225+
@Override
226+
public T tryReceive() {
227+
return get(ch.tryReceive());
228+
}
229+
230+
@Override
231+
public void close() {
232+
ch.close();
233+
}
234+
235+
@Override
236+
public boolean isClosed() {
237+
return ch.isClosed();
238+
}
239+
};
240+
}
241+
242+
public SendPort<T> sendPort() {
243+
return (SendPort<T>) ch;
244+
}
245+
246+
private T get(Object m) {
247+
if (m instanceof ThrowableWrapper)
248+
throw Exceptions.propagate(((ThrowableWrapper) m).t);
249+
return (T) m;
250+
}
251+
252+
private static class ThrowableWrapper {
253+
final Throwable t;
254+
255+
public ThrowableWrapper(Throwable t) {
256+
this.t = t;
257+
}
258+
}
259+
}
105260
}

rxjava-contrib/rxjava-quasar/src/main/java/rx/quasar/OnSubscribeFromChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void call(Subscriber<? super T> o) {
5050
break;
5151
} catch (Exception e) {
5252
o.onError(e);
53-
continue;
53+
return;
5454
}
5555

5656
o.onNext(m);
Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,25 @@
1-
rx.util.functions.Action1.call
21
rx.Observer.onNext
2+
rx.Observer.onError
3+
rx.Observer.onCompleted
4+
rx.util.functions.Action0.call
5+
rx.util.functions.Action1.call
6+
rx.util.functions.Action2.call
7+
rx.util.functions.Action3.call
8+
rx.util.functions.Action4.call
9+
rx.util.functions.Action5.call
10+
rx.util.functions.Action6.call
11+
rx.util.functions.Action7.call
12+
rx.util.functions.Action8.call
13+
rx.util.functions.Action9.call
14+
rx.util.functions.ActionN.call
15+
rx.util.functions.Func0.call
16+
rx.util.functions.Func1.call
17+
rx.util.functions.Func2.call
18+
rx.util.functions.Func3.call
19+
rx.util.functions.Func4.call
20+
rx.util.functions.Func5.call
21+
rx.util.functions.Func6.call
22+
rx.util.functions.Func7.call
23+
rx.util.functions.Func8.call
24+
rx.util.functions.Func9.call
25+
rx.util.functions.FuncN.call
Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,33 @@
1+
rx.Observer.onNext
2+
rx.Observer.onError
3+
rx.Observer.onCompleted
14
rx.Observable.subscribe
2-
rx.observers.SafeSubscriber.onNext
5+
rx.util.functions.Actions.Actions$1.call
6+
rx.util.functions.Actions.Actions$2.call
7+
rx.util.functions.Actions.Actions$3.call
8+
rx.util.functions.Actions.Actions$4.call
9+
rx.util.functions.Actions.Actions$5.call
10+
rx.util.functions.Actions.Actions$6.call
11+
rx.util.functions.Actions.Actions$7.call
12+
rx.util.functions.Actions.Actions$8.call
13+
rx.util.functions.Actions.Actions$9.call
14+
rx.util.functions.Actions.Actions$10.call
15+
rx.util.functions.Actions.Actions$11.call
16+
rx.util.functions.Actions.Actions$12.call
17+
rx.util.functions.Actions.Actions$13.call
18+
rx.util.functions.Actions.Actions$14.call
19+
rx.util.functions.Actions.Functions$1.call
20+
rx.util.functions.Actions.Functions$2.call
21+
rx.util.functions.Actions.Functions$3.call
22+
rx.util.functions.Actions.Functions$4.call
23+
rx.util.functions.Actions.Functions$5.call
24+
rx.util.functions.Actions.Functions$6.call
25+
rx.util.functions.Actions.Functions$7.call
26+
rx.util.functions.Actions.Functions$8.call
27+
rx.util.functions.Actions.Functions$9.call
28+
rx.util.functions.Actions.Functions$10.call
29+
rx.util.functions.Actions.Functions$11.call
30+
rx.util.functions.Actions.Functions$12.call
31+
rx.util.functions.Actions.Functions$13.call
32+
rx.util.functions.Actions.Functions$14.call
33+
rx.util.functions.Actions.Functions$15.call

rxjava-contrib/rxjava-quasar/src/test/java/rx/quasar/ChannelObservableTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import co.paralleluniverse.strands.Strand;
2121
import co.paralleluniverse.strands.channels.Channel;
2222
import co.paralleluniverse.strands.channels.Channels;
23+
import co.paralleluniverse.strands.channels.ReceivePort;
2324
import java.util.ArrayList;
2425
import java.util.Arrays;
2526
import java.util.Queue;
@@ -30,6 +31,7 @@
3031
import org.junit.Test;
3132
import rx.Observable;
3233
import rx.Observer;
34+
import rx.subjects.PublishSubject;
3335

3436
public class ChannelObservableTest {
3537
@Test
@@ -91,4 +93,37 @@ public void testObserverChannel() throws Exception {
9193
assertThat(c.receive(), equalTo("c"));
9294
assertThat(c.receive(), is(nullValue()));
9395
}
96+
97+
@Test
98+
public void testObserverChannel2() throws Exception {
99+
ReceivePort<String> c = ChannelObservable.subscribe(10, Channels.OverflowPolicy.BLOCK, Observable.from(Arrays.asList("a", "b", "c")));
100+
101+
assertThat(c.receive(), equalTo("a"));
102+
assertThat(c.receive(), equalTo("b"));
103+
assertThat(c.receive(), equalTo("c"));
104+
assertThat(c.receive(), is(nullValue()));
105+
}
106+
107+
@Test
108+
public void testObserverChannelWithError() throws Exception {
109+
PublishSubject<String> o = PublishSubject.create();
110+
ReceivePort<String> c = ChannelObservable.subscribe(10, Channels.OverflowPolicy.BLOCK, o);
111+
112+
o.onNext("a");
113+
o.onError(new MyException());
114+
o.onNext("c");
115+
116+
assertThat(c.receive(), equalTo("a"));
117+
try {
118+
c.receive();
119+
fail();
120+
} catch(MyException e) {
121+
122+
}
123+
assertThat(c.receive(), is(nullValue()));
124+
}
125+
126+
static class MyException extends RuntimeException {
127+
128+
}
94129
}

0 commit comments

Comments
 (0)