Skip to content

Commit ceff938

Browse files
committed
Fixed deadlock in Subjects + OperatorCache.
1 parent d3ead3a commit ceff938

File tree

9 files changed

+262
-151
lines changed

9 files changed

+262
-151
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import rx.operators.OperationAsObservable;
5757
import rx.operators.OperationAverage;
5858
import rx.operators.OperationBuffer;
59-
import rx.operators.OperationCache;
6059
import rx.operators.OperationCombineLatest;
6160
import rx.operators.OperationConcat;
6261
import rx.operators.OperationDebounce;
@@ -105,6 +104,7 @@
105104
import rx.operators.OperationUsing;
106105
import rx.operators.OperationWindow;
107106
import rx.operators.OperatorAmb;
107+
import rx.operators.OperatorCache;
108108
import rx.operators.OperatorCast;
109109
import rx.operators.OperatorDoOnEach;
110110
import rx.operators.OperatorFilter;
@@ -3863,7 +3863,7 @@ public final <B> Observable<List<T>> buffer(Observable<B> boundary, int initialC
38633863
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-cache">RxJava Wiki: cache()</a>
38643864
*/
38653865
public final Observable<T> cache() {
3866-
return create(OperationCache.cache(this));
3866+
return create(new OperatorCache<T>(this));
38673867
}
38683868

38693869
/**

rxjava-core/src/main/java/rx/operators/OperationCache.java renamed to rxjava-core/src/main/java/rx/operators/OperatorCache.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
import java.util.concurrent.atomic.AtomicBoolean;
1919

2020
import rx.Observable;
21+
import rx.Observable.OnSubscribe;
2122
import rx.Observable.OnSubscribeFunc;
2223
import rx.Observer;
24+
import rx.Subscriber;
2325
import rx.Subscription;
2426
import rx.subjects.ReplaySubject;
27+
import rx.subjects.Subject;
2528

2629
/**
2730
* This method has similar behavior to {@link Observable#replay()} except that this auto-subscribes
@@ -35,30 +38,35 @@
3538
* NOTE: You sacrifice the ability to unsubscribe from the origin when you use this operator, so be
3639
* careful not to use this operator on Observables that emit infinite or very large numbers of
3740
* items, as this will use up memory.
41+
*
42+
* @param <T> the cached value type
3843
*/
39-
public class OperationCache {
40-
41-
public static <T> OnSubscribeFunc<T> cache(final Observable<? extends T> source) {
42-
return new OnSubscribeFunc<T>() {
43-
44-
final AtomicBoolean subscribed = new AtomicBoolean(false);
45-
private final ReplaySubject<T> cache = ReplaySubject.create();
44+
public final class OperatorCache<T> implements OnSubscribe<T> {
45+
protected final Observable<? extends T> source;
46+
protected final Subject<? super T, ? extends T> cache;
47+
protected final AtomicBoolean sourceSubscribed;
4648

47-
@Override
48-
public Subscription onSubscribe(Observer<? super T> observer) {
49-
if (subscribed.compareAndSet(false, true)) {
50-
// subscribe to the source once
51-
source.subscribe(cache);
52-
/*
53-
* Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values.
54-
*
55-
* This means this should never be used on an infinite or very large sequence, similar to toList().
56-
*/
57-
}
58-
59-
return cache.subscribe(observer);
60-
}
49+
public OperatorCache(Observable<? extends T> source) {
50+
this(source, ReplaySubject.<T>create());
51+
}
52+
53+
/** Test support. */
54+
public OperatorCache(Observable<? extends T> source, Subject<? super T, ? extends T> cache) {
55+
this.source = source;
56+
this.cache = cache;
57+
this.sourceSubscribed = new AtomicBoolean();
58+
}
6159

62-
};
60+
@Override
61+
public void call(Subscriber<? super T> t1) {
62+
if (sourceSubscribed.compareAndSet(false, true)) {
63+
source.subscribe(cache);
64+
/*
65+
* Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values.
66+
*
67+
* This means this should never be used on an infinite or very large sequence, similar to toList().
68+
*/
69+
}
70+
cache.subscribe(t1);
6371
}
6472
}

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,35 +105,40 @@ protected AsyncSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T>
105105

106106
@Override
107107
public void onCompleted() {
108+
Collection<SubjectObserver<? super T>> observers =
108109
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
109110

110111
@Override
111112
public void call(Collection<SubjectObserver<? super T>> observers) {
112-
for (Observer<? super T> o : observers) {
113-
emitValueToObserver(lastNotification.get(), o);
114-
}
115113
}
116114
});
115+
if (observers != null) {
116+
for (Observer<? super T> o : observers) {
117+
emitValueToObserver(lastNotification.get(), o);
118+
}
119+
}
117120
}
118121

119122
@Override
120123
public void onError(final Throwable e) {
124+
Collection<SubjectObserver<? super T>> observers =
121125
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
122-
123126
@Override
124127
public void call(Collection<SubjectObserver<? super T>> observers) {
125-
lastNotification.set(new Notification<T>(e));
126-
for (Observer<? super T> o : observers) {
127-
emitValueToObserver(lastNotification.get(), o);
128-
}
128+
lastNotification.set(Notification.<T>createOnError(e));
129129
}
130130
});
131+
if (observers != null) {
132+
for (Observer<? super T> o : observers) {
133+
emitValueToObserver(lastNotification.get(), o);
134+
}
135+
}
131136

132137
}
133138

134139
@Override
135140
public void onNext(T v) {
136-
lastNotification.set(new Notification<T>(v));
141+
lastNotification.set(Notification.createOnNext(v));
137142
}
138143

139144
}

rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -144,39 +144,44 @@ protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager
144144

