Skip to content

Commit c728d77

Browse files
committed
Make unsubscribe + reference modification an atomic operation
1 parent d1706bc commit c728d77

File tree

1 file changed

+42
-21
lines changed

1 file changed

+42
-21
lines changed

rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
import java.util.concurrent.atomic.AtomicBoolean;
2121
import java.util.concurrent.atomic.AtomicReference;
22+
import java.util.concurrent.locks.Lock;
23+
import java.util.concurrent.locks.ReadWriteLock;
24+
import java.util.concurrent.locks.ReentrantReadWriteLock;
2225

2326
import rx.Subscription;
2427

@@ -29,25 +32,43 @@
2932
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx">Rx.Net equivalent SerialDisposable</a>
3033
*/
3134
public class SerialSubscription implements Subscription {
32-
private final AtomicBoolean unsubscribed = new AtomicBoolean();
33-
private final AtomicReference<Subscription> reference = new AtomicReference<Subscription>(empty());
34-
35-
@Override
36-
public void unsubscribe() {
37-
if (unsubscribed.compareAndSet(false, true)) {
38-
reference.getAndSet(empty()).unsubscribe();
39-
}
40-
}
41-
42-
public void setSubscription(final Subscription subscription) {
43-
if (unsubscribed.get()) {
44-
subscription.unsubscribe();
45-
} else {
46-
reference.getAndSet(subscription == null ? empty() : subscription).unsubscribe();
47-
}
48-
}
49-
50-
public Subscription getSubscription() {
51-
return reference.get();
52-
}
35+
private final AtomicBoolean unsubscribed = new AtomicBoolean();
36+
private final AtomicReference<Subscription> reference = new AtomicReference<Subscription>(empty());
37+
private final Lock read;
38+
private final Lock write;
39+
40+
public SerialSubscription() {
41+
final ReadWriteLock lock = new ReentrantReadWriteLock();
42+
read = lock.readLock();
43+
write = lock.writeLock();
44+
}
45+
46+
@Override
47+
public void unsubscribe() {
48+
write.lock();
49+
try {
50+
if (unsubscribed.compareAndSet(false, true)) {
51+
reference.getAndSet(empty()).unsubscribe();
52+
}
53+
} finally {
54+
write.unlock();
55+
}
56+
}
57+
58+
public void setSubscription(final Subscription subscription) {
59+
read.lock();
60+
try {
61+
if (unsubscribed.get()) {
62+
subscription.unsubscribe();
63+
} else {
64+
reference.getAndSet(subscription).unsubscribe();
65+
}
66+
} finally {
67+
read.unlock();
68+
}
69+
}
70+
71+
public Subscription getSubscription() {
72+
return reference.get();
73+
}
5374
}

0 commit comments

Comments
 (0)