Skip to content

Commit effc08d

Browse files
Synchronize Observer on OperationMerge
fixes #200 This is necessary because by definition Merge is subscribing to multiple sequences in parallel and is supposed to serialize them into a single Observable.
1 parent f4968d6 commit effc08d

File tree

1 file changed

+79
-1
lines changed

1 file changed

+79
-1
lines changed

rxjava-core/src/main/java/rx/operators/OperationMerge.java

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import java.util.Arrays;
2424
import java.util.List;
2525
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.CountDownLatch;
2627
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicInteger;
2729

2830
import org.junit.Before;
2931
import org.junit.Test;
@@ -33,6 +35,8 @@
3335
import rx.Observable;
3436
import rx.Observer;
3537
import rx.Subscription;
38+
import rx.util.AtomicObservableSubscription;
39+
import rx.util.SynchronizedObserver;
3640
import rx.util.functions.Func1;
3741

3842
public final class OperationMerge {
@@ -115,10 +119,20 @@ private MergeObservable(Observable<Observable<T>> sequences) {
115119
}
116120

117121
public MergeSubscription call(Observer<T> actualObserver) {
122+
123+
/**
124+
* We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting.
125+
* <p>
126+
* The calls from each sequence must be serialized.
127+
* <p>
128+
* Bug report: https://github.com/Netflix/RxJava/issues/200
129+
*/
130+
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, new AtomicObservableSubscription(ourSubscription));
131+
118132
/**
119133
* Subscribe to the parent Observable to get to the children Observables
120134
*/
121-
sequences.subscribe(new ParentObserver(actualObserver));
135+
sequences.subscribe(new ParentObserver(synchronizedObserver));
122136

123137
/* return our subscription to allow unsubscribing */
124138
return ourSubscription;
@@ -380,6 +394,68 @@ public void testMergeArrayWithThreading() {
380394
verify(stringObserver, times(1)).onCompleted();
381395
}
382396

397+
@Test
398+
public void testSynchronizationOfMultipleSequences() throws Exception {
399+
final TestASynchronousObservable o1 = new TestASynchronousObservable();
400+
final TestASynchronousObservable o2 = new TestASynchronousObservable();
401+
402+
// use this latch to cause onNext to wait until we're ready to let it go
403+
final CountDownLatch endLatch = new CountDownLatch(1);
404+
405+
final AtomicInteger concurrentCounter = new AtomicInteger();
406+
final AtomicInteger totalCounter = new AtomicInteger();
407+
408+
@SuppressWarnings("unchecked")
409+
Observable<String> m = Observable.create(merge(o1, o2));
410+
m.subscribe(new Observer<String>() {
411+
412+
@Override
413+
public void onCompleted() {
414+
415+
}
416+
417+
@Override
418+
public void onError(Exception e) {
419+
throw new RuntimeException("failed", e);
420+
}
421+
422+
@Override
423+
public void onNext(String v) {
424+
totalCounter.incrementAndGet();
425+
concurrentCounter.incrementAndGet();
426+
try {
427+
// wait here until we're done asserting
428+
endLatch.await();
429+
} catch (InterruptedException e) {
430+
e.printStackTrace();
431+
throw new RuntimeException("failed", e);
432+
} finally {
433+
concurrentCounter.decrementAndGet();
434+
}
435+
}
436+
437+
});
438+
439+
// wait for both observables to send (one should be blocked)
440+
o1.onNextBeingSent.await();
441+
o2.onNextBeingSent.await();
442+
443+
assertEquals(1, concurrentCounter.get());
444+
445+
// release so it can finish
446+
endLatch.countDown();
447+
448+
try {
449+
o1.t.join();
450+
o2.t.join();
451+
} catch (InterruptedException e) {
452+
throw new RuntimeException(e);
453+
}
454+
455+
assertEquals(2, totalCounter.get());
456+
assertEquals(0, concurrentCounter.get());
457+
}
458+
383459
/**
384460
* unit test from OperationMergeDelayError backported here to show how these use cases work with normal merge
385461
*/
@@ -452,13 +528,15 @@ public void unsubscribe() {
452528

453529
private static class TestASynchronousObservable extends Observable<String> {
454530
Thread t;
531+
final CountDownLatch onNextBeingSent = new CountDownLatch(1);
455532

456533
@Override
457534
public Subscription subscribe(final Observer<String> observer) {
458535
t = new Thread(new Runnable() {
459536

460537
@Override
461538
public void run() {
539+
onNextBeingSent.countDown();
462540
observer.onNext("hello");
463541
observer.onCompleted();
464542
}

0 commit comments

Comments
 (0)