Skip to content

Commit b2697cb

Browse files
committed
Implement TrustedObservableTester.assertTrustedObservable()
1 parent 4919472 commit b2697cb

File tree

1 file changed

+253
-0
lines changed

1 file changed

+253
-0
lines changed
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
package rx.testing;
2+
3+
import org.junit.Test;
4+
import rx.Observable;
5+
import rx.Observer;
6+
import rx.Subscription;
7+
import rx.subscriptions.Subscriptions;
8+
import rx.util.functions.Func1;
9+
10+
import java.lang.Thread.UncaughtExceptionHandler;
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
import java.util.concurrent.atomic.AtomicBoolean;
14+
import java.util.concurrent.atomic.AtomicReference;
15+
16+
import static org.junit.Assert.assertEquals;
17+
import static org.junit.Assert.assertFalse;
18+
import static org.junit.Assert.assertNotNull;
19+
20+
public class TrustedObservableTester
21+
{
22+
private TrustedObservableTester() {}
23+
24+
public static <T> Func1<Observer<T>, Subscription> assertTrustedObservable(final Func1<Observer<T>, Subscription> source)
25+
{
26+
return new Func1<Observer<T>, Subscription>()
27+
{
28+
@Override
29+
public Subscription call(Observer<T> observer)
30+
{
31+
return source.call(new TestingObserver<T>(observer));
32+
}
33+
};
34+
}
35+
36+
public static class TestingObserver<T> implements Observer<T> {
37+
38+
private final Observer<T> actual;
39+
private final AtomicBoolean isFinished = new AtomicBoolean(false);
40+
private final AtomicBoolean isInCallback = new AtomicBoolean(false);
41+
42+
public TestingObserver(Observer<T> actual) {
43+
this.actual = actual;
44+
}
45+
46+
@Override
47+
public void onCompleted() {
48+
assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true));
49+
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
50+
actual.onCompleted();
51+
isInCallback.set(false);
52+
}
53+
54+
@Override
55+
public void onError(Exception e) {
56+
assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true));
57+
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
58+
actual.onError(e);
59+
isInCallback.set(false);
60+
}
61+
62+
@Override
63+
public void onNext(T args) {
64+
assertFalse("previous call to onCompleted() or onError()", isFinished.get());
65+
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
66+
actual.onNext(args);
67+
isInCallback.set(false);
68+
}
69+
70+
}
71+
72+
public static class UnitTest {
73+
@Test(expected = AssertionError.class)
74+
public void testDoubleCompleted() {
75+
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
76+
{
77+
@Override
78+
public Subscription call(Observer<String> observer)
79+
{
80+
observer.onCompleted();
81+
observer.onCompleted();
82+
return Subscriptions.empty();
83+
}
84+
})).lastOrDefault("end");
85+
86+
}
87+
88+
@Test(expected = AssertionError.class)
89+
public void testCompletedError() {
90+
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
91+
{
92+
@Override
93+
public Subscription call(Observer<String> observer)
94+
{
95+
observer.onCompleted();
96+
observer.onError(new Exception());
97+
return Subscriptions.empty();
98+
}
99+
})).lastOrDefault("end");
100+
}
101+
102+
@Test(expected = AssertionError.class)
103+
public void testCompletedNext() {
104+
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
105+
{
106+
@Override
107+
public Subscription call(Observer<String> observer)
108+
{
109+
observer.onCompleted();
110+
observer.onNext("one");
111+
return Subscriptions.empty();
112+
}
113+
})).lastOrDefault("end");
114+
}
115+
116+
@Test(expected = AssertionError.class)
117+
public void testErrorCompleted() {
118+
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
119+
{
120+
@Override
121+
public Subscription call(Observer<String> observer)
122+
{
123+
observer.onError(new Exception());
124+
observer.onCompleted();
125+
return Subscriptions.empty();
126+
}
127+
})).lastOrDefault("end");
128+
}
129+
130+
@Test(expected = AssertionError.class)
131+
public void testDoubleError() {
132+
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
133+
{
134+
@Override
135+
public Subscription call(Observer<String> observer)
136+
{
137+
observer.onError(new Exception());
138+
observer.onError(new Exception());
139+
return Subscriptions.empty();
140+
}
141+
})).lastOrDefault("end");
142+
}
143+
144+
145+
@Test(expected = AssertionError.class)
146+
public void testErrorNext() {
147+
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
148+
{
149+
@Override
150+
public Subscription call(Observer<String> observer)
151+
{
152+
observer.onError(new Exception());
153+
observer.onNext("one");
154+
return Subscriptions.empty();
155+
}
156+
})).lastOrDefault("end");
157+
}
158+
159+
@Test
160+
public void testNextCompleted() {
161+
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
162+
{
163+
@Override
164+
public Subscription call(Observer<String> observer)
165+
{
166+
observer.onNext("one");
167+
observer.onCompleted();
168+
return Subscriptions.empty();
169+
}
170+
})).lastOrDefault("end");
171+
}
172+
173+
@Test
174+
public void testConcurrentNextNext() {
175+
final List<Thread> threads = new ArrayList<Thread>();
176+
final AtomicReference<Throwable> threadFailure = new AtomicReference<Throwable>();
177+
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
178+
{
179+
@Override
180+
public Subscription call(final Observer<String> observer)
181+
{
182+
threads.add(new Thread(new Runnable()
183+
{
184+
@Override
185+
public void run()
186+
{
187+
observer.onNext("one");
188+
}
189+
}));
190+
threads.add(new Thread(new Runnable()
191+
{
192+
@Override
193+
public void run()
194+
{
195+
observer.onNext("two");
196+
}
197+
}));
198+
return Subscriptions.empty();
199+
}
200+
})).subscribe(new SlowObserver());
201+
for (Thread thread : threads) {
202+
thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler()
203+
{
204+
@Override
205+
public void uncaughtException(Thread thread, Throwable throwable)
206+
{
207+
threadFailure.set(throwable);
208+
}
209+
});
210+
thread.start();
211+
}
212+
for (Thread thread : threads) {
213+
try {
214+
thread.join();
215+
} catch (InterruptedException ignored) {
216+
}
217+
}
218+
// Junit seems pretty bad about exposing test failures inside of created threads.
219+
assertNotNull("exception thrown by thread", threadFailure.get());
220+
assertEquals("class of exception thrown by thread", AssertionError.class, threadFailure.get().getClass());
221+
}
222+
223+
private static class SlowObserver implements Observer<String>
224+
{
225+
@Override
226+
public void onCompleted()
227+
{
228+
try {
229+
Thread.sleep(10);
230+
} catch (InterruptedException ignored) {
231+
}
232+
}
233+
234+
@Override
235+
public void onError(Exception e)
236+
{
237+
try {
238+
Thread.sleep(10);
239+
} catch (InterruptedException ignored) {
240+
}
241+
}
242+
243+
@Override
244+
public void onNext(String args)
245+
{
246+
try {
247+
Thread.sleep(10);
248+
} catch (InterruptedException ignored) {
249+
}
250+
}
251+
}
252+
}
253+
}

0 commit comments

Comments
 (0)