1616package rx .schedulers ;
1717
1818import rx .Scheduler ;
19- import rx .internal .schedulers .*;
19+ import rx .annotations .Experimental ;
20+ import rx .internal .schedulers .ExecutorScheduler ;
21+ import rx .internal .schedulers .GenericScheduledExecutorService ;
22+ import rx .internal .schedulers .SchedulerLifecycle ;
2023import rx .internal .util .RxRingBuffer ;
2124import rx .plugins .RxJavaPlugins ;
2225import rx .plugins .RxJavaSchedulersHook ;
2326
2427import java .util .concurrent .Executor ;
28+ import java .util .concurrent .atomic .AtomicReference ;
2529
2630/**
2731 * Static factory methods for creating Schedulers.
@@ -32,7 +36,22 @@ public final class Schedulers {
3236 private final Scheduler ioScheduler ;
3337 private final Scheduler newThreadScheduler ;
3438
35- private static final Schedulers INSTANCE = new Schedulers ();
39+ private static final AtomicReference <Schedulers > INSTANCE = new AtomicReference <Schedulers >();
40+
41+ private static Schedulers getInstance () {
42+ for (;;) {
43+ Schedulers current = INSTANCE .get ();
44+ if (current != null ) {
45+ return current ;
46+ }
47+ current = new Schedulers ();
48+ if (INSTANCE .compareAndSet (null , current )) {
49+ return current ;
50+ } else {
51+ shutdown ();
52+ }
53+ }
54+ }
3655
3756 private Schedulers () {
3857 RxJavaSchedulersHook hook = RxJavaPlugins .getInstance ().getSchedulersHook ();
@@ -86,7 +105,7 @@ public static Scheduler trampoline() {
86105 * @return a {@link Scheduler} that creates new threads
87106 */
88107 public static Scheduler newThread () {
89- return INSTANCE .newThreadScheduler ;
108+ return getInstance () .newThreadScheduler ;
90109 }
91110
92111 /**
@@ -101,7 +120,7 @@ public static Scheduler newThread() {
101120 * @return a {@link Scheduler} meant for computation-bound work
102121 */
103122 public static Scheduler computation () {
104- return INSTANCE .computationScheduler ;
123+ return getInstance () .computationScheduler ;
105124 }
106125
107126 /**
@@ -118,7 +137,7 @@ public static Scheduler computation() {
118137 * @return a {@link Scheduler} meant for IO-bound work
119138 */
120139 public static Scheduler io () {
121- return INSTANCE .ioScheduler ;
140+ return getInstance () .ioScheduler ;
122141 }
123142
124143 /**
@@ -141,13 +160,24 @@ public static TestScheduler test() {
141160 public static Scheduler from (Executor executor ) {
142161 return new ExecutorScheduler (executor );
143162 }
163+
164+ /**
165+ * Resets the current {@link Schedulers} instance.
166+ * This will re-init the cached schedulers on the next usage,
167+ * which can be useful in testing.
168+ */
169+ @ Experimental
170+ public static void reset () {
171+ shutdown ();
172+ INSTANCE .set (null );
173+ }
144174
145175 /**
146176 * Starts those standard Schedulers which support the SchedulerLifecycle interface.
147177 * <p>The operation is idempotent and threadsafe.
148178 */
149179 /* public test only */ static void start () {
150- Schedulers s = INSTANCE ;
180+ Schedulers s = getInstance () ;
151181 synchronized (s ) {
152182 if (s .computationScheduler instanceof SchedulerLifecycle ) {
153183 ((SchedulerLifecycle ) s .computationScheduler ).start ();
@@ -170,7 +200,7 @@ public static Scheduler from(Executor executor) {
170200 * <p>The operation is idempotent and threadsafe.
171201 */
172202 public static void shutdown () {
173- Schedulers s = INSTANCE ;
203+ Schedulers s = getInstance () ;
174204 synchronized (s ) {
175205 if (s .computationScheduler instanceof SchedulerLifecycle ) {
176206 ((SchedulerLifecycle ) s .computationScheduler ).shutdown ();
@@ -181,12 +211,12 @@ public static void shutdown() {
181211 if (s .newThreadScheduler instanceof SchedulerLifecycle ) {
182212 ((SchedulerLifecycle ) s .newThreadScheduler ).shutdown ();
183213 }
184-
214+
185215 GenericScheduledExecutorService .INSTANCE .shutdown ();
186-
216+
187217 RxRingBuffer .SPSC_POOL .shutdown ();
188-
218+
189219 RxRingBuffer .SPMC_POOL .shutdown ();
190220 }
191221 }
192- }
222+ }
0 commit comments