Skip to content

Commit 6b6fced

Browse files
Merge pull request #1422 from benjchristensen/issue-1420
Concurrency Fixes for RxRingBuffer & Merge
2 parents 14f3f51 + 3a0d891 commit 6b6fced

File tree

4 files changed

+133
-83
lines changed

4 files changed

+133
-83
lines changed

rxjava-core/src/main/java/rx/Subscriber.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,10 @@ protected Producer onSetProducer(Producer producer) {
108108

109109
public final void setProducer(Producer producer) {
110110
producer = onSetProducer(producer);
111-
int toRequest = requested;
111+
int toRequest;
112112
boolean setProducer = false;
113113
synchronized (this) {
114+
toRequest = requested;
114115
p = producer;
115116
if (op != null) {
116117
// middle operator ... we pass thru unless a request has been made

rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 47 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void onStart() {
9191
// we decouple the Producer chain while keeping the Subscription chain together (perf benefit) via super(actual)
9292
request(RxRingBuffer.SIZE);
9393
}
94-
94+
9595
/*
9696
* This is expected to be executed sequentially as per the Rx contract or it will not work.
9797
*/
@@ -309,12 +309,7 @@ public Boolean call(InnerSubscriber<T> s) {
309309
if (s.q != null) {
310310
long r = mergeProducer.requested;
311311
int emitted = 0;
312-
if (r < 0) {
313-
emitted += drainAll(s);
314-
} else if (r > 0) {
315-
emitted += drainRequested(s, r);
316-
}
317-
312+
emitted += s.drainQueue();
318313
if (emitted > 0) {
319314
/*
320315
* `s.emitted` is not volatile (because of performance impact of making it so shown by JMH tests)
@@ -336,46 +331,6 @@ public Boolean call(InnerSubscriber<T> s) {
336331
return Boolean.TRUE;
337332
}
338333

339-
private int drainRequested(InnerSubscriber<T> s, long r) {
340-
int emitted = 0;
341-
// drain what was requested
342-
long toEmit = r;
343-
Object o;
344-
for (int i = 0; i < toEmit; i++) {
345-
o = s.q.poll();
346-
if (o == null) {
347-
// no more items
348-
break;
349-
} else if (s.q.isCompleted(o)) {
350-
completeInner(s);
351-
} else {
352-
if (!s.q.accept(o, actual)) {
353-
emitted++;
354-
}
355-
}
356-
}
357-
358-
// decrement the number we emitted from outstanding requests
359-
mergeProducer.REQUESTED.getAndAdd(mergeProducer, -emitted);
360-
return emitted;
361-
}
362-
363-
private int drainAll(InnerSubscriber<T> s) {
364-
int emitted = 0;
365-
// drain it all
366-
Object o;
367-
while ((o = s.q.poll()) != null) {
368-
if (s.q.isCompleted(o)) {
369-
completeInner(s);
370-
} else {
371-
if (!s.q.accept(o, actual)) {
372-
emitted++;
373-
}
374-
}
375-
}
376-
return emitted;
377-
}
378-
379334
};
380335

381336
@Override
@@ -451,7 +406,6 @@ private static final class InnerSubscriber<T> extends Subscriber<T> {
451406
@SuppressWarnings("rawtypes")
452407
static final AtomicIntegerFieldUpdater<InnerSubscriber> ONCE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "once");
453408
private final RxRingBuffer q = RxRingBuffer.getSpmcInstance();
454-
private boolean mayNeedToDrain = false;
455409
/* protected by emitLock */
456410
int emitted = 0;
457411
final int THRESHOLD = (int) (q.capacity() * 0.7);
@@ -526,12 +480,9 @@ private void emit(T t, boolean complete) {
526480
if (parentSubscriber.getEmitLock()) {
527481
enqueue = false;
528482
try {
529-
// when we have the lock, nothing else can cause producer.requested to decrement, but it can increment at any time
530-
if (mayNeedToDrain) {
531-
// drain the queue if there is anything in it before emitting the current value
532-
emitted += drainQueue();
533-
mayNeedToDrain = false;
534-
}
483+
// drain the queue if there is anything in it before emitting the current value
484+
emitted += drainQueue();
485+
// }
535486
if (producer == null) {
536487
// no backpressure requested
537488
if (complete) {
@@ -600,7 +551,7 @@ private void emit(T t, boolean complete) {
600551
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 78.795 1.766 ops/s
601552
* } </pre>
602553
*/
603-
mayNeedToDrain = !parentSubscriber.drainQueuesIfNeeded();
554+
parentSubscriber.drainQueuesIfNeeded();
604555
}
605556
}
606557

@@ -611,41 +562,57 @@ private void enqueue(T t, boolean complete) {
611562
} else {
612563
q.onNext(t);
613564
}
614-
mayNeedToDrain = true;
615565
} catch (MissingBackpressureException e) {
616566
onError(e);
617567
}
618568
}
619569

620-
private int drainQueue() {
621-
int emittedWhileDraining = 0;
622-
if (q != null) {
623-
if (producer == null) {
624-
Object o;
625-
while ((o = q.poll()) != null) {
626-
if (!q.accept(o, parentSubscriber.actual)) {
627-
// non-terminal event so let's increment count
628-
emittedWhileDraining++;
629-
}
570+
private int drainRequested() {
571+
int emitted = 0;
572+
// drain what was requested
573+
long toEmit = producer.requested;
574+
Object o;
575+
for (int i = 0; i < toEmit; i++) {
576+
o = q.poll();
577+
if (o == null) {
578+
// no more items
579+
break;
580+
} else if (q.isCompleted(o)) {
581+
parentSubscriber.completeInner(this);
582+
} else {
583+
if (!q.accept(o, parentSubscriber.actual)) {
584+
emitted++;
630585
}
586+
}
587+
}
588+
589+
// decrement the number we emitted from outstanding requests
590+
producer.REQUESTED.getAndAdd(producer, -emitted);
591+
return emitted;
592+
}
593+
594+
private int drainAll() {
595+
int emitted = 0;
596+
// drain it all
597+
Object o;
598+
while ((o = q.poll()) != null) {
599+
if (q.isCompleted(o)) {
600+
parentSubscriber.completeInner(this);
631601
} else {
632-
long toEmit = producer.requested;
633-
for (int i = 0; i < toEmit; i++) {
634-
Object o = q.poll();
635-
if (o == null) {
636-
break;
637-
} else {
638-
if (!q.accept(o, parentSubscriber.actual)) {
639-
// non-terminal event so let's increment count
640-
emittedWhileDraining++;
641-
}
642-
}
602+
if (!q.accept(o, parentSubscriber.actual)) {
603+
emitted++;
643604
}
644-
// decrement the number we emitted from outstanding requests
645-
producer.REQUESTED.getAndAdd(producer, -emittedWhileDraining);
646605
}
647606
}
648-
return emittedWhileDraining;
607+
return emitted;
608+
}
609+
610+
private int drainQueue() {
611+
if (producer != null) {
612+
return drainRequested();
613+
} else {
614+
return drainAll();
615+
}
649616
}
650617
}
651618
}

rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
import rx.internal.util.unsafe.SpscArrayQueue;
2626
import rx.internal.util.unsafe.UnsafeAccess;
2727

28+
/**
29+
* This assumes Spsc or Spmc usage. This means only a single producer calling the on* methods. This is the Rx contract of an Observer.
30+
* Concurrent invocations of on* methods will not be thread-safe.
31+
*/
2832
public class RxRingBuffer implements Subscription {
2933

3034
public static RxRingBuffer getSpscInstance() {
@@ -71,6 +75,29 @@ public static RxRingBuffer getSpmcInstance() {
7175
* r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 23951121.098 1982380.330 ops/s
7276
* r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 1142.351 33.592 ops/s
7377
*
78+
* With SynchronizedQueue (synchronized LinkedList)
79+
*
80+
* r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 33231667.136 685757.510 ops/s
81+
* r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 74623.614 5493.766 ops/s
82+
* r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 22907359.257 707026.632 ops/s
83+
* r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 22222.410 320.829 ops/s
84+
*
85+
* With ArrayBlockingQueue
86+
*
87+
* Benchmark Mode Samples Score Score error Units
88+
* r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 2389804.664 68990.804 ops/s
89+
* r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 27384.274 1411.789 ops/s
90+
* r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 26497037.559 91176.247 ops/s
91+
* r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 17985.144 237.771 ops/s
92+
*
93+
* With ArrayBlockingQueue and Object Pool
94+
*
95+
* Benchmark Mode Samples Score Score error Units
96+
* r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 12465685.522 399070.770 ops/s
97+
* r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 27701.294 395.217 ops/s
98+
* r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 26399625.086 695639.436 ops/s
99+
* r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 17985.427 253.190 ops/s
100+
*
74101
* With SpscArrayQueue (single consumer, so failing 1 unit test)
75102
* - requires access to Unsafe
76103
*
@@ -130,7 +157,15 @@ public static RxRingBuffer getSpmcInstance() {
130157
private final int size;
131158
private final ObjectPool<Queue<Object>> pool;
132159

133-
private volatile Object terminalState;
160+
/**
161+
* We store the terminal state separately so it doesn't count against the size.
162+
* We don't just +1 the size since some of the queues require sizes that are a power of 2.
163+
* This is a subjective thing ... wanting to keep the size (ie 1024) the actual number of onNext
164+
* that can be sent rather than something like 1023 onNext + 1 terminal event. It also simplifies
165+
* checking that we have received only 1 terminal event, as we don't need to peek at the last item
166+
* or retain a boolean flag.
167+
*/
168+
public volatile Object terminalState;
134169

135170
public static final int SIZE = 1024;
136171

@@ -233,7 +268,22 @@ public Object poll() {
233268
}
234269
Object o;
235270
o = queue.poll();
236-
if (o == null && terminalState != null) {
271+
/*
272+
* benjchristensen July 10 2014 => The check for 'queue.size() == 0' came from a very rare concurrency bug where poll()
273+
* is invoked, then an "onNext + onCompleted/onError" arrives before hitting the if check below. In that case,
274+
* "o == null" and there is a terminal state, but now "queue.size() > 0" and we should NOT return the terminalState.
275+
*
276+
* The queue.size() check is a double-check that works to handle this, without needing to synchronize poll with on*
277+
* or needing to enqueue terminalState.
278+
*
279+
* This did make me consider eliminating the 'terminalState' ref and enqueuing it ... but then that requires
280+
* a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it
281+
* is currently the way it is.
282+
*
283+
* This performs fine as long as we don't use a queue implementation where the size() impl has to scan the whole list,
284+
* such as ConcurrentLinkedQueue.
285+
*/
286+
if (o == null && terminalState != null && queue.size() == 0) {
237287
o = terminalState;
238288
// once emitted we clear so a poll loop will finish
239289
terminalState = null;

rxjava-core/src/test/java/rx/internal/operators/OperatorMergeTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -973,4 +973,36 @@ public boolean hasNext() {
973973
return observable;
974974
}
975975

976+
@Test
977+
public void mergeManyAsyncSingle() {
978+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
979+
Observable<Observable<Integer>> os = Observable.range(1, 10000).map(new Func1<Integer, Observable<Integer>>() {
980+
981+
@Override
982+
public Observable<Integer> call(final Integer i) {
983+
return Observable.create(new OnSubscribe<Integer>() {
984+
985+
@Override
986+
public void call(Subscriber<? super Integer> s) {
987+
if (i < 500) {
988+
try {
989+
Thread.sleep(1);
990+
} catch (InterruptedException e) {
991+
e.printStackTrace();
992+
}
993+
}
994+
s.onNext(i);
995+
s.onCompleted();
996+
}
997+
998+
}).subscribeOn(Schedulers.computation()).cache();
999+
}
1000+
1001+
});
1002+
Observable.merge(os).subscribe(ts);
1003+
ts.awaitTerminalEvent();
1004+
ts.assertNoErrors();
1005+
assertEquals(10000, ts.getOnNextEvents().size());
1006+
}
1007+
9761008
}

0 commit comments

Comments
 (0)