Skip to content

Commit c3ee348

Browse files
Refactor CompositeSubscription
- simplified state machine - removed busy spin state
1 parent 109572d commit c3ee348

File tree

1 file changed

+84
-94
lines changed

1 file changed

+84
-94
lines changed

rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java

Lines changed: 84 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,10 @@
1515
*/
1616
package rx.subscriptions;
1717

18-
import static java.util.Arrays.asList;
19-
import static java.util.Collections.unmodifiableSet;
20-
2118
import java.util.ArrayList;
19+
import java.util.Arrays;
2220
import java.util.Collection;
23-
import java.util.Collections;
24-
import java.util.HashSet;
25-
import java.util.Set;
21+
import java.util.List;
2622
import java.util.concurrent.atomic.AtomicReference;
2723

2824
import rx.Subscription;
@@ -35,103 +31,116 @@
3531
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
3632
*/
3733
public class CompositeSubscription implements Subscription {
38-
/** Sentinel to indicate a thread is modifying the subscription set. */
39-
private static final Set<Subscription> MUTATE_SENTINEL = unmodifiableSet(Collections.<Subscription> emptySet());
40-
/** Sentinel to indicate the entire CompositeSubscription has been unsubscribed. */
41-
private static final Set<Subscription> UNSUBSCRIBED_SENTINEL = unmodifiableSet(Collections.<Subscription> emptySet());
42-
/** The reference to the set of subscriptions. */
43-
private final AtomicReference<Set<Subscription>> reference = new AtomicReference<Set<Subscription>>();
34+
35+
private final AtomicReference<State> state = new AtomicReference<State>();
36+
37+
private static final class State {
38+
final boolean isUnsubscribed;
39+
final List<Subscription> subscriptions;
40+
41+
State(boolean u, List<Subscription> s) {
42+
this.isUnsubscribed = u;
43+
this.subscriptions = s;
44+
}
45+
46+
State unsubscribe() {
47+
return new State(true, subscriptions);
48+
}
49+
50+
State add(Subscription s) {
51+
List<Subscription> newSubscriptions = new ArrayList<Subscription>();
52+
newSubscriptions.addAll(subscriptions);
53+
newSubscriptions.add(s);
54+
return new State(isUnsubscribed, newSubscriptions);
55+
}
56+
57+
State remove(Subscription s) {
58+
List<Subscription> newSubscriptions = new ArrayList<Subscription>();
59+
newSubscriptions.addAll(subscriptions);
60+
newSubscriptions.remove(s); // only first occurrence
61+
return new State(isUnsubscribed, newSubscriptions);
62+
}
63+
64+
State clear() {
65+
return new State(isUnsubscribed, new ArrayList<Subscription>());
66+
}
67+
}
4468

4569
public CompositeSubscription(final Subscription... subscriptions) {
46-
reference.set(new HashSet<Subscription>(asList(subscriptions)));
70+
state.set(new State(false, Arrays.asList(subscriptions)));
4771
}
4872

4973
public boolean isUnsubscribed() {
50-
return reference.get() == UNSUBSCRIBED_SENTINEL;
74+
return state.get().isUnsubscribed;
5175
}
5276

5377
public void add(final Subscription s) {
78+
State current;
79+
State newState;
5480
do {
55-
final Set<Subscription> existing = reference.get();
56-
if (existing == UNSUBSCRIBED_SENTINEL) {
81+
current = state.get();
82+
if (current.isUnsubscribed) {
5783
s.unsubscribe();
58-
break;
59-
}
60-
61-
if (existing == MUTATE_SENTINEL) {
62-
continue;
84+
return;
85+
} else {
86+
newState = current.add(s);
6387
}
64-
65-
if (reference.compareAndSet(existing, MUTATE_SENTINEL)) {
66-
existing.add(s);
67-
reference.set(existing);
68-
break;
69-
}
70-
} while (true);
88+
} while (!state.compareAndSet(current, newState));
7189
}
7290

7391
public void remove(final Subscription s) {
92+
State current;
93+
State newState;
7494
do {
75-
final Set<Subscription> subscriptions = reference.get();
76-
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
77-
s.unsubscribe();
78-
break;
79-
}
80-
81-
if (subscriptions == MUTATE_SENTINEL) {
82-
continue;
95+
current = state.get();
96+
if (current.isUnsubscribed) {
97+
return;
98+
} else {
99+
newState = current.remove(s);
83100
}
84-
85-
if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) {
86-
// also unsubscribe from it:
87-
// http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
88-
subscriptions.remove(s);
89-
reference.set(subscriptions);
90-
s.unsubscribe();
91-
break;
92-
}
93-
} while (true);
101+
} while (!state.compareAndSet(current, newState));
102+
// if we removed successfully we then need to call unsubscribe on it
103+
s.unsubscribe();
94104
}
95105

96106
public void clear() {
107+
State current;
108+
State newState;
97109
do {
98-
final Set<Subscription> subscriptions = reference.get();
99-
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
100-
break;
101-
}
102-
103-
if (subscriptions == MUTATE_SENTINEL) {
104-
continue;
110+
current = state.get();
111+
if (current.isUnsubscribed) {
112+
return;
113+
} else {
114+
newState = current.clear();
105115
}
116+
} while (!state.compareAndSet(current, newState));
117+
// if we cleared successfully we then need to call unsubscribe on all previous
118+
// current is now "previous"
119+
unsubscribeFromAll(current.subscriptions);
120+
}
106121

107-
if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) {
108-
final Set<Subscription> copy = new HashSet<Subscription>(
109-
subscriptions);
110-
subscriptions.clear();
111-
reference.set(subscriptions);
112-
113-
unsubscribeAll(copy);
114-
break;
122+
@Override
123+
public void unsubscribe() {
124+
State current;
125+
State newState;
126+
do {
127+
current = state.get();
128+
if (current.isUnsubscribed) {
129+
return;
130+
} else {
131+
newState = current.unsubscribe();
115132
}
116-
} while (true);
133+
} while (!state.compareAndSet(current, newState));
134+
// current is now "previous"
135+
unsubscribeFromAll(current.subscriptions);
117136
}
118137

119-
/**
120-
* Unsubscribe from the collection of subscriptions.
121-
* <p>
122-
* Exceptions thrown by any of the {@code unsubscribe()} methods are
123-
* collected into a {@link CompositeException} and thrown once
124-
* all unsubscriptions have been attempted.
125-
*
126-
* @param subs
127-
* the collection of subscriptions
128-
*/
129-
private void unsubscribeAll(Collection<Subscription> subs) {
138+
private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
130139
final Collection<Throwable> es = new ArrayList<Throwable>();
131-
for (final Subscription s : subs) {
140+
for (Subscription s : subscriptions) {
132141
try {
133142
s.unsubscribe();
134-
} catch (final Throwable e) {
143+
} catch (Throwable e) {
135144
es.add(e);
136145
}
137146
}
@@ -140,23 +149,4 @@ private void unsubscribeAll(Collection<Subscription> subs) {
140149
"Failed to unsubscribe to 1 or more subscriptions.", es);
141150
}
142151
}
143-
144-
@Override
145-
public void unsubscribe() {
146-
do {
147-
final Set<Subscription> subscriptions = reference.get();
148-
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
149-
break;
150-
}
151-
152-
if (subscriptions == MUTATE_SENTINEL) {
153-
continue;
154-
}
155-
156-
if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_SENTINEL)) {
157-
unsubscribeAll(subscriptions);
158-
break;
159-
}
160-
} while (true);
161-
}
162152
}

0 commit comments

Comments
 (0)