Skip to content

Commit 93d4e69

Browse files
Update Scheduler Tests for Recursion and Common Testing
1 parent 094788f commit 93d4e69

File tree

6 files changed

+331
-120
lines changed

6 files changed

+331
-120
lines changed
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
package rx.schedulers;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.util.concurrent.CountDownLatch;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
import org.junit.Test;
10+
11+
import rx.Observable;
12+
import rx.Observable.OnSubscribeFunc;
13+
import rx.Observer;
14+
import rx.Scheduler;
15+
import rx.Subscription;
16+
import rx.operators.SafeObservableSubscription;
17+
import rx.subscriptions.BooleanSubscription;
18+
import rx.subscriptions.CompositeSubscription;
19+
import rx.subscriptions.Subscriptions;
20+
import rx.util.functions.Action0;
21+
import rx.util.functions.Action1;
22+
import rx.util.functions.Func1;
23+
import rx.util.functions.Func2;
24+
25+
public abstract class AbstractSchedulerTests {
26+
27+
/**
28+
* The scheduler to test
29+
*
30+
* @return
31+
*/
32+
protected abstract Scheduler getScheduler();
33+
34+
@Test
35+
public void testUnsubscribeRecursiveScheduleWithStateAndFunc2() throws InterruptedException {
36+
final CountDownLatch latch = new CountDownLatch(1);
37+
final CountDownLatch unsubscribeLatch = new CountDownLatch(1);
38+
final AtomicInteger counter = new AtomicInteger();
39+
Subscription s = getScheduler().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
40+
41+
@Override
42+
public Subscription call(Scheduler innerScheduler, Long i) {
43+
i++;
44+
// System.out.println("i: " + i);
45+
if (i == 10) {
46+
latch.countDown();
47+
try {
48+
// wait for unsubscribe to finish so we are not racing it
49+
unsubscribeLatch.await();
50+
} catch (InterruptedException e) {
51+
// we expect the countDown if unsubscribe is not working
52+
// or to be interrupted if unsubscribe is successful since
53+
// the unsubscribe will interrupt it as it is calling Future.cancel(true)
54+
// so we will ignore the stacktrace
55+
}
56+
}
57+
58+
counter.incrementAndGet();
59+
return innerScheduler.schedule(i, this);
60+
}
61+
});
62+
63+
latch.await();
64+
s.unsubscribe();
65+
unsubscribeLatch.countDown();
66+
Thread.sleep(200); // let time pass to see if the scheduler is still doing work
67+
assertEquals(10, counter.get());
68+
}
69+
70+
/**
71+
* Bug report: https://github.com/Netflix/RxJava/issues/431
72+
*/
73+
@Test
74+
public void testUnSubscribeForScheduler() throws InterruptedException {
75+
76+
final AtomicInteger countReceived = new AtomicInteger();
77+
final AtomicInteger countGenerated = new AtomicInteger();
78+
final SafeObservableSubscription s = new SafeObservableSubscription();
79+
final CountDownLatch latch = new CountDownLatch(1);
80+
81+
s.wrap(Observable.interval(50, TimeUnit.MILLISECONDS)
82+
.map(new Func1<Long, Long>() {
83+
@Override
84+
public Long call(Long aLong) {
85+
countGenerated.incrementAndGet();
86+
return aLong;
87+
}
88+
})
89+
.subscribeOn(getScheduler())
90+
.observeOn(getScheduler())
91+
.subscribe(new Observer<Long>() {
92+
@Override
93+
public void onCompleted() {
94+
System.out.println("--- completed");
95+
}
96+
97+
@Override
98+
public void onError(Throwable e) {
99+
System.out.println("--- onError");
100+
}
101+
102+
@Override
103+
public void onNext(Long args) {
104+
if (countReceived.incrementAndGet() == 2) {
105+
s.unsubscribe();
106+
latch.countDown();
107+
}
108+
System.out.println("==> Received " + args);
109+
}
110+
}));
111+
112+
latch.await(1000, TimeUnit.MILLISECONDS);
113+
114+
System.out.println("----------- it thinks it is finished ------------------ ");
115+
Thread.sleep(100);
116+
117+
assertEquals(2, countGenerated.get());
118+
}
119+
120+
@Test
121+
public void scheduleMultipleTasksOnOuterForSequentialExecution() throws InterruptedException {
122+
final AtomicInteger countExecutions = new AtomicInteger();
123+
final CountDownLatch latch = new CountDownLatch(1);
124+
final SafeObservableSubscription outerSubscription = new SafeObservableSubscription();
125+
final Func2<Scheduler, Long, Subscription> fInner = new Func2<Scheduler, Long, Subscription>() {
126+
127+
@Override
128+
public Subscription call(Scheduler innerScheduler, Long i) {
129+
countExecutions.incrementAndGet();
130+
i++;
131+
System.out.println("i: " + i);
132+
if (i == 1000) {
133+
outerSubscription.unsubscribe();
134+
latch.countDown();
135+
}
136+
if (i < 10000) {
137+
return innerScheduler.schedule(i, this);
138+
} else {
139+
latch.countDown();
140+
return Subscriptions.empty();
141+
}
142+
}
143+
};
144+
145+
Func2<Scheduler, Long, Subscription> fOuter = new Func2<Scheduler, Long, Subscription>() {
146+
147+
@Override
148+
public Subscription call(Scheduler innerScheduler, Long i) {
149+
CompositeSubscription s = new CompositeSubscription();
150+
s.add(innerScheduler.schedule(i, fInner));
151+
s.add(innerScheduler.schedule(i, fInner));
152+
return s;
153+
}
154+
};
155+
156+
outerSubscription.wrap(getScheduler().schedule(0L, fOuter));
157+
latch.await();
158+
Thread.sleep(200); // let time pass to see if the scheduler is still doing work
159+
System.out.println("Count: " + countExecutions.get());
160+
// we unsubscribe on first to 1000 so we hit 1999 instead of 2000
161+
assertEquals(1999, countExecutions.get());
162+
}
163+
164+
@Test
165+
public void unsubscribeWithFastProducerWithSlowConsumerCausingQueuing() throws InterruptedException {
166+
final AtomicInteger countEmitted = new AtomicInteger();
167+
final AtomicInteger countTaken = new AtomicInteger();
168+
int value = Observable.create(new OnSubscribeFunc<Integer>() {
169+
170+
@Override
171+
public Subscription onSubscribe(final Observer<? super Integer> o) {
172+
final BooleanSubscription s = BooleanSubscription.create();
173+
Thread t = new Thread(new Runnable() {
174+
175+
@Override
176+
public void run() {
177+
int i = 1;
178+
while (!s.isUnsubscribed() && i <= 100) {
179+
System.out.println("onNext from fast producer: " + i);
180+
o.onNext(i++);
181+
}
182+
o.onCompleted();
183+
}
184+
});
185+
t.setDaemon(true);
186+
t.start();
187+
return s;
188+
}
189+
}).doOnNext(new Action1<Integer>() {
190+
191+
@Override
192+
public void call(Integer i) {
193+
countEmitted.incrementAndGet();
194+
}
195+
}).doOnCompleted(new Action0() {
196+
197+
@Override
198+
public void call() {
199+
System.out.println("-------- Done Emitting from Source ---------");
200+
}
201+
}).observeOn(getScheduler()).doOnNext(new Action1<Integer>() {
202+
203+
@Override
204+
public void call(Integer i) {
205+
System.out.println(">> onNext to slowConsumer pre-take: " + i);
206+
//force it to be slower than the producer
207+
try {
208+
Thread.sleep(10);
209+
} catch (InterruptedException e) {
210+
e.printStackTrace();
211+
}
212+
countTaken.incrementAndGet();
213+
}
214+
}).take(10).toBlockingObservable().last();
215+
216+
// they will all emit because the consumer is running slow
217+
assertEquals(100, countEmitted.get());
218+
// number received after take (but take will filter any extra)
219+
assertEquals(10, value);
220+
// so we also want to check the doOnNext after observeOn to see if it got unsubscribed
221+
Thread.sleep(200); // let time pass to see if the scheduler is still doing work
222+
// we expect only 10 to make it through the observeOn side
223+
assertEquals(10, countTaken.get());
224+
}
225+
226+
@Test(timeout = 8000)
227+
public void recursionUsingFunc2() throws InterruptedException {
228+
final CountDownLatch latch = new CountDownLatch(1);
229+
getScheduler().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
230+
231+
@Override
232+
public Subscription call(Scheduler innerScheduler, Long i) {
233+
i++;
234+
if (i % 100000 == 0) {
235+
System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory());
236+
}
237+
if (i < 5000000L) {
238+
return innerScheduler.schedule(i, this);
239+
} else {
240+
latch.countDown();
241+
return Subscriptions.empty();
242+
}
243+
}
244+
});
245+
246+
latch.await();
247+
}
248+
249+
@Test(timeout = 8000)
250+
public void recursionUsingAction0() throws InterruptedException {
251+
final CountDownLatch latch = new CountDownLatch(1);
252+
getScheduler().schedule(new Action1<Action0>() {
253+
254+
private long i = 0;
255+
256+
@Override
257+
public void call(Action0 self) {
258+
i++;
259+
if (i % 100000 == 0) {
260+
System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory());
261+
}
262+
if (i < 5000000L) {
263+
self.call();
264+
} else {
265+
latch.countDown();
266+
}
267+
}
268+
});
269+
270+
latch.await();
271+
}
272+
273+
}

rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,9 +22,15 @@
2222
import org.junit.Test;
2323
import org.mockito.InOrder;
2424

25+
import rx.Scheduler;
2526
import rx.util.functions.Action0;
2627

27-
public class CurrentThreadSchedulerTest {
28+
public class CurrentThreadSchedulerTest extends AbstractSchedulerTests {
29+
30+
@Override
31+
protected Scheduler getScheduler() {
32+
return CurrentThreadScheduler.getInstance();
33+
}
2834

2935
@Test
3036
public void testNestedActions() {

rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,13 @@
2828
import rx.util.functions.Action1;
2929
import rx.util.functions.Func2;
3030

31-
public class ExecutorSchedulerTests {
31+
public class ExecutorSchedulerTests extends AbstractSchedulerTests {
32+
33+
@Override
34+
protected Scheduler getScheduler() {
35+
// this is an implementation of ExecutorScheduler
36+
return Schedulers.threadPoolForComputation();
37+
}
3238

3339
@Test
3440
public void testThreadSafetyWhenSchedulerIsHoppingBetweenThreads() {

rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,9 +20,16 @@
2020
import org.junit.Test;
2121
import org.mockito.InOrder;
2222

23+
import rx.Scheduler;
2324
import rx.util.functions.Action0;
2425

25-
public class ImmediateSchedulerTest {
26+
public class ImmediateSchedulerTest extends AbstractSchedulerTests {
27+
28+
@Override
29+
protected Scheduler getScheduler() {
30+
return ImmediateScheduler.getInstance();
31+
}
32+
2633
@Test
2734
public void testNestedActions() {
2835
final ImmediateScheduler scheduler = new ImmediateScheduler();
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.schedulers;
18+
19+
import rx.Scheduler;
20+
21+
public class NewThreadSchedulerTest extends AbstractSchedulerTests {
22+
23+
@Override
24+
protected Scheduler getScheduler() {
25+
return NewThreadScheduler.getInstance();
26+
}
27+
28+
}

0 commit comments

Comments
 (0)