Skip to content

Commit 996e78f

Browse files
Refactor SerialSubscription
- simplified state machine
1 parent 5918265 commit 996e78f

File tree

1 file changed

+59
-26
lines changed

1 file changed

+59
-26
lines changed

rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java

Lines changed: 59 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package rx.subscriptions;
1717

18-
import static rx.subscriptions.Subscriptions.empty;
19-
2018
import java.util.concurrent.atomic.AtomicReference;
2119

2220
import rx.Subscription;
@@ -27,43 +25,78 @@
2725
*
2826
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx">Rx.Net equivalent SerialDisposable</a>
2927
*/
30-
public class SerialSubscription implements Subscription {
31-
private final AtomicReference<Subscription> reference = new AtomicReference<Subscription>(empty());
32-
/** Sentinel for the unsubscribed state. */
33-
private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() {
34-
@Override
35-
public void unsubscribe() {
28+
public final class SerialSubscription implements Subscription {
29+
30+
private final AtomicReference<State> state = new AtomicReference<State>(new State(false, Subscriptions.empty()));
31+
32+
private static final class State {
33+
final boolean isUnsubscribed;
34+
final Subscription subscription;
35+
36+
State(boolean u, Subscription s) {
37+
this.isUnsubscribed = u;
38+
this.subscription = s;
39+
}
40+
41+
State unsubscribe() {
42+
return new State(true, subscription);
43+
}
44+
45+
State set(Subscription s) {
46+
return new State(isUnsubscribed, s);
3647
}
37-
};
48+
49+
}
3850

3951
public boolean isUnsubscribed() {
40-
return reference.get() == UNSUBSCRIBED_SENTINEL;
52+
return state.get().isUnsubscribed;
4153
}
4254

4355
@Override
4456
public void unsubscribe() {
45-
Subscription s = reference.getAndSet(UNSUBSCRIBED_SENTINEL);
46-
if (s != null) {
47-
s.unsubscribe();
48-
}
57+
State oldState;
58+
State newState;
59+
do {
60+
oldState = state.get();
61+
if (oldState.isUnsubscribed) {
62+
return;
63+
} else {
64+
newState = oldState.unsubscribe();
65+
}
66+
} while (!state.compareAndSet(oldState, newState));
67+
oldState.subscription.unsubscribe();
68+
}
69+
70+
@Deprecated
71+
public void setSubscription(Subscription s) {
72+
set(s);
4973
}
5074

51-
public void setSubscription(final Subscription subscription) {
75+
public void set(Subscription s) {
76+
if (s == null) {
77+
throw new IllegalArgumentException("Subscription can not be null");
78+
}
79+
State oldState;
80+
State newState;
5281
do {
53-
final Subscription current = reference.get();
54-
if (current == UNSUBSCRIBED_SENTINEL) {
55-
subscription.unsubscribe();
56-
break;
57-
}
58-
if (reference.compareAndSet(current, subscription)) {
59-
current.unsubscribe();
60-
break;
82+
oldState = state.get();
83+
if (oldState.isUnsubscribed) {
84+
s.unsubscribe();
85+
return;
86+
} else {
87+
newState = oldState.set(s);
6188
}
62-
} while (true);
89+
} while (!state.compareAndSet(oldState, newState));
90+
oldState.subscription.unsubscribe();
6391
}
6492

93+
@Deprecated
6594
public Subscription getSubscription() {
66-
final Subscription subscription = reference.get();
67-
return subscription == UNSUBSCRIBED_SENTINEL ? Subscriptions.empty() : subscription;
95+
return get();
6896
}
97+
98+
public Subscription get() {
99+
return state.get().subscription;
100+
}
101+
69102
}

0 commit comments

Comments
 (0)