Skip to content

Commit 5d1006e

Browse files
Move InterruptibleBlockingQueue Inside ObserveOn
I decided I'm not ready to commit to this in the public API so am leaving it an implementation detail of ObserveOn. While working on groupBy, parallel and subscribeOn this queue was not the right solution.
1 parent 6d3066f commit 5d1006e

File tree

2 files changed

+108
-113
lines changed

2 files changed

+108
-113
lines changed

rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.operators;
1717

18+
import java.util.concurrent.Semaphore;
1819
import java.util.concurrent.atomic.AtomicLong;
1920

2021
import rx.Scheduler;
@@ -24,7 +25,6 @@
2425
import rx.schedulers.TestScheduler;
2526
import rx.schedulers.TrampolineScheduler;
2627
import rx.subscriptions.Subscriptions;
27-
import rx.util.InterruptibleBlockingQueue;
2828
import rx.util.functions.Action0;
2929
import rx.util.functions.Action1;
3030

@@ -112,7 +112,7 @@ private class ObserveOnSubscriber extends Subscriber<T> {
112112
final Subscriber<? super T> observer;
113113
private volatile Scheduler.Inner recursiveScheduler;
114114

115-
private final InterruptibleBlockingQueue queue = new InterruptibleBlockingQueue(bufferSize);
115+
private final InterruptibleBlockingQueue<Object> queue = new InterruptibleBlockingQueue<Object>(bufferSize);
116116
final AtomicLong counter = new AtomicLong(0);
117117

118118
public ObserveOnSubscriber(Subscriber<? super T> observer) {
@@ -220,4 +220,110 @@ private void pollQueue() {
220220

221221
}
222222

223+
/**
224+
* Single-producer-single-consumer queue (only thread-safe for 1 producer thread with 1 consumer thread).
225+
*
226+
* This supports an interrupt() being called externally rather than needing to interrupt the thread. This allows
227+
* unsubscribe behavior when this queue is being used.
228+
*
229+
* @param <E>
230+
*/
231+
private static class InterruptibleBlockingQueue<E> {
232+
233+
private final Semaphore semaphore;
234+
private volatile boolean interrupted = false;
235+
236+
private final E[] buffer;
237+
238+
private AtomicLong tail = new AtomicLong();
239+
private AtomicLong head = new AtomicLong();
240+
private final int capacity;
241+
private final int mask;
242+
243+
@SuppressWarnings("unchecked")
244+
public InterruptibleBlockingQueue(final int size) {
245+
this.semaphore = new Semaphore(size);
246+
this.capacity = size;
247+
this.mask = size - 1;
248+
buffer = (E[]) new Object[size];
249+
}
250+
251+
/**
252+
* Used to unsubscribe and interrupt the producer if blocked in put()
253+
*/
254+
public void interrupt() {
255+
interrupted = true;
256+
semaphore.release();
257+
}
258+
259+
public void addBlocking(final E e) throws InterruptedException {
260+
if (interrupted) {
261+
throw new InterruptedException("Interrupted by Unsubscribe");
262+
}
263+
semaphore.acquire();
264+
if (interrupted) {
265+
throw new InterruptedException("Interrupted by Unsubscribe");
266+
}
267+
if (e == null) {
268+
throw new IllegalArgumentException("Can not put null");
269+
}
270+
271+
if (offer(e)) {
272+
return;
273+
} else {
274+
throw new IllegalStateException("Queue is full");
275+
}
276+
}
277+
278+
private boolean offer(final E e) {
279+
final long _t = tail.get();
280+
if (_t - head.get() == capacity) {
281+
// queue is full
282+
return false;
283+
}
284+
int index = (int) (_t & mask);
285+
buffer[index] = e;
286+
// move the tail forward
287+
tail.lazySet(_t + 1);
288+
289+
return true;
290+
}
291+
292+
public E poll() {
293+
if (interrupted) {
294+
return null;
295+
}
296+
final long _h = head.get();
297+
if (tail.get() == _h) {
298+
// nothing available
299+
return null;
300+
}
301+
int index = (int) (_h & mask);
302+
303+
// fetch the item
304+
E v = buffer[index];
305+
// allow GC to happen
306+
buffer[index] = null;
307+
// increment and signal we're done
308+
head.lazySet(_h + 1);
309+
if (v != null) {
310+
semaphore.release();
311+
}
312+
return v;
313+
}
314+
315+
public int size()
316+
{
317+
int size;
318+
do
319+
{
320+
final long currentHead = head.get();
321+
final long currentTail = tail.get();
322+
size = (int) (currentTail - currentHead);
323+
} while (size > buffer.length);
324+
325+
return size;
326+
}
327+
328+
}
223329
}

rxjava-core/src/main/java/rx/util/InterruptibleBlockingQueue.java

Lines changed: 0 additions & 111 deletions
This file was deleted.

0 commit comments

Comments
 (0)