Skip to content

Commit 4e4c165

Browse files
Fix Subject SubscriptionManager
Remove possibility of infinite loop
1 parent 0ac1e6c commit 4e4c165

File tree

1 file changed

+11
-9
lines changed

1 file changed

+11
-9
lines changed

rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import rx.Observer;
2525
import rx.Subscription;
2626
import rx.operators.SafeObservableSubscription;
27+
import rx.subscriptions.Subscriptions;
2728
import rx.util.functions.Action1;
2829

2930
/* package */class SubjectSubscriptionManager<T> {
@@ -51,10 +52,12 @@ public void call(Observer<? super T> actualObserver) {
5152
State<T> current;
5253
State<T> newState = null;
5354
boolean addedObserver = false;
55+
Subscription s;
5456
do {
5557
current = state.get();
5658
if (current.terminated) {
5759
// we are terminated so don't need to do anything
60+
s = Subscriptions.empty();
5861
addedObserver = false;
5962
// break out and don't try to modify state
6063
newState = current;
@@ -67,24 +70,22 @@ public void call(Observer<? super T> actualObserver) {
6770
}
6871
break;
6972
} else {
70-
final SafeObservableSubscription subscription = new SafeObservableSubscription();
71-
actualObserver.add(subscription); // add to parent if the Subject itself is unsubscribed
7273
addedObserver = true;
73-
subscription.wrap(new Subscription() {
74+
s = new Subscription() {
7475
@Override
7576
public void unsubscribe() {
7677
State<T> current;
7778
State<T> newState;
7879
do {
7980
current = state.get();
8081
// on unsubscribe remove it from the map of outbound observers to notify
81-
newState = current.removeObserver(subscription);
82+
newState = current.removeObserver(this);
8283
} while (!state.compareAndSet(current, newState));
8384
}
84-
});
85+
};
8586

8687
// on subscribe add it to the map of outbound observers to notify
87-
newState = current.addObserver(subscription, observer);
88+
newState = current.addObserver(s, observer);
8889
}
8990
} while (!state.compareAndSet(current, newState));
9091

@@ -94,12 +95,13 @@ public void unsubscribe() {
9495
if (newState.terminated && !addedObserver) {
9596
onTerminated.call(observer);
9697
}
98+
99+
actualObserver.add(s);
97100
}
98101

99102
};
100103
}
101104

102-
@SuppressWarnings({ "unchecked", "rawtypes" })
103105
protected void terminate(Action1<Collection<SubjectObserver<? super T>>> onTerminate) {
104106
State<T> current;
105107
State<T> newState = null;
@@ -134,7 +136,6 @@ protected void terminate(Action1<Collection<SubjectObserver<? super T>>> onTermi
134136
*
135137
* @return the array of current observers
136138
*/
137-
@SuppressWarnings("unchecked")
138139
public SubjectObserver<Object>[] rawSnapshot() {
139140
return state.get().observers;
140141
}
@@ -231,6 +232,7 @@ protected static class SubjectObserver<T> extends Observer<T> {
231232
protected volatile boolean caughtUp = false;
232233

233234
SubjectObserver(Observer<? super T> actual) {
235+
super(actual);
234236
this.actual = actual;
235237
}
236238

@@ -251,4 +253,4 @@ public void onNext(T v) {
251253

252254
}
253255

254-
}
256+
}

0 commit comments

Comments
 (0)