Skip to content

Commit 797bf4d

Browse files
author
jmhofer
committed
Tried to adapt according to Ben's review comments.
1 parent 1500859 commit 797bf4d

File tree

2 files changed

+14
-10
lines changed

2 files changed

+14
-10
lines changed

rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import rx.Observer;
3535
import rx.Subscription;
3636
import rx.subscriptions.Subscriptions;
37+
import rx.util.AtomicObservableSubscription;
38+
import rx.util.SynchronizedObserver;
3739
import rx.util.functions.Func1;
3840
import rx.util.functions.Func2;
3941
import rx.util.functions.Func3;
@@ -124,7 +126,7 @@ public void onNext(T args) {
124126
*/
125127
private static class Aggregator<R> implements Func1<Observer<? super R>, Subscription> {
126128

127-
private Observer<? super R> observer;
129+
private volatile Observer<? super R> observer;
128130

129131
private final FuncN<? extends R> combineLatestFunction;
130132
private final AtomicBoolean running = new AtomicBoolean(true);
@@ -228,19 +230,21 @@ public Subscription call(Observer<? super R> observer) {
228230
if (this.observer != null) {
229231
throw new IllegalStateException("Only one Observer can subscribe to this Observable.");
230232
}
231-
this.observer = observer;
233+
234+
AtomicObservableSubscription subscription = new AtomicObservableSubscription(new Subscription() {
235+
@Override
236+
public void unsubscribe() {
237+
stop();
238+
}
239+
});
240+
this.observer = new SynchronizedObserver<R>(observer, subscription);
232241

233242
/* start the observers */
234243
for (CombineObserver<R, ?> rw : observers) {
235244
rw.startWatching();
236245
}
237246

238-
return new Subscription() {
239-
@Override
240-
public void unsubscribe() {
241-
stop();
242-
}
243-
};
247+
return subscription;
244248
}
245249

246250
private void stop() {

rxjava-core/src/main/java/rx/util/SynchronizedObserver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,12 @@ public final class SynchronizedObserver<T> implements Observer<T> {
6464
* // TODO composing of this class should rarely happen now with updated design so this decision should be revisited
6565
*/
6666

67-
private final Observer<T> observer;
67+
private final Observer<? super T> observer;
6868
private final AtomicObservableSubscription subscription;
6969
private volatile boolean finishRequested = false;
7070
private volatile boolean finished = false;
7171

72-
public SynchronizedObserver(Observer<T> Observer, AtomicObservableSubscription subscription) {
72+
public SynchronizedObserver(Observer<? super T> Observer, AtomicObservableSubscription subscription) {
7373
this.observer = Observer;
7474
this.subscription = subscription;
7575
}

0 commit comments

Comments
 (0)