Skip to content

Commit e4d9a60

Browse files
Merge branch 'SubjectDeadlockFix320' of github.com:akarnokd/RxJava into pull-972
Conflicts: rxjava-core/src/main/java/rx/operators/OperatorCache.java Updated OperatorCache to use `unsafeSubscribe` to fix merge conflicts.
2 parents 7a85774 + ceff938 commit e4d9a60

File tree

9 files changed

+262
-154
lines changed

9 files changed

+262
-154
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import rx.operators.OperationAsObservable;
5757
import rx.operators.OperationAverage;
5858
import rx.operators.OperationBuffer;
59-
import rx.operators.OperationCache;
6059
import rx.operators.OperationCombineLatest;
6160
import rx.operators.OperationConcat;
6261
import rx.operators.OperationDebounce;
@@ -105,6 +104,7 @@
105104
import rx.operators.OperationUsing;
106105
import rx.operators.OperationWindow;
107106
import rx.operators.OperatorAmb;
107+
import rx.operators.OperatorCache;
108108
import rx.operators.OperatorCast;
109109
import rx.operators.OperatorDoOnEach;
110110
import rx.operators.OperatorFilter;
@@ -3864,7 +3864,7 @@ public final <B> Observable<List<T>> buffer(Observable<B> boundary, int initialC
38643864
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-cache">RxJava Wiki: cache()</a>
38653865
*/
38663866
public final Observable<T> cache() {
3867-
return create(OperationCache.cache(this));
3867+
return create(new OperatorCache<T>(this));
38683868
}
38693869

38703870
/**

rxjava-core/src/main/java/rx/operators/OperationCache.java renamed to rxjava-core/src/main/java/rx/operators/OperatorCache.java

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
import java.util.concurrent.atomic.AtomicBoolean;
1919

2020
import rx.Observable;
21-
import rx.Observable.OnSubscribeFunc;
22-
import rx.Observer;
23-
import rx.Subscription;
21+
import rx.Observable.OnSubscribe;
22+
import rx.Subscriber;
2423
import rx.observers.Subscribers;
2524
import rx.subjects.ReplaySubject;
25+
import rx.subjects.Subject;
2626

2727
/**
2828
* This method has similar behavior to {@link Observable#replay()} except that this auto-subscribes
@@ -36,30 +36,35 @@
3636
* NOTE: You sacrifice the ability to unsubscribe from the origin when you use this operator, so be
3737
* careful not to use this operator on Observables that emit infinite or very large numbers of
3838
* items, as this will use up memory.
39+
*
40+
* @param <T>
41+
* the cached value type
3942
*/
40-
public class OperationCache {
41-
42-
public static <T> OnSubscribeFunc<T> cache(final Observable<? extends T> source) {
43-
return new OnSubscribeFunc<T>() {
44-
45-
final AtomicBoolean subscribed = new AtomicBoolean(false);
46-
private final ReplaySubject<T> cache = ReplaySubject.create();
43+
public final class OperatorCache<T> implements OnSubscribe<T> {
44+
protected final Observable<? extends T> source;
45+
protected final Subject<? super T, ? extends T> cache;
46+
protected final AtomicBoolean sourceSubscribed;
4747

48-
@Override
49-
public Subscription onSubscribe(Observer<? super T> observer) {
50-
if (subscribed.compareAndSet(false, true)) {
51-
// subscribe to the source once
52-
source.unsafeSubscribe(Subscribers.from(cache));
53-
/*
54-
* Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values.
55-
*
56-
* This means this should never be used on an infinite or very large sequence, similar to toList().
57-
*/
58-
}
48+
public OperatorCache(Observable<? extends T> source) {
49+
this(source, ReplaySubject.<T> create());
50+
}
5951

60-
return cache.unsafeSubscribe(Subscribers.from(observer));
61-
}
52+
/* accessible to tests */OperatorCache(Observable<? extends T> source, Subject<? super T, ? extends T> cache) {
53+
this.source = source;
54+
this.cache = cache;
55+
this.sourceSubscribed = new AtomicBoolean();
56+
}
6257

63-
};
58+
@Override
59+
public void call(Subscriber<? super T> s) {
60+
if (sourceSubscribed.compareAndSet(false, true)) {
61+
source.unsafeSubscribe(Subscribers.from(cache));
62+
/*
63+
* Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values.
64+
*
65+
* This means this should never be used on an infinite or very large sequence, similar to toList().
66+
*/
67+
}
68+
cache.unsafeSubscribe(s);
6469
}
65-
}
70+
}

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,35 +105,40 @@ protected AsyncSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T>
105105

106106
@Override
107107
public void onCompleted() {
108+
Collection<SubjectObserver<? super T>> observers =
108109
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
109110

110111
@Override
111112
public void call(Collection<SubjectObserver<? super T>> observers) {
112-
for (Observer<? super T> o : observers) {
113-
emitValueToObserver(lastNotification.get(), o);
114-
}
115113
}
116114
});
115+
if (observers != null) {
116+
for (Observer<? super T> o : observers) {
117+
emitValueToObserver(lastNotification.get(), o);
118+
}
119+
}
117120
}
118121

