Skip to content

Commit fb555df

Browse files
Synchronization of Merge operator (fixes)
- return AtomicSubscription not MergeSubscription which I was accidentally still returning - try/finally in unit test so threads are released even if assertion is thrown
1 parent effc08d commit fb555df

File tree

1 file changed

+10
-7
lines changed

1 file changed

+10
-7
lines changed

rxjava-core/src/main/java/rx/operators/OperationMerge.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ private MergeObservable(Observable<Observable<T>> sequences) {
118118
this.sequences = sequences;
119119
}
120120

121-
public MergeSubscription call(Observer<T> actualObserver) {
121+
public Subscription call(Observer<T> actualObserver) {
122122

123123
/**
124124
* We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting.
@@ -127,15 +127,16 @@ public MergeSubscription call(Observer<T> actualObserver) {
127127
* <p>
128128
* Bug report: https://github.com/Netflix/RxJava/issues/200
129129
*/
130-
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, new AtomicObservableSubscription(ourSubscription));
130+
AtomicObservableSubscription subscription = new AtomicObservableSubscription(ourSubscription);
131+
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, subscription);
131132

132133
/**
133134
* Subscribe to the parent Observable to get to the children Observables
134135
*/
135136
sequences.subscribe(new ParentObserver(synchronizedObserver));
136137

137138
/* return our subscription to allow unsubscribing */
138-
return ourSubscription;
139+
return subscription;
139140
}
140141

141142
/**
@@ -439,11 +440,13 @@ public void onNext(String v) {
439440
// wait for both observables to send (one should be blocked)
440441
o1.onNextBeingSent.await();
441442
o2.onNextBeingSent.await();
442-
443-
assertEquals(1, concurrentCounter.get());
444443

445-
// release so it can finish
446-
endLatch.countDown();
444+
try { // in try/finally so threads are released via latch countDown even if assertion fails
445+
assertEquals(1, concurrentCounter.get());
446+
} finally {
447+
// release so it can finish
448+
endLatch.countDown();
449+
}
447450

448451
try {
449452
o1.t.join();

0 commit comments

Comments
 (0)