Skip to content

Commit d1706bc

Browse files
committed
Use lock free strategy
Reduce contention by using CAS (Compare And Swap) operations to replace subscription
1 parent 43420f8 commit d1706bc

File tree

2 files changed

+120
-90
lines changed

2 files changed

+120
-90
lines changed

rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java

Lines changed: 23 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
*/
1616
package rx.subscriptions;
1717

18+
import static rx.subscriptions.Subscriptions.empty;
19+
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
import java.util.concurrent.atomic.AtomicReference;
22+
1823
import rx.Subscription;
1924

2025
/**
@@ -24,47 +29,25 @@
2429
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx">Rx.Net equivalent SerialDisposable</a>
2530
*/
2631
public class SerialSubscription implements Subscription {
27-
private boolean unsubscribed;
28-
private Subscription subscription;
29-
private final Object gate = new Object();
32+
private final AtomicBoolean unsubscribed = new AtomicBoolean();
33+
private final AtomicReference<Subscription> reference = new AtomicReference<Subscription>(empty());
3034

31-
@Override
32-
public void unsubscribe() {
33-
Subscription toUnsubscribe = null;
34-
synchronized (gate) {
35-
if (!unsubscribed) {
36-
if (subscription != null) {
37-
toUnsubscribe = subscription;
38-
subscription = null;
39-
}
40-
unsubscribed = true;
41-
}
42-
}
43-
if (toUnsubscribe != null) {
44-
toUnsubscribe.unsubscribe();
45-
}
46-
}
35+
@Override
36+
public void unsubscribe() {
37+
if (unsubscribed.compareAndSet(false, true)) {
38+
reference.getAndSet(empty()).unsubscribe();
39+
}
40+
}
4741

48-
public Subscription getSubscription() {
49-
synchronized (gate) {
50-
return subscription;
51-
}
52-
}
42+
public void setSubscription(final Subscription subscription) {
43+
if (unsubscribed.get()) {
44+
subscription.unsubscribe();
45+
} else {
46+
reference.getAndSet(subscription == null ? empty() : subscription).unsubscribe();
47+
}
48+
}
5349

54-
public void setSubscription(Subscription subscription) {
55-
Subscription toUnsubscribe = null;
56-
synchronized (gate) {
57-
if (!unsubscribed) {
58-
if (this.subscription != null) {
59-
toUnsubscribe = this.subscription;
60-
}
61-
this.subscription = subscription;
62-
} else {
63-
toUnsubscribe = subscription;
64-
}
65-
}
66-
if (toUnsubscribe != null) {
67-
toUnsubscribe.unsubscribe();
68-
}
69-
}
50+
public Subscription getSubscription() {
51+
return reference.get();
52+
}
7053
}

rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java

Lines changed: 97 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,13 @@
1515
*/
1616
package rx.subscriptions;
1717

18-
import static org.mockito.Mockito.*;
18+
import static org.junit.Assert.fail;
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.verify;
21+
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.concurrent.CountDownLatch;
1925

2026
import org.junit.Before;
2127
import org.junit.Test;
@@ -24,53 +30,94 @@
2430
import rx.Subscription;
2531

