Skip to content

Commit fc7f077

Browse files
committed
OperatorRefCount
1 parent 95e0636 commit fc7f077

File tree

4 files changed

+66
-69
lines changed

4 files changed

+66
-69
lines changed

rxjava-core/src/main/java/rx/observables/ConnectableObservable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import rx.Observable;
1919
import rx.Subscriber;
2020
import rx.Subscription;
21-
import rx.operators.OperationRefCount;
21+
import rx.operators.OperatorRefCount;
2222

2323
/**
2424
* A ConnectableObservable resembles an ordinary {@link Observable}, except that it does not begin
@@ -44,6 +44,7 @@ protected ConnectableObservable(OnSubscribe<T> onSubscribe) {
4444
/**
4545
* Call a ConnectableObservable's connect() method to instruct it to begin emitting the
4646
* items from its underlying {@link Observable} to its {@link Subscriber}s.
47+
* @return the subscription representing the connection
4748
*/
4849
public abstract Subscription connect();
4950

@@ -54,6 +55,6 @@ protected ConnectableObservable(OnSubscribe<T> onSubscribe) {
5455
* @return a {@link Observable}
5556
*/
5657
public Observable<T> refCount() {
57-
return Observable.create(OperationRefCount.refCount(this));
58+
return create(new OperatorRefCount<T>(this));
5859
}
5960
}

rxjava-core/src/main/java/rx/operators/OperationRefCount.java

Lines changed: 0 additions & 67 deletions
This file was deleted.
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable.OnSubscribe;
19+
import rx.Subscriber;
20+
import rx.Subscription;
21+
import rx.functions.Action0;
22+
import rx.observables.ConnectableObservable;
23+
import rx.subscriptions.Subscriptions;
24+
25+
/**
26+
* Returns an observable sequence that stays connected to the source as long
27+
* as there is at least one subscription to the observable sequence.
28+
* @param <T> the value type
29+
*/
30+
public final class OperatorRefCount<T> implements OnSubscribe<T> {
31+
final ConnectableObservable<? extends T> source;
32+
final Object guard;
33+
/** Guarded by guard. */
34+
int count;
35+
/** Guarded by guard. */
36+
Subscription connection;
37+
public OperatorRefCount(ConnectableObservable<? extends T> source) {
38+
this.source = source;
39+
this.guard = new Object();
40+
}
41+
42+
@Override
43+
public void call(Subscriber<? super T> t1) {
44+
t1.add(Subscriptions.create(new Action0() {
45+
@Override
46+
public void call() {
47+
synchronized (guard) {
48+
if (--count == 0) {
49+
connection.unsubscribe();
50+
connection = null;
51+
}
52+
}
53+
}
54+
}));
55+
source.unsafeSubscribe(t1);
56+
synchronized (guard) {
57+
if (count++ == 0) {
58+
connection = source.connect();
59+
}
60+
}
61+
}
62+
}

rxjava-core/src/test/java/rx/RefCountTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public void call() {
5656
}
5757
});
5858
Observable<Integer> refCounted = observable.publish().refCount();
59+
@SuppressWarnings("unchecked")
5960
Observer<Integer> observer = mock(Observer.class);
6061
Subscription first = refCounted.subscribe(observer);
6162
assertEquals(1, subscriptionCount.get());

0 commit comments

Comments
 (0)