Skip to content

Commit 538d245

Browse files
committed
Implement a cached thread scheduler using event loops
1 parent 566e892 commit 538d245

File tree

4 files changed

+243
-37
lines changed

4 files changed

+243
-37
lines changed
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.schedulers;
17+
18+
import rx.Scheduler;
19+
import rx.Subscription;
20+
import rx.functions.Action0;
21+
import rx.subscriptions.CompositeSubscription;
22+
import rx.subscriptions.Subscriptions;
23+
24+
import java.util.Iterator;
25+
import java.util.concurrent.*;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
29+
/* package */class CachedThreadScheduler extends Scheduler {
30+
private static final class CachedWorkerPool {
31+
final ThreadFactory factory = new ThreadFactory() {
32+
final AtomicInteger counter = new AtomicInteger();
33+
34+
@Override
35+
public Thread newThread(Runnable r) {
36+
Thread t = new Thread(r, "RxCachedThreadScheduler-" + counter.incrementAndGet());
37+
t.setDaemon(true);
38+
return t;
39+
}
40+
};
41+
42+
private final long keepAliveTime;
43+
private final ConcurrentLinkedQueue<PoolWorker> expiringQueue;
44+
private final ScheduledExecutorService evictExpiredWorkerExecutor;
45+
46+
CachedWorkerPool(long keepAliveTime, TimeUnit unit) {
47+
this.keepAliveTime = unit.toNanos(keepAliveTime);
48+
this.expiringQueue = new ConcurrentLinkedQueue<PoolWorker>();
49+
50+
evictExpiredWorkerExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
51+
final AtomicInteger counter = new AtomicInteger();
52+
53+
@Override
54+
public Thread newThread(Runnable r) {
55+
Thread t = new Thread(r, "RxCachedWorkerPoolEvictor-" + counter.incrementAndGet());
56+
t.setDaemon(true);
57+
return t;
58+
}
59+
});
60+
evictExpiredWorkerExecutor.scheduleWithFixedDelay(
61+
new Runnable() {
62+
@Override
63+
public void run() {
64+
evictExpiredWorkers();
65+
}
66+
}, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS
67+
);
68+
}
69+
70+
private static CachedWorkerPool INSTANCE = new CachedWorkerPool(
71+
60L, TimeUnit.SECONDS
72+
);
73+
74+
PoolWorker get() {
75+
while (!expiringQueue.isEmpty()) {
76+
PoolWorker poolWorker = expiringQueue.poll();
77+
if (poolWorker != null) {
78+
return poolWorker;
79+
}
80+
}
81+
82+
// No cached worker found, so create a new one.
83+
return new PoolWorker(factory);
84+
}
85+
86+
void release(PoolWorker poolWorker) {
87+
// Refresh expire time before putting worker back in pool
88+
poolWorker.setExpirationTime(now() + keepAliveTime);
89+
90+
expiringQueue.add(poolWorker);
91+
}
92+
93+
void evictExpiredWorkers() {
94+
if (!expiringQueue.isEmpty()) {
95+
long currentTimestamp = now();
96+
97+
Iterator<PoolWorker> poolWorkerIterator = expiringQueue.iterator();
98+
while (poolWorkerIterator.hasNext()) {
99+
PoolWorker poolWorker = poolWorkerIterator.next();
100+
if (poolWorker.getExpirationTime() <= currentTimestamp) {
101+
poolWorkerIterator.remove();
102+
poolWorker.unsubscribe();
103+
} else {
104+
// Queue is ordered with the worker that will expire first in the beginning, so when we
105+
// find a non-expired worker we can stop evicting.
106+
break;
107+
}
108+
}
109+
}
110+
}
111+
112+
long now() {
113+
return System.nanoTime();
114+
}
115+
}
116+
117+
@Override
118+
public Worker createWorker() {
119+
return new EventLoopWorker(CachedWorkerPool.INSTANCE.get());
120+
}
121+
122+
private static class EventLoopWorker extends Scheduler.Worker {
123+
private final CompositeSubscription innerSubscription = new CompositeSubscription();
124+
private final PoolWorker poolWorker;
125+
private final AtomicBoolean releasePoolWorkerOnce = new AtomicBoolean(false);
126+
127+
EventLoopWorker(PoolWorker poolWorker) {
128+
this.poolWorker = poolWorker;
129+
}
130+
131+
@Override
132+
public void unsubscribe() {
133+
if (releasePoolWorkerOnce.compareAndSet(false, true)) {
134+
// unsubscribe should be idempotent, so only do this once
135+
CachedWorkerPool.INSTANCE.release(poolWorker);
136+
}
137+
innerSubscription.unsubscribe();
138+
}
139+
140+
@Override
141+
public boolean isUnsubscribed() {
142+
return innerSubscription.isUnsubscribed();
143+
}
144+
145+
@Override
146+
public Subscription schedule(Action0 action) {
147+
return schedule(action, 0, null);
148+
}
149+
150+
@Override
151+
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
152+
if (innerSubscription.isUnsubscribed()) {
153+
// don't schedule, we are unsubscribed
154+
return Subscriptions.empty();
155+
}
156+
157+
NewThreadScheduler.NewThreadWorker.ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit);
158+
innerSubscription.add(s);
159+
s.addParent(innerSubscription);
160+
return s;
161+
}
162+
}
163+
164+
private static final class PoolWorker extends NewThreadScheduler.NewThreadWorker {
165+
private long expirationTime;
166+
167+
PoolWorker(ThreadFactory threadFactory) {
168+
super(threadFactory);
169+
this.expirationTime = 0L;
170+
}
171+
172+
public long getExpirationTime() {
173+
return expirationTime;
174+
}
175+
176+
public void setExpirationTime(long expirationTime) {
177+
this.expirationTime = expirationTime;
178+
}
179+
}
180+
}

