1515 */
1616package rx .internal .operators ;
1717
18- import java .util .ArrayList ;
19- import java .util .List ;
20- import java .util .Map ;
21- import java . util . WeakHashMap ;
18+ import java .util .concurrent . atomic . AtomicBoolean ;
19+ import java .util .concurrent . atomic . AtomicInteger ;
20+ import java .util .concurrent . locks . ReentrantLock ;
21+
2222import rx .Observable .OnSubscribe ;
2323import rx .Subscriber ;
2424import rx .Subscription ;
2525import rx .functions .Action0 ;
26+ import rx .functions .Action1 ;
2627import rx .observables .ConnectableObservable ;
28+ import rx .subscriptions .CompositeSubscription ;
2729import rx .subscriptions .Subscriptions ;
2830
2931/**
30- * Returns an observable sequence that stays connected to the source as long
31- * as there is at least one subscription to the observable sequence.
32- * @param <T> the value type
32+ * Returns an observable sequence that stays connected to the source as long as
33+ * there is at least one subscription to the observable sequence.
34+ *
35+ * @param <T>
36+ * the value type
3337 */
3438public final class OnSubscribeRefCount <T > implements OnSubscribe <T > {
35- final ConnectableObservable <? extends T > source ;
36- final Object guard ;
37- /** Guarded by guard. */
38- int index ;
39- /** Guarded by guard. */
40- boolean emitting ;
41- /** Guarded by guard. If true, indicates a connection request, false indicates a disconnect request. */
42- List < Token > queue ;
43- /** Manipulated while in the serialized section. */
44- int count ;
45- /** Manipulated while in the serialized section. */
46- Subscription connection ;
47- /** Manipulated while in the serialized section. */
48- final Map < Token , Object > connectionStatus ;
49- /** Occupied indicator. */
50- private static final Object OCCUPIED = new Object ();
39+
40+ private final ConnectableObservable <? extends T > source ;
41+ private volatile CompositeSubscription baseSubscription = new CompositeSubscription ();
42+ private final AtomicInteger subscriptionCount = new AtomicInteger ( 0 ) ;
43+
44+ /**
45+ * Use this lock for every subscription and disconnect action.
46+ */
47+ private final ReentrantLock lock = new ReentrantLock ();
48+
49+ /**
50+ * Constructor.
51+ *
52+ * @param source
53+ * observable to apply ref count to
54+ */
5155 public OnSubscribeRefCount (ConnectableObservable <? extends T > source ) {
5256 this .source = source ;
53- this .guard = new Object ();
54- this .connectionStatus = new WeakHashMap <Token , Object >();
5557 }
5658
5759 @ Override
58- public void call (Subscriber <? super T > t1 ) {
59- int id ;
60- synchronized (guard ) {
61- id = ++index ;
62- }
63- final Token t = new Token (id );
64- t1 .add (Subscriptions .create (new Action0 () {
65- @ Override
66- public void call () {
67- disconnect (t );
68- }
69- }));
70- source .unsafeSubscribe (t1 );
71- connect (t );
72- }
73- private void connect (Token id ) {
74- List <Token > localQueue ;
75- synchronized (guard ) {
76- if (emitting ) {
77- if (queue == null ) {
78- queue = new ArrayList <Token >();
79- }
80- queue .add (id );
81- return ;
82- }
83-
84- localQueue = queue ;
85- queue = null ;
86- emitting = true ;
87- }
88- boolean once = true ;
89- do {
90- drain (localQueue );
91- if (once ) {
92- once = false ;
93- doConnect (id );
94- }
95- synchronized (guard ) {
96- localQueue = queue ;
97- queue = null ;
98- if (localQueue == null ) {
99- emitting = false ;
100- return ;
101- }
102- }
103- } while (true );
104- }
105- private void disconnect (Token id ) {
106- List <Token > localQueue ;
107- synchronized (guard ) {
108- if (emitting ) {
109- if (queue == null ) {
110- queue = new ArrayList <Token >();
111- }
112- queue .add (id .toDisconnect ()); // negative value indicates disconnect
113- return ;
114- }
115-
116- localQueue = queue ;
117- queue = null ;
118- emitting = true ;
119- }
120- boolean once = true ;
121- do {
122- drain (localQueue );
123- if (once ) {
124- once = false ;
125- doDisconnect (id );
126- }
127- synchronized (guard ) {
128- localQueue = queue ;
129- queue = null ;
130- if (localQueue == null ) {
131- emitting = false ;
132- return ;
60+ public void call (final Subscriber <? super T > subscriber ) {
61+
62+ lock .lock ();
63+ if (subscriptionCount .incrementAndGet () == 1 ) {
64+
65+ final AtomicBoolean writeLocked = new AtomicBoolean (true );
66+
67+ try {
68+ // need to use this overload of connect to ensure that
69+ // baseSubscription is set in the case that source is a
70+ // synchronous Observable
71+ source .connect (onSubscribe (subscriber , writeLocked ));
72+ } finally {
73+ // need to cover the case where the source is subscribed to
74+ // outside of this class thus preventing the above Action1
75+ // being called
76+ if (writeLocked .get ()) {
77+ // Action1 was not called
78+ lock .unlock ();
13379 }
13480 }
135- } while (true );
136- }
137- private void drain (List <Token > localQueue ) {
138- if (localQueue == null ) {
139- return ;
140- }
141- int n = localQueue .size ();
142- for (int i = 0 ; i < n ; i ++) {
143- Token id = localQueue .get (i );
144- if (id .isDisconnect ()) {
145- doDisconnect (id );
146- } else {
147- doConnect (id );
148- }
149- }
150- }
151- private void doConnect (Token id ) {
152- // this method is called only once per id
153- // if add succeeds, id was not yet disconnected
154- if (connectionStatus .put (id , OCCUPIED ) == null ) {
155- if (count ++ == 0 ) {
156- connection = source .connect ();
157- }
15881 } else {
159- // connection exists due to disconnect, just remove
160- connectionStatus .remove (id );
161- }
162- }
163- private void doDisconnect (Token id ) {
164- // this method is called only once per id
165- // if remove succeeds, id was connected
166- if (connectionStatus .remove (id ) != null ) {
167- if (--count == 0 ) {
168- connection .unsubscribe ();
169- connection = null ;
82+ try {
83+ // handle unsubscribing from the base subscription
84+ subscriber .add (disconnect ());
85+
86+ // ready to subscribe to source so do it
87+ source .unsafeSubscribe (subscriber );
88+ } finally {
89+ // release the read lock
90+ lock .unlock ();
17091 }
171- } else {
172- // mark id as if connected
173- connectionStatus .put (id , OCCUPIED );
17492 }
93+
17594 }
176- /** Token that represens a connection request or a disconnection request. */
177- private static final class Token {
178- final int id ;
179- public Token (int id ) {
180- this .id = id ;
181- }
18295
183- @ Override
184- public boolean equals (Object obj ) {
185- if (obj == null ) {
186- return false ;
187- }
188- if (obj .getClass () != getClass ()) {
189- return false ;
96+ private Action1 <Subscription > onSubscribe (final Subscriber <? super T > subscriber ,
97+ final AtomicBoolean writeLocked ) {
98+ return new Action1 <Subscription >() {
99+ @ Override
100+ public void call (Subscription subscription ) {
101+
102+ try {
103+ baseSubscription .add (subscription );
104+
105+ // handle unsubscribing from the base subscription
106+ subscriber .add (disconnect ());
107+
108+ // ready to subscribe to source so do it
109+ source .unsafeSubscribe (subscriber );
110+ } finally {
111+ // release the write lock
112+ lock .unlock ();
113+ writeLocked .set (false );
114+ }
190115 }
191- int other = ((Token )obj ).id ;
192- return id == other || -id == other ;
193- }
116+ };
117+ }
194118
195- @ Override
196- public int hashCode () {
197- return id < 0 ? -id : id ;
198- }
199- public boolean isDisconnect () {
200- return id < 0 ;
201- }
202- public Token toDisconnect () {
203- if (id < 0 ) {
204- return this ;
119+ private Subscription disconnect () {
120+ return Subscriptions .create (new Action0 () {
121+ @ Override
122+ public void call () {
123+ lock .lock ();
124+ try {
125+ if (subscriptionCount .decrementAndGet () == 0 ) {
126+ baseSubscription .unsubscribe ();
127+ // need a new baseSubscription because once
128+ // unsubscribed stays that way
129+ baseSubscription = new CompositeSubscription ();
130+ }
131+ } finally {
132+ lock .unlock ();
133+ }
205134 }
206- return new Token (-id );
207- }
135+ });
208136 }
209- }
137+ }
0 commit comments