Skip to content

Commit 0010178

Browse files
Merge pull request #201 from benjchristensen/issue-200-merge-synchronization
Synchronize Observer on OperationMerge
2 parents 645c7b4 + 169e7e0 commit 0010178

File tree

1 file changed

+95
-3
lines changed

1 file changed

+95
-3
lines changed

rxjava-core/src/main/java/rx/operators/OperationMerge.java

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import java.util.Arrays;
2424
import java.util.List;
2525
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.CountDownLatch;
2627
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicInteger;
2729

2830
import org.junit.Before;
2931
import org.junit.Test;
@@ -33,6 +35,8 @@
3335
import rx.Observable;
3436
import rx.Observer;
3537
import rx.Subscription;
38+
import rx.util.AtomicObservableSubscription;
39+
import rx.util.SynchronizedObserver;
3640
import rx.util.functions.Func1;
3741

3842
public final class OperationMerge {
@@ -114,14 +118,25 @@ private MergeObservable(Observable<Observable<T>> sequences) {
114118
this.sequences = sequences;
115119
}
116120

117-
public MergeSubscription call(Observer<T> actualObserver) {
121+
public Subscription call(Observer<T> actualObserver) {
122+
123+
/**
124+
* We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting.
125+
* <p>
126+
* The calls from each sequence must be serialized.
127+
* <p>
128+
* Bug report: https://github.com/Netflix/RxJava/issues/200
129+
*/
130+
AtomicObservableSubscription subscription = new AtomicObservableSubscription(ourSubscription);
131+
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, subscription);
132+
118133
/**
119134
* Subscribe to the parent Observable to get to the children Observables
120135
*/
121-
sequences.subscribe(new ParentObserver(actualObserver));
136+
sequences.subscribe(new ParentObserver(synchronizedObserver));
122137

123138
/* return our subscription to allow unsubscribing */
124-
return ourSubscription;
139+
return subscription;
125140
}
126141

127142
/**
@@ -380,6 +395,79 @@ public void testMergeArrayWithThreading() {
380395
verify(stringObserver, times(1)).onCompleted();
381396
}
382397

398+
@Test
399+
public void testSynchronizationOfMultipleSequences() throws Exception {
400+
final TestASynchronousObservable o1 = new TestASynchronousObservable();
401+
final TestASynchronousObservable o2 = new TestASynchronousObservable();
402+
403+
// use this latch to cause onNext to wait until we're ready to let it go
404+
final CountDownLatch endLatch = new CountDownLatch(1);
405+
406+
final AtomicInteger concurrentCounter = new AtomicInteger();
407+
final AtomicInteger totalCounter = new AtomicInteger();
408+
409+
@SuppressWarnings("unchecked")
410+
Observable<String> m = Observable.create(merge(o1, o2));
411+
m.subscribe(new Observer<String>() {
412+
413+
@Override
414+
public void onCompleted() {
415+
416+
}
417+
418+
@Override
419+
public void onError(Exception e) {
420+
throw new RuntimeException("failed", e);
421+
}
422+
423+
@Override
424+
public void onNext(String v) {
425+
totalCounter.incrementAndGet();
426+
concurrentCounter.incrementAndGet();
427+
try {
428+
// wait here until we're done asserting
429+
endLatch.await();
430+
} catch (InterruptedException e) {
431+
e.printStackTrace();
432+
throw new RuntimeException("failed", e);
433+
} finally {
434+
concurrentCounter.decrementAndGet();
435+
}
436+
}
437+
438+
});
439+
440+
// wait for both observables to send (one should be blocked)
441+
o1.onNextBeingSent.await();
442+
o2.onNextBeingSent.await();
443+
444+
// I can't think of a way to know for sure that both threads have or are trying to send onNext
445+
// since I can't use a CountDownLatch for "after" onNext since I want to catch during it
446+
// but I can't know for sure onNext is invoked
447+
// so I'm unfortunately reverting to using a Thread.sleep to allow the process scheduler time
448+
// to make sure after o1.onNextBeingSent and o2.onNextBeingSent are hit that the following
449+
// onNext is invoked.
450+
451+
Thread.sleep(300);
452+
453+
try { // in try/finally so threads are released via latch countDown even if assertion fails
454+
assertEquals(1, concurrentCounter.get());
455+
} finally {
456+
// release so it can finish
457+
endLatch.countDown();
458+
}
459+
460+
try {
461+
o1.t.join();
462+
o2.t.join();
463+
} catch (InterruptedException e) {
464+
throw new RuntimeException(e);
465+
}
466+
467+
assertEquals(2, totalCounter.get());
468+
assertEquals(0, concurrentCounter.get());
469+
}
470+
383471
/**
384472
* unit test from OperationMergeDelayError backported here to show how these use cases work with normal merge
385473
*/
@@ -452,14 +540,18 @@ public void unsubscribe() {
452540

453541
private static class TestASynchronousObservable extends Observable<String> {
454542
Thread t;
543+
final CountDownLatch onNextBeingSent = new CountDownLatch(1);
455544

456545
@Override
457546
public Subscription subscribe(final Observer<String> observer) {
458547
t = new Thread(new Runnable() {
459548

460549
@Override
461550
public void run() {
551+
onNextBeingSent.countDown();
462552
observer.onNext("hello");
553+
// I can't use a countDownLatch to prove we are actually sending 'onNext'
554+
// since it will block if synchronized and I'll deadlock
463555
observer.onCompleted();
464556
}
465557

0 commit comments

Comments
 (0)