Skip to content

Commit 446acf5

Browse files
ObserveOn with Backpressure
1 parent 9e26917 commit 446acf5

File tree

4 files changed

+143
-146
lines changed

4 files changed

+143
-146
lines changed

rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java

Lines changed: 134 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,53 +15,156 @@
1515
*/
1616
package rx.operators;
1717

18-
import rx.Notification;
18+
import java.util.concurrent.ArrayBlockingQueue;
19+
import java.util.concurrent.atomic.AtomicLong;
20+
1921
import rx.Scheduler;
22+
import rx.Scheduler.Inner;
2023
import rx.Subscriber;
21-
import rx.subscriptions.CompositeSubscription;
22-
import rx.util.functions.Action0;
24+
import rx.schedulers.ImmediateScheduler;
25+
import rx.schedulers.TestScheduler;
26+
import rx.schedulers.TrampolineScheduler;
27+
import rx.util.functions.Action1;
2328

2429
/**
25-
* Move the observation of events to another thread via Scheduler.
26-
* @param <T> the item type
30+
* Asynchronously notify Observers on the specified Scheduler.
31+
* <p>
32+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/observeOn.png">
2733
*/
2834
public class OperatorObserveOn<T> implements Operator<T, T> {
29-
final Scheduler scheduler;
35+
36+
private final Scheduler scheduler;
37+
3038
public OperatorObserveOn(Scheduler scheduler) {
3139
this.scheduler = scheduler;
3240
}
3341

3442
@Override
35-
public Subscriber<? super T> call(final Subscriber<? super T> t1) {
36-
final QueueDrain qd = new QueueDrain(t1);
37-
final CompositeSubscription csub = new CompositeSubscription();
38-
t1.add(csub);
39-
return new Subscriber<T>(t1) {
40-
/** Dispatch the notification value. */
41-
void run(final Notification<T> nt) {
42-
qd.enqueue(new Action0() {
43-
@Override
44-
public void call() {
45-
nt.accept(t1);
46-
}
47-
});
48-
qd.tryDrainAsync(scheduler, csub);
43+
public Subscriber<? super T> call(Subscriber<? super T> child) {
44+
if (scheduler instanceof ImmediateScheduler) {
45+
// avoid overhead, execute directly
46+
return child;
47+
} else if (scheduler instanceof TrampolineScheduler) {
48+
// avoid overhead, execute directly
49+
return child;
50+
} else if (scheduler instanceof TestScheduler) {
51+
// this one will deadlock as it is single-threaded and won't run the scheduled
52+
// work until it manually advances, which it won't be able to do as it will block
53+
return child;
54+
} else {
55+
return new ObserveOnSubscriber(child);
56+
}
57+
58+
}
59+
60+
private static Object NULL_SENTINEL = new Object();
61+
private static Object COMPLETE_SENTINEL = new Object();
62+
63+
private static class ErrorSentinel {
64+
final Throwable e;
65+
66+
ErrorSentinel(Throwable e) {
67+
this.e = e;
68+
}
69+
}
70+
71+
/** Observe through individual queue per observer. */
72+
private class ObserveOnSubscriber extends Subscriber<T> {
73+
final Subscriber<? super T> observer;
74+
private volatile Scheduler.Inner recursiveScheduler;
75+
76+
private final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);
77+
final AtomicLong counter = new AtomicLong(0);
78+
79+
public ObserveOnSubscriber(Subscriber<? super T> observer) {
80+
super(observer);
81+
this.observer = observer;
82+
}
83+
84+
@Override
85+
public void onNext(final T t) {
86+
try {
87+
// we want to block for natural back-pressure
88+
// so that the producer waits for each value to be consumed
89+
if (t == null) {
90+
queue.put(NULL_SENTINEL);
91+
} else {
92+
queue.put(t);
93+
}
94+
schedule();
95+
} catch (InterruptedException e) {
96+
onError(e);
4997
}
50-
@Override
51-
public void onNext(final T args) {
52-
run(Notification.createOnNext(args));
98+
}
99+
100+
@Override
101+
public void onCompleted() {
102+
try {
103+
// we want to block for natural back-pressure
104+
// so that the producer waits for each value to be consumed
105+
queue.put(COMPLETE_SENTINEL);
106+
schedule();
107+
} catch (InterruptedException e) {
108+
onError(e);
53109
}
110+
}
54111

55-
@Override
56-
public void onError(final Throwable e) {
57-
run(Notification.<T>createOnError(e));
112+
@Override
113+
public void onError(final Throwable e) {
114+
try {
115+
// we want to block for natural back-pressure
116+
// so that the producer waits for each value to be consumed
117+
queue.put(new ErrorSentinel(e));
118+
schedule();
119+
} catch (InterruptedException e2) {
120+
// call directly if we can't schedule
121+
observer.onError(e2);
58122
}
123+
}
124+
125+
protected void schedule() {
126+
if (counter.getAndIncrement() == 0) {
127+
if (recursiveScheduler == null) {
128+
add(scheduler.schedule(new Action1<Inner>() {
129+
130+
@Override
131+
public void call(Inner inner) {
132+
recursiveScheduler = inner;
133+
pollQueue();
134+
}
135+
136+
}));
137+
} else {
138+
recursiveScheduler.schedule(new Action1<Inner>() {
59139

60-
@Override
61-
public void onCompleted() {
62-
run(Notification.<T>createOnCompleted());
140+
@Override
141+
public void call(Inner inner) {
142+
pollQueue();
143+
}
144+
145+
});
146+
}
63147
}
64-
};
148+
}
149+
150+
@SuppressWarnings("unchecked")
151+
private void pollQueue() {
152+
do {
153+
Object v = queue.poll();
154+
if (v != null) {
155+
if (v == NULL_SENTINEL) {
156+
observer.onNext(null);
157+
} else if (v == COMPLETE_SENTINEL) {
158+
observer.onCompleted();
159+
} else if (v instanceof ErrorSentinel) {
160+
observer.onError(((ErrorSentinel) v).e);
161+
} else {
162+
observer.onNext((T) v);
163+
}
164+
}
165+
} while (counter.decrementAndGet() > 0);
166+
}
167+
65168
}
66-
169+
67170
}

rxjava-core/src/main/java/rx/operators/QueueDrain.java

Lines changed: 0 additions & 106 deletions
This file was deleted.

rxjava-core/src/main/java/rx/schedulers/TestScheduler.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,22 @@
1919
import java.util.PriorityQueue;
2020
import java.util.Queue;
2121
import java.util.concurrent.TimeUnit;
22-
import java.util.concurrent.atomic.AtomicBoolean;
2322

2423
import rx.Scheduler;
2524
import rx.Subscription;
2625
import rx.subscriptions.BooleanSubscription;
27-
import rx.subscriptions.Subscriptions;
28-
import rx.util.functions.Action0;
2926
import rx.util.functions.Action1;
30-
import rx.util.functions.Func2;
3127

3228
public class TestScheduler extends Scheduler {
3329
private final Queue<TimedAction> queue = new PriorityQueue<TimedAction>(11, new CompareActionsByTime());
30+
private static long counter = 0;
3431

3532
private static class TimedAction {
3633

3734
private final long time;
3835
private final Action1<Inner> action;
3936
private final Inner scheduler;
37+
private final long count = counter++; // for differentiating tasks at same time
4038

4139
private TimedAction(Inner scheduler, long time, Action1<Inner> action) {
4240
this.time = time;
@@ -53,7 +51,11 @@ public String toString() {
5351
private static class CompareActionsByTime implements Comparator<TimedAction> {
5452
@Override
5553
public int compare(TimedAction action1, TimedAction action2) {
56-
return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time));
54+
if (action1.time == action2.time) {
55+
return Long.valueOf(action1.count).compareTo(Long.valueOf(action2.count));
56+
} else {
57+
return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time));
58+
}
5759
}
5860
}
5961

rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import rx.Observable;
3232
import rx.Observer;
33+
import rx.Scheduler;
3334
import rx.schedulers.Schedulers;
3435
import rx.schedulers.TestScheduler;
3536
import rx.util.functions.Action0;
@@ -141,7 +142,7 @@ public void call() {
141142

142143
@Test
143144
public void observeOnTheSameSchedulerTwice() {
144-
TestScheduler scheduler = new TestScheduler();
145+
Scheduler scheduler = Schedulers.immediate();
145146

146147
Observable<Integer> o = Observable.from(1, 2, 3);
147148
Observable<Integer> o2 = o.observeOn(scheduler);
@@ -157,8 +158,6 @@ public void observeOnTheSameSchedulerTwice() {
157158
o2.subscribe(observer1);
158159
o2.subscribe(observer2);
159160

160-
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
161-
162161
inOrder1.verify(observer1, times(1)).onNext(1);
163162
inOrder1.verify(observer1, times(1)).onNext(2);
164163
inOrder1.verify(observer1, times(1)).onNext(3);
@@ -172,7 +171,6 @@ public void observeOnTheSameSchedulerTwice() {
172171
inOrder2.verify(observer2, times(1)).onCompleted();
173172
verify(observer2, never()).onError(any(Throwable.class));
174173
inOrder2.verifyNoMoreInteractions();
175-
176174
}
177175

178176
@Test

0 commit comments

Comments
 (0)