Skip to content

Commit c758d13

Browse files
Custom InterruptibleBlockingQueue for ObserveOn
Since we are blocking the producer on* notifications we need to interrupt it on unsubscribe events. I need to do it on the data structure and not the thread as the thread could change for each onNext and that could have unexpected consequences.
1 parent eea02d8 commit c758d13

File tree

1 file changed

+85
-4
lines changed

1 file changed

+85
-4
lines changed

rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java

Lines changed: 85 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package rx.operators;
1717

18-
import java.util.concurrent.ArrayBlockingQueue;
18+
import java.util.concurrent.Semaphore;
1919
import java.util.concurrent.atomic.AtomicLong;
2020

2121
import rx.Scheduler;
@@ -24,11 +24,25 @@
2424
import rx.schedulers.ImmediateScheduler;
2525
import rx.schedulers.TestScheduler;
2626
import rx.schedulers.TrampolineScheduler;
27+
import rx.subscriptions.Subscriptions;
28+
import rx.util.functions.Action0;
2729
import rx.util.functions.Action1;
2830

2931
/**
30-
* Asynchronously notify Observers on the specified Scheduler.
32+
* Delivers events on the specified Scheduler.
3133
* <p>
34+
* This provides backpressure by blocking the incoming onNext when there is already one in the queue.
35+
* <p>
36+
* This means that at any given time the max number of "onNext" in flight is 3:
37+
* -> 1 being delivered on the Scheduler
38+
* -> 1 in the queue waiting for the Scheduler
39+
* -> 1 blocking on the queue waiting to deliver it
40+
*
41+
* I have chosen to allow 1 in the queue rather than using an Exchanger style process so that the Scheduler
42+
* can loop and have something to do each time around to optimize for avoiding rescheduling when it
43+
* can instead just loop. I'm avoiding having the Scheduler thread ever block as it could be an event-loop
44+
* thus if the queue is empty it exits and next time something is added it will reschedule.
45+
*
3246
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/observeOn.png">
3347
*/
3448
public class OperatorObserveOn<T> implements Operator<T, T> {
@@ -73,7 +87,7 @@ private class ObserveOnSubscriber extends Subscriber<T> {
7387
final Subscriber<? super T> observer;
7488
private volatile Scheduler.Inner recursiveScheduler;
7589

76-
private final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);
90+
private final InterruptibleBlockingQueue queue = new InterruptibleBlockingQueue();
7791
final AtomicLong counter = new AtomicLong(0);
7892

7993
public ObserveOnSubscriber(Subscriber<? super T> observer) {
@@ -93,7 +107,9 @@ public void onNext(final T t) {
93107
}
94108
schedule();
95109
} catch (InterruptedException e) {
96-
onError(e);
110+
if (!isUnsubscribed()) {
111+
onError(e);
112+
}
97113
}
98114
}
99115

@@ -125,6 +141,18 @@ public void onError(final Throwable e) {
125141
protected void schedule() {
126142
if (counter.getAndIncrement() == 0) {
127143
if (recursiveScheduler == null) {
144+
// first time through, register a Subscription
145+
// that can interrupt this thread
146+
add(Subscriptions.create(new Action0() {
147+
148+
@Override
149+
public void call() {
150+
// we have to interrupt the parent thread because
151+
// it can be blocked on queue.put
152+
queue.interrupt();
153+
}
154+
155+
}));
128156
add(scheduler.schedule(new Action1<Inner>() {
129157

130158
@Override
@@ -167,4 +195,57 @@ private void pollQueue() {
167195

168196
}
169197

198+
/**
199+
* Same behavior as ArrayBlockingQueue<Object>(1) except that we can interrupt/unsubscribe it.
200+
*/
201+
private class InterruptibleBlockingQueue {
202+
203+
private final Semaphore semaphore = new Semaphore(1);
204+
private volatile Object item;
205+
private volatile boolean interrupted = false;
206+
207+
public Object poll() {
208+
if (interrupted) {
209+
return null;
210+
}
211+
if (item == null) {
212+
return null;
213+
}
214+
try {
215+
return item;
216+
} finally {
217+
item = null;
218+
semaphore.release();
219+
}
220+
}
221+
222+
/**
223+
* Add an Object, blocking if an item is already in the queue.
224+
*
225+
* @param o
226+
* @throws InterruptedException
227+
*/
228+
public void put(Object o) throws InterruptedException {
229+
if (interrupted) {
230+
throw new InterruptedException("Interrupted by Unsubscribe");
231+
}
232+
semaphore.acquire();
233+
if (interrupted) {
234+
throw new InterruptedException("Interrupted by Unsubscribe");
235+
}
236+
if (o == null) {
237+
throw new IllegalArgumentException("Can not put null");
238+
}
239+
item = o;
240+
}
241+
242+
/**
243+
* Used to unsubscribe and interrupt the producer if blocked in put()
244+
*/
245+
public void interrupt() {
246+
interrupted = true;
247+
semaphore.release();
248+
}
249+
}
250+
170251
}

0 commit comments

Comments
 (0)