Skip to content

Commit d3d2757

Browse files
author
John Marks
committed
RefCount work in progress
1 parent 9ec1715 commit d3d2757

File tree

1 file changed

+66
-4
lines changed

1 file changed

+66
-4
lines changed

rxjava-core/src/main/java/rx/operators/OperationRefCount.java

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717

1818
import org.junit.Before;
1919
import org.junit.Test;
20+
import org.junit.runner.RunWith;
21+
import org.junit.runners.JUnit4;
2022
import org.mockito.MockitoAnnotations;
2123
import rx.Observable;
2224
import rx.Observer;
2325
import rx.Subscription;
2426
import rx.observables.ConnectableObservable;
2527
import rx.subscriptions.Subscriptions;
28+
import rx.util.functions.Action0;
2629
import rx.util.functions.Func1;
2730

2831
import static org.mockito.Mockito.*;
@@ -38,17 +41,38 @@ public static <T> Func1<Observer<T>, Subscription> refCount(ConnectableObservabl
3841

3942
private static class RefCount<T> implements Func1<Observer<T>, Subscription> {
4043
private final ConnectableObservable<T> innerConnectableObservable;
44+
private final Object gate = new Object();
45+
private int count = 0;
46+
private Subscription connection = null;
4147

4248
public RefCount(ConnectableObservable<T> innerConnectableObservable) {
4349
this.innerConnectableObservable = innerConnectableObservable;
4450
}
4551

4652
@Override
4753
public Subscription call(Observer<T> observer) {
48-
throw new UnsupportedOperationException();
54+
final Subscription subscription = innerConnectableObservable.subscribe(observer);
55+
synchronized (gate) {
56+
if (count++ == 0) {
57+
connection = innerConnectableObservable.connect();
58+
}
59+
}
60+
return Subscriptions.create(new Action0() {
61+
@Override
62+
public void call() {
63+
synchronized (gate) {
64+
if (--count == 0) {
65+
connection.unsubscribe();
66+
connection = null;
67+
}
68+
}
69+
subscription.unsubscribe();
70+
}
71+
});
4972
}
5073
}
5174

75+
@RunWith(JUnit4.class)
5276
public static class UnitTest {
5377

5478
@Before
@@ -58,8 +82,10 @@ public void setUp() {
5882

5983
@Test
6084
public void subscriptionToUnderlyingOnFirstSubscription() {
85+
@SuppressWarnings("unchecked")
6186
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
6287
Observable<Integer> refCounted = Observable.create(refCount(connectable));
88+
@SuppressWarnings("unchecked")
6389
Observer<Integer> observer = mock(Observer.class);
6490
when(connectable.subscribe(observer)).thenReturn(Subscriptions.empty());
6591
when(connectable.connect()).thenReturn(Subscriptions.empty());
@@ -70,17 +96,53 @@ public void subscriptionToUnderlyingOnFirstSubscription() {
7096

7197
@Test
7298
public void noSubscriptionToUnderlyingOnSecondSubscription() {
73-
99+
@SuppressWarnings("unchecked")
100+
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
101+
Observable<Integer> refCounted = Observable.create(refCount(connectable));
102+
@SuppressWarnings("unchecked")
103+
Observer<Integer> observer = mock(Observer.class);
104+
when(connectable.subscribe(observer)).thenReturn(Subscriptions.empty());
105+
when(connectable.connect()).thenReturn(Subscriptions.empty());
106+
refCounted.subscribe(observer);
107+
refCounted.subscribe(observer);
108+
verify(connectable, times(2)).subscribe(observer);
109+
verify(connectable, times(1)).connect();
74110
}
75111

76112
@Test
77113
public void unsubscriptionFromUnderlyingOnLastUnsubscription() {
78-
114+
@SuppressWarnings("unchecked")
115+
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
116+
Observable<Integer> refCounted = Observable.create(refCount(connectable));
117+
@SuppressWarnings("unchecked")
118+
Observer<Integer> observer = mock(Observer.class);
119+
Subscription underlying = mock(Subscription.class);
120+
when(connectable.subscribe(observer)).thenReturn(underlying);
121+
Subscription connection = mock(Subscription.class);
122+
when(connectable.connect()).thenReturn(connection);
123+
Subscription first = refCounted.subscribe(observer);
124+
first.unsubscribe();
125+
verify(underlying, times(1)).unsubscribe();
126+
verify(connection, times(1)).unsubscribe();
79127
}
80128

81129
@Test
82130
public void noUnsubscriptionFromUnderlyingOnFirstUnsubscription() {
83-
131+
@SuppressWarnings("unchecked")
132+
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
133+
Observable<Integer> refCounted = Observable.create(refCount(connectable));
134+
@SuppressWarnings("unchecked")
135+
Observer<Integer> observer = mock(Observer.class);
136+
Subscription underlying = mock(Subscription.class);
137+
when(connectable.subscribe(observer)).thenReturn(underlying);
138+
Subscription connection = mock(Subscription.class);
139+
when(connectable.connect()).thenReturn(connection);
140+
Subscription first = refCounted.subscribe(observer);
141+
Subscription second = refCounted.subscribe(observer);
142+
first.unsubscribe();
143+
second.unsubscribe();
144+
verify(underlying, times(2)).unsubscribe();
145+
verify(connection, times(1)).unsubscribe();
84146
}
85147
}
86148
}

0 commit comments

Comments
 (0)