Skip to content

Commit 68652e9

Browse files
committed
Merge pull request #1559 from mattrjacobs/scheduler-plugin
More consistent hooks for scheduler plugins. This is a rework of #1514.
2 parents b58860b + 6f3f505 commit 68652e9

File tree

5 files changed

+138
-1
lines changed

5 files changed

+138
-1
lines changed

rxjava-core/src/main/java/rx/internal/schedulers/NewThreadWorker.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import rx.Scheduler;
1919
import rx.Subscription;
2020
import rx.functions.Action0;
21+
import rx.plugins.RxJavaPlugins;
22+
import rx.plugins.RxJavaSchedulersHook;
2123
import rx.subscriptions.Subscriptions;
2224

2325
import java.util.concurrent.*;
@@ -27,11 +29,13 @@
2729
*/
2830
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
2931
private final ScheduledExecutorService executor;
32+
private final RxJavaSchedulersHook schedulersHook;
3033
volatile boolean isUnsubscribed;
3134

3235
/* package */
3336
public NewThreadWorker(ThreadFactory threadFactory) {
3437
executor = Executors.newScheduledThreadPool(1, threadFactory);
38+
schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
3539
}
3640

3741
@Override
@@ -55,7 +59,8 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
5559
* @return
5660
*/
5761
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
58-
ScheduledAction run = new ScheduledAction(action);
62+
Action0 decoratedAction = schedulersHook.onSchedule(action);
63+
ScheduledAction run = new ScheduledAction(decoratedAction);
5964
Future<?> f;
6065
if (delayTime <= 0) {
6166
f = executor.submit(run);

rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulers.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
* See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins:
2424
* <a href="https://github.com/Netflix/RxJava/wiki/Plugins">https://github.com/Netflix/RxJava/wiki/Plugins</a>.
2525
*/
26+
@Deprecated
2627
public abstract class RxJavaDefaultSchedulers {
2728

2829
/**

rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulersDefault.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
*
2323
* @ExcludeFromJavadoc
2424
*/
25+
@Deprecated
2526
public class RxJavaDefaultSchedulersDefault extends RxJavaDefaultSchedulers {
2627

2728
private static RxJavaDefaultSchedulersDefault INSTANCE = new RxJavaDefaultSchedulersDefault();

rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ public class RxJavaPlugins {
3434

3535
private final AtomicReference<RxJavaErrorHandler> errorHandler = new AtomicReference<RxJavaErrorHandler>();
3636
private final AtomicReference<RxJavaObservableExecutionHook> observableExecutionHook = new AtomicReference<RxJavaObservableExecutionHook>();
37+
//deprecated
3738
private final AtomicReference<RxJavaDefaultSchedulers> schedulerOverrides = new AtomicReference<RxJavaDefaultSchedulers>();
39+
private final AtomicReference<RxJavaSchedulersHook> schedulersHook = new AtomicReference<RxJavaSchedulersHook>();
3840

3941
/**
4042
* Retrieves the single {@code RxJavaPlugins} instance.
@@ -52,6 +54,7 @@ public static RxJavaPlugins getInstance() {
5254
/* package accessible for unit tests */void reset() {
5355
INSTANCE.errorHandler.set(null);
5456
INSTANCE.observableExecutionHook.set(null);
57+
INSTANCE.schedulersHook.set(null);
5558
}
5659

5760
/**
@@ -207,4 +210,46 @@ public void registerDefaultSchedulers(RxJavaDefaultSchedulers impl) {
207210
throw new IllegalStateException("Another strategy was already registered: " + schedulerOverrides.get());
208211
}
209212
}
213+
214+
/**
215+
* Retrieves the instance of {@link RxJavaSchedulersHook} to use based on order of precedence as defined
216+
* in the {@link RxJavaPlugins} class header.
217+
* <p>
218+
* Override the default by calling {@link #registerSchedulersHook(RxJavaSchedulersHook)} or by setting
219+
* the property {@code rxjava.plugin.RxJavaSchedulersHook.implementation} with the full classname to
220+
* load.
221+
*
222+
* @return the {@link RxJavaSchedulersHook} implementation in use
223+
*/
224+
public RxJavaSchedulersHook getSchedulersHook() {
225+
if (schedulersHook.get() == null) {
226+
// check for an implementation from System.getProperty first
227+
Object impl = getPluginImplementationViaProperty(RxJavaSchedulersHook.class);
228+
if (impl == null) {
229+
// nothing set via properties so initialize with default
230+
schedulersHook.compareAndSet(null, RxJavaSchedulersHook.getDefaultInstance());
231+
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
232+
} else {
233+
// we received an implementation from the system property so use it
234+
schedulersHook.compareAndSet(null, (RxJavaSchedulersHook) impl);
235+
}
236+
}
237+
return schedulersHook.get();
238+
}
239+
240+
/**
241+
* Registers an {@link RxJavaSchedulersHook} implementation as a global override of any injected or
242+
* default implementations.
243+
*
244+
* @param impl
245+
* {@link RxJavaSchedulersHook} implementation
246+
* @throws IllegalStateException
247+
* if called more than once or after the default was initialized (if usage occurs before trying
248+
* to register)
249+
*/
250+
public void registerSchedulersHook(RxJavaSchedulersHook impl) {
251+
if (!schedulersHook.compareAndSet(null, impl)) {
252+
throw new IllegalStateException("Another strategy was already registered: " + schedulersHook.get());
253+
}
254+
}
210255
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.plugins;
18+
19+
import rx.Scheduler;
20+
import rx.functions.Action0;
21+
22+
/**
23+
* This plugin class provides 2 ways to customize {@link Scheduler} functionality
24+
* 1. You may redefine entire schedulers, if you so choose. To do so, override
25+
* the 3 methods that return Scheduler (io(), computation(), newThread()).
26+
* 2. You may wrap/decorate an {@link Action0}, before it is handed off to a Scheduler. The system-
27+
* supplied Schedulers (Schedulers.ioScheduler, Schedulers.computationScheduler,
28+
* Scheduler.newThreadScheduler) all use this hook, so it's a convenient way to
29+
* modify Scheduler functionality without redefining Schedulers wholesale.
30+
*
31+
* Also, when redefining Schedulers, you are free to use/not use the onSchedule decoration hook.
32+
* <p>
33+
* See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins:
34+
* <a href="https://github.com/Netflix/RxJava/wiki/Plugins">https://github.com/Netflix/RxJava/wiki/Plugins</a>.
35+
*/
36+
public class RxJavaSchedulersHook {
37+
38+
protected RxJavaSchedulersHook() {
39+
40+
}
41+
42+
private final static RxJavaSchedulersHook DEFAULT_INSTANCE = new RxJavaSchedulersHook();
43+
44+
/**
45+
* Scheduler to return from {@link rx.schedulers.Schedulers#computation()} or null if default should be
46+
* used.
47+
*
48+
* This instance should be or behave like a stateless singleton;
49+
*/
50+
public Scheduler getComputationScheduler() {
51+
return null;
52+
}
53+
54+
/**
55+
* Scheduler to return from {@link rx.schedulers.Schedulers#io()} or null if default should be used.
56+
*
57+
* This instance should be or behave like a stateless singleton;
58+
*/
59+
public Scheduler getIOScheduler() {
60+
return null;
61+
}
62+
63+
/**
64+
* Scheduler to return from {@link rx.schedulers.Schedulers#newThread()} or null if default should be used.
65+
*
66+
* This instance should be or behave like a stateless singleton;
67+
*/
68+
public Scheduler getNewThreadScheduler() {
69+
return null;
70+
}
71+
72+
/**
73+
* Invoked before the Action is handed over to the scheduler. Can be used for wrapping/decorating/logging.
74+
* The default is just a passthrough.
75+
* @param action action to schedule
76+
* @return wrapped action to schedule
77+
*/
78+
public Action0 onSchedule(Action0 action) {
79+
return action;
80+
}
81+
82+
public static RxJavaSchedulersHook getDefaultInstance() {
83+
return DEFAULT_INSTANCE;
84+
}
85+
}

0 commit comments

Comments
 (0)