rxjava-core/src/main/java/rx/schedulers/Schedulers.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
*/
1616
package rx.schedulers;
1717

18-
import java.util.concurrent.Executor;
19-
2018
import rx.Scheduler;
2119
import rx.plugins.RxJavaPlugins;
2220

21+
import java.util.concurrent.Executor;
22+
2323
/**
2424
* Static factory methods for creating Schedulers.
2525
*/
@@ -43,7 +43,7 @@ private Schedulers() {
4343
if (io != null) {
4444
ioScheduler = io;
4545
} else {
46-
ioScheduler = NewThreadScheduler.instance(); // defaults to new thread
46+
ioScheduler = new CachedThreadScheduler();
4747
}
4848

4949
Scheduler nt = RxJavaPlugins.getInstance().getDefaultSchedulers().getNewThreadScheduler();
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.schedulers;
18+
19+
import org.junit.Test;
20+
import rx.Observable;
21+
import rx.Scheduler;
22+
import rx.functions.Action1;
23+
import rx.functions.Func1;
24+
25+
import static org.junit.Assert.assertTrue;
26+
27+
public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {
28+
29+
@Override
30+
protected Scheduler getScheduler() {
31+
return Schedulers.io();
32+
}
33+
34+
/**
35+
* IO scheduler defaults to using CachedThreadScheduler
36+
*/
37+
@Test
38+
public final void testIOScheduler() {
39+
40+
Observable<Integer> o1 = Observable.from(1, 2, 3, 4, 5);
41+
Observable<Integer> o2 = Observable.from(6, 7, 8, 9, 10);
42+
Observable<String> o = Observable.merge(o1, o2).map(new Func1<Integer, String>() {
43+
44+
@Override
45+
public String call(Integer t) {
46+
assertTrue(Thread.currentThread().getName().startsWith("RxCachedThreadScheduler"));
47+
return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
48+
}
49+
});
50+
51+
o.subscribeOn(Schedulers.io()).toBlocking().forEach(new Action1<String>() {
52+
53+
@Override
54+
public void call(String t) {
55+
System.out.println("t: " + t);
56+
}
57+
});
58+
}
59+
60+
}

rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,46 +16,12 @@
1616

1717
package rx.schedulers;
1818

19-
import static org.junit.Assert.assertTrue;
20-
21-
import org.junit.Test;
22-
23-
import rx.Observable;
2419
import rx.Scheduler;
25-
import rx.functions.Action1;
26-
import rx.functions.Func1;
2720

2821
public class NewThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {
2922

3023
@Override
3124
protected Scheduler getScheduler() {
3225
return Schedulers.newThread();
3326
}
34-
35-
/**
36-
* IO scheduler defaults to using NewThreadScheduler
37-
*/
38-
@Test
39-
public final void testIOScheduler() {
40-
41-
Observable<Integer> o1 = Observable.<Integer> from(1, 2, 3, 4, 5);
42-
Observable<Integer> o2 = Observable.<Integer> from(6, 7, 8, 9, 10);
43-
Observable<String> o = Observable.<Integer> merge(o1, o2).map(new Func1<Integer, String>() {
44-
45-
@Override
46-
public String call(Integer t) {
47-
assertTrue(Thread.currentThread().getName().startsWith("RxNewThreadScheduler"));
48-
return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
49-
}
50-
});
51-
52-
o.subscribeOn(Schedulers.io()).toBlocking().forEach(new Action1<String>() {
53-
54-
@Override
55-
public void call(String t) {
56-
System.out.println("t: " + t);
57-
}
58-
});
59-
}
60-
6127
}

0 commit comments

Comments
 (0)