Skip to content

Commit 210df42

Browse files
Simplify Subscription/Observer State
1 parent e4d9a60 commit 210df42

File tree

1 file changed

+25
-45
lines changed

1 file changed

+25
-45
lines changed

rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java

Lines changed: 25 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@
2323
import rx.Observable.OnSubscribe;
2424
import rx.Observer;
2525
import rx.Subscriber;
26-
import rx.Subscription;
2726
import rx.functions.Action0;
2827
import rx.functions.Action1;
29-
import rx.operators.SafeObservableSubscription;
3028
import rx.subscriptions.Subscriptions;
3129

3230
/* package */class SubjectSubscriptionManager<T> {
@@ -39,10 +37,11 @@
3937
* Always runs at the beginning of 'subscribe' regardless of terminal state.
4038
* @param onTerminated
4139
* Only runs if Subject is in terminal state and the Observer ends up not being registered.
42-
* @param onUnsubscribe called after the child subscription is removed from the state
40+
* @param onUnsubscribe
41+
* called after the child subscription is removed from the state
4342
* @return
4443
*/
45-
public OnSubscribe<T> getOnSubscribeFunc(final Action1<SubjectObserver<? super T>> onSubscribe,
44+
public OnSubscribe<T> getOnSubscribeFunc(final Action1<SubjectObserver<? super T>> onSubscribe,
4645
final Action1<SubjectObserver<? super T>> onTerminated,
4746
final Action1<SubjectObserver<? super T>> onUnsubscribe) {
4847
return new OnSubscribe<T>() {
@@ -73,10 +72,9 @@ public void call(Subscriber<? super T> actualOperator) {
7372
}
7473
break;
7574
} else {
76-
final SafeObservableSubscription subscription = new SafeObservableSubscription();
77-
actualOperator.add(subscription); // add to parent if the Subject itself is unsubscribed
7875
addedObserver = true;
79-
subscription.wrap(Subscriptions.create(new Action0() {
76+
// add to parent if the Subject itself is unsubscribed
77+
actualOperator.add(Subscriptions.create(new Action0() {
8078

8179
@Override
8280
public void call() {
@@ -85,19 +83,19 @@ public void call() {
8583
do {
8684
current = state.get();
8785
// on unsubscribe remove it from the map of outbound observers to notify
88-
newState = current.removeObserver(subscription);
86+
newState = current.removeObserver(observer);
8987
} while (!state.compareAndSet(current, newState));
9088
if (onUnsubscribe != null) {
9189
onUnsubscribe.call(observer);
9290
}
9391
}
9492
}));
95-
if (subscription.isUnsubscribed()) {
96-
addedObserver = false;
93+
if (actualOperator.isUnsubscribed()) {
94+
// we've been unsubscribed while working so return and do nothing
9795
return;
9896
}
9997
// on subscribe add it to the map of outbound observers to notify
100-
newState = current.addObserver(subscription, observer);
98+
newState = current.addObserver(observer);
10199
}
102100
} while (!state.compareAndSet(current, newState));
103101

@@ -126,7 +124,7 @@ protected Collection<SubjectObserver<? super T>> terminate(Action1<Collection<Su
126124
}
127125
} while (!state.compareAndSet(current, newState));
128126

129-
Collection<SubjectObserver<? super T>> observerCollection = (Collection)Arrays.asList(newState.observers);
127+
Collection<SubjectObserver<? super T>> observerCollection = (Collection) Arrays.asList(newState.observers);
130128
/*
131129
* if we get here then we won setting the state to terminated
132130
* and have a deterministic set of Observers to emit to (concurrent subscribes
@@ -158,97 +156,79 @@ public SubjectObserver<Object>[] rawSnapshot() {
158156
protected static class State<T> {
159157
final boolean terminated;
160158
final CountDownLatch terminationLatch;
161-
final Subscription[] subscriptions;
162159
final SubjectObserver[] observers;
163160
// to avoid lots of empty arrays
164-
final Subscription[] EMPTY_S = new Subscription[0];
165-
// to avoid lots of empty arrays
166161
final SubjectObserver[] EMPTY_O = new SubjectObserver[0];
167162

168-
private State(boolean isTerminated, CountDownLatch terminationLatch,
169-
Subscription[] subscriptions, SubjectObserver[] observers) {
163+
private State(boolean isTerminated, CountDownLatch terminationLatch, SubjectObserver[] observers) {
170164
this.terminationLatch = terminationLatch;
171165
this.terminated = isTerminated;
172-
this.subscriptions = subscriptions;
173166
this.observers = observers;
174167
}
175168

176169
State() {
177170
this.terminated = false;
178171
this.terminationLatch = null;
179-
this.subscriptions = EMPTY_S;
180172
this.observers = EMPTY_O;
181173
}
182174

183175
public State<T> terminate() {
184176
if (terminated) {
185177
throw new IllegalStateException("Already terminated.");
186178
}
187-
return new State<T>(true, new CountDownLatch(1), subscriptions, observers);
179+
return new State<T>(true, new CountDownLatch(1), observers);
188180
}
189181

190-
public State<T> addObserver(Subscription s, SubjectObserver<? super T> observer) {
182+
public State<T> addObserver(SubjectObserver<? super T> observer) {
191183
int n = this.observers.length;
192184

193-
Subscription[] newsubscriptions = Arrays.copyOf(this.subscriptions, n + 1);
194185
SubjectObserver[] newobservers = Arrays.copyOf(this.observers, n + 1);
195186

196-
newsubscriptions[n] = s;
197187
newobservers[n] = observer;
198188

199-
return createNewWith(newsubscriptions, newobservers);
189+
return createNewWith(newobservers);
200190
}
201191

202-
private State<T> createNewWith(Subscription[] newsubscriptions, SubjectObserver[] newobservers) {
203-
return new State<T>(terminated, terminationLatch, newsubscriptions, newobservers);
192+
private State<T> createNewWith(SubjectObserver[] newobservers) {
193+
return new State<T>(terminated, terminationLatch, newobservers);
204194
}
205195

206-
public State<T> removeObserver(Subscription s) {
196+
public State<T> removeObserver(SubjectObserver<? super T> o) {
207197
// we are empty, nothing to remove
208198
if (this.observers.length == 0) {
209199
return this;
210-
} else
211-
if (this.observers.length == 1) {
212-
if (this.subscriptions[0].equals(s)) {
213-
return createNewWith(EMPTY_S, EMPTY_O);
214-
}
215-
return this;
216200
}
201+
217202
int n = this.observers.length - 1;
218203
int copied = 0;
219-
Subscription[] newsubscriptions = new Subscription[n];
220204
SubjectObserver[] newobservers = new SubjectObserver[n];
221205

222-
for (int i = 0; i < this.subscriptions.length; i++) {
223-
Subscription s0 = this.subscriptions[i];
224-
if (!s0.equals(s)) {
206+
for (int i = 0; i < this.observers.length; i++) {
207+
SubjectObserver s0 = this.observers[i];
208+
if (!s0.equals(o)) {
225209
if (copied == n) {
226210
// if s was not found till the end of the iteration
227211
// we return ourselves since no modification should
228212
// have happened
229213
return this;
230214
}
231-
newsubscriptions[copied] = s0;
232-
newobservers[copied] = this.observers[i];
215+
newobservers[copied] = s0;
233216
copied++;
234217
}
235218
}
236219

237220
if (copied == 0) {
238-
return createNewWith(EMPTY_S, EMPTY_O);
221+
return createNewWith(EMPTY_O);
239222
}
240223
// if somehow copied less than expected, truncate the arrays
241224
// if s is unique, this should never happen
242225
if (copied < n) {
243-
Subscription[] newsubscriptions2 = new Subscription[copied];
244-
System.arraycopy(newsubscriptions, 0, newsubscriptions2, 0, copied);
245-
246226
SubjectObserver[] newobservers2 = new SubjectObserver[copied];
247227
System.arraycopy(newobservers, 0, newobservers2, 0, copied);
248228

249-
return createNewWith(newsubscriptions2, newobservers2);
229+
return createNewWith(newobservers2);
250230
}
251-
return createNewWith(newsubscriptions, newobservers);
231+
return createNewWith(newobservers);
252232
}
253233
}
254234

0 commit comments

Comments
 (0)