1515 */
1616package rx .internal .operators ;
1717
18+ import java .util .concurrent .atomic .AtomicBoolean ;
1819import java .util .concurrent .atomic .AtomicInteger ;
1920import java .util .concurrent .locks .ReadWriteLock ;
2021import java .util .concurrent .locks .ReentrantReadWriteLock ;
2526import rx .functions .Action0 ;
2627import rx .functions .Action1 ;
2728import rx .observables .ConnectableObservable ;
29+ import rx .subscriptions .CompositeSubscription ;
2830import rx .subscriptions .Subscriptions ;
2931
3032/**
3638 */
3739public final class OnSubscribeRefCount <T > implements OnSubscribe <T > {
3840
39- private ConnectableObservable <? extends T > source ;
40- private volatile Subscription baseSubscription ;
41- private AtomicInteger subscriptionCount = new AtomicInteger (0 );
41+ private final ConnectableObservable <? extends T > source ;
42+ private volatile CompositeSubscription baseSubscription = new CompositeSubscription () ;
43+ private final AtomicInteger subscriptionCount = new AtomicInteger (0 );
4244
4345 /**
4446 * Ensures that subscribers wait for the first subscription to be assigned
@@ -59,44 +61,58 @@ public OnSubscribeRefCount(ConnectableObservable<? extends T> source) {
5961 @ Override
6062 public void call (final Subscriber <? super T > subscriber ) {
6163
62- // ensure secondary subscriptions wait for baseSubscription to be set by
63- // first subscription
64- lock .writeLock ().lock ();
65-
6664 if (subscriptionCount .incrementAndGet () == 1 ) {
67- // need to use this overload of connect to ensure that
68- // baseSubscription is set in the case that source is a synchronous
69- // Observable
70- source .connect (new Action1 <Subscription >() {
71- @ Override
72- public void call (Subscription subscription ) {
73- baseSubscription = subscription ;
74-
75- // handle unsubscribing from the base subscription
76- subscriber .add (disconnect ());
7765
78- // ready to subscribe to source so do it
79- source .unsafeSubscribe (subscriber );
66+ // ensure secondary subscriptions wait for baseSubscription to be
67+ // set by first subscription
68+ lock .writeLock ().lock ();
8069
81- // release the write lock
70+ final AtomicBoolean writeLocked = new AtomicBoolean (true );
71+
72+ try {
73+ // need to use this overload of connect to ensure that
74+ // baseSubscription is set in the case that source is a synchronous
75+ // Observable
76+ source .connect (new Action1 <Subscription >() {
77+ @ Override
78+ public void call (Subscription subscription ) {
79+
80+ try {
81+ baseSubscription .add (subscription );
82+
83+ // handle unsubscribing from the base subscription
84+ subscriber .add (disconnect ());
85+
86+ // ready to subscribe to source so do it
87+ source .unsafeSubscribe (subscriber );
88+ } finally {
89+ // release the write lock
90+ lock .writeLock ().unlock ();
91+ writeLocked .set (false );
92+ }
93+ }
94+ });
95+ } finally {
96+ // need to cover the case where the source is subscribed to
97+ // outside of this class thus preventing the above Action1
98+ // being called
99+ if (writeLocked .get ()) {
100+ // Action1 was not called
82101 lock .writeLock ().unlock ();
83102 }
84- });
103+ }
85104 } else {
86- // release the write lock
87- lock .writeLock ().unlock ();
88-
89- // wait till baseSubscription set
90105 lock .readLock ().lock ();
91-
92- // handle unsubscribing from the base subscription
93- subscriber .add (disconnect ());
94-
95- // ready to subscribe to source so do it
96- source .unsafeSubscribe (subscriber );
97-
98- //release the read lock
99- lock .readLock ().unlock ();
106+ try {
107+ // handle unsubscribing from the base subscription
108+ subscriber .add (disconnect ());
109+
110+ // ready to subscribe to source so do it
111+ source .unsafeSubscribe (subscriber );
112+ } finally {
113+ // release the read lock
114+ lock .readLock ().unlock ();
115+ }
100116 }
101117
102118 }
@@ -107,6 +123,10 @@ private Subscription disconnect() {
107123 public void call () {
108124 if (subscriptionCount .decrementAndGet () == 0 ) {
109125 baseSubscription .unsubscribe ();
126+
127+ // need a new baseSubscription because once unsubscribed
128+ // stays that way
129+ baseSubscription = new CompositeSubscription ();
110130 }
111131 }
112132 });
0 commit comments