119122
@Override
120123
public void onError(final Throwable e) {
124+
Collection<SubjectObserver<? super T>> observers =
121125
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
122-
123126
@Override
124127
public void call(Collection<SubjectObserver<? super T>> observers) {
125-
lastNotification.set(new Notification<T>(e));
126-
for (Observer<? super T> o : observers) {
127-
emitValueToObserver(lastNotification.get(), o);
128-
}
128+
lastNotification.set(Notification.<T>createOnError(e));
129129
}
130130
});
131+
if (observers != null) {
132+
for (Observer<? super T> o : observers) {
133+
emitValueToObserver(lastNotification.get(), o);
134+
}
135+
}
131136

132137
}
133138

134139
@Override
135140
public void onNext(T v) {
136-
lastNotification.set(new Notification<T>(v));
141+
lastNotification.set(Notification.createOnNext(v));
137142
}
138143

139144
}

rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -144,39 +144,44 @@ protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager
144144

145145
@Override
146146
public void onCompleted() {
147+
Collection<SubjectObserver<? super T>> observers =
147148
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
148149

149150
@Override
150151
public void call(Collection<SubjectObserver<? super T>> observers) {
151-
lastNotification.set(new Notification<T>());
152-
for (Observer<? super T> o : observers) {
153-
o.onCompleted();
154-
}
152+
lastNotification.set(Notification.<T>createOnCompleted());
155153
}
156154
});
155+
if (observers != null) {
156+
for (Observer<? super T> o : observers) {
157+
o.onCompleted();
158+
}
159+
}
157160
}
158161

159162
@Override
160163
public void onError(final Throwable e) {
164+
Collection<SubjectObserver<? super T>> observers =
161165
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
162166

163167
@Override
164168
public void call(Collection<SubjectObserver<? super T>> observers) {
165-
lastNotification.set(new Notification<T>(e));
166-
for (Observer<? super T> o : observers) {
167-
o.onError(e);
168-
}
169+
lastNotification.set(Notification.<T>createOnError(e));
169170
}
170171
});
171-
172+
if (observers != null) {
173+
for (Observer<? super T> o : observers) {
174+
o.onError(e);
175+
}
176+
}
172177
}
173178

174179
@Override
175180
public void onNext(T v) {
176181
// do not overwrite a terminal notification
177182
// so new subscribers can get them
178183
if (lastNotification.get().isOnNext()) {
179-
lastNotification.set(new Notification<T>(v));
184+
lastNotification.set(Notification.createOnNext(v));
180185
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
181186
o.onNext(v);
182187
}

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,31 +95,35 @@ protected PublishSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<
9595

9696
@Override
9797
public void onCompleted() {
98+
Collection<SubjectObserver<? super T>> observers =
9899
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
99-
100100
@Override
101101
public void call(Collection<SubjectObserver<? super T>> observers) {
102102
lastNotification.set(Notification.<T> createOnCompleted());
103-
for (Observer<? super T> o : observers) {
104-
o.onCompleted();
105-
}
106103
}
107104
});
105+
if (observers != null) {
106+
for (Observer<? super T> o : observers) {
107+
o.onCompleted();
108+
}
109+
}
108110
}
109111

