Skip to content

Commit 15a20e1

Browse files
Remove ExecutorScheduler - New ComputationScheduler
- see #711 and #713
1 parent c37fc2b commit 15a20e1

File tree

21 files changed

+199
-449
lines changed

21 files changed

+199
-449
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ExecutorScheduler.scala

Lines changed: 0 additions & 41 deletions
This file was deleted.

rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationDeferFutureTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public Observable<Integer> call() throws Exception {
5858
}
5959
};
6060

61-
Observable<Integer> result = Async.deferFuture(func, Schedulers.threadPoolForComputation());
61+
Observable<Integer> result = Async.deferFuture(func, Schedulers.computation());
6262

6363
final Observer<Integer> observer = mock(Observer.class);
6464

rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationForEachFutureTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void testSimple() {
5454

5555
try {
5656
Observable<Integer> source = Observable.from(1, 2, 3)
57-
.subscribeOn(Schedulers.threadPoolForComputation());
57+
.subscribeOn(Schedulers.computation());
5858

5959
final AtomicInteger sum = new AtomicInteger();
6060
Action1<Integer> add = new Action1<Integer>() {
@@ -93,7 +93,7 @@ public void testSimpleThrowing() {
9393

9494
try {
9595
Observable<Integer> source = Observable.<Integer>error(new CustomException())
96-
.subscribeOn(Schedulers.threadPoolForComputation());
96+
.subscribeOn(Schedulers.computation());
9797

9898
final AtomicInteger sum = new AtomicInteger();
9999
Action1<Integer> add = new Action1<Integer>() {
@@ -128,7 +128,7 @@ public void call(Integer t1) {
128128
@Test
129129
public void testSimpleScheduled() {
130130
Observable<Integer> source = Observable.from(1, 2, 3)
131-
.subscribeOn(Schedulers.threadPoolForComputation());
131+
.subscribeOn(Schedulers.computation());
132132

133133
final AtomicInteger sum = new AtomicInteger();
134134
Action1<Integer> add = new Action1<Integer>() {
@@ -158,7 +158,7 @@ public void call(Integer t1) {
158158
public void testSimpleScheduledThrowing() {
159159

160160
Observable<Integer> source = Observable.<Integer>error(new CustomException())
161-
.subscribeOn(Schedulers.threadPoolForComputation());
161+
.subscribeOn(Schedulers.computation());
162162

163163
final AtomicInteger sum = new AtomicInteger();
164164
Action1<Integer> add = new Action1<Integer>() {

rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationStartFutureTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public Integer call() throws Exception {
104104
}
105105
};
106106

107-
Observable<Integer> result = Async.startFuture(func, Schedulers.threadPoolForComputation());
107+
Observable<Integer> result = Async.startFuture(func, Schedulers.computation());
108108

109109
final Observer<Integer> observer = mock(Observer.class);
110110

rxjava-contrib/rxjava-computation-expressions/src/test/java/rx/operators/OperationConditionalsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ public void testWhileDoZeroTimes() {
442442

443443
@Test
444444
public void testWhileDoManyTimes() {
445-
Observable<Integer> source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.currentThread());
445+
Observable<Integer> source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.trampoline());
446446

447447
List<Integer> expected = new ArrayList<Integer>(numRecursion * 3);
448448
for (int i = 0; i < numRecursion; i++) {

rxjava-core/src/main/java/rx/operators/OperationBuffer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
197197
* the {@link Func1} object representing the specified buffer operation
198198
*/
199199
public static <T> OnSubscribeFunc<List<T>> buffer(Observable<T> source, long timespan, TimeUnit unit) {
200-
return buffer(source, timespan, unit, Schedulers.threadPoolForComputation());
200+
return buffer(source, timespan, unit, Schedulers.computation());
201201
}
202202

203203
/**
@@ -259,7 +259,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
259259
* the {@link Func1} object representing the specified buffer operation
260260
*/
261261
public static <T> OnSubscribeFunc<List<T>> buffer(Observable<T> source, long timespan, TimeUnit unit, int count) {
262-
return buffer(source, timespan, unit, count, Schedulers.threadPoolForComputation());
262+
return buffer(source, timespan, unit, count, Schedulers.computation());
263263
}
264264

265265
/**
@@ -325,7 +325,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
325325
* the {@link Func1} object representing the specified buffer operation
326326
*/
327327
public static <T> OnSubscribeFunc<List<T>> buffer(Observable<T> source, long timespan, long timeshift, TimeUnit unit) {
328-
return buffer(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation());
328+
return buffer(source, timespan, timeshift, unit, Schedulers.computation());
329329
}
330330

331331
/**

rxjava-core/src/main/java/rx/operators/OperationWindow.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
195195
* the {@link rx.functions.Func1} object representing the specified window operation
196196
*/
197197
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit) {
198-
return window(source, timespan, unit, Schedulers.threadPoolForComputation());
198+
return window(source, timespan, unit, Schedulers.computation());
199199
}
200200

201201
/**
@@ -255,7 +255,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
255255
* the {@link rx.functions.Func1} object representing the specified window operation
256256
*/
257257
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit, int count) {
258-
return window(source, timespan, unit, count, Schedulers.threadPoolForComputation());
258+
return window(source, timespan, unit, count, Schedulers.computation());
259259
}
260260

261261
/**
@@ -318,7 +318,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
318318
* the {@link rx.functions.Func1} object representing the specified window operation
319319
*/
320320
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, long timeshift, TimeUnit unit) {
321-
return window(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation());
321+
return window(source, timespan, timeshift, unit, Schedulers.computation());
322322
}
323323

324324
/**
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package rx.schedulers;
2+
3+
import java.util.concurrent.ThreadFactory;
4+
import java.util.concurrent.TimeUnit;
5+
import java.util.concurrent.atomic.AtomicInteger;
6+
7+
import rx.Scheduler;
8+
import rx.Subscription;
9+
import rx.functions.Action0;
10+
import rx.schedulers.NewThreadScheduler.OnActionComplete;
11+
import rx.subscriptions.CompositeSubscription;
12+
import rx.subscriptions.Subscriptions;
13+
14+
/* package */class ComputationScheduler extends Scheduler {
15+
16+
private static class ComputationSchedulerPool {
17+
final int cores = Runtime.getRuntime().availableProcessors();
18+
final ThreadFactory factory = new ThreadFactory() {
19+
final AtomicInteger counter = new AtomicInteger();
20+
21+
@Override
22+
public Thread newThread(Runnable r) {
23+
Thread t = new Thread(r, "RxComputationThreadPool-" + counter.incrementAndGet());
24+
t.setDaemon(true);
25+
return t;
26+
}
27+
};
28+
29+
final EventLoopScheduler[] eventLoops;
30+
31+
ComputationSchedulerPool() {
32+
// initialize event loops
33+
eventLoops = new EventLoopScheduler[cores];
34+
for (int i = 0; i < cores; i++) {
35+
eventLoops[i] = new EventLoopScheduler(factory);
36+
}
37+
}
38+
39+
private static ComputationSchedulerPool INSTANCE = new ComputationSchedulerPool();
40+
41+
long n = 0;
42+
43+
public EventLoopScheduler getEventLoop() {
44+
// round-robin selection (improvements to come)
45+
return eventLoops[(int) (n++ % cores)];
46+
}
47+
48+
}
49+
50+
@Override
51+
public Inner createInner() {
52+
return new EventLoop();
53+
}
54+
55+
private static class EventLoop extends Scheduler.Inner {
56+
private final CompositeSubscription innerSubscription = new CompositeSubscription();
57+
private final EventLoopScheduler pooledEventLoop;
58+
private final OnActionComplete onComplete;
59+
60+
EventLoop() {
61+
pooledEventLoop = ComputationSchedulerPool.INSTANCE.getEventLoop();
62+
onComplete = new OnActionComplete() {
63+
64+
@Override
65+
public void complete(Subscription s) {
66+
innerSubscription.remove(s);
67+
}
68+
69+
};
70+
}
71+
72+
@Override
73+
public void unsubscribe() {
74+
innerSubscription.unsubscribe();
75+
}
76+
77+
@Override
78+
public boolean isUnsubscribed() {
79+
return innerSubscription.isUnsubscribed();
80+
}
81+
82+
@Override
83+
public Subscription schedule(Action0 action) {
84+
if (innerSubscription.isUnsubscribed()) {
85+
// don't schedule, we are unsubscribed
86+
return Subscriptions.empty();
87+
}
88+
return pooledEventLoop.schedule(action, onComplete);
89+
}
90+
91+
@Override
92+
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
93+
if (innerSubscription.isUnsubscribed()) {
94+
// don't schedule, we are unsubscribed
95+
return Subscriptions.empty();
96+
}
97+
98+
return pooledEventLoop.schedule(action, delayTime, unit, onComplete);
99+
}
100+
101+
}
102+
103+
private static class EventLoopScheduler extends NewThreadScheduler.EventLoopScheduler {
104+
EventLoopScheduler(ThreadFactory threadFactory) {
105+
super(threadFactory);
106+
}
107+
}
108+
109+
}

0 commit comments

Comments
 (0)