@@ -33,8 +33,7 @@ public class RxRingBuffer implements Subscription {
3333
3434 public static RxRingBuffer getSpscInstance () {
3535 if (UnsafeAccess .isUnsafeAvailable ()) {
36- // TODO the SpscArrayQueue isn't ready yet so using SpmcArrayQueue for now
37- return new RxRingBuffer (SPMC_POOL , SIZE );
36+ return new RxRingBuffer (SPSC_POOL , SIZE );
3837 } else {
3938 return new RxRingBuffer ();
4039 }
@@ -306,12 +305,13 @@ private RxRingBuffer(ObjectPool<Queue<Object>> pool, int size) {
306305 this .size = size ;
307306 }
308307
309- public void release () {
310- if (pool != null ) {
311- Queue <Object > q = queue ;
308+ public synchronized void release () {
309+ Queue <Object > q = queue ;
310+ ObjectPool <Queue <Object >> p = pool ;
311+ if (p != null && q != null ) {
312312 q .clear ();
313313 queue = null ;
314- pool .returnObject (q );
314+ p .returnObject (q );
315315 }
316316 }
317317
@@ -331,10 +331,21 @@ public void unsubscribe() {
331331 * if more onNext are sent than have been requested
332332 */
333333 public void onNext (Object o ) throws MissingBackpressureException {
334- if (queue == null ) {
334+ boolean iae = false ;
335+ boolean mbe = false ;
336+ synchronized (this ) {
337+ Queue <Object > q = queue ;
338+ if (q != null ) {
339+ mbe = !q .offer (on .next (o ));
340+ } else {
341+ iae = true ;
342+ }
343+ }
344+
345+ if (iae ) {
335346 throw new IllegalStateException ("This instance has been unsubscribed and the queue is no longer usable." );
336347 }
337- if (! queue . offer ( on . next ( o )) ) {
348+ if (mbe ) {
338349 throw new MissingBackpressureException ();
339350 }
340351 }
@@ -362,55 +373,54 @@ public int capacity() {
362373 }
363374
364375 public int count () {
365- if (queue == null ) {
376+ Queue <Object > q = queue ;
377+ if (q == null ) {
366378 return 0 ;
367379 }
368- return queue .size ();
380+ return q .size ();
369381 }
370382
371383 public boolean isEmpty () {
372- if (queue == null ) {
384+ Queue <Object > q = queue ;
385+ if (q == null ) {
373386 return true ;
374387 }
375- return queue .isEmpty ();
388+ return q .isEmpty ();
376389 }
377390
378391 public Object poll () {
379- if (queue == null ) {
380- // we are unsubscribed and have released the undelrying queue
381- return null ;
382- }
383392 Object o ;
384- o = queue .poll ();
385- /*
386- * benjchristensen July 10 2014 => The check for 'queue.isEmpty()' came from a very rare concurrency bug where poll()
387- * is invoked, then an "onNext + onCompleted/onError" arrives before hitting the if check below. In that case,
388- * "o == null" and there is a terminal state, but now "queue.isEmpty()" and we should NOT return the terminalState.
389- *
390- * The queue.size() check is a double-check that works to handle this, without needing to synchronize poll with on*
391- * or needing to enqueue terminalState.
392- *
393- * This did make me consider eliminating the 'terminalState' ref and enqueuing it ... but then that requires
394- * a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it
395- * is currently the way it is.
396- */
397- if (o == null && terminalState != null && queue .isEmpty ()) {
398- o = terminalState ;
399- // once emitted we clear so a poll loop will finish
400- terminalState = null ;
393+ synchronized (this ) {
394+ Queue <Object > q = queue ;
395+ if (q == null ) {
396+ // we are unsubscribed and have released the underlying queue
397+ return null ;
398+ }
399+ o = q .poll ();
400+
401+ Object ts = terminalState ;
402+ if (o == null && ts != null && q .peek () == null ) {
403+ o = ts ;
404+ // once emitted we clear so a poll loop will finish
405+ terminalState = null ;
406+ }
401407 }
402408 return o ;
403409 }
404410
405411 public Object peek () {
406- if (queue == null ) {
407- // we are unsubscribed and have released the undelrying queue
408- return null ;
409- }
410412 Object o ;
411- o = queue .peek ();
412- if (o == null && terminalState != null && queue .isEmpty ()) {
413- o = terminalState ;
413+ synchronized (this ) {
414+ Queue <Object > q = queue ;
415+ if (q == null ) {
416+ // we are unsubscribed and have released the underlying queue
417+ return null ;
418+ }
419+ o = q .peek ();
420+ Object ts = terminalState ;
421+ if (o == null && ts != null && q .peek () == null ) {
422+ o = ts ;
423+ }
414424 }
415425 return o ;
416426 }
0 commit comments