110112
@Override
111113
public void onError(final Throwable e) {
114+
Collection<SubjectObserver<? super T>> observers =
112115
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
113116

114117
@Override
115118
public void call(Collection<SubjectObserver<? super T>> observers) {
116119
lastNotification.set(Notification.<T>createOnError(e));
117-
for (Observer<? super T> o : observers) {
118-
o.onError(e);
119-
}
120120
}
121121
});
122-
122+
if (observers != null) {
123+
for (Observer<? super T> o : observers) {
124+
o.onError(e);
125+
}
126+
}
123127
}
124128

125129
@Override

rxjava-core/src/main/java/rx/subjects/ReplaySubject.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -122,34 +122,40 @@ protected ReplaySubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T
122122

123123
@Override
124124
public void onCompleted() {
125+
Collection<SubjectObserver<? super T>> observers =
125126
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
126127

127128
@Override
128129
public void call(Collection<SubjectObserver<? super T>> observers) {
129130
state.history.complete(Notification.<T>createOnCompleted());
130-
for (SubjectObserver<? super T> o : observers) {
131-
if (caughtUp(o)) {
132-
o.onCompleted();
133-
}
134-
}
135131
}
136132
});
133+
if (observers != null) {
134+
for (SubjectObserver<? super T> o : observers) {
135+
if (caughtUp(o)) {
136+
o.onCompleted();
137+
}
138+
}
139+
}
137140
}
138141

139142
@Override
140143
public void onError(final Throwable e) {
144+
Collection<SubjectObserver<? super T>> observers =
141145
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
142146

143147
@Override
144148
public void call(Collection<SubjectObserver<? super T>> observers) {
145149
state.history.complete(Notification.<T>createOnError(e));
146-
for (SubjectObserver<? super T> o : observers) {
147-
if (caughtUp(o)) {
148-
o.onError(e);
149-
}
150-
}
151150
}
152151
});
152+
if (observers != null) {
153+
for (SubjectObserver<? super T> o : observers) {
154+
if (caughtUp(o)) {
155+
o.onError(e);
156+
}
157+
}
158+
}
153159
}
154160

155161
@Override

rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,19 +113,20 @@ public void call() {
113113
}
114114

115115
@SuppressWarnings({ "unchecked", "rawtypes" })
116-
protected void terminate(Action1<Collection<SubjectObserver<? super T>>> onTerminate) {
116+
protected Collection<SubjectObserver<? super T>> terminate(Action1<Collection<SubjectObserver<? super T>>> onTerminate) {
117117
State<T> current;
118118
State<T> newState = null;
119119
do {
120120
current = state.get();
121121
if (current.terminated) {
122122
// already terminated so do nothing
123-
return;
123+
return null;
124124
} else {
125125
newState = current.terminate();
126126
}
127127
} while (!state.compareAndSet(current, newState));
128128

129+
Collection<SubjectObserver<? super T>> observerCollection = (Collection)Arrays.asList(newState.observers);
129130
/*
130131
* if we get here then we won setting the state to terminated
131132
* and have a deterministic set of Observers to emit to (concurrent subscribes
@@ -134,11 +135,12 @@ protected void terminate(Action1<Collection<SubjectObserver<? super T>>> onTermi
134135
*/
135136
try {
136137
// had to circumvent type check, we know what the array contains
137-
onTerminate.call((Collection) Arrays.asList(newState.observers));
138+
onTerminate.call(observerCollection);
138139
} finally {
139140
// mark that termination is completed
140141
newState.terminationLatch.countDown();
141142
}
143+
return observerCollection;
142144
}
143145

144146
/**

0 commit comments

Comments
 (0)