2632
public class SerialSubscriptionTests {
27-
private SerialSubscription serialSubscription;
28-
29-
@Before
30-
public void setUp() {
31-
MockitoAnnotations.initMocks(this);
32-
33-
serialSubscription = new SerialSubscription();
34-
}
35-
36-
@Test
37-
public void unsubscribingWithoutUnderlyingDoesNothing() {
38-
serialSubscription.unsubscribe();
39-
}
40-
41-
@Test
42-
public void unsubscribingWithSingleUnderlyingUnsubscribes() {
43-
Subscription underlying = mock(Subscription.class);
44-
serialSubscription.setSubscription(underlying);
45-
underlying.unsubscribe();
46-
verify(underlying).unsubscribe();
47-
}
48-
49-
@Test
50-
public void replacingFirstUnderlyingCausesUnsubscription() {
51-
Subscription first = mock(Subscription.class);
52-
serialSubscription.setSubscription(first);
53-
Subscription second = mock(Subscription.class);
54-
serialSubscription.setSubscription(second);
55-
verify(first).unsubscribe();
56-
}
57-
58-
@Test
59-
public void whenUnsubscribingSecondUnderlyingUnsubscribed() {
60-
Subscription first = mock(Subscription.class);
61-
serialSubscription.setSubscription(first);
62-
Subscription second = mock(Subscription.class);
63-
serialSubscription.setSubscription(second);
64-
serialSubscription.unsubscribe();
65-
verify(second).unsubscribe();
66-
}
67-
68-
@Test
69-
public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscription()
70-
{
71-
serialSubscription.unsubscribe();
72-
Subscription underlying = mock(Subscription.class);
73-
serialSubscription.setSubscription(underlying);
74-
verify(underlying).unsubscribe();
75-
}
33+
private SerialSubscription serialSubscription;
34+
35+
@Before
36+
public void setUp() {
37+
MockitoAnnotations.initMocks(this);
38+
39+
serialSubscription = new SerialSubscription();
40+
}
41+
42+
@Test
43+
public void unsubscribingWithoutUnderlyingDoesNothing() {
44+
serialSubscription.unsubscribe();
45+
}
46+
47+
@Test
48+
public void unsubscribingWithSingleUnderlyingUnsubscribes() {
49+
Subscription underlying = mock(Subscription.class);
50+
serialSubscription.setSubscription(underlying);
51+
underlying.unsubscribe();
52+
verify(underlying).unsubscribe();
53+
}
54+
55+
@Test
56+
public void replacingFirstUnderlyingCausesUnsubscription() {
57+
Subscription first = mock(Subscription.class);
58+
serialSubscription.setSubscription(first);
59+
Subscription second = mock(Subscription.class);
60+
serialSubscription.setSubscription(second);
61+
verify(first).unsubscribe();
62+
}
63+
64+
@Test
65+
public void whenUnsubscribingSecondUnderlyingUnsubscribed() {
66+
Subscription first = mock(Subscription.class);
67+
serialSubscription.setSubscription(first);
68+
Subscription second = mock(Subscription.class);
69+
serialSubscription.setSubscription(second);
70+
serialSubscription.unsubscribe();
71+
verify(second).unsubscribe();
72+
}
73+
74+
@Test
75+
public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscription() {
76+
serialSubscription.unsubscribe();
77+
Subscription underlying = mock(Subscription.class);
78+
serialSubscription.setSubscription(underlying);
79+
verify(underlying).unsubscribe();
80+
}
81+
82+
@Test(timeout = 1000)
83+
public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscriptionConcurrently()
84+
throws InterruptedException {
85+
final Subscription firstSet = mock(Subscription.class);
86+
serialSubscription.setSubscription(firstSet);
87+
88+
final CountDownLatch start = new CountDownLatch(1);
89+
90+
final int count = 10;
91+
final CountDownLatch end = new CountDownLatch(count);
92+
93+
final List<Thread> threads = new ArrayList<Thread>();
94+
for (int i = 0 ; i < count ; i++) {
95+
final Thread t = new Thread() {
96+
@Override
97+
public void run() {
98+
try {
99+
start.await();
100+
serialSubscription.unsubscribe();
101+
} catch (InterruptedException e) {
102+
fail(e.getMessage());
103+
} finally {
104+
end.countDown();
105+
}
106+
}
107+
};
108+
t.start();
109+
threads.add(t);
110+
}
111+
112+
final Subscription underlying = mock(Subscription.class);
113+
start.countDown();
114+
serialSubscription.setSubscription(underlying);
115+
end.await();
116+
verify(firstSet).unsubscribe();
117+
verify(underlying).unsubscribe();
118+
119+
for (final Thread t : threads) {
120+
t.interrupt();
121+
}
122+
}
76123
}

0 commit comments

Comments
 (0)