Skip to content

Commit 1ea15ba

Browse files
Fix ObserveOn, NewThreadScheduler and ScheduledObserver bugs
@headinthebox and I were working on some code and found differences in behavior between Rx.Net and RxJava with observeOn. This commit should fix that.
1 parent 2878c2a commit 1ea15ba

File tree

8 files changed

+380
-84
lines changed

8 files changed

+380
-84
lines changed

rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java

Lines changed: 64 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
*/
1616
package rx.concurrency;
1717

18+
import java.util.concurrent.ExecutorService;
19+
import java.util.concurrent.Executors;
1820
import java.util.concurrent.ScheduledFuture;
21+
import java.util.concurrent.ThreadFactory;
1922
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicLong;
2024

2125
import rx.Scheduler;
2226
import rx.Subscription;
23-
import rx.operators.SafeObservableSubscription;
2427
import rx.subscriptions.CompositeSubscription;
2528
import rx.subscriptions.Subscriptions;
2629
import rx.util.functions.Func2;
@@ -29,27 +32,74 @@
2932
* Schedules work on a new thread.
3033
*/
3134
public class NewThreadScheduler extends Scheduler {
32-
private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
35+
36+
private final static NewThreadScheduler INSTANCE = new NewThreadScheduler();
37+
private final static AtomicLong count = new AtomicLong();
3338

3439
public static NewThreadScheduler getInstance() {
3540
return INSTANCE;
3641
}
3742

38-
@Override
39-
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
40-
final SafeObservableSubscription subscription = new SafeObservableSubscription();
41-
final Scheduler _scheduler = this;
43+
private NewThreadScheduler() {
4244

43-
Thread t = new Thread(new Runnable() {
44-
@Override
45-
public void run() {
46-
subscription.wrap(action.call(_scheduler, state));
47-
}
48-
}, "RxNewThreadScheduler");
45+
}
4946

50-
t.start();
47+
private static class EventLoopScheduler extends Scheduler {
48+
private final ExecutorService executor;
5149

52-
return subscription;
50+
private EventLoopScheduler() {
51+
executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
52+
53+
@Override
54+
public Thread newThread(Runnable r) {
55+
return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet());
56+
}
57+
});
58+
}
59+
60+
@Override
61+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
62+
final Scheduler _scheduler = this;
63+
return Subscriptions.from(executor.submit(new Runnable() {
64+
65+
@Override
66+
public void run() {
67+
action.call(_scheduler, state);
68+
}
69+
}));
70+
}
71+
72+
@Override
73+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, final long delayTime, final TimeUnit unit) {
74+
// we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep
75+
// we will instead schedule the event then launch the thread after the delay has passed
76+
final Scheduler _scheduler = this;
77+
final CompositeSubscription subscription = new CompositeSubscription();
78+
ScheduledFuture<?> f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() {
79+
80+
@Override
81+
public void run() {
82+
if (!subscription.isUnsubscribed()) {
83+
// when the delay has passed we now do the work on the actual scheduler
84+
Subscription s = _scheduler.schedule(state, action);
85+
// add the subscription to the CompositeSubscription so it is unsubscribed
86+
subscription.add(s);
87+
}
88+
}
89+
}, delayTime, unit);
90+
91+
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
92+
subscription.add(Subscriptions.create(f));
93+
94+
return subscription;
95+
}
96+
97+
}
98+
99+
@Override
100+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
101+
EventLoopScheduler s = new EventLoopScheduler();
102+
return s.schedule(state, action);
53103
}
54104

55105
@Override

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.concurrent.CountDownLatch;
2222
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicReference;
2324

2425
import org.junit.Test;
2526
import org.mockito.InOrder;
@@ -33,6 +34,8 @@
3334
import rx.Subscription;
3435
import rx.concurrency.ImmediateScheduler;
3536
import rx.concurrency.Schedulers;
37+
import rx.subscriptions.CompositeSubscription;
38+
import rx.util.functions.Func2;
3639

