Skip to content

Commit ab8460b

Browse files
committed
Merge: fixed hangs & missed scalar emissions
1 parent cf5ae70 commit ab8460b

File tree

2 files changed

+60
-40
lines changed

2 files changed

+60
-40
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 43 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,13 @@
1717

1818
import java.util.Queue;
1919
import java.util.concurrent.ConcurrentLinkedQueue;
20-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
21-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
20+
import java.util.concurrent.atomic.*;
2221

23-
import rx.Observable;
22+
import rx.*;
2423
import rx.Observable.Operator;
25-
import rx.Producer;
26-
import rx.Subscriber;
27-
import rx.exceptions.CompositeException;
28-
import rx.exceptions.MissingBackpressureException;
29-
import rx.exceptions.OnErrorThrowable;
24+
import rx.exceptions.*;
3025
import rx.functions.Func1;
31-
import rx.internal.util.RxRingBuffer;
32-
import rx.internal.util.ScalarSynchronousObservable;
33-
import rx.internal.util.SubscriptionIndexedRingBuffer;
26+
import rx.internal.util.*;
3427

3528
/**
3629
* Flattens a list of {@link Observable}s into one {@code Observable}, without any transformation.
@@ -135,7 +128,7 @@ private static final class MergeSubscriber<T> extends Subscriber<Observable<? ex
135128

136129
private volatile SubscriptionIndexedRingBuffer<InnerSubscriber<T>> childrenSubscribers;
137130

138-
private RxRingBuffer scalarValueQueue = null;
131+
private volatile RxRingBuffer scalarValueQueue = null;
139132

140133
/* protected by lock on MergeSubscriber instance */
141134
private int missedEmitting = 0;
@@ -266,9 +259,8 @@ private void handleScalarSynchronousObservableWithoutRequestLimits(ScalarSynchro
266259
request(1);
267260
return;
268261
} else {
269-
initScalarValueQueueIfNeeded();
270262
try {
271-
scalarValueQueue.onNext(value);
263+
getOrCreateScalarValueQueue().onNext(value);
272264
} catch (MissingBackpressureException e) {
273265
onError(e);
274266
}
@@ -306,19 +298,20 @@ private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronou
306298

307299
// if we didn't return above we need to enqueue
308300
// enqueue the values for later delivery
309-
initScalarValueQueueIfNeeded();
310301
try {
311-
scalarValueQueue.onNext(t.get());
302+
getOrCreateScalarValueQueue().onNext(t.get());
312303
} catch (MissingBackpressureException e) {
313304
onError(e);
314305
}
315306
}
316307

