1717
1818import java .util .PriorityQueue ;
1919import java .util .concurrent .TimeUnit ;
20+ import java .util .concurrent .atomic .AtomicInteger ;
2021import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
2122
2223import rx .Scheduler ;
@@ -44,15 +45,20 @@ public Worker createWorker() {
4445 /* package accessible for unit tests */ TrampolineScheduler () {
4546 }
4647
47- private static final ThreadLocal <PriorityQueue <TimedAction >> QUEUE = new ThreadLocal <PriorityQueue <TimedAction >>();
48+ private static final ThreadLocal <PriorityQueue <TimedAction >> QUEUE = new ThreadLocal <PriorityQueue <TimedAction >>() {
49+ @ Override
50+ protected PriorityQueue <TimedAction > initialValue () {
51+ return new PriorityQueue <TimedAction >();
52+ }
53+ };
4854
4955 volatile int counter ;
50- static final AtomicIntegerFieldUpdater <TrampolineScheduler > COUNTER_UPDATER
51- = AtomicIntegerFieldUpdater .newUpdater (TrampolineScheduler .class , "counter" );
56+ static final AtomicIntegerFieldUpdater <TrampolineScheduler > COUNTER_UPDATER = AtomicIntegerFieldUpdater .newUpdater (TrampolineScheduler .class , "counter" );
5257
5358 private class InnerCurrentThreadScheduler extends Scheduler .Worker implements Subscription {
5459
5560 private final BooleanSubscription innerSubscription = new BooleanSubscription ();
61+ private final AtomicInteger wip = new AtomicInteger ();
5662
5763 @ Override
5864 public Subscription schedule (Action0 action ) {
@@ -71,24 +77,16 @@ private Subscription enqueue(Action0 action, long execTime) {
7177 return Subscriptions .empty ();
7278 }
7379 PriorityQueue <TimedAction > queue = QUEUE .get ();
74- boolean exec = queue == null ;
75-
76- if (exec ) {
77- queue = new PriorityQueue <TimedAction >();
78- QUEUE .set (queue );
79- }
80-
8180 final TimedAction timedAction = new TimedAction (action , execTime , COUNTER_UPDATER .incrementAndGet (TrampolineScheduler .this ));
8281 queue .add (timedAction );
8382
84- if (exec ) {
85- while (! queue . isEmpty ()) {
83+ if (wip . getAndIncrement () == 0 ) {
84+ do {
8685 queue .poll ().action .call ();
87- }
88-
89- QUEUE .set (null );
86+ } while (wip .decrementAndGet () > 0 );
9087 return Subscriptions .empty ();
9188 } else {
89+ // queue wasn't empty, a parent is already processing so we just add to the end of the queue
9290 return Subscriptions .create (new Action0 () {
9391
9492 @ Override
@@ -118,9 +116,9 @@ public boolean isUnsubscribed() {
118116 private static class TimedAction implements Comparable <TimedAction > {
119117 final Action0 action ;
120118 final Long execTime ;
121- final Integer count ; // In case if time between enqueueing took less than 1ms
119+ final int count ; // In case if time between enqueueing took less than 1ms
122120
123- private TimedAction (Action0 action , Long execTime , Integer count ) {
121+ private TimedAction (Action0 action , Long execTime , int count ) {
124122 this .action = action ;
125123 this .execTime = execTime ;
126124 this .count = count ;
@@ -130,10 +128,15 @@ private TimedAction(Action0 action, Long execTime, Integer count) {
130128 public int compareTo (TimedAction that ) {
131129 int result = execTime .compareTo (that .execTime );
132130 if (result == 0 ) {
133- return count . compareTo ( that .count );
131+ return compare ( count , that .count );
134132 }
135133 return result ;
136134 }
137135 }
136+
137+ // because I can't use Integer.compare from Java 7
138+ private static int compare (int x , int y ) {
139+ return (x < y ) ? -1 : ((x == y ) ? 0 : 1 );
140+ }
138141
139142}
0 commit comments