1717
1818import java .util .concurrent .atomic .AtomicBoolean ;
1919import java .util .concurrent .atomic .AtomicInteger ;
20- import java .util .concurrent .locks .ReadWriteLock ;
21- import java .util .concurrent .locks .ReentrantReadWriteLock ;
20+ import java .util .concurrent .locks .ReentrantLock ;
2221
2322import rx .Observable .OnSubscribe ;
2423import rx .Subscriber ;
@@ -43,10 +42,9 @@ public final class OnSubscribeRefCount<T> implements OnSubscribe<T> {
4342 private final AtomicInteger subscriptionCount = new AtomicInteger (0 );
4443
4544 /**
46- * Ensures that subscribers wait for the first subscription to be assigned
47- * to baseSubcription before being subscribed themselves.
45+ * Use this lock for every subscription and disconnect action.
4846 */
49- private final ReadWriteLock lock = new ReentrantReadWriteLock ();
47+ private final ReentrantLock lock = new ReentrantLock ();
5048
5149 /**
5250 * Constructor.
@@ -61,48 +59,26 @@ public OnSubscribeRefCount(ConnectableObservable<? extends T> source) {
6159 @ Override
6260 public void call (final Subscriber <? super T > subscriber ) {
6361
62+ lock .lock ();
6463 if (subscriptionCount .incrementAndGet () == 1 ) {
6564
66- // ensure secondary subscriptions wait for baseSubscription to be
67- // set by first subscription
68- lock .writeLock ().lock ();
69-
7065 final AtomicBoolean writeLocked = new AtomicBoolean (true );
71-
66+
7267 try {
7368 // need to use this overload of connect to ensure that
74- // baseSubscription is set in the case that source is a synchronous
75- // Observable
76- source .connect (new Action1 <Subscription >() {
77- @ Override
78- public void call (Subscription subscription ) {
79-
80- try {
81- baseSubscription .add (subscription );
82-
83- // handle unsubscribing from the base subscription
84- subscriber .add (disconnect ());
85-
86- // ready to subscribe to source so do it
87- source .unsafeSubscribe (subscriber );
88- } finally {
89- // release the write lock
90- lock .writeLock ().unlock ();
91- writeLocked .set (false );
92- }
93- }
94- });
69+ // baseSubscription is set in the case that source is a
70+ // synchronous Observable
71+ source .connect (onSubscribe (subscriber , writeLocked ));
9572 } finally {
9673 // need to cover the case where the source is subscribed to
97- // outside of this class thus preventing the above Action1
74+ // outside of this class thus preventing the above Action1
9875 // being called
9976 if (writeLocked .get ()) {
10077 // Action1 was not called
101- lock .writeLock (). unlock ();
78+ lock .unlock ();
10279 }
10380 }
10481 } else {
105- lock .readLock ().lock ();
10682 try {
10783 // handle unsubscribing from the base subscription
10884 subscriber .add (disconnect ());
@@ -111,22 +87,49 @@ public void call(Subscription subscription) {
11187 source .unsafeSubscribe (subscriber );
11288 } finally {
11389 // release the read lock
114- lock .readLock (). unlock ();
90+ lock .unlock ();
11591 }
11692 }
11793
11894 }
11995
96+ private Action1 <Subscription > onSubscribe (final Subscriber <? super T > subscriber ,
97+ final AtomicBoolean writeLocked ) {
98+ return new Action1 <Subscription >() {
99+ @ Override
100+ public void call (Subscription subscription ) {
101+
102+ try {
103+ baseSubscription .add (subscription );
104+
105+ // handle unsubscribing from the base subscription
106+ subscriber .add (disconnect ());
107+
108+ // ready to subscribe to source so do it
109+ source .unsafeSubscribe (subscriber );
110+ } finally {
111+ // release the write lock
112+ lock .unlock ();
113+ writeLocked .set (false );
114+ }
115+ }
116+ };
117+ }
118+
120119 private Subscription disconnect () {
121120 return Subscriptions .create (new Action0 () {
122121 @ Override
123122 public void call () {
124- if (subscriptionCount .decrementAndGet () == 0 ) {
125- baseSubscription .unsubscribe ();
126-
127- // need a new baseSubscription because once unsubscribed
128- // stays that way
129- baseSubscription = new CompositeSubscription ();
123+ lock .lock ();
124+ try {
125+ if (subscriptionCount .decrementAndGet () == 0 ) {
126+ baseSubscription .unsubscribe ();
127+ // need a new baseSubscription because once
128+ // unsubscribed stays that way
129+ baseSubscription = new CompositeSubscription ();
130+ }
131+ } finally {
132+ lock .unlock ();
130133 }
131134 }
132135 });
0 commit comments