Skip to content

Commit 8d6a186

Browse files
committed
Avoid self-interrupting of a completing ScheduledAction.
1 parent 4439cc1 commit 8d6a186

File tree

3 files changed

+85
-4
lines changed

3 files changed

+85
-4
lines changed

src/main/java/rx/internal/schedulers/NewThreadWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
6767
} else {
6868
f = executor.schedule(run, delayTime, unit);
6969
}
70-
run.add(Subscriptions.from(f));
70+
run.add(f);
7171

7272
return run;
7373
}

src/main/java/rx/internal/schedulers/ScheduledAction.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
*/
1616
package rx.internal.schedulers;
1717

18+
import java.util.concurrent.Future;
1819
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
1920

2021
import rx.Subscription;
2122
import rx.exceptions.OnErrorNotImplementedException;
2223
import rx.functions.Action0;
2324
import rx.plugins.RxJavaPlugins;
2425
import rx.subscriptions.CompositeSubscription;
26+
import rx.subscriptions.Subscriptions;
2527

2628
/**
2729
* A {@code Runnable} that executes an {@code Action0} and can be cancelled. The analog is the
@@ -33,6 +35,8 @@ public final class ScheduledAction implements Runnable, Subscription {
3335
volatile int once;
3436
static final AtomicIntegerFieldUpdater<ScheduledAction> ONCE_UPDATER
3537
= AtomicIntegerFieldUpdater.newUpdater(ScheduledAction.class, "once");
38+
/** Set by the run() method to avoid self interrupting at the end of the run method. */
39+
volatile Thread runner;
3640

3741
public ScheduledAction(Action0 action) {
3842
this.action = action;
@@ -42,6 +46,7 @@ public ScheduledAction(Action0 action) {
4246
@Override
4347
public void run() {
4448
try {
49+
runner = Thread.currentThread();
4550
action.call();
4651
} catch (Throwable e) {
4752
// nothing to do but print a System error as this is fatal and there is nowhere else to throw this
@@ -72,15 +77,24 @@ public void unsubscribe() {
7277
}
7378

7479
/**
75-
* @warn javadoc missing
80+
* Adds a general Subscription to this {@code ScheduledAction} that will be unsubscribed
81+
* if the underlying {@code action} completes or the this scheduled action is cancelled.
7682
*
77-
* @param s
78-
* @warn param "s" undescribed
83+
* @param s the Subscription to add
7984
*/
8085
public void add(Subscription s) {
8186
cancel.add(s);
8287
}
8388

89+
/**
90+
* Adds the given Future to the unsubscription composite in order to support
91+
* cancelling the underlying task in the executor framework.
92+
* @param f the future to add
93+
*/
94+
public void add(final Future<?> f) {
95+
cancel.add(Subscriptions.create(new FutureCompleter(f)));
96+
}
97+
8498
/**
8599
* Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is
86100
* cancelled or terminates, it can remove itself from this parent.
@@ -92,10 +106,32 @@ public void addParent(CompositeSubscription parent) {
92106
cancel.add(new Remover(this, parent));
93107
}
94108

109+
/**
110+
* Cancels the captured future if the caller of the call method
111+
* is not the same as the runner of the outer ScheduledAction to
112+
* prevent unnecessary self-interrupting if the unsubscription
113+
* happens from the same thread.
114+
*/
115+
private final class FutureCompleter implements Action0 {
116+
private final Future<?> f;
117+
118+
private FutureCompleter(Future<?> f) {
119+
this.f = f;
120+
}
121+
122+
@Override
123+
public void call() {
124+
if (runner != Thread.currentThread()) {
125+
f.cancel(true);
126+
}
127+
}
128+
}
129+
95130
/** Remove a child subscription from a composite when unsubscribing. */
96131
private static final class Remover implements Subscription {
97132
final Subscription s;
98133
final CompositeSubscription parent;
134+
@SuppressWarnings("unused")
99135
volatile int once;
100136
static final AtomicIntegerFieldUpdater<Remover> ONCE_UPDATER
101137
= AtomicIntegerFieldUpdater.newUpdater(Remover.class, "once");

src/test/java/rx/schedulers/NewThreadSchedulerTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,18 @@
1616

1717
package rx.schedulers;
1818

19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
import java.util.concurrent.atomic.AtomicReference;
22+
23+
import junit.framework.Assert;
24+
1925
import org.junit.Test;
26+
2027
import rx.Scheduler;
28+
import rx.functions.Action0;
29+
import rx.internal.schedulers.ScheduledAction;
30+
import rx.subscriptions.Subscriptions;
2131

2232
public class NewThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {
2333

@@ -35,4 +45,39 @@ public final void testUnhandledErrorIsDeliveredToThreadHandler() throws Interrup
3545
public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
3646
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
3747
}
48+
@Test(timeout = 3000)
49+
public void testNoSelfInterrupt() throws InterruptedException {
50+
Scheduler.Worker worker = Schedulers.newThread().createWorker();
51+
52+
final CountDownLatch run = new CountDownLatch(1);
53+
final CountDownLatch done = new CountDownLatch(1);
54+
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
55+
final AtomicBoolean interruptFlag = new AtomicBoolean();
56+
57+
ScheduledAction sa = (ScheduledAction)worker.schedule(new Action0() {
58+
@Override
59+
public void call() {
60+
try {
61+
run.await();
62+
} catch (InterruptedException ex) {
63+
exception.set(ex);
64+
}
65+
}
66+
});
67+
68+
sa.add(Subscriptions.create(new Action0() {
69+
@Override
70+
public void call() {
71+
interruptFlag.set(Thread.currentThread().isInterrupted());
72+
done.countDown();
73+
}
74+
}));
75+
76+
run.countDown();
77+
78+
done.await();
79+
80+
Assert.assertEquals(null, exception.get());
81+
Assert.assertFalse("Interrupted?!", interruptFlag.get());
82+
}
3883
}

0 commit comments

Comments
 (0)