3740
/**
3841
* Asynchronously notify Observers on the specified Scheduler.
@@ -60,7 +63,9 @@ public Subscription onSubscribe(final Observer<? super T> observer) {
6063
// do nothing if we request ImmediateScheduler so we don't invoke overhead
6164
return source.subscribe(observer);
6265
} else {
63-
return source.subscribe(new ScheduledObserver<T>(observer, scheduler));
66+
CompositeSubscription s = new CompositeSubscription();
67+
s.add(source.subscribe(new ScheduledObserver<T>(s, observer, scheduler)));
68+
return s;
6469
}
6570
}
6671
}

rxjava-core/src/main/java/rx/operators/OperationRetry.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@
2626
import rx.Observable;
2727
import rx.Observable.OnSubscribeFunc;
2828
import rx.Observer;
29+
import rx.Scheduler;
2930
import rx.Subscription;
3031
import rx.concurrency.Schedulers;
3132
import rx.subscriptions.CompositeSubscription;
33+
import rx.subscriptions.MultipleAssignmentSubscription;
3234
import rx.subscriptions.Subscriptions;
33-
import rx.util.functions.Action0;
35+
import rx.util.functions.Func2;
3436

3537
public class OperationRetry {
3638

@@ -58,17 +60,19 @@ public Retry(Observable<T> source, int retryCount) {
5860

5961
@Override
6062
public Subscription onSubscribe(Observer<? super T> observer) {
61-
subscription.add(Schedulers.currentThread().schedule(attemptSubscription(observer)));
63+
MultipleAssignmentSubscription rescursiveSubscription = new MultipleAssignmentSubscription();
64+
subscription.add(Schedulers.currentThread().schedule(rescursiveSubscription, attemptSubscription(observer)));
65+
subscription.add(rescursiveSubscription);
6266
return subscription;
6367
}
6468

65-
private Action0 attemptSubscription(final Observer<? super T> observer) {
66-
return new Action0() {
69+
private Func2<Scheduler, MultipleAssignmentSubscription, Subscription> attemptSubscription(final Observer<? super T> observer) {
70+
return new Func2<Scheduler, MultipleAssignmentSubscription, Subscription>() {
6771

6872
@Override
69-
public void call() {
73+
public Subscription call(final Scheduler scheduler, final MultipleAssignmentSubscription rescursiveSubscription) {
7074
attempts.incrementAndGet();
71-
source.subscribe(new Observer<T>() {
75+
return source.subscribe(new Observer<T>() {
7276

7377
@Override
7478
public void onCompleted() {
@@ -79,10 +83,8 @@ public void onCompleted() {
7983
public void onError(Throwable e) {
8084
if ((retryCount == INFINITE_RETRY || attempts.get() <= retryCount) && !subscription.isUnsubscribed()) {
8185
// retry again
82-
// remove the last subscription since we have completed (so as we retry we don't build up a huge list)
83-
subscription.removeLast();
84-
// add the new subscription and schedule a retry
85-
subscription.add(Schedulers.currentThread().schedule(attemptSubscription(observer)));
86+
// add the new subscription and schedule a retry recursively
87+
rescursiveSubscription.setSubscription(scheduler.schedule(rescursiveSubscription, attemptSubscription(observer)));
8688
} else {
8789
// give up and pass the failure
8890
observer.onError(e);
@@ -96,6 +98,7 @@ public void onNext(T v) {
9698
});
9799

98100
}
101+
99102
};
100103
}
101104

@@ -157,7 +160,7 @@ public void testRetrySuccess() {
157160
inOrder.verify(observer, times(1)).onCompleted();
158161
inOrder.verifyNoMoreInteractions();
159162
}
160-
163+
161164
@Test
162165
public void testInfiniteRetry() {
163166
int NUM_FAILURES = 20;

rxjava-core/src/main/java/rx/operators/ScheduledObserver.java

Lines changed: 78 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,28 @@
1616
package rx.operators;
1717

1818
import java.util.concurrent.ConcurrentLinkedQueue;
19+
import java.util.concurrent.atomic.AtomicBoolean;
1920
import java.util.concurrent.atomic.AtomicInteger;
2021

2122
import rx.Notification;
2223
import rx.Observer;
2324
import rx.Scheduler;
24-
import rx.util.functions.Action0;
25+
import rx.Subscription;
26+
import rx.subscriptions.CompositeSubscription;
27+
import rx.subscriptions.MultipleAssignmentSubscription;
28+
import rx.util.functions.Func2;
2529

2630
/* package */class ScheduledObserver<T> implements Observer<T> {
2731
private final Observer<? super T> underlying;
2832
private final Scheduler scheduler;
33+
private final CompositeSubscription parentSubscription;
34+
private final EventLoop eventLoop = new EventLoop();
2935

3036
private final ConcurrentLinkedQueue<Notification<? extends T>> queue = new ConcurrentLinkedQueue<Notification<? extends T>>();
31-
private final AtomicInteger counter = new AtomicInteger(0);
37+
private final AtomicBoolean started = new AtomicBoolean();
3238

33-
public ScheduledObserver(Observer<? super T> underlying, Scheduler scheduler) {
39+
public ScheduledObserver(CompositeSubscription s, Observer<? super T> underlying, Scheduler scheduler) {
40+
this.parentSubscription = s;
3441
this.underlying = underlying;
3542
this.scheduler = scheduler;
3643
}
@@ -50,46 +57,83 @@ public void onNext(final T args) {
5057
enqueue(new Notification<T>(args));
5158
}
5259

60+
final AtomicInteger counter = new AtomicInteger();
61+
5362
private void enqueue(Notification<? extends T> notification) {
54-
// this must happen before 'counter' is used to provide synchronization between threads
63+
// this must happen before synchronization between threads
5564
queue.offer(notification);
5665

57-
// we now use counter to atomically determine if we need to start processing or not
58-
// it will be 0 if it's the first notification or the scheduler has finished processing work
59-
// and we need to start doing it again
60-
if (counter.getAndIncrement() == 0) {
61-
processQueue();
66+
/**
67+
* If the counter is currently at 0 (before incrementing with this addition)
68+
* we will schedule the work.
69+
*/
70+
if (counter.getAndIncrement() <= 0) {
71+
if (!started.get() && started.compareAndSet(false, true)) {
72+
// first time we use the parent scheduler to start the event loop
73+
MultipleAssignmentSubscription recursiveSubscription = new MultipleAssignmentSubscription();
74+
parentSubscription.add(scheduler.schedule(recursiveSubscription, eventLoop));
75+
parentSubscription.add(recursiveSubscription);
76+
} else {
77+
// subsequent times we reschedule existing one
78+
eventLoop.reschedule();
79+
}
6280
}
6381
}
6482

65-
private void processQueue() {
66-
scheduler.schedule(new Action0() {
67-
@Override
68-
public void call() {
69-
Notification<? extends T> not = queue.poll();
70-
71-
switch (not.getKind()) {
72-
case OnNext:
73-
underlying.onNext(not.getValue());
74-
break;
75-
case OnError:
76-
underlying.onError(not.getThrowable());
77-
break;
78-
case OnCompleted:
79-
underlying.onCompleted();
80-
break;
81-
default:
82-
throw new IllegalStateException("Unknown kind of notification " + not);
83+
private class EventLoop implements Func2<Scheduler, MultipleAssignmentSubscription, Subscription> {
8384

84-
}
85+
volatile Scheduler _recursiveScheduler;
86+
volatile MultipleAssignmentSubscription _recursiveSubscription;
8587

86-
// decrement count and if we still have work to do
87-
// recursively schedule ourselves to process again
88-
if (counter.decrementAndGet() > 0) {
89-
scheduler.schedule(this);
90-
}
88+
public void reschedule() {
89+
_recursiveSubscription.setSubscription(_recursiveScheduler.schedule(_recursiveSubscription, this));
90+
}
9191

92+
@Override
93+
public Subscription call(Scheduler s, MultipleAssignmentSubscription recursiveSubscription) {
94+
/*
95+
* --------------------------------------------------------------------------------------
96+
* Set these the first time through so we can externally trigger recursive execution again
97+
*/
98+
if (_recursiveScheduler == null) {
99+
_recursiveScheduler = s;
100+
}
101+
if (_recursiveSubscription == null) {
102+
_recursiveSubscription = recursiveSubscription;
92103
}
93-
});
104+
/*
105+
* Back to regular flow
106+
* --------------------------------------------------------------------------------------
107+
*/
108+
109+
do {
110+
Notification<? extends T> notification = queue.poll();
111+
// if we got a notification, send it
112+
if (notification != null) {
113+
114+
// if unsubscribed stop working
115+
if (parentSubscription.isUnsubscribed()) {
116+
return parentSubscription;
117+
}
118+
// process notification
119+
120+
switch (notification.getKind()) {
121+
case OnNext:
122+
underlying.onNext(notification.getValue());
123+
break;
124+
case OnError:
125+
underlying.onError(notification.getThrowable());
126+
break;
127+
case OnCompleted:
128+
underlying.onCompleted();
129+
break;
130+
default:
131+
throw new IllegalStateException("Unknown kind of notification " + notification);
132+
}
133+
}
134+
} while (counter.decrementAndGet() > 0);
135+
136+
return parentSubscription;
137+
}
94138
}
95139
}

0 commit comments

Comments
 (0)