Skip to content

Commit 9897ca8

Browse files
committed
Lock free strategy using a sentinel (Thanks to akarnokd)
1 parent c728d77 commit 9897ca8

File tree

2 files changed

+113
-117
lines changed

2 files changed

+113
-117
lines changed

rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,7 @@
1717

1818
import static rx.subscriptions.Subscriptions.empty;
1919

20-
import java.util.concurrent.atomic.AtomicBoolean;
2120
import java.util.concurrent.atomic.AtomicReference;
22-
import java.util.concurrent.locks.Lock;
23-
import java.util.concurrent.locks.ReadWriteLock;
24-
import java.util.concurrent.locks.ReentrantReadWriteLock;
2521

2622
import rx.Subscription;
2723

@@ -32,40 +28,40 @@
3228
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx">Rx.Net equivalent SerialDisposable</a>
3329
*/
3430
public class SerialSubscription implements Subscription {
35-
private final AtomicBoolean unsubscribed = new AtomicBoolean();
3631
private final AtomicReference<Subscription> reference = new AtomicReference<Subscription>(empty());
37-
private final Lock read;
38-
private final Lock write;
3932

40-
public SerialSubscription() {
41-
final ReadWriteLock lock = new ReentrantReadWriteLock();
42-
read = lock.readLock();
43-
write = lock.writeLock();
44-
}
33+
private static final Subscription UNSUBSCRIBED = new Subscription() {
34+
@Override
35+
public void unsubscribe() {
36+
}
37+
};
4538

4639
@Override
4740
public void unsubscribe() {
48-
write.lock();
49-
try {
50-
if (unsubscribed.compareAndSet(false, true)) {
51-
reference.getAndSet(empty()).unsubscribe();
41+
do {
42+
final Subscription current = reference.get();
43+
if (current == UNSUBSCRIBED) {
44+
break;
5245
}
53-
} finally {
54-
write.unlock();
55-
}
46+
if (reference.compareAndSet(current, UNSUBSCRIBED)) {
47+
current.unsubscribe();
48+
break;
49+
}
50+
} while (true);
5651
}
5752

5853
public void setSubscription(final Subscription subscription) {
59-
read.lock();
60-
try {
61-
if (unsubscribed.get()) {
54+
do {
55+
final Subscription current = reference.get();
56+
if (current == UNSUBSCRIBED) {
6257
subscription.unsubscribe();
63-
} else {
64-
reference.getAndSet(subscription).unsubscribe();
58+
break;
6559
}
66-
} finally {
67-
read.unlock();
68-
}
60+
if (reference.compareAndSet(current, subscription)) {
61+
current.unsubscribe();
62+
break;
63+
}
64+
} while (true);
6965
}
7066

7167
public Subscription getSubscription() {

rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java

Lines changed: 90 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -30,94 +30,94 @@
3030
import rx.Subscription;
3131

3232
public class SerialSubscriptionTests {
33-
private SerialSubscription serialSubscription;
34-
35-
@Before
36-
public void setUp() {
37-
MockitoAnnotations.initMocks(this);
38-
39-
serialSubscription = new SerialSubscription();
40-
}
41-
42-
@Test
43-
public void unsubscribingWithoutUnderlyingDoesNothing() {
44-
serialSubscription.unsubscribe();
45-
}
46-
47-
@Test
48-
public void unsubscribingWithSingleUnderlyingUnsubscribes() {
49-
Subscription underlying = mock(Subscription.class);
50-
serialSubscription.setSubscription(underlying);
51-
underlying.unsubscribe();
52-
verify(underlying).unsubscribe();
53-
}
54-
55-
@Test
56-
public void replacingFirstUnderlyingCausesUnsubscription() {
57-
Subscription first = mock(Subscription.class);
58-
serialSubscription.setSubscription(first);
59-
Subscription second = mock(Subscription.class);
60-
serialSubscription.setSubscription(second);
61-
verify(first).unsubscribe();
62-
}
63-
64-
@Test
65-
public void whenUnsubscribingSecondUnderlyingUnsubscribed() {
66-
Subscription first = mock(Subscription.class);
67-
serialSubscription.setSubscription(first);
68-
Subscription second = mock(Subscription.class);
69-
serialSubscription.setSubscription(second);
70-
serialSubscription.unsubscribe();
71-
verify(second).unsubscribe();
72-
}
73-
74-
@Test
75-
public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscription() {
76-
serialSubscription.unsubscribe();
77-
Subscription underlying = mock(Subscription.class);
78-
serialSubscription.setSubscription(underlying);
79-
verify(underlying).unsubscribe();
80-
}
81-
82-
@Test(timeout = 1000)
83-
public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscriptionConcurrently()
84-
throws InterruptedException {
85-
final Subscription firstSet = mock(Subscription.class);
86-
serialSubscription.setSubscription(firstSet);
87-
88-
final CountDownLatch start = new CountDownLatch(1);
89-
90-
final int count = 10;
91-
final CountDownLatch end = new CountDownLatch(count);
92-
93-
final List<Thread> threads = new ArrayList<Thread>();
94-
for (int i = 0 ; i < count ; i++) {
95-
final Thread t = new Thread() {
96-
@Override
97-
public void run() {
98-
try {
99-
start.await();
100-
serialSubscription.unsubscribe();
101-
} catch (InterruptedException e) {
102-
fail(e.getMessage());
103-
} finally {
104-
end.countDown();
105-
}
106-
}
107-
};
108-
t.start();
109-
threads.add(t);
110-
}
111-
112-
final Subscription underlying = mock(Subscription.class);
113-
start.countDown();
114-
serialSubscription.setSubscription(underlying);
115-
end.await();
116-
verify(firstSet).unsubscribe();
117-
verify(underlying).unsubscribe();
118-
119-
for (final Thread t : threads) {
120-
t.interrupt();
121-
}
122-
}
33+
private SerialSubscription serialSubscription;
34+
35+
@Before
36+
public void setUp() {
37+
MockitoAnnotations.initMocks(this);
38+
39+
serialSubscription = new SerialSubscription();
40+
}
41+
42+
@Test
43+
public void unsubscribingWithoutUnderlyingDoesNothing() {
44+
serialSubscription.unsubscribe();
45+
}
46+
47+
@Test
48+
public void unsubscribingWithSingleUnderlyingUnsubscribes() {
49+
Subscription underlying = mock(Subscription.class);
50+
serialSubscription.setSubscription(underlying);
51+
underlying.unsubscribe();
52+
verify(underlying).unsubscribe();
53+
}
54+
55+
@Test
56+
public void replacingFirstUnderlyingCausesUnsubscription() {
57+
Subscription first = mock(Subscription.class);
58+
serialSubscription.setSubscription(first);
59+
Subscription second = mock(Subscription.class);
60+
serialSubscription.setSubscription(second);
61+
verify(first).unsubscribe();
62+
}
63+
64+
@Test
65+
public void whenUnsubscribingSecondUnderlyingUnsubscribed() {
66+
Subscription first = mock(Subscription.class);
67+
serialSubscription.setSubscription(first);
68+
Subscription second = mock(Subscription.class);
69+
serialSubscription.setSubscription(second);
70+
serialSubscription.unsubscribe();
71+
verify(second).unsubscribe();
72+
}
73+
74+
@Test
75+
public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscription() {
76+
serialSubscription.unsubscribe();
77+
Subscription underlying = mock(Subscription.class);
78+
serialSubscription.setSubscription(underlying);
79+
verify(underlying).unsubscribe();
80+
}
81+
82+
@Test(timeout = 1000)
83+
public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscriptionConcurrently()
84+
throws InterruptedException {
85+
final Subscription firstSet = mock(Subscription.class);
86+
serialSubscription.setSubscription(firstSet);
87+
88+
final CountDownLatch start = new CountDownLatch(1);
89+
90+
final int count = 10;
91+
final CountDownLatch end = new CountDownLatch(count);
92+
93+
final List<Thread> threads = new ArrayList<Thread>();
94+
for (int i = 0 ; i < count ; i++) {
95+
final Thread t = new Thread() {
96+
@Override
97+
public void run() {
98+
try {
99+
start.await();
100+
serialSubscription.unsubscribe();
101+
} catch (InterruptedException e) {
102+
fail(e.getMessage());
103+
} finally {
104+
end.countDown();
105+
}
106+
}
107+
};
108+
t.start();
109+
threads.add(t);
110+
}
111+
112+
final Subscription underlying = mock(Subscription.class);
113+
start.countDown();
114+
serialSubscription.setSubscription(underlying);
115+
end.await();
116+
verify(firstSet).unsubscribe();
117+
verify(underlying).unsubscribe();
118+
119+
for (final Thread t : threads) {
120+
t.interrupt();
121+
}
122+
}
123123
}

0 commit comments

Comments
 (0)