Skip to content

Commit 7ec374c

Browse files
Merge pull request #619 from benjchristensen/executorSchedulerConcurrencyTests
UnitTest to assert thread safety of Executor Scheduler
2 parents cf494e8 + 3ec9ef3 commit 7ec374c

File tree

1 file changed

+86
-0
lines changed

1 file changed

+86
-0
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.schedulers;
17+
18+
import static org.junit.Assert.*;
19+
20+
import java.util.HashMap;
21+
import java.util.concurrent.CountDownLatch;
22+
23+
import org.junit.Test;
24+
25+
import rx.Scheduler;
26+
import rx.Subscription;
27+
import rx.util.functions.Action0;
28+
import rx.util.functions.Action1;
29+
import rx.util.functions.Func2;
30+
31+
public class ExecutorSchedulerTests {
32+
33+
@Test
34+
public void testThreadSafetyWhenSchedulerIsHoppingBetweenThreads() {
35+
36+
final int NUM = 1000000;
37+
final CountDownLatch latch = new CountDownLatch(1);
38+
HashMap<String, Integer> statefulMap = new HashMap<String, Integer>();
39+
Schedulers.threadPoolForComputation().schedule(statefulMap,
40+
new Func2<Scheduler, HashMap<String, Integer>, Subscription>() {
41+
42+
@Override
43+
public Subscription call(Scheduler innerScheduler, final HashMap<String, Integer> statefulMap) {
44+
return innerScheduler.schedule(new Action1<Action0>() {
45+
46+
int nonThreadSafeCounter = 0;
47+
48+
@Override
49+
public void call(Action0 self) {
50+
Integer i = statefulMap.get("a");
51+
if (i == null) {
52+
i = 1;
53+
statefulMap.put("a", i);
54+
statefulMap.put("b", i);
55+
} else {
56+
i++;
57+
statefulMap.put("a", i);
58+
statefulMap.put("b", i);
59+
}
60+
nonThreadSafeCounter++;
61+
statefulMap.put("nonThreadSafeCounter", nonThreadSafeCounter);
62+
if (i < NUM) {
63+
self.call();
64+
} else {
65+
latch.countDown();
66+
}
67+
}
68+
});
69+
}
70+
});
71+
72+
try {
73+
latch.await();
74+
} catch (InterruptedException e) {
75+
e.printStackTrace();
76+
}
77+
78+
System.out.println("Count A: " + statefulMap.get("a"));
79+
System.out.println("Count B: " + statefulMap.get("b"));
80+
System.out.println("nonThreadSafeCounter: " + statefulMap.get("nonThreadSafeCounter"));
81+
82+
assertEquals(NUM, statefulMap.get("a").intValue());
83+
assertEquals(NUM, statefulMap.get("b").intValue());
84+
assertEquals(NUM, statefulMap.get("nonThreadSafeCounter").intValue());
85+
}
86+
}

0 commit comments

Comments
 (0)