Skip to content

Commit 109572d

Browse files
Refactor RefCountSubscription
- simplified logic - remove unnecessary busy spins and state changes
1 parent dccbc6b commit 109572d

File tree

1 file changed

+56
-73
lines changed

1 file changed

+56
-73
lines changed

rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java

Lines changed: 56 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
package rx.subscriptions;
1717

1818
import java.util.concurrent.atomic.AtomicBoolean;
19-
import java.util.concurrent.atomic.AtomicInteger;
2019
import java.util.concurrent.atomic.AtomicReference;
20+
2121
import rx.Subscription;
2222

2323
/**
@@ -27,21 +27,33 @@
2727
* @see <a href='http://msdn.microsoft.com/en-us/library/system.reactive.disposables.refcountdisposable.aspx'>MSDN RefCountDisposable</a>
2828
*/
2929
public class RefCountSubscription implements Subscription {
30-
/** The state for the atomic operations. */
31-
private enum State {
32-
ACTIVE,
33-
MUTATING,
34-
UNSUBSCRIBED
35-
}
36-
3730
/** The reference to the actual subscription. */
38-
private volatile Subscription main;
39-
/** The current state. */
40-
private final AtomicReference<State> state = new AtomicReference<State>();
41-
/** Counts the number of sub-subscriptions. */
42-
private final AtomicInteger count = new AtomicInteger();
43-
/** Indicate the request to unsubscribe from the main. */
44-
private final AtomicBoolean mainDone = new AtomicBoolean();
31+
private final Subscription actual;
32+
/** Counts the number of subscriptions (1 parent + multiple children) */
33+
private final AtomicReference<State> state = new AtomicReference<State>(new State(false, 0));
34+
35+
private static final class State {
36+
final boolean isUnsubscribed;
37+
final int children;
38+
39+
State(boolean u, int c) {
40+
this.isUnsubscribed = u;
41+
this.children = c;
42+
}
43+
44+
State addChild() {
45+
return new State(isUnsubscribed, children + 1);
46+
}
47+
48+
State removeChild() {
49+
return new State(isUnsubscribed, children - 1);
50+
}
51+
52+
State unsubscribe() {
53+
return new State(true, children);
54+
}
55+
56+
}
4557

4658
/**
4759
* Create a RefCountSubscription by wrapping the given non-null Subscription.
@@ -52,87 +64,52 @@ public RefCountSubscription(Subscription s) {
5264
if (s == null) {
5365
throw new IllegalArgumentException("s");
5466
}
55-
this.main = s;
67+
this.actual = s;
5668
}
5769

5870
/**
5971
* Returns a new sub-subscription.
6072
*/
6173
public Subscription getSubscription() {
74+
State current;
75+
State newState;
6276
do {
63-
State s = state.get();
64-
if (s == State.UNSUBSCRIBED) {
77+
current = state.get();
78+
if (current.isUnsubscribed) {
6579
return Subscriptions.empty();
80+
} else {
81+
newState = current.addChild();
6682
}
67-
if (s == State.MUTATING) {
68-
continue;
69-
}
70-
if (state.compareAndSet(s, State.MUTATING)) {
71-
count.incrementAndGet();
72-
state.set(State.ACTIVE);
73-
return new InnerSubscription();
74-
}
75-
} while (true);
83+
} while (!state.compareAndSet(current, newState));
84+
85+
return new InnerSubscription();
7686
}
7787

7888
/**
7989
* Check if this subscription is already unsubscribed.
8090
*/
8191
public boolean isUnsubscribed() {
82-
return state.get() == State.UNSUBSCRIBED;
92+
return state.get().isUnsubscribed;
8393
}
8494

8595
@Override
8696
public void unsubscribe() {
97+
State current;
98+
State newState;
8799
do {
88-
State s = state.get();
89-
if (s == State.UNSUBSCRIBED) {
100+
current = state.get();
101+
if (current.isUnsubscribed) {
90102
return;
91103
}
92-
if (s == State.MUTATING) {
93-
continue;
94-
}
95-
if (state.compareAndSet(s, State.MUTATING)) {
96-
if (mainDone.compareAndSet(false, true) && count.get() == 0) {
97-
terminate();
98-
return;
99-
}
100-
state.set(State.ACTIVE);
101-
break;
102-
}
103-
} while (true);
104-
}
105-
106-
/**
107-
* Terminate this subscription by unsubscribing from main and setting the
108-
* state to UNSUBSCRIBED.
109-
*/
110-
private void terminate() {
111-
state.set(State.UNSUBSCRIBED);
112-
Subscription r = main;
113-
main = null;
114-
r.unsubscribe();
104+
newState = current.unsubscribe();
105+
} while (!state.compareAndSet(current, newState));
106+
unsubscribeActualIfApplicable(newState);
115107
}
116108

117-
/** Remove an inner subscription. */
118-
void innerDone() {
119-
do {
120-
State s = state.get();
121-
if (s == State.UNSUBSCRIBED) {
122-
return;
123-
}
124-
if (s == State.MUTATING) {
125-
continue;
126-
}
127-
if (state.compareAndSet(s, State.MUTATING)) {
128-
if (count.decrementAndGet() == 0 && mainDone.get()) {
129-
terminate();
130-
return;
131-
}
132-
state.set(State.ACTIVE);
133-
break;
134-
}
135-
} while (true);
109+
private void unsubscribeActualIfApplicable(State state) {
110+
if (state.isUnsubscribed && state.children == 0) {
111+
actual.unsubscribe();
112+
}
136113
}
137114

138115
/** The individual sub-subscriptions. */
@@ -142,7 +119,13 @@ class InnerSubscription implements Subscription {
142119
@Override
143120
public void unsubscribe() {
144121
if (innerDone.compareAndSet(false, true)) {
145-
innerDone();
122+
State current;
123+
State newState;
124+
do {
125+
current = state.get();
126+
newState = current.removeChild();
127+
} while (!state.compareAndSet(current, newState));
128+
unsubscribeActualIfApplicable(newState);
146129
}
147130
}
148131
};

0 commit comments

Comments
 (0)