Skip to content

Commit 53fb292

Browse files
committed
map Observable.onError(Throwable) to SendPort.close(Throwable) in
1 parent f4ea073 commit 53fb292

File tree

2 files changed

+10
-84
lines changed

2 files changed

+10
-84
lines changed

rxjava-contrib/rxjava-quasar/src/main/java/rx/quasar/ChannelObservable.java

Lines changed: 6 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,13 @@
1717
import co.paralleluniverse.fibers.SuspendExecution;
1818
import co.paralleluniverse.fibers.Suspendable;
1919
import co.paralleluniverse.strands.Strand;
20-
import co.paralleluniverse.strands.Timeout;
2120
import co.paralleluniverse.strands.channels.Channel;
2221
import co.paralleluniverse.strands.channels.Channels;
2322
import co.paralleluniverse.strands.channels.ReceivePort;
2423
import co.paralleluniverse.strands.channels.SendPort;
25-
import java.util.concurrent.TimeUnit;
2624
import rx.Observable;
2725
import rx.Observer;
2826
import rx.Scheduler;
29-
import rx.util.Exceptions;
30-
import rx.util.OnErrorNotImplementedException;
3127
import rx.util.functions.Action2;
3228
import rx.util.functions.Actions;
3329
import rx.util.functions.Func1;
@@ -109,7 +105,7 @@ public void onCompleted() {
109105

110106
@Override
111107
public void onError(Throwable e) {
112-
throw new OnErrorNotImplementedException(e);
108+
channel.close(e);
113109
}
114110
};
115111
}
@@ -124,7 +120,7 @@ public void onError(Throwable e) {
124120
* @return A new channel with the given buffer size and overflow policy that will receive all events emitted by the observable.
125121
*/
126122
public final static <T> ReceivePort<T> subscribe(int bufferSize, Channels.OverflowPolicy policy, Observable<T> o) {
127-
final ChannelWithErrors<T> channel = new ChannelWithErrors<T>(Channels.newChannel(bufferSize, policy));
123+
final Channel<T> channel = Channels.newChannel(bufferSize, policy);
128124

129125
System.out.println(Functions.fromFunc(new Func1<String, String>() {
130126

@@ -146,7 +142,7 @@ public void call(String t1, String t2) {
146142
@Suspendable
147143
public void onNext(T t) {
148144
try {
149-
channel.sendPort().send(t);
145+
channel.send(t);
150146
} catch (InterruptedException ex) {
151147
Strand.interrupted();
152148
} catch (SuspendExecution ex) {
@@ -156,85 +152,14 @@ public void onNext(T t) {
156152

157153
@Override
158154
public void onCompleted() {
159-
channel.sendPort().close();
155+
channel.close();
160156
}
161157

162158
@Override
163159
public void onError(Throwable e) {
164-
channel.error(e);
160+
channel.close(e);
165161
}
166162
});
167-
return channel.receivePort();
168-
}
169-
170-
private static class ChannelWithErrors<T> {
171-
private final Channel<Object> ch;
172-
173-
public ChannelWithErrors(Channel<Object> ch) {
174-
this.ch = ch;
175-
}
176-
177-
@Suspendable
178-
public void error(Throwable t) {
179-
try {
180-
ch.send(new ThrowableWrapper(t));
181-
ch.close();
182-
} catch (InterruptedException e) {
183-
} catch (SuspendExecution e) {
184-
throw new AssertionError(e);
185-
}
186-
}
187-
188-
public ReceivePort<T> receivePort() {
189-
return new ReceivePort<T>() {
190-
@Override
191-
public T receive() throws SuspendExecution, InterruptedException {
192-
return get(ch.receive());
193-
}
194-
195-
@Override
196-
public T receive(long timeout, TimeUnit unit) throws SuspendExecution, InterruptedException {
197-
return get(ch.receive(timeout, unit));
198-
}
199-
200-
@Override
201-
public T receive(Timeout timeout) throws SuspendExecution, InterruptedException {
202-
return get(ch.receive(timeout));
203-
}
204-
205-
@Override
206-
public T tryReceive() {
207-
return get(ch.tryReceive());
208-
}
209-
210-
@Override
211-
public void close() {
212-
ch.close();
213-
}
214-
215-
@Override
216-
public boolean isClosed() {
217-
return ch.isClosed();
218-
}
219-
};
220-
}
221-
222-
public SendPort<T> sendPort() {
223-
return (SendPort<T>) ch;
224-
}
225-
226-
private T get(Object m) {
227-
if (m instanceof ThrowableWrapper)
228-
throw Exceptions.propagate(((ThrowableWrapper) m).t);
229-
return (T) m;
230-
}
231-
232-
private static class ThrowableWrapper {
233-
final Throwable t;
234-
235-
public ThrowableWrapper(Throwable t) {
236-
this.t = t;
237-
}
238-
}
163+
return channel;
239164
}
240165
}

rxjava-contrib/rxjava-quasar/src/test/java/rx/quasar/ChannelObservableTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import co.paralleluniverse.strands.Strand;
2121
import co.paralleluniverse.strands.channels.Channel;
2222
import co.paralleluniverse.strands.channels.Channels;
23+
import co.paralleluniverse.strands.channels.ProducerException;
2324
import co.paralleluniverse.strands.channels.ReceivePort;
2425
import java.util.ArrayList;
2526
import java.util.Arrays;
@@ -117,10 +118,10 @@ public void testObserverChannelWithError() throws Exception {
117118
try {
118119
c.receive();
119120
fail();
120-
} catch(MyException e) {
121-
121+
} catch(ProducerException e) {
122+
assertThat(e.getCause(), instanceOf(MyException.class));
122123
}
123-
assertThat(c.receive(), is(nullValue()));
124+
assertThat(c.isClosed(), is(true));
124125
}
125126

126127
static class MyException extends RuntimeException {

0 commit comments

Comments
 (0)