Skip to content

Commit e9c3a9c

Browse files
Merge pull request #855 from benjchristensen/observeOn+queue
Move InterruptibleBlockingQueue Inside ObserveOn
2 parents 6d3066f + 5d1006e commit e9c3a9c

File tree

2 files changed

+108
-113
lines changed

2 files changed

+108
-113
lines changed

rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.operators;
1717

18+
import java.util.concurrent.Semaphore;
1819
import java.util.concurrent.atomic.AtomicLong;
1920

2021
import rx.Scheduler;
@@ -24,7 +25,6 @@
2425
import rx.schedulers.TestScheduler;
2526
import rx.schedulers.TrampolineScheduler;
2627
import rx.subscriptions.Subscriptions;
27-
import rx.util.InterruptibleBlockingQueue;
2828
import rx.util.functions.Action0;
2929
import rx.util.functions.Action1;
3030

@@ -112,7 +112,7 @@ private class ObserveOnSubscriber extends Subscriber<T> {
112112
final Subscriber<? super T> observer;
113113
private volatile Scheduler.Inner recursiveScheduler;
114114

115-
private final InterruptibleBlockingQueue queue = new InterruptibleBlockingQueue(bufferSize);
115+
private final InterruptibleBlockingQueue<Object> queue = new InterruptibleBlockingQueue<Object>(bufferSize);
116116
final AtomicLong counter = new AtomicLong(0);
117117

118118
public ObserveOnSubscriber(Subscriber<? super T> observer) {
@@ -220,4 +220,110 @@ private void pollQueue() {
220220

221221
}
222222

223+
/**
224+
* Single-producer-single-consumer queue (only thread-safe for 1 producer thread with 1 consumer thread).
225+
*
226+
* This supports an interrupt() being called externally rather than needing to interrupt the thread. This allows
227+
* unsubscribe behavior when this queue is being used.
228+
*
229+
* @param <E>
230+
*/
231+
private static class InterruptibleBlockingQueue<E> {
232+
233+
private final Semaphore semaphore;
234+
private volatile boolean interrupted = false;
235+
236+
private final E[] buffer;
237+
238+
private AtomicLong tail = new AtomicLong();
239+
private AtomicLong head = new AtomicLong();
240+
private final int capacity;
241+
private final int mask;
242+
243+
@SuppressWarnings("unchecked")
244+
public InterruptibleBlockingQueue(final int size) {
245+
this.semaphore = new Semaphore(size);
246+
this.capacity = size;
247+
this.mask = size - 1;
248+
buffer = (E[]) new Object[size];
249+
}
250+
251+
/**
252+
* Used to unsubscribe and interrupt the producer if blocked in put()
253+
*/
254+
public void interrupt() {
255+
interrupted = true;
256+
semaphore.release();
257+
}
258+
259+
public void addBlocking(final E e) throws InterruptedException {
260+
if (interrupted) {
261+
throw new InterruptedException("Interrupted by Unsubscribe");
262+
}
263+
semaphore.acquire();
264+
if (interrupted) {
265+
throw new InterruptedException("Interrupted by Unsubscribe");
266+
}
267+
if (e == null) {
268+
throw new IllegalArgumentException("Can not put null");
269+
}
270+
271+
if (offer(e)) {
272+
return;
273+
} else {
274+
throw new IllegalStateException("Queue is full");
275+
}
276+
}
277+
278+
private boolean offer(final E e) {
279+
final long _t = tail.get();
280+
if (_t - head.get() == capacity) {
281+
// queue is full
282+
return false;
283+
}
284+
int index = (int) (_t & mask);
285+
buffer[index] = e;
286+
// move the tail forward
287+
tail.lazySet(_t + 1);
288+
289+
return true;
290+
}
291+
292+
public E poll() {
293+
if (interrupted) {
294+
return null;
295+
}
296+
final long _h = head.get();
297+
if (tail.get() == _h) {
298+
// nothing available
299+
return null;
300+
}
301+
int index = (int) (_h & mask);
302+
303+
// fetch the item
304+
E v = buffer[index];
305+
// allow GC to happen
306+
buffer[index] = null;
307+
// increment and signal we're done
308+
head.lazySet(_h + 1);
309+
if (v != null) {
310+
semaphore.release();
311+
}
312+
return v;
313+
}
314+
315+
public int size()
316+
{
317+
int size;
318+
do
319+
{
320+
final long currentHead = head.get();
321+
final long currentTail = tail.get();
322+
size = (int) (currentTail - currentHead);
323+
} while (size > buffer.length);
324+
325+
return size;
326+
}
327+
328+
}
223329
}

rxjava-core/src/main/java/rx/util/InterruptibleBlockingQueue.java

Lines changed: 0 additions & 111 deletions
This file was deleted.

0 commit comments

Comments
 (0)