Skip to content

Commit 3900a55

Browse files
Merge pull request #1283 from benjchristensen/0.18.x-composite-subscription-performance
Reduce Subscription Object Allocation
2 parents 2e0e537 + 6fe35b8 commit 3900a55

File tree

8 files changed

+470
-126
lines changed

8 files changed

+470
-126
lines changed

rxjava-core/src/main/java/rx/Subscriber.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx;
1717

18+
import rx.subscriptions.SubscriptionList;
1819
import rx.subscriptions.CompositeSubscription;
1920

2021
/**
@@ -30,17 +31,23 @@
3031
*/
3132
public abstract class Subscriber<T> implements Observer<T>, Subscription {
3233

33-
private final CompositeSubscription cs;
34+
private final SubscriptionList cs;
3435

35-
protected Subscriber(CompositeSubscription cs) {
36+
protected Subscriber(SubscriptionList cs) {
3637
if (cs == null) {
3738
throw new IllegalArgumentException("The CompositeSubscription can not be null");
3839
}
3940
this.cs = cs;
4041
}
42+
43+
@Deprecated
44+
protected Subscriber(CompositeSubscription cs) {
45+
this(new SubscriptionList());
46+
add(cs);
47+
}
4148

4249
protected Subscriber() {
43-
this(new CompositeSubscription());
50+
this(new SubscriptionList());
4451
}
4552

4653
protected Subscriber(Subscriber<?> op) {

rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import rx.functions.Action0;
2828
import rx.functions.Func1;
2929
import rx.observables.GroupedObservable;
30-
import rx.subscriptions.CompositeSubscription;
3130
import rx.subscriptions.Subscriptions;
3231

3332
/**
@@ -46,9 +45,9 @@ public OperatorGroupBy(final Func1<? super T, ? extends K> keySelector) {
4645

4746
@Override
4847
public Subscriber<? super T> call(final Subscriber<? super GroupedObservable<K, T>> childObserver) {
49-
// a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
48+
// a new SubscriptionList to decouple the subscription as the inner subscriptions need a separate lifecycle
5049
// and will unsubscribe on this parent if they are all unsubscribed
51-
return new Subscriber<T>(new CompositeSubscription()) {
50+
return new Subscriber<T>() {
5251
private final Map<K, BufferUntilSubscriber<T>> groups = new HashMap<K, BufferUntilSubscriber<T>>();
5352
private final AtomicInteger completionCounter = new AtomicInteger(0);
5453
private final AtomicBoolean completionEmitted = new AtomicBoolean(false);

rxjava-core/src/main/java/rx/operators/OperatorPivot.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@
2828
import rx.Subscriber;
2929
import rx.functions.Action0;
3030
import rx.observables.GroupedObservable;
31-
import rx.subscriptions.CompositeSubscription;
31+
import rx.subscriptions.SubscriptionList;
3232
import rx.subscriptions.Subscriptions;
3333

3434
public class OperatorPivot<K1, K2, T> implements Operator<GroupedObservable<K2, GroupedObservable<K1, T>>, GroupedObservable<K1, GroupedObservable<K2, T>>> {
3535

3636
@Override
3737
public Subscriber<? super GroupedObservable<K1, GroupedObservable<K2, T>>> call(final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
3838
final AtomicReference<State> state = new AtomicReference<State>(State.create());
39-
final OperatorPivot<K1, K2, T>.PivotSubscriber pivotSubscriber = new PivotSubscriber(new CompositeSubscription(), child, state);
39+
final OperatorPivot<K1, K2, T>.PivotSubscriber pivotSubscriber = new PivotSubscriber(new SubscriptionList(), child, state);
4040
child.add(Subscriptions.create(new Action0() {
4141

4242
@Override
@@ -65,12 +65,12 @@ private final class PivotSubscriber extends Subscriber<GroupedObservable<K1, Gro
6565
* needs to decouple the subscription as the inner subscriptions need a separate lifecycle
6666
* and will unsubscribe on this parent if they are all unsubscribed
6767
*/
68-
private final CompositeSubscription parentSubscription;
68+
private final SubscriptionList parentSubscription;
6969
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
7070
private final AtomicReference<State> state;
7171
private final GroupState<K1, K2, T> groups;
7272

73-
private PivotSubscriber(CompositeSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
73+
private PivotSubscriber(SubscriptionList parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
7474
super(parentSubscription);
7575
this.parentSubscription = parentSubscription;
7676
this.child = child;
@@ -159,10 +159,10 @@ private static class GroupState<K1, K2, T> {
159159
private final ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>> innerSubjects = new ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>>();
160160
private final ConcurrentHashMap<K2, Outer<K1, K2, T>> outerSubjects = new ConcurrentHashMap<K2, Outer<K1, K2, T>>();
161161
private final AtomicBoolean completeEmitted = new AtomicBoolean();
162-
private final CompositeSubscription parentSubscription;
162+
private final SubscriptionList parentSubscription;
163163
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
164164

165-
public GroupState(CompositeSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
165+
public GroupState(SubscriptionList parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
166166
this.parentSubscription = parentSubscription;
167167
this.child = child;
168168
}

rxjava-core/src/main/java/rx/operators/OperatorTake.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import rx.Observable.Operator;
1919
import rx.Subscriber;
20-
import rx.subscriptions.CompositeSubscription;
20+
import rx.subscriptions.SubscriptionList;
2121

2222
/**
2323
* Returns an Observable that emits the first <code>num</code> items emitted by the source
@@ -40,7 +40,7 @@ public OperatorTake(int limit) {
4040

4141
@Override
4242
public Subscriber<? super T> call(final Subscriber<? super T> child) {
43-
final CompositeSubscription parent = new CompositeSubscription();
43+
final SubscriptionList parent = new SubscriptionList();
4444
if (limit == 0) {
4545
child.onCompleted();
4646
parent.unsubscribe();

rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import rx.Scheduler;
2020
import rx.Subscriber;
2121
import rx.functions.Action0;
22-
import rx.subscriptions.CompositeSubscription;
22+
import rx.subscriptions.SubscriptionList;
2323
import rx.subscriptions.Subscriptions;
2424

2525
/**
@@ -36,7 +36,7 @@ public OperatorUnsubscribeOn(Scheduler scheduler) {
3636

3737
@Override
3838
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
39-
final CompositeSubscription parentSubscription = new CompositeSubscription();
39+
final SubscriptionList parentSubscription = new SubscriptionList();
4040
subscriber.add(Subscriptions.create(new Action0() {
4141

4242
@Override

rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java

Lines changed: 53 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616
package rx.subscriptions;
1717

1818
import java.util.ArrayList;
19+
import java.util.Arrays;
20+
import java.util.Collection;
21+
import java.util.HashSet;
1922
import java.util.List;
20-
import java.util.concurrent.atomic.AtomicReference;
23+
import java.util.Set;
2124

2225
import rx.Subscription;
2326
import rx.exceptions.CompositeException;
@@ -30,154 +33,94 @@
3033
*/
3134
public final class CompositeSubscription implements Subscription {
3235

33-
private final AtomicReference<State> state = new AtomicReference<State>();
34-
35-
/** Empty initial state. */
36-
private static final State CLEAR_STATE;
37-
/** Unsubscribed empty state. */
38-
private static final State CLEAR_STATE_UNSUBSCRIBED;
39-
static {
40-
Subscription[] s0 = new Subscription[0];
41-
CLEAR_STATE = new State(false, s0);
42-
CLEAR_STATE_UNSUBSCRIBED = new State(true, s0);
43-
}
44-
45-
private static final class State {
46-
final boolean isUnsubscribed;
47-
final Subscription[] subscriptions;
48-
49-
State(boolean u, Subscription[] s) {
50-
this.isUnsubscribed = u;
51-
this.subscriptions = s;
52-
}
53-
54-
State unsubscribe() {
55-
return CLEAR_STATE_UNSUBSCRIBED;
56-
}
57-
58-
State add(Subscription s) {
59-
int idx = subscriptions.length;
60-
Subscription[] newSubscriptions = new Subscription[idx + 1];
61-
System.arraycopy(subscriptions, 0, newSubscriptions, 0, idx);
62-
newSubscriptions[idx] = s;
63-
return new State(isUnsubscribed, newSubscriptions);
64-
}
65-
66-
State remove(Subscription s) {
67-
if ((subscriptions.length == 1 && subscriptions[0].equals(s)) || subscriptions.length == 0) {
68-
return clear();
69-
}
70-
Subscription[] newSubscriptions = new Subscription[subscriptions.length - 1];
71-
int idx = 0;
72-
for (Subscription _s : subscriptions) {
73-
if (!_s.equals(s)) {
74-
// was not in this composite
75-
if (idx == newSubscriptions.length) {
76-
return this;
77-
}
78-
newSubscriptions[idx] = _s;
79-
idx++;
80-
}
81-
}
82-
if (idx == 0) {
83-
return clear();
84-
}
85-
// subscription appeared more than once
86-
if (idx < newSubscriptions.length) {
87-
Subscription[] newSub2 = new Subscription[idx];
88-
System.arraycopy(newSubscriptions, 0, newSub2, 0, idx);
89-
return new State(isUnsubscribed, newSub2);
90-
}
91-
return new State(isUnsubscribed, newSubscriptions);
92-
}
93-
94-
State clear() {
95-
return isUnsubscribed ? CLEAR_STATE_UNSUBSCRIBED : CLEAR_STATE;
96-
}
97-
}
36+
private Set<Subscription> subscriptions;
37+
private boolean unsubscribed = false;
9838

9939
public CompositeSubscription() {
100-
state.set(CLEAR_STATE);
10140
}
10241

10342
public CompositeSubscription(final Subscription... subscriptions) {
104-
state.set(new State(false, subscriptions));
43+
this.subscriptions = new HashSet<Subscription>(Arrays.asList(subscriptions));
10544
}
10645

10746
@Override
108-
public boolean isUnsubscribed() {
109-
return state.get().isUnsubscribed;
47+
public synchronized boolean isUnsubscribed() {
48+
return unsubscribed;
11049
}
11150

11251
public void add(final Subscription s) {
113-
State oldState;
114-
State newState;
115-
do {
116-
oldState = state.get();
117-
if (oldState.isUnsubscribed) {
118-
s.unsubscribe();
119-
return;
52+
Subscription unsubscribe = null;
53+
synchronized (this) {
54+
if (unsubscribed) {
55+
unsubscribe = s;
12056
} else {
121-
newState = oldState.add(s);
57+
if (subscriptions == null) {
58+
subscriptions = new HashSet<Subscription>(4);
59+
}
60+
subscriptions.add(s);
12261
}
123-
} while (!state.compareAndSet(oldState, newState));
62+
}
63+
if (unsubscribe != null) {
64+
// call after leaving the synchronized block so we're not holding a lock while executing this
65+
unsubscribe.unsubscribe();
66+
}
12467
}
12568

12669
public void remove(final Subscription s) {
127-
State oldState;
128-
State newState;
129-
do {
130-
oldState = state.get();
131-
if (oldState.isUnsubscribed) {
70+
boolean unsubscribe = false;
71+
synchronized (this) {
72+
if (unsubscribed || subscriptions == null) {
13273
return;
133-
} else {
134-
newState = oldState.remove(s);
13574
}
136-
} while (!state.compareAndSet(oldState, newState));
137-
// if we removed successfully we then need to call unsubscribe on it
138-
s.unsubscribe();
75+
unsubscribe = subscriptions.remove(s);
76+
}
77+
if (unsubscribe) {
78+
// if we removed successfully we then need to call unsubscribe on it (outside of the lock)
79+
s.unsubscribe();
80+
}
13981
}
14082

14183
public void clear() {
142-
State oldState;
143-
State newState;
144-
do {
145-
oldState = state.get();
146-
if (oldState.isUnsubscribed) {
84+
Collection<Subscription> unsubscribe = null;
85+
synchronized (this) {
86+
if (unsubscribed || subscriptions == null) {
14787
return;
14888
} else {
149-
newState = oldState.clear();
89+
unsubscribe = subscriptions;
90+
subscriptions = null;
15091
}
151-
} while (!state.compareAndSet(oldState, newState));
152-
// if we cleared successfully we then need to call unsubscribe on all previous
153-
unsubscribeFromAll(oldState.subscriptions);
92+
}
93+
unsubscribeFromAll(unsubscribe);
15494
}
15595

15696
@Override
15797
public void unsubscribe() {
158-
State oldState;
159-
State newState;
160-
do {
161-
oldState = state.get();
162-
if (oldState.isUnsubscribed) {
98+
synchronized (this) {
99+
if (unsubscribed) {
163100
return;
164-
} else {
165-
newState = oldState.unsubscribe();
166101
}
167-
} while (!state.compareAndSet(oldState, newState));
168-
unsubscribeFromAll(oldState.subscriptions);
102+
unsubscribed = true;
103+
}
104+
// we will only get here once
105+
unsubscribeFromAll(subscriptions);
169106
}
170107

171-
private static void unsubscribeFromAll(Subscription[] subscriptions) {
172-
final List<Throwable> es = new ArrayList<Throwable>();
108+
private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
109+
if (subscriptions == null) {
110+
return;
111+
}
112+
List<Throwable> es = null;
173113
for (Subscription s : subscriptions) {
174114
try {
175115
s.unsubscribe();
176116
} catch (Throwable e) {
117+
if (es == null) {
118+
es = new ArrayList<Throwable>();
119+
}
177120
es.add(e);
178121
}
179122
}
180-
if (!es.isEmpty()) {
123+
if (es != null) {
181124
if (es.size() == 1) {
182125
Throwable t = es.get(0);
183126
if (t instanceof RuntimeException) {

0 commit comments

Comments
 (0)