2222import io .reactivex .internal .disposables .*;
2323import io .reactivex .internal .functions .Functions ;
2424import io .reactivex .internal .queue .MpscLinkedQueue ;
25- import io .reactivex .internal .schedulers .ExecutorScheduler .ExecutorWorker .BooleanRunnable ;
25+ import io .reactivex .internal .schedulers .ExecutorScheduler .ExecutorWorker .* ;
2626import io .reactivex .plugins .RxJavaPlugins ;
2727import io .reactivex .schedulers .*;
2828
3131 */
3232public final class ExecutorScheduler extends Scheduler {
3333
34+ final boolean interruptibleWorker ;
35+
3436 @ NonNull
3537 final Executor executor ;
3638
3739 static final Scheduler HELPER = Schedulers .single ();
3840
39- public ExecutorScheduler (@ NonNull Executor executor ) {
41+ public ExecutorScheduler (@ NonNull Executor executor , boolean interruptibleWorker ) {
4042 this .executor = executor ;
43+ this .interruptibleWorker = interruptibleWorker ;
4144 }
4245
4346 @ NonNull
4447 @ Override
4548 public Worker createWorker () {
46- return new ExecutorWorker (executor );
49+ return new ExecutorWorker (executor , interruptibleWorker );
4750 }
4851
4952 @ NonNull
@@ -58,9 +61,15 @@ public Disposable scheduleDirect(@NonNull Runnable run) {
5861 return task ;
5962 }
6063
61- BooleanRunnable br = new BooleanRunnable (decoratedRun );
62- executor .execute (br );
63- return br ;
64+ if (interruptibleWorker ) {
65+ InterruptibleRunnable interruptibleTask = new InterruptibleRunnable (decoratedRun , null );
66+ executor .execute (interruptibleTask );
67+ return interruptibleTask ;
68+ } else {
69+ BooleanRunnable br = new BooleanRunnable (decoratedRun );
70+ executor .execute (br );
71+ return br ;
72+ }
6473 } catch (RejectedExecutionException ex ) {
6574 RxJavaPlugins .onError (ex );
6675 return EmptyDisposable .INSTANCE ;
@@ -111,6 +120,9 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
111120 }
112121 /* public: test support. */
113122 public static final class ExecutorWorker extends Scheduler .Worker implements Runnable {
123+
124+ final boolean interruptibleWorker ;
125+
114126 final Executor executor ;
115127
116128 final MpscLinkedQueue <Runnable > queue ;
@@ -121,9 +133,10 @@ public static final class ExecutorWorker extends Scheduler.Worker implements Run
121133
122134 final CompositeDisposable tasks = new CompositeDisposable ();
123135
124- public ExecutorWorker (Executor executor ) {
136+ public ExecutorWorker (Executor executor , boolean interruptibleWorker ) {
125137 this .executor = executor ;
126138 this .queue = new MpscLinkedQueue <Runnable >();
139+ this .interruptibleWorker = interruptibleWorker ;
127140 }
128141
129142 @ NonNull
@@ -134,9 +147,24 @@ public Disposable schedule(@NonNull Runnable run) {
134147 }
135148
136149 Runnable decoratedRun = RxJavaPlugins .onSchedule (run );
137- BooleanRunnable br = new BooleanRunnable (decoratedRun );
138150
139- queue .offer (br );
151+ Runnable task ;
152+ Disposable disposable ;
153+
154+ if (interruptibleWorker ) {
155+ InterruptibleRunnable interruptibleTask = new InterruptibleRunnable (decoratedRun , tasks );
156+ tasks .add (interruptibleTask );
157+
158+ task = interruptibleTask ;
159+ disposable = interruptibleTask ;
160+ } else {
161+ BooleanRunnable runnableTask = new BooleanRunnable (decoratedRun );
162+
163+ task = runnableTask ;
164+ disposable = runnableTask ;
165+ }
166+
167+ queue .offer (task );
140168
141169 if (wip .getAndIncrement () == 0 ) {
142170 try {
@@ -149,7 +177,7 @@ public Disposable schedule(@NonNull Runnable run) {
149177 }
150178 }
151179
152- return br ;
180+ return disposable ;
153181 }
154182
155183 @ NonNull
@@ -288,6 +316,97 @@ public void run() {
288316 mar .replace (schedule (decoratedRun ));
289317 }
290318 }
319+
320+ /**
321+ * Wrapper for a {@link Runnable} with additional logic for handling interruption on
322+ * a shared thread, similar to how Java Executors do it.
323+ */
324+ static final class InterruptibleRunnable extends AtomicInteger implements Runnable , Disposable {
325+
326+ private static final long serialVersionUID = -3603436687413320876L ;
327+
328+ final Runnable run ;
329+
330+ final DisposableContainer tasks ;
331+
332+ volatile Thread thread ;
333+
334+ static final int READY = 0 ;
335+
336+ static final int RUNNING = 1 ;
337+
338+ static final int FINISHED = 2 ;
339+
340+ static final int INTERRUPTING = 3 ;
341+
342+ static final int INTERRUPTED = 4 ;
343+
344+ InterruptibleRunnable (Runnable run , DisposableContainer tasks ) {
345+ this .run = run ;
346+ this .tasks = tasks ;
347+ }
348+
349+ @ Override
350+ public void run () {
351+ if (get () == READY ) {
352+ thread = Thread .currentThread ();
353+ if (compareAndSet (READY , RUNNING )) {
354+ try {
355+ run .run ();
356+ } finally {
357+ thread = null ;
358+ if (compareAndSet (RUNNING , FINISHED )) {
359+ cleanup ();
360+ } else {
361+ while (get () == INTERRUPTING ) {
362+ Thread .yield ();
363+ }
364+ Thread .interrupted ();
365+ }
366+ }
367+ } else {
368+ thread = null ;
369+ }
370+ }
371+ }
372+
373+ @ Override
374+ public void dispose () {
375+ for (;;) {
376+ int state = get ();
377+ if (state >= FINISHED ) {
378+ break ;
379+ } else if (state == READY ) {
380+ if (compareAndSet (READY , INTERRUPTED )) {
381+ cleanup ();
382+ break ;
383+ }
384+ } else {
385+ if (compareAndSet (RUNNING , INTERRUPTING )) {
386+ Thread t = thread ;
387+ if (t != null ) {
388+ t .interrupt ();
389+ thread = null ;
390+ }
391+ set (INTERRUPTED );
392+ cleanup ();
393+ break ;
394+ }
395+ }
396+ }
397+ }
398+
399+ void cleanup () {
400+ if (tasks != null ) {
401+ tasks .delete (this );
402+ }
403+ }
404+
405+ @ Override
406+ public boolean isDisposed () {
407+ return get () >= FINISHED ;
408+ }
409+ }
291410 }
292411
293412 static final class DelayedRunnable extends AtomicReference <Runnable >
0 commit comments