Skip to content

Commit 47ea6c8

Browse files
Merge pull request #1276 from jbripley/cachedthreadscheduler2
Re-submission of CachedThreadScheduler
2 parents 60a7d82 + b6af321 commit 47ea6c8

File tree

4 files changed

+232
-37
lines changed

4 files changed

+232
-37
lines changed
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.schedulers;
17+
18+
import rx.Scheduler;
19+
import rx.Subscription;
20+
import rx.functions.Action0;
21+
import rx.subscriptions.CompositeSubscription;
22+
import rx.subscriptions.Subscriptions;
23+
24+
import java.util.Iterator;
25+
import java.util.concurrent.*;
26+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
27+
28+
/* package */final class CachedThreadScheduler extends Scheduler {
29+
private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler-";
30+
private static final NewThreadScheduler.RxThreadFactory WORKER_THREAD_FACTORY =
31+
new NewThreadScheduler.RxThreadFactory(WORKER_THREAD_NAME_PREFIX);
32+
33+
private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor-";
34+
private static final NewThreadScheduler.RxThreadFactory EVICTOR_THREAD_FACTORY =
35+
new NewThreadScheduler.RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX);
36+
37+
private static final class CachedWorkerPool {
38+
private final long keepAliveTime;
39+
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
40+
private final ScheduledExecutorService evictExpiredWorkerExecutor;
41+
42+
CachedWorkerPool(long keepAliveTime, TimeUnit unit) {
43+
this.keepAliveTime = unit.toNanos(keepAliveTime);
44+
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
45+
46+
evictExpiredWorkerExecutor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
47+
evictExpiredWorkerExecutor.scheduleWithFixedDelay(
48+
new Runnable() {
49+
@Override
50+
public void run() {
51+
evictExpiredWorkers();
52+
}
53+
}, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS
54+
);
55+
}
56+
57+
private static CachedWorkerPool INSTANCE = new CachedWorkerPool(
58+
60L, TimeUnit.SECONDS
59+
);
60+
61+
ThreadWorker get() {
62+
while (!expiringWorkerQueue.isEmpty()) {
63+
ThreadWorker threadWorker = expiringWorkerQueue.poll();
64+
if (threadWorker != null) {
65+
return threadWorker;
66+
}
67+
}
68+
69+
// No cached worker found, so create a new one.
70+
return new ThreadWorker(WORKER_THREAD_FACTORY);
71+
}
72+
73+
void release(ThreadWorker threadWorker) {
74+
// Refresh expire time before putting worker back in pool
75+
threadWorker.setExpirationTime(now() + keepAliveTime);
76+
77+
expiringWorkerQueue.offer(threadWorker);
78+
}
79+
80+
void evictExpiredWorkers() {
81+
if (!expiringWorkerQueue.isEmpty()) {
82+
long currentTimestamp = now();
83+
84+
Iterator<ThreadWorker> threadWorkerIterator = expiringWorkerQueue.iterator();
85+
while (threadWorkerIterator.hasNext()) {
86+
ThreadWorker threadWorker = threadWorkerIterator.next();
87+
if (threadWorker.getExpirationTime() <= currentTimestamp) {
88+
threadWorkerIterator.remove();
89+
threadWorker.unsubscribe();
90+
} else {
91+
// Queue is ordered with the worker that will expire first in the beginning, so when we
92+
// find a non-expired worker we can stop evicting.
93+
break;
94+
}
95+
}
96+
}
97+
}
98+
99+
long now() {
100+
return System.nanoTime();
101+
}
102+
}
103+
104+
@Override
105+
public Worker createWorker() {
106+
return new EventLoopWorker(CachedWorkerPool.INSTANCE.get());
107+
}
108+
109+
private static final class EventLoopWorker extends Scheduler.Worker {
110+
private final CompositeSubscription innerSubscription = new CompositeSubscription();
111+
private final ThreadWorker threadWorker;
112+
volatile int once;
113+
static final AtomicIntegerFieldUpdater<EventLoopWorker> ONCE_UPDATER
114+
= AtomicIntegerFieldUpdater.newUpdater(EventLoopWorker.class, "once");
115+
116+
EventLoopWorker(ThreadWorker threadWorker) {
117+
this.threadWorker = threadWorker;
118+
}
119+
120+
@Override
121+
public void unsubscribe() {
122+
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
123+
// unsubscribe should be idempotent, so only do this once
124+
CachedWorkerPool.INSTANCE.release(threadWorker);
125+
}
126+
innerSubscription.unsubscribe();
127+
}
128+
129+
@Override
130+
public boolean isUnsubscribed() {
131+
return innerSubscription.isUnsubscribed();
132+
}
133+
134+
@Override
135+
public Subscription schedule(Action0 action) {
136+
return schedule(action, 0, null);
137+
}
138+
139+
@Override
140+
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
141+
if (innerSubscription.isUnsubscribed()) {
142+
// don't schedule, we are unsubscribed
143+
return Subscriptions.empty();
144+
}
145+
146+
NewThreadScheduler.NewThreadWorker.ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
147+
innerSubscription.add(s);
148+
s.addParent(innerSubscription);
149+
return s;
150+
}
151+
}
152+
153+
private static final class ThreadWorker extends NewThreadScheduler.NewThreadWorker {
154+
private long expirationTime;
155+
156+
ThreadWorker(ThreadFactory threadFactory) {
157+
super(threadFactory);
158+
this.expirationTime = 0L;
159+
}
160+
161+
public long getExpirationTime() {
162+
return expirationTime;
163+
}
164+
165+
public void setExpirationTime(long expirationTime) {
166+
this.expirationTime = expirationTime;
167+
}
168+
}
169+
}