317-
private void initScalarValueQueueIfNeeded() {
318-
if (scalarValueQueue == null) {
319-
scalarValueQueue = RxRingBuffer.getSpmcInstance();
320-
add(scalarValueQueue);
308+
private RxRingBuffer getOrCreateScalarValueQueue() {
309+
RxRingBuffer svq = scalarValueQueue;
310+
if (svq == null) {
311+
svq = RxRingBuffer.getSpmcInstance();
312+
scalarValueQueue = svq;
321313
}
314+
return svq;
322315
}
323316

324317
private synchronized boolean releaseEmitLock() {
@@ -381,21 +374,22 @@ private void drainChildrenQueues() {
381374
* ONLY call when holding the EmitLock.
382375
*/
383376
private int drainScalarValueQueue() {
384-
if (scalarValueQueue != null) {
377+
RxRingBuffer svq = scalarValueQueue;
378+
if (svq != null) {
385379
long r = mergeProducer.requested;
386380
int emittedWhileDraining = 0;
387381
if (r < 0) {
388382
// drain it all
389383
Object o = null;
390-
while ((o = scalarValueQueue.poll()) != null) {
384+
while ((o = svq.poll()) != null) {
391385
on.accept(actual, o);
392386
emittedWhileDraining++;
393387
}
394388
} else if (r > 0) {
395389
// drain what was requested
396390
long toEmit = r;
397391
for (int i = 0; i < toEmit; i++) {
398-
Object o = scalarValueQueue.poll();
392+
Object o = svq.poll();
399393
if (o == null) {
400394
break;
401395
} else {
@@ -469,7 +463,7 @@ public void onCompleted() {
469463
boolean c = false;
470464
synchronized (this) {
471465
completed = true;
472-
if (wip == 0 && (scalarValueQueue == null || scalarValueQueue.isEmpty())) {
466+
if (wip == 0) {
473467
c = true;
474468
}
475469
}
@@ -494,25 +488,38 @@ void completeInner(InnerSubscriber<T> s) {
494488
}
495489

496490
private void drainAndComplete() {
497-
drainQueuesIfNeeded(); // TODO need to confirm whether this is needed or not
498-
if (delayErrors) {
499-
Queue<Throwable> es = null;
491+
boolean moreToDrain = true;
492+
while (moreToDrain) {
500493
synchronized (this) {
501-
es = exceptions;
494+
missedEmitting = 0;
502495
}
503-
if (es != null) {
504-
if (es.isEmpty()) {
505-
actual.onCompleted();
506-
} else if (es.size() == 1) {
507-
actual.onError(es.poll());
496+
drainScalarValueQueue();
497+
drainChildrenQueues();
498+
synchronized (this) {
499+
moreToDrain = missedEmitting > 0;
500+
}
501+
}
502+
RxRingBuffer svq = scalarValueQueue;
503+
if (svq == null || svq.isEmpty()) {
504+
if (delayErrors) {
505+
Queue<Throwable> es = null;
506+
synchronized (this) {
507+
es = exceptions;
508+
}
509+
if (es != null) {
510+
if (es.isEmpty()) {
511+
actual.onCompleted();
512+
} else if (es.size() == 1) {
513+
actual.onError(es.poll());
514+
} else {
515+
actual.onError(new CompositeException(es));
516+
}
508517
} else {
509-
actual.onError(new CompositeException(es));
518+
actual.onCompleted();
510519
}
511520
} else {
512521
actual.onCompleted();
513522
}
514-
} else {
515-
actual.onCompleted();
516523
}
517524
}
518525

src/test/java/rx/internal/operators/OperatorMergeTest.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,16 @@ public void onNext(Integer t) {
628628
assertTrue(generated1.get() >= RxRingBuffer.SIZE * 2 && generated1.get() <= RxRingBuffer.SIZE * 4);
629629
}
630630

631+
@Test
632+
public void testBackpressureUpstream2InLoop() throws InterruptedException {
633+
for (int i = 0; i < 1000; i++) {
634+
System.err.flush();
635+
System.out.println("---");
636+
System.out.flush();
637+
testBackpressureUpstream2();
638+
}
639+
}
640+
631641
@Test
632642
public void testBackpressureUpstream2() throws InterruptedException {
633643
final AtomicInteger generated1 = new AtomicInteger();
@@ -636,21 +646,24 @@ public void testBackpressureUpstream2() throws InterruptedException {
636646
TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>() {
637647
@Override
638648
public void onNext(Integer t) {
639-
System.err.println("testSubscriber received => " + t + " on thread " + Thread.currentThread());
640649
super.onNext(t);
641650
}
642651
};
643652

644653
Observable.merge(o1.take(RxRingBuffer.SIZE * 2), Observable.just(-99)).subscribe(testSubscriber);
645654
testSubscriber.awaitTerminalEvent();
655+
656+
List<Integer> onNextEvents = testSubscriber.getOnNextEvents();
657+
658+
System.out.println("Generated 1: " + generated1.get() + " / received: " + onNextEvents.size());
659+
System.out.println(onNextEvents);
660+
646661
if (testSubscriber.getOnErrorEvents().size() > 0) {
647662
testSubscriber.getOnErrorEvents().get(0).printStackTrace();
648663
}
649664
testSubscriber.assertNoErrors();
650-
System.err.println(testSubscriber.getOnNextEvents());
651-
assertEquals(RxRingBuffer.SIZE * 2 + 1, testSubscriber.getOnNextEvents().size());
665+
assertEquals(RxRingBuffer.SIZE * 2 + 1, onNextEvents.size());
652666
// it should be between the take num and requested batch size across the async boundary
653-
System.out.println("Generated 1: " + generated1.get());
654667
assertTrue(generated1.get() >= RxRingBuffer.SIZE * 2 && generated1.get() <= RxRingBuffer.SIZE * 3);
655668
}
656669

0 commit comments

Comments
 (0)