File tree Expand file tree Collapse file tree 1 file changed +1
-14
lines changed
src/main/java/rx/internal/util Expand file tree Collapse file tree 1 file changed +1
-14
lines changed Original file line number Diff line number Diff line change @@ -33,7 +33,6 @@ public class RxRingBuffer implements Subscription {
33
33
34
34
public static RxRingBuffer getSpscInstance () {
35
35
if (UnsafeAccess .isUnsafeAvailable ()) {
36
- // TODO the SpscArrayQueue isn't ready yet so using SpmcArrayQueue for now
37
36
return new RxRingBuffer (SPSC_POOL , SIZE );
38
37
} else {
39
38
return new RxRingBuffer ();
@@ -394,23 +393,11 @@ public Object poll() {
394
393
synchronized (this ) {
395
394
Queue <Object > q = queue ;
396
395
if (q == null ) {
397
- // we are unsubscribed and have released the undelrying queue
396
+ // we are unsubscribed and have released the underlying queue
398
397
return null ;
399
398
}
400
399
o = q .poll ();
401
400
402
- /*
403
- * benjchristensen July 10 2014 => The check for 'queue.isEmpty()' came from a very rare concurrency bug where poll()
404
- * is invoked, then an "onNext + onCompleted/onError" arrives before hitting the if check below. In that case,
405
- * "o == null" and there is a terminal state, but now "queue.isEmpty()" and we should NOT return the terminalState.
406
- *
407
- * The queue.size() check is a double-check that works to handle this, without needing to synchronize poll with on*
408
- * or needing to enqueue terminalState.
409
- *
410
- * This did make me consider eliminating the 'terminalState' ref and enqueuing it ... but then that requires
411
- * a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it
412
- * is currently the way it is.
413
- */
414
401
Object ts = terminalState ;
415
402
if (o == null && ts != null && q .peek () == null ) {
416
403
o = ts ;
You can’t perform that action at this time.
0 commit comments