rxjava-core/src/main/java/rx/schedulers/Schedulers.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
*/
1616
package rx.schedulers;
1717

18-
import java.util.concurrent.Executor;
19-
2018
import rx.Scheduler;
2119
import rx.plugins.RxJavaPlugins;
2220

21+
import java.util.concurrent.Executor;
22+
2323
/**
2424
* Static factory methods for creating Schedulers.
2525
*/
@@ -43,7 +43,7 @@ private Schedulers() {
4343
if (io != null) {
4444
ioScheduler = io;
4545
} else {
46-
ioScheduler = NewThreadScheduler.instance(); // defaults to new thread
46+
ioScheduler = new CachedThreadScheduler();
4747
}
4848

4949
Scheduler nt = RxJavaPlugins.getInstance().getDefaultSchedulers().getNewThreadScheduler();
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.schedulers;
18+
19+
import org.junit.Test;
20+
import rx.Observable;
21+
import rx.Scheduler;
22+
import rx.functions.Action1;
23+
import rx.functions.Func1;
24+
25+
import static org.junit.Assert.assertTrue;
26+
27+
public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {
28+
29+
@Override
30+
protected Scheduler getScheduler() {
31+
return Schedulers.io();
32+
}
33+
34+
/**
35+
* IO scheduler defaults to using CachedThreadScheduler
36+
*/
37+
@Test
38+
public final void testIOScheduler() {
39+
40+
Observable<Integer> o1 = Observable.from(1, 2, 3, 4, 5);
41+
Observable<Integer> o2 = Observable.from(6, 7, 8, 9, 10);
42+
Observable<String> o = Observable.merge(o1, o2).map(new Func1<Integer, String>() {
43+
44+
@Override
45+
public String call(Integer t) {
46+
assertTrue(Thread.currentThread().getName().startsWith("RxCachedThreadScheduler"));
47+
return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
48+
}
49+
});
50+
51+
o.subscribeOn(Schedulers.io()).toBlocking().forEach(new Action1<String>() {
52+
53+
@Override
54+
public void call(String t) {
55+
System.out.println("t: " + t);
56+
}
57+
});
58+
}
59+
60+
}

rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,46 +16,12 @@
1616

1717
package rx.schedulers;
1818

19-
import static org.junit.Assert.assertTrue;
20-
21-
import org.junit.Test;
22-
23-
import rx.Observable;
2419
import rx.Scheduler;
25-
import rx.functions.Action1;
26-
import rx.functions.Func1;
2720

2821
public class NewThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {
2922

3023
@Override
3124
protected Scheduler getScheduler() {
3225
return Schedulers.newThread();
3326
}
34-
35-
/**
36-
* IO scheduler defaults to using NewThreadScheduler
37-
*/
38-
@Test
39-
public final void testIOScheduler() {
40-
41-
Observable<Integer> o1 = Observable.<Integer> from(1, 2, 3, 4, 5);
42-
Observable<Integer> o2 = Observable.<Integer> from(6, 7, 8, 9, 10);
43-
Observable<String> o = Observable.<Integer> merge(o1, o2).map(new Func1<Integer, String>() {
44-
45-
@Override
46-
public String call(Integer t) {
47-
assertTrue(Thread.currentThread().getName().startsWith("RxNewThreadScheduler"));
48-
return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
49-
}
50-
});
51-
52-
o.subscribeOn(Schedulers.io()).toBlocking().forEach(new Action1<String>() {
53-
54-
@Override
55-
public void call(String t) {
56-
System.out.println("t: " + t);
57-
}
58-
});
59-
}
60-
6127
}

0 commit comments

Comments
 (0)