File tree Expand file tree Collapse file tree 2 files changed +41
-1
lines changed Expand file tree Collapse file tree 2 files changed +41
-1
lines changed Original file line number Diff line number Diff line change @@ -96,11 +96,20 @@ public Subscription schedule(Action0 action) {
9696 @ Override
9797 public void run () {
9898 do {
99+ if (tasks .isUnsubscribed ()) {
100+ queue .clear ();
101+ return ;
102+ }
103+
99104 ScheduledAction sa = queue .poll ();
105+ if (sa == null ) {
106+ return ;
107+ }
108+
100109 if (!sa .isUnsubscribed ()) {
101110 sa .run ();
102111 }
103- } while (wip .decrementAndGet () > 0 );
112+ } while (wip .decrementAndGet () != 0 );
104113 }
105114
106115 @ Override
@@ -170,6 +179,7 @@ public boolean isUnsubscribed() {
170179 @ Override
171180 public void unsubscribe () {
172181 tasks .unsubscribe ();
182+ queue .clear ();
173183 }
174184
175185 }
Original file line number Diff line number Diff line change 1818import static org .junit .Assert .*;
1919
2020import java .lang .management .*;
21+ import java .util .Queue ;
2122import java .util .concurrent .*;
2223import java .util .concurrent .atomic .AtomicInteger ;
2324
25+ import org .junit .Assert ;
2426import org .junit .Test ;
2527
2628import rx .*;
@@ -275,4 +277,32 @@ public void call() {
275277
276278 assertFalse (w .tasks .hasSubscriptions ());
277279 }
280+
281+ @ Test
282+ public void workerUnderConcurrentUnsubscribeShouldNotAllowLaterTasksToRunDueToUnsubscriptionRace () {
283+ Scheduler scheduler = Schedulers .from (Executors .newFixedThreadPool (1 ));
284+ for (int i = 0 ; i < 1000 ; i ++) {
285+ Worker worker = scheduler .createWorker ();
286+ final Queue <Integer > q = new ConcurrentLinkedQueue <Integer >();
287+ Action0 action1 = new Action0 () {
288+
289+ @ Override
290+ public void call () {
291+ q .add (1 );
292+ }};
293+ Action0 action2 = new Action0 () {
294+
295+ @ Override
296+ public void call () {
297+ q .add (2 );
298+ }};
299+ worker .schedule (action1 );
300+ worker .schedule (action2 );
301+ worker .unsubscribe ();
302+ if (q .size ()==1 && q .poll () == 2 ) {
303+ //expect a queue of 1,2 or 1. If queue is just 2 then we have a problem!
304+ Assert .fail ("wrong order on loop " + i );
305+ }
306+ }
307+ }
278308}
You can’t perform that action at this time.
0 commit comments