Skip to content

Commit 3e7f71f

Browse files
Merge pull request #1736 from benjchristensen/issue-1702
Fix TrampolineScheduler NullPointerException
2 parents 651429a + 348ecce commit 3e7f71f

File tree

1 file changed

+7
-10
lines changed

1 file changed

+7
-10
lines changed

src/main/java/rx/schedulers/TrampolineScheduler.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,12 @@ public Worker createWorker() {
4545
/* package accessible for unit tests */TrampolineScheduler() {
4646
}
4747

48-
private static final ThreadLocal<PriorityQueue<TimedAction>> QUEUE = new ThreadLocal<PriorityQueue<TimedAction>>() {
49-
@Override
50-
protected PriorityQueue<TimedAction> initialValue() {
51-
return new PriorityQueue<TimedAction>();
52-
}
53-
};
54-
5548
volatile int counter;
5649
static final AtomicIntegerFieldUpdater<TrampolineScheduler> COUNTER_UPDATER = AtomicIntegerFieldUpdater.newUpdater(TrampolineScheduler.class, "counter");
5750

5851
private class InnerCurrentThreadScheduler extends Scheduler.Worker implements Subscription {
5952

53+
final PriorityQueue<TimedAction> queue = new PriorityQueue<TimedAction>();
6054
private final BooleanSubscription innerSubscription = new BooleanSubscription();
6155
private final AtomicInteger wip = new AtomicInteger();
6256

@@ -76,13 +70,16 @@ private Subscription enqueue(Action0 action, long execTime) {
7670
if (innerSubscription.isUnsubscribed()) {
7771
return Subscriptions.empty();
7872
}
79-
PriorityQueue<TimedAction> queue = QUEUE.get();
8073
final TimedAction timedAction = new TimedAction(action, execTime, COUNTER_UPDATER.incrementAndGet(TrampolineScheduler.this));
8174
queue.add(timedAction);
8275

8376
if (wip.getAndIncrement() == 0) {
8477
do {
85-
queue.poll().action.call();
78+
TimedAction polled = queue.poll();
79+
// check for null as it could have been unsubscribed and removed
80+
if (polled != null) {
81+
polled.action.call();
82+
}
8683
} while (wip.decrementAndGet() > 0);
8784
return Subscriptions.empty();
8885
} else {
@@ -91,7 +88,7 @@ private Subscription enqueue(Action0 action, long execTime) {
9188

9289
@Override
9390
public void call() {
94-
PriorityQueue<TimedAction> _q = QUEUE.get();
91+
PriorityQueue<TimedAction> _q = queue;
9592
if (_q != null) {
9693
_q.remove(timedAction);
9794
}

0 commit comments

Comments
 (0)