145145
@Override
146146
public void onCompleted() {
147+
Collection<SubjectObserver<? super T>> observers =
147148
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
148149

149150
@Override
150151
public void call(Collection<SubjectObserver<? super T>> observers) {
151-
lastNotification.set(new Notification<T>());
152-
for (Observer<? super T> o : observers) {
153-
o.onCompleted();
154-
}
152+
lastNotification.set(Notification.<T>createOnCompleted());
155153
}
156154
});
155+
if (observers != null) {
156+
for (Observer<? super T> o : observers) {
157+
o.onCompleted();
158+
}
159+
}
157160
}
158161

159162
@Override
160163
public void onError(final Throwable e) {
164+
Collection<SubjectObserver<? super T>> observers =
161165
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
162166

163167
@Override
164168
public void call(Collection<SubjectObserver<? super T>> observers) {
165-
lastNotification.set(new Notification<T>(e));
166-
for (Observer<? super T> o : observers) {
167-
o.onError(e);
168-
}
169+
lastNotification.set(Notification.<T>createOnError(e));
169170
}
170171
});
171-
172+
if (observers != null) {
173+
for (Observer<? super T> o : observers) {
174+
o.onError(e);
175+
}
176+
}
172177
}
173178

174179
@Override
175180
public void onNext(T v) {
176181
// do not overwrite a terminal notification
177182
// so new subscribers can get them
178183
if (lastNotification.get().isOnNext()) {
179-
lastNotification.set(new Notification<T>(v));
184+
lastNotification.set(Notification.createOnNext(v));
180185
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
181186
o.onNext(v);
182187
}

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,31 +95,35 @@ protected PublishSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<
9595

9696
@Override
9797
public void onCompleted() {
98+
Collection<SubjectObserver<? super T>> observers =
9899
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
99-
100100
@Override
101101
public void call(Collection<SubjectObserver<? super T>> observers) {
102102
lastNotification.set(Notification.<T> createOnCompleted());
103-
for (Observer<? super T> o : observers) {
104-
o.onCompleted();
105-
}
106103
}
107104
});
105+
if (observers != null) {
106+
for (Observer<? super T> o : observers) {
107+
o.onCompleted();
108+
}
109+
}
108110
}
109111

110112
@Override
111113
public void onError(final Throwable e) {
114+
Collection<SubjectObserver<? super T>> observers =
112115
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
113116

114117
@Override
115118
public void call(Collection<SubjectObserver<? super T>> observers) {
116119
lastNotification.set(Notification.<T>createOnError(e));
117-
for (Observer<? super T> o : observers) {
118-
o.onError(e);
119-
}
120120
}
121121
});
122-
122+
if (observers != null) {
123+
for (Observer<? super T> o : observers) {
124+
o.onError(e);
125+
}
126+
}
123127
}
124128

125129
@Override

rxjava-core/src/main/java/rx/subjects/ReplaySubject.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -122,34 +122,40 @@ protected ReplaySubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T
122122

123123
@Override
124124
public void onCompleted() {
125+
Collection<SubjectObserver<? super T>> observers =
125126
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
126127

127128
@Override
128129
public void call(Collection<SubjectObserver<? super T>> observers) {
129130
state.history.complete(Notification.<T>createOnCompleted());
130-
for (SubjectObserver<? super T> o : observers) {
131-
if (caughtUp(o)) {
132-
o.onCompleted();
133-
}
134-
}
135131
}
136132
});
133+
if (observers != null) {
134+
for (SubjectObserver<? super T> o : observers) {
135+
if (caughtUp(o)) {
136+
o.onCompleted();
137+
}
138+
}
139+
}
137140
}
138141

139142
@Override
140143
public void onError(final Throwable e) {
144+
Collection<SubjectObserver<? super T>> observers =
141145
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
142146

143147
@Override
144148
public void call(Collection<SubjectObserver<? super T>> observers) {
145149
state.history.complete(Notification.<T>createOnError(e));
146-
for (SubjectObserver<? super T> o : observers) {
147-
if (caughtUp(o)) {
148-
o.onError(e);
149-
}
150-
}
151150
}
152151
});
152+
if (observers != null) {
153+
for (SubjectObserver<? super T> o : observers) {
154+
if (caughtUp(o)) {
155+
o.onError(e);
156+
}
157+
}
158+
}
153159
}
154160

155161
@Override

rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,19 +113,20 @@ public void call() {
113113
}
114114

115115
@SuppressWarnings({ "unchecked", "rawtypes" })
116-
protected void terminate(Action1<Collection<SubjectObserver<? super T>>> onTerminate) {
116+
protected Collection<SubjectObserver<? super T>> terminate(Action1<Collection<SubjectObserver<? super T>>> onTerminate) {
117117
State<T> current;
118118
State<T> newState = null;
119119
do {
120120
current = state.get();
121121
if (current.terminated) {
122122
// already terminated so do nothing
123-
return;
123+
return null;
124124
} else {
125125
newState = current.terminate();
126126
}
127127
} while (!state.compareAndSet(current, newState));
128128

129+
Collection<SubjectObserver<? super T>> observerCollection = (Collection)Arrays.asList(newState.observers);
129130
/*
130131
* if we get here then we won setting the state to terminated
131132
* and have a deterministic set of Observers to emit to (concurrent subscribes
@@ -134,11 +135,12 @@ protected void terminate(Action1<Collection<SubjectObserver<? super T>>> onTermi
134135
*/
135136
try {
136137
// had to circumvent type check, we know what the array contains
137-
onTerminate.call((Collection) Arrays.asList(newState.observers));
138+
onTerminate.call(observerCollection);
138139
} finally {
139140
// mark that termination is completed
140141
newState.terminationLatch.countDown();
141142
}
143+
return observerCollection;
142144
}
143145

144146
/**

0 commit comments

Comments
 (0)