Skip to content

Commit 9f35594

Browse files
Scheduler Unit Tests
- passing for all but ExecutorScheduler
1 parent b1d9c1c commit 9f35594

File tree

6 files changed

+306
-331
lines changed

6 files changed

+306
-331
lines changed
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package rx.schedulers;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.util.concurrent.CountDownLatch;
6+
import java.util.concurrent.atomic.AtomicInteger;
7+
8+
import org.junit.Test;
9+
10+
import rx.Observable;
11+
import rx.Observable.OnSubscribeFunc;
12+
import rx.Observer;
13+
import rx.Scheduler;
14+
import rx.Subscription;
15+
import rx.operators.SafeObservableSubscription;
16+
import rx.subscriptions.BooleanSubscription;
17+
import rx.subscriptions.CompositeSubscription;
18+
import rx.subscriptions.Subscriptions;
19+
import rx.util.functions.Action0;
20+
import rx.util.functions.Action1;
21+
import rx.util.functions.Func2;
22+
23+
/**
24+
* Base tests for schedulers that involve threads (concurrency).
25+
*
26+
* These can only run on Schedulers that launch threads since they expect async/concurrent behavior.
27+
*
28+
* The Current/Immediate schedulers will not work with these tests.
29+
*/
30+
public abstract class AbstractSchedulerConcurrencyTests extends AbstractSchedulerTests {
31+
32+
@Test
33+
public void testUnsubscribeRecursiveScheduleWithStateAndFunc2() throws InterruptedException {
34+
final CountDownLatch latch = new CountDownLatch(1);
35+
final CountDownLatch unsubscribeLatch = new CountDownLatch(1);
36+
final AtomicInteger counter = new AtomicInteger();
37+
Subscription s = getScheduler().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
38+
39+
@Override
40+
public Subscription call(Scheduler innerScheduler, Long i) {
41+
i++;
42+
// System.out.println("i: " + i);
43+
if (i == 10) {
44+
latch.countDown();
45+
try {
46+
// wait for unsubscribe to finish so we are not racing it
47+
unsubscribeLatch.await();
48+
} catch (InterruptedException e) {
49+
// we expect the countDown if unsubscribe is not working
50+
// or to be interrupted if unsubscribe is successful since
51+
// the unsubscribe will interrupt it as it is calling Future.cancel(true)
52+
// so we will ignore the stacktrace
53+
}
54+
}
55+
56+
counter.incrementAndGet();
57+
return innerScheduler.schedule(i, this);
58+
}
59+
});
60+
61+
latch.await();
62+
s.unsubscribe();
63+
unsubscribeLatch.countDown();
64+
Thread.sleep(200); // let time pass to see if the scheduler is still doing work
65+
assertEquals(10, counter.get());
66+
}
67+
68+
@Test
69+
public void scheduleMultipleTasksOnOuterForSequentialExecution() throws InterruptedException {
70+
final AtomicInteger countExecutions = new AtomicInteger();
71+
final CountDownLatch latch = new CountDownLatch(1);
72+
final SafeObservableSubscription outerSubscription = new SafeObservableSubscription();
73+
final Func2<Scheduler, Long, Subscription> fInner = new Func2<Scheduler, Long, Subscription>() {
74+
75+
@Override
76+
public Subscription call(Scheduler innerScheduler, Long i) {
77+
countExecutions.incrementAndGet();
78+
i++;
79+
System.out.println("i: " + i);
80+
if (i == 1000) {
81+
outerSubscription.unsubscribe();
82+
latch.countDown();
83+
}
84+
if (i < 10000) {
85+
return innerScheduler.schedule(i, this);
86+
} else {
87+
latch.countDown();
88+
return Subscriptions.empty();
89+
}
90+
}
91+
};
92+
93+
Func2<Scheduler, Long, Subscription> fOuter = new Func2<Scheduler, Long, Subscription>() {
94+
95+
@Override
96+
public Subscription call(Scheduler innerScheduler, Long i) {
97+
CompositeSubscription s = new CompositeSubscription();
98+
s.add(innerScheduler.schedule(i, fInner));
99+
s.add(innerScheduler.schedule(i, fInner));
100+
return s;
101+
}
102+
};
103+
104+
outerSubscription.wrap(getScheduler().schedule(0L, fOuter));
105+
latch.await();
106+
Thread.sleep(200); // let time pass to see if the scheduler is still doing work
107+
System.out.println("Count: " + countExecutions.get());
108+
// we unsubscribe on first to 1000 so we hit 1999 instead of 2000
109+
assertEquals(1999, countExecutions.get());
110+
}
111+
112+
@Test(timeout = 8000)
113+
public void recursionUsingFunc2() throws InterruptedException {
114+
final CountDownLatch latch = new CountDownLatch(1);
115+
getScheduler().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
116+
117+
@Override
118+
public Subscription call(Scheduler innerScheduler, Long i) {
119+
i++;
120+
if (i % 100000 == 0) {
121+
System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory());
122+
}
123+
if (i < 5000000L) {
124+
return innerScheduler.schedule(i, this);
125+
} else {
126+
latch.countDown();
127+
return Subscriptions.empty();
128+
}
129+
}
130+
});
131+
132+
latch.await();
133+
}
134+
135+
@Test(timeout = 8000)
136+
public void recursionUsingAction0() throws InterruptedException {
137+
final CountDownLatch latch = new CountDownLatch(1);
138+
getScheduler().schedule(new Action1<Action0>() {
139+
140+
private long i = 0;
141+
142+
@Override
143+
public void call(Action0 self) {
144+
i++;
145+
if (i % 100000 == 0) {
146+
System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory());
147+
}
148+
if (i < 5000000L) {
149+
self.call();
150+
} else {
151+
latch.countDown();
152+
}
153+
}
154+
});
155+
156+
latch.await();
157+
}
158+
159+
}

0 commit comments

Comments
 (0)