Skip to content

Commit 3082558

Browse files
committed
RxRingBuffer with synchronization
1 parent ccada81 commit 3082558

File tree

3 files changed

+77
-51
lines changed

3 files changed

+77
-51
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronou
308308
private RxRingBuffer getOrCreateScalarValueQueue() {
309309
RxRingBuffer svq = scalarValueQueue;
310310
if (svq == null) {
311-
svq = RxRingBuffer.getSpmcInstance();
311+
svq = RxRingBuffer.getSpscInstance();
312312
scalarValueQueue = svq;
313313
}
314314
return svq;
@@ -581,7 +581,7 @@ private static final class InnerSubscriber<T> extends Subscriber<T> {
581581
@SuppressWarnings("rawtypes")
582582
static final AtomicIntegerFieldUpdater<InnerSubscriber> ONCE_TERMINATED = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "terminated");
583583

584-
private final RxRingBuffer q = RxRingBuffer.getSpmcInstance();
584+
private final RxRingBuffer q = RxRingBuffer.getSpscInstance();
585585

586586
public InnerSubscriber(MergeSubscriber<T> parent, MergeProducer<T> producer) {
587587
this.parentSubscriber = parent;

src/main/java/rx/internal/util/RxRingBuffer.java

Lines changed: 62 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class RxRingBuffer implements Subscription {
3434
public static RxRingBuffer getSpscInstance() {
3535
if (UnsafeAccess.isUnsafeAvailable()) {
3636
// TODO the SpscArrayQueue isn't ready yet so using SpmcArrayQueue for now
37-
return new RxRingBuffer(SPMC_POOL, SIZE);
37+
return new RxRingBuffer(SPSC_POOL, SIZE);
3838
} else {
3939
return new RxRingBuffer();
4040
}
@@ -306,12 +306,13 @@ private RxRingBuffer(ObjectPool<Queue<Object>> pool, int size) {
306306
this.size = size;
307307
}
308308

309-
public void release() {
310-
if (pool != null) {
311-
Queue<Object> q = queue;
309+
public synchronized void release() {
310+
Queue<Object> q = queue;
311+
ObjectPool<Queue<Object>> p = pool;
312+
if (p != null && q != null) {
312313
q.clear();
313314
queue = null;
314-
pool.returnObject(q);
315+
p.returnObject(q);
315316
}
316317
}
317318

@@ -331,10 +332,21 @@ public void unsubscribe() {
331332
* if more onNext are sent than have been requested
332333
*/
333334
public void onNext(Object o) throws MissingBackpressureException {
334-
if (queue == null) {
335+
boolean iae = false;
336+
boolean mbe = false;
337+
synchronized (this) {
338+
Queue<Object> q = queue;
339+
if (q != null) {
340+
mbe = !q.offer(on.next(o));
341+
} else {
342+
iae = true;
343+
}
344+
}
345+
346+
if (iae) {
335347
throw new IllegalStateException("This instance has been unsubscribed and the queue is no longer usable.");
336348
}
337-
if (!queue.offer(on.next(o))) {
349+
if (mbe) {
338350
throw new MissingBackpressureException();
339351
}
340352
}
@@ -362,55 +374,66 @@ public int capacity() {
362374
}
363375

364376
public int count() {
365-
if (queue == null) {
377+
Queue<Object> q = queue;
378+
if (q == null) {
366379
return 0;
367380
}
368-
return queue.size();
381+
return q.size();
369382
}
370383

371384
public boolean isEmpty() {
372-
if (queue == null) {
385+
Queue<Object> q = queue;
386+
if (q == null) {
373387
return true;
374388
}
375-
return queue.isEmpty();
389+
return q.isEmpty();
376390
}
377391

378392
public Object poll() {
379-
if (queue == null) {
380-
// we are unsubscribed and have released the undelrying queue
381-
return null;
382-
}
383393
Object o;
384-
o = queue.poll();
385-
/*
386-
* benjchristensen July 10 2014 => The check for 'queue.isEmpty()' came from a very rare concurrency bug where poll()
387-
* is invoked, then an "onNext + onCompleted/onError" arrives before hitting the if check below. In that case,
388-
* "o == null" and there is a terminal state, but now "queue.isEmpty()" and we should NOT return the terminalState.
389-
*
390-
* The queue.size() check is a double-check that works to handle this, without needing to synchronize poll with on*
391-
* or needing to enqueue terminalState.
392-
*
393-
* This did make me consider eliminating the 'terminalState' ref and enqueuing it ... but then that requires
394-
* a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it
395-
* is currently the way it is.
396-
*/
397-
if (o == null && terminalState != null && queue.isEmpty()) {
398-
o = terminalState;
399-
// once emitted we clear so a poll loop will finish
400-
terminalState = null;
394+
synchronized (this) {
395+
Queue<Object> q = queue;
396+
if (q == null) {
397+
// we are unsubscribed and have released the undelrying queue
398+
return null;
399+
}
400+
o = q.poll();
401+
402+
/*
403+
* benjchristensen July 10 2014 => The check for 'queue.isEmpty()' came from a very rare concurrency bug where poll()
404+
* is invoked, then an "onNext + onCompleted/onError" arrives before hitting the if check below. In that case,
405+
* "o == null" and there is a terminal state, but now "queue.isEmpty()" and we should NOT return the terminalState.
406+
*
407+
* The queue.size() check is a double-check that works to handle this, without needing to synchronize poll with on*
408+
* or needing to enqueue terminalState.
409+
*
410+
* This did make me consider eliminating the 'terminalState' ref and enqueuing it ... but then that requires
411+
* a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it
412+
* is currently the way it is.
413+
*/
414+
Object ts = terminalState;
415+
if (o == null && ts != null && q.peek() == null) {
416+
o = ts;
417+
// once emitted we clear so a poll loop will finish
418+
terminalState = null;
419+
}
401420
}
402421
return o;
403422
}
404423

405424
public Object peek() {
406-
if (queue == null) {
407-
// we are unsubscribed and have released the undelrying queue
408-
return null;
409-
}
410425
Object o;
411-
o = queue.peek();
412-
if (o == null && terminalState != null && queue.isEmpty()) {
413-
o = terminalState;
426+
synchronized (this) {
427+
Queue<Object> q = queue;
428+
if (q == null) {
429+
// we are unsubscribed and have released the undelrying queue
430+
return null;
431+
}
432+
o = q.peek();
433+
Object ts = terminalState;
434+
if (o == null && ts != null && q.peek() == null) {
435+
o = ts;
436+
}
414437
}
415438
return o;
416439
}

src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -118,19 +118,22 @@ public SpscArrayQueue(final int capacity) {
118118
*/
119119
@Override
120120
public boolean offer(final E e) {
121-
if (null == e) {
122-
throw new NullPointerException("Null is not a valid element");
123-
}
121+
// if (null == e) {
122+
// throw new NullPointerException("Null is not a valid element");
123+
// }
124124
// local load of field to avoid repeated loads after volatile reads
125125
final E[] lElementBuffer = buffer;
126126
final long offset = calcElementOffset(producerIndex);
127-
if (producerIndex >= producerLookAhead) {
128-
if (null == lvElement(lElementBuffer, calcElementOffset(producerIndex + lookAheadStep))) {// LoadLoad
129-
producerLookAhead = producerIndex + lookAheadStep;
130-
}
131-
else if (null != lvElement(lElementBuffer, offset)){
132-
return false;
133-
}
127+
// if (producerIndex >= producerLookAhead) {
128+
// if (null == lvElement(lElementBuffer, calcElementOffset(producerIndex + lookAheadStep))) {// LoadLoad
129+
// producerLookAhead = producerIndex + lookAheadStep;
130+
// }
131+
// else if (null != lvElement(lElementBuffer, offset)){
132+
// return false;
133+
// }
134+
// }
135+
if (null != lvElement(lElementBuffer, offset)){
136+
return false;
134137
}
135138
producerIndex++; // do increment here so the ordered store give both a barrier
136139
soElement(lElementBuffer, offset, e);// StoreStore

0 commit comments

Comments
 (0)