Skip to content

Commit 078b687

Browse files
committed
use non-locking state machine based on atomic reference
1 parent 61aea25 commit 078b687

File tree

2 files changed

+182
-72
lines changed

2 files changed

+182
-72
lines changed

rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java

Lines changed: 90 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -15,103 +15,123 @@
1515
*/
1616
package rx.subscriptions;
1717

18+
import static java.util.Arrays.asList;
19+
import static java.util.Collections.unmodifiableSet;
20+
1821
import java.util.ArrayList;
1922
import java.util.Collection;
20-
import java.util.List;
21-
import java.util.concurrent.ConcurrentHashMap;
22-
import java.util.concurrent.atomic.AtomicBoolean;
23+
import java.util.HashSet;
24+
import java.util.Set;
25+
import java.util.concurrent.atomic.AtomicReference;
2326

2427
import rx.Subscription;
2528
import rx.util.CompositeException;
2629

2730
/**
28-
* Subscription that represents a group of Subscriptions that are unsubscribed together.
31+
* Subscription that represents a group of Subscriptions that are unsubscribed
32+
* together.
2933
*
30-
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
34+
* @see <a
35+
* href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net
36+
* equivalent CompositeDisposable</a>
3137
*/
3238
public class CompositeSubscription implements Subscription {
39+
private static final Set<Subscription> MUTATE_STATE = unmodifiableSet(new HashSet<Subscription>());
40+
private static final Set<Subscription> UNSUBSCRIBED_STATE = unmodifiableSet(new HashSet<Subscription>());
41+
42+
private final AtomicReference<Set<Subscription>> reference = new AtomicReference<Set<Subscription>>();
3343

34-
/*
35-
* The reason 'synchronized' is used on 'add' and 'unsubscribe' is because AtomicBoolean/ConcurrentLinkedQueue are both being modified so it needs to be done atomically.
36-
*
37-
* TODO evaluate whether use of synchronized is a performance issue here and if it's worth using an atomic state machine or other non-locking approach
38-
*/
39-
private AtomicBoolean unsubscribed = new AtomicBoolean(false);
40-
private final ConcurrentHashMap<Subscription, Boolean> subscriptions = new ConcurrentHashMap<Subscription, Boolean>();
41-
42-
public CompositeSubscription(List<Subscription> subscriptions) {
43-
for (Subscription s : subscriptions) {
44-
this.subscriptions.put(s, Boolean.TRUE);
45-
}
44+
public CompositeSubscription(final Subscription... subscriptions) {
45+
reference.set(new HashSet<Subscription>(asList(subscriptions)));
4646
}
4747

48-
public CompositeSubscription(Subscription... subscriptions) {
49-
for (Subscription s : subscriptions) {
50-
this.subscriptions.put(s, Boolean.TRUE);
51-
}
48+
public boolean isUnsubscribed() {
49+
return reference.get() == UNSUBSCRIBED_STATE;
5250
}
5351

54-
/**
55-
* Remove and unsubscribe all subscriptions but do not unsubscribe the outer CompositeSubscription.
56-
*/
57-
public void clear() {
58-
Collection<Throwable> es = null;
59-
for (Subscription s : subscriptions.keySet()) {
60-
try {
52+
public void add(final Subscription s) {
53+
do {
54+
final Set<Subscription> existing = reference.get();
55+
if (existing == UNSUBSCRIBED_STATE) {
6156
s.unsubscribe();
62-
this.subscriptions.remove(s);
63-
} catch (Throwable e) {
64-
if (es == null) {
65-
es = new ArrayList<Throwable>();
66-
}
67-
es.add(e);
57+
break;
6858
}
69-
}
70-
if (es != null) {
71-
throw new CompositeException("Failed to unsubscribe to 1 or more subscriptions.", es);
72-
}
73-
}
7459

75-
/**
76-
* Remove the {@link Subscription} and unsubscribe it.
77-
*
78-
* @param s
79-
*/
80-
public void remove(Subscription s) {
81-
this.subscriptions.remove(s);
82-
// also unsubscribe from it: http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
83-
s.unsubscribe();
60+
if (reference.compareAndSet(existing, MUTATE_STATE)) {
61+
existing.add(s);
62+
reference.set(existing);
63+
break;
64+
}
65+
} while (true);
8466
}
8567

86-
public boolean isUnsubscribed() {
87-
return unsubscribed.get();
68+
public void remove(final Subscription s) {
69+
do {
70+
final Set<Subscription> subscriptions = reference.get();
71+
if (subscriptions == UNSUBSCRIBED_STATE) {
72+
s.unsubscribe();
73+
break;
74+
}
75+
76+
if (reference.compareAndSet(subscriptions, MUTATE_STATE)) {
77+
// also unsubscribe from it:
78+
// http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
79+
subscriptions.remove(s);
80+
reference.set(subscriptions);
81+
s.unsubscribe();
82+
break;
83+
}
84+
} while (true);
8885
}
8986

90-
public synchronized void add(Subscription s) {
91-
if (unsubscribed.get()) {
92-
s.unsubscribe();
93-
} else {
94-
subscriptions.put(s, Boolean.TRUE);
95-
}
87+
public void clear() {
88+
do {
89+
final Set<Subscription> subscriptions = reference.get();
90+
if (subscriptions == UNSUBSCRIBED_STATE) {
91+
break;
92+
}
93+
94+
if (reference.compareAndSet(subscriptions, MUTATE_STATE)) {
95+
final Set<Subscription> copy = new HashSet<Subscription>(
96+
subscriptions);
97+
subscriptions.clear();
98+
reference.set(subscriptions);
99+
100+
for (final Subscription subscription : copy) {
101+
subscription.unsubscribe();
102+
}
103+
break;
104+
}
105+
} while (true);
96106
}
97107

98108
@Override
99-
public synchronized void unsubscribe() {
100-
if (unsubscribed.compareAndSet(false, true)) {
101-
Collection<Throwable> es = null;
102-
for (Subscription s : subscriptions.keySet()) {
103-
try {
104-
s.unsubscribe();
105-
} catch (Throwable e) {
106-
if (es == null) {
107-
es = new ArrayList<Throwable>();
109+
public void unsubscribe() {
110+
do {
111+
final Set<Subscription> subscriptions = reference.get();
112+
if (subscriptions == UNSUBSCRIBED_STATE) {
113+
break;
114+
}
115+
116+
if (subscriptions == MUTATE_STATE) {
117+
continue;
118+
}
119+
120+
if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_STATE)) {
121+
final Collection<Throwable> es = new ArrayList<Throwable>();
122+
for (final Subscription s : subscriptions) {
123+
try {
124+
s.unsubscribe();
125+
} catch (final Throwable e) {
126+
es.add(e);
108127
}
109-
es.add(e);
110128
}
129+
if (es.isEmpty()) {
130+
break;
131+
}
132+
throw new CompositeException(
133+
"Failed to unsubscribe to 1 or more subscriptions.", es);
111134
}
112-
if (es != null) {
113-
throw new CompositeException("Failed to unsubscribe to 1 or more subscriptions.", es);
114-
}
115-
}
135+
} while (true);
116136
}
117137
}

rxjava-core/src/test/java/rx/subscriptions/CompositeSubscriptionTest.java

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,14 @@
1515
*/
1616
package rx.subscriptions;
1717

18-
import static org.junit.Assert.*;
19-
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertFalse;
20+
import static org.junit.Assert.assertTrue;
21+
import static org.junit.Assert.fail;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.concurrent.CountDownLatch;
2026
import java.util.concurrent.atomic.AtomicInteger;
2127

2228
import org.junit.Test;
@@ -51,6 +57,48 @@ public void unsubscribe() {
5157
assertEquals(2, counter.get());
5258
}
5359

60+
@Test(timeout = 1000)
61+
public void shouldUnsubscribeAll() throws InterruptedException {
62+
final AtomicInteger counter = new AtomicInteger();
63+
final CompositeSubscription s = new CompositeSubscription();
64+
65+
final int count = 10;
66+
final CountDownLatch start = new CountDownLatch(1);
67+
for (int i = 0; i < count; i++) {
68+
s.add(new Subscription() {
69+
70+
@Override
71+
public void unsubscribe() {
72+
counter.incrementAndGet();
73+
}
74+
});
75+
}
76+
77+
final List<Thread> threads = new ArrayList<Thread>();
78+
for (int i = 0; i < count; i++) {
79+
final Thread t = new Thread() {
80+
@Override
81+
public void run() {
82+
try {
83+
start.await();
84+
s.unsubscribe();
85+
} catch (final InterruptedException e) {
86+
fail(e.getMessage());
87+
}
88+
}
89+
};
90+
t.start();
91+
threads.add(t);
92+
}
93+
94+
start.countDown();
95+
for (final Thread t : threads) {
96+
t.join();
97+
}
98+
99+
assertEquals(count, counter.get());
100+
}
101+
54102
@Test
55103
public void testException() {
56104
final AtomicInteger counter = new AtomicInteger();
@@ -144,4 +192,46 @@ public void unsubscribe() {
144192
// we should have only unsubscribed once
145193
assertEquals(1, counter.get());
146194
}
195+
196+
@Test(timeout = 1000)
197+
public void testUnsubscribeIdempotenceConcurrently()
198+
throws InterruptedException {
199+
final AtomicInteger counter = new AtomicInteger();
200+
final CompositeSubscription s = new CompositeSubscription();
201+
202+
final int count = 10;
203+
final CountDownLatch start = new CountDownLatch(1);
204+
s.add(new Subscription() {
205+
206+
@Override
207+
public void unsubscribe() {
208+
counter.incrementAndGet();
209+
}
210+
});
211+
212+
final List<Thread> threads = new ArrayList<Thread>();
213+
for (int i = 0; i < count; i++) {
214+
final Thread t = new Thread() {
215+
@Override
216+
public void run() {
217+
try {
218+
start.await();
219+
s.unsubscribe();
220+
} catch (final InterruptedException e) {
221+
fail(e.getMessage());
222+
}
223+
}
224+
};
225+
t.start();
226+
threads.add(t);
227+
}
228+
229+
start.countDown();
230+
for (final Thread t : threads) {
231+
t.join();
232+
}
233+
234+
// we should have only unsubscribed once
235+
assertEquals(1, counter.get());
236+
}
147237
}

0 commit comments

Comments
 (0)