Skip to content

Commit 9ec1715

Browse files
author
John Marks
committed
RefCount work in progress
1 parent 257a744 commit 9ec1715

File tree

8 files changed

+171
-1
lines changed

8 files changed

+171
-1
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
This package "rx.operators" is for internal implementation details and can change at any time.
2+
3+
It is excluded from the public Javadocs (http://netflix.github.io/RxJava/javadoc/) and should not be relied upon by any code.
4+
5+
In short, changes to public signatures of these classes will not be accounted for in the versioning of RxJava.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<!--
2+
3+
Copyright 2013 Netflix, Inc.
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
17+
-->
18+
<body>
19+
<p>Operators that allow composing Observables to transform and
20+
manipulate data in an asynchronous, functional and thread-safe manner.</p>
21+
<p>The operators are all exposed via the ObservableExtensions class</p>
22+
23+
</body>

classes/test/rxjava-core/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
This test folder only contains performance and functional/integration style tests.
2+
3+
The unit tests themselves are embedded as inner classes of the Java code (such as here: [rxjava-core/src/main/java/rx/operators](https://github.com/Netflix/RxJava/tree/master/rxjava-core/src/main/java/rx/operators)).
4+
5+
* For an explanation of this design choice see
6+
Ben J. Christensen's [JUnit Tests as Inner Classes](http://benjchristensen.com/2011/10/23/junit-tests-as-inner-classes/).
7+
8+
Also, each of the language adaptors has a /src/test/ folder which further testing (see Groovy for an example: [language-adaptors/rxjava-groovy/src/test](https://github.com/Netflix/RxJava/tree/master/language-adaptors/rxjava-groovy/src/test)).
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
This package "rx.operators" is for internal implementation details and can change at any time.
2+
3+
It is excluded from the public Javadocs (http://netflix.github.io/RxJava/javadoc/) and should not be relied upon by any code.
4+
5+
In short, changes to public signatures of these classes will not be accounted for in the versioning of RxJava.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<!--
2+
3+
Copyright 2013 Netflix, Inc.
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
17+
-->
18+
<body>
19+
<p>Operators that allow composing Observables to transform and
20+
manipulate data in an asynchronous, functional and thread-safe manner.</p>
21+
<p>The operators are all exposed via the ObservableExtensions class</p>
22+
23+
</body>

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4392,7 +4392,7 @@ public BlockingObservable<T> toBlockingObservable() {
43924392
*
43934393
* NOTE: If strong reasons for not depending on package names comes up then the implementation of this method can change to looking for a marker interface.
43944394
*
4395-
* @param f
4395+
* @param o
43964396
* @return {@code true} if the given function is an internal implementation, and {@code false} otherwise.
43974397
*/
43984398
private boolean isInternalImplementation(Object o) {

rxjava-core/src/main/java/rx/observables/ConnectableObservable.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.Observable;
1919
import rx.Observer;
2020
import rx.Subscription;
21+
import rx.operators.OperationRefCount;
2122
import rx.util.functions.Func1;
2223

2324
/**
@@ -47,4 +48,23 @@ protected ConnectableObservable(Func1<Observer<T>, Subscription> onSubscribe) {
4748
*/
4849
public abstract Subscription connect();
4950

51+
/**
52+
* Returns an observable sequence that stays connected to the source as long
53+
* as there is at least one subscription to the observable sequence.
54+
* @return a {@link Observable}
55+
*/
56+
public Observable<T> refCount() {
57+
return refCount(this);
58+
}
59+
60+
/**
61+
* Returns an observable sequence that stays connected to the source as long
62+
* as there is at least one subscription to the observable sequence.
63+
* @param that
64+
* a {@link ConnectableObservable}
65+
* @return a {@link Observable}
66+
*/
67+
public static <T> Observable<T> refCount(ConnectableObservable<T> that) {
68+
return Observable.create(OperationRefCount.refCount(that));
69+
}
5070
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import org.junit.Before;
19+
import org.junit.Test;
20+
import org.mockito.MockitoAnnotations;
21+
import rx.Observable;
22+
import rx.Observer;
23+
import rx.Subscription;
24+
import rx.observables.ConnectableObservable;
25+
import rx.subscriptions.Subscriptions;
26+
import rx.util.functions.Func1;
27+
28+
import static org.mockito.Mockito.*;
29+
30+
/**
31+
* Returns an observable sequence that stays connected to the source as long
32+
* as there is at least one subscription to the observable sequence.
33+
*/
34+
public final class OperationRefCount<T> {
35+
public static <T> Func1<Observer<T>, Subscription> refCount(ConnectableObservable<T> connectableObservable) {
36+
return new RefCount<T>(connectableObservable);
37+
}
38+
39+
private static class RefCount<T> implements Func1<Observer<T>, Subscription> {
40+
private final ConnectableObservable<T> innerConnectableObservable;
41+
42+
public RefCount(ConnectableObservable<T> innerConnectableObservable) {
43+
this.innerConnectableObservable = innerConnectableObservable;
44+
}
45+
46+
@Override
47+
public Subscription call(Observer<T> observer) {
48+
throw new UnsupportedOperationException();
49+
}
50+
}
51+
52+
public static class UnitTest {
53+
54+
@Before
55+
public void setUp() {
56+
MockitoAnnotations.initMocks(this);
57+
}
58+
59+
@Test
60+
public void subscriptionToUnderlyingOnFirstSubscription() {
61+
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
62+
Observable<Integer> refCounted = Observable.create(refCount(connectable));
63+
Observer<Integer> observer = mock(Observer.class);
64+
when(connectable.subscribe(observer)).thenReturn(Subscriptions.empty());
65+
when(connectable.connect()).thenReturn(Subscriptions.empty());
66+
refCounted.subscribe(observer);
67+
verify(connectable, times(1)).subscribe(observer);
68+
verify(connectable, times(1)).connect();
69+
}
70+
71+
@Test
72+
public void noSubscriptionToUnderlyingOnSecondSubscription() {
73+
74+
}
75+
76+
@Test
77+
public void unsubscriptionFromUnderlyingOnLastUnsubscription() {
78+
79+
}
80+
81+
@Test
82+
public void noUnsubscriptionFromUnderlyingOnFirstUnsubscription() {
83+
84+
}
85+
}
86+
}

0 commit comments

Comments
 (0)