Skip to content

Commit 6c7ac7f

Browse files
committed
tests pass
1 parent 0554dc0 commit 6c7ac7f

File tree

3 files changed

+34
-22
lines changed

3 files changed

+34
-22
lines changed

rxjava-contrib/rxjava-quasar/src/main/java/rx/quasar/ChannelObservable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import rx.Observable;
2323
import rx.Observer;
2424
import rx.Scheduler;
25+
import rx.util.OnErrorNotImplementedException;
2526

2627
/**
2728
*
@@ -97,6 +98,7 @@ public void onCompleted() {
9798

9899
@Override
99100
public void onError(Throwable e) {
101+
throw new OnErrorNotImplementedException(e);
100102
}
101103
};
102104
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
rx.Observable.subscribe
2+
rx.observers.SafeSubscriber.onNext

rxjava-contrib/rxjava-quasar/src/test/java/rx/quasar/ChannelObservableTest.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,44 +22,38 @@
2222
import co.paralleluniverse.strands.channels.Channels;
2323
import java.util.ArrayList;
2424
import java.util.Arrays;
25-
import java.util.Collection;
2625
import java.util.Queue;
2726
import java.util.concurrent.ConcurrentLinkedQueue;
2827
import java.util.concurrent.atomic.AtomicBoolean;
2928
import static org.hamcrest.CoreMatchers.*;
3029
import static org.junit.Assert.*;
3130
import org.junit.Test;
31+
import rx.Observable;
3232
import rx.Observer;
3333

3434
public class ChannelObservableTest {
35-
private static final int BUFFER_SIZE = 0;
36-
private static final Channels.OverflowPolicy OVERFLOW_POLICY = Channels.OverflowPolicy.BLOCK;
37-
3835
@Test
3936
public void testObservableFromChannel() throws Exception {
40-
final Channel<String> c = Channels.newChannel(BUFFER_SIZE, OVERFLOW_POLICY);
37+
final Channel<String> c = Channels.newChannel(0);
38+
39+
System.out.println("===== " + c);
4140

4241
final Queue<String> result = new ConcurrentLinkedQueue<String>();
4342
final AtomicBoolean completed = new AtomicBoolean();
44-
45-
ChannelObservable.from(c).subscribeOn(NewFiberScheduler.getDefaultInstance()).subscribe(new Observer<String>() {
43+
44+
ChannelObservable.from(c, NewFiberScheduler.getDefaultInstance()).subscribe(new Observer<String>() {
4645
@Override
4746
@Suspendable
4847
public void onNext(String t) {
49-
// try {
48+
try {
5049
System.out.println("GOT: " + t);
5150
assertTrue(Strand.isCurrentFiber());
52-
System.out.println("GOT2: " + t);
53-
//Strand.sleep(100);
54-
System.out.println("GOT3: " + t);
51+
Strand.sleep(100);
5552
result.add(t);
56-
System.out.println("GOT4: " + t);
57-
// } catch(InterruptedException e) {
58-
//
59-
// } catch (SuspendExecution e) {
60-
// System.err.println("WHA????? " + e);
61-
// throw new AssertionError(e);
62-
// }
53+
} catch (InterruptedException e) {
54+
} catch (SuspendExecution e) {
55+
throw new AssertionError(e);
56+
}
6357
}
6458

6559
@Override
@@ -69,18 +63,32 @@ public void onCompleted() {
6963

7064
@Override
7165
public void onError(Throwable e) {
72-
66+
7367
}
7468
});
75-
69+
7670
c.send("a");
7771
c.send("b");
7872
c.send("c");
7973
c.close();
80-
74+
8175
Thread.sleep(500);
82-
76+
8377
assertThat(new ArrayList<String>(result), equalTo(Arrays.asList("a", "b", "c")));
8478
assertThat(completed.get(), is(true));
8579
}
80+
81+
@Test
82+
public void testObserverChannel() throws Exception {
83+
final Channel<String> c = Channels.newChannel(10); // must use a buffer, otherwise will block on subscribe
84+
85+
System.out.println("===== " + c);
86+
87+
Observable.from(Arrays.asList("a", "b", "c")).subscribe(ChannelObservable.to(c));
88+
89+
assertThat(c.receive(), equalTo("a"));
90+
assertThat(c.receive(), equalTo("b"));
91+
assertThat(c.receive(), equalTo("c"));
92+
assertThat(c.receive(), is(nullValue()));
93+
}
8694
}

0 commit comments

Comments
 (0)