Skip to content

Commit 79535dc

Browse files
Decouple Dispose Function for Using
As per discussion at #1466
1 parent c7baab8 commit 79535dc

File tree

3 files changed

+90
-29
lines changed

3 files changed

+90
-29
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2906,12 +2906,45 @@ public final static Observable<Long> timer(long delay, TimeUnit unit, Scheduler
29062906
* the factory function to create a resource object that depends on the Observable
29072907
* @param observableFactory
29082908
* the factory function to create an Observable
2909+
* @param disposeAction
2910+
* the function that will dispose of the resource
29092911
* @return the Observable whose lifetime controls the lifetime of the dependent resource object
29102912
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#using">RxJava wiki: using</a>
29112913
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229585.aspx">MSDN: Observable.Using</a>
29122914
*/
2913-
public final static <T, Resource extends Subscription> Observable<T> using(Func0<Resource> resourceFactory, Func1<Resource, ? extends Observable<? extends T>> observableFactory) {
2914-
return create(new OnSubscribeUsing<T, Resource>(resourceFactory, observableFactory));
2915+
public final static <T, Resource> Observable<T> using(
2916+
final Func0<Resource> resourceFactory,
2917+
final Func1<? super Resource, ? extends Observable<? extends T>> observableFactory,
2918+
final Action1<? super Resource> disposeAction) {
2919+
return create(new OnSubscribeUsing<T, Resource>(resourceFactory, observableFactory, disposeAction));
2920+
}
2921+
2922+
/**
2923+
* Constructs an Observable that creates a dependent resource object.
2924+
* <p>
2925+
* <img width="640" height="400" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/using.png" alt="">
2926+
* <dl>
2927+
* <dt><b>Scheduler:</b></dt>
2928+
* <dd>{@code using} does not operate by default on a particular {@link Scheduler}.</dd>
2929+
* </dl>
2930+
*
2931+
* @param resourceFactory
2932+
* the factory function to create a resource object that depends on the Observable
2933+
* @param observableFactory
2934+
* the factory function to create an Observable
2935+
* @return the Observable whose lifetime controls the lifetime of the dependent resource object
2936+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#using">RxJava wiki: using</a>
2937+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229585.aspx">MSDN: Observable.Using</a>
2938+
*/
2939+
public final static <T, Resource extends Subscription> Observable<T> using(Func0<Resource> resourceFactory, Func1<? super Resource, ? extends Observable<? extends T>> observableFactory) {
2940+
return create(new OnSubscribeUsing<T, Resource>(resourceFactory, observableFactory, new Action1<Resource>() {
2941+
2942+
@Override
2943+
public void call(Resource r) {
2944+
r.unsubscribe();
2945+
}
2946+
2947+
}));
29152948
}
29162949

29172950
/**

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeUsing.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,46 @@
1818
import rx.Observable;
1919
import rx.Observable.OnSubscribe;
2020
import rx.Subscriber;
21-
import rx.Subscription;
21+
import rx.functions.Action0;
22+
import rx.functions.Action1;
2223
import rx.functions.Func0;
2324
import rx.functions.Func1;
25+
import rx.subscriptions.Subscriptions;
2426

2527
/**
2628
* Constructs an observable sequence that depends on a resource object.
2729
*/
28-
public final class OnSubscribeUsing<T, Resource extends Subscription> implements OnSubscribe<T> {
30+
public final class OnSubscribeUsing<T, Resource> implements OnSubscribe<T> {
2931

3032
private final Func0<Resource> resourceFactory;
31-
private final Func1<Resource, ? extends Observable<? extends T>> observableFactory;
33+
private final Func1<? super Resource, ? extends Observable<? extends T>> observableFactory;
34+
private final Action1<? super Resource> dispose;
3235

33-
public OnSubscribeUsing(Func0<Resource> resourceFactory, Func1<Resource, ? extends Observable<? extends T>> observableFactory) {
36+
public OnSubscribeUsing(Func0<Resource> resourceFactory,
37+
Func1<? super Resource, ? extends Observable<? extends T>> observableFactory,
38+
Action1<? super Resource> dispose) {
3439
this.resourceFactory = resourceFactory;
3540
this.observableFactory = observableFactory;
41+
this.dispose = dispose;
3642
}
3743

3844
public void call(Subscriber<? super T> subscriber) {
39-
Resource resource = null;
4045
try {
41-
resource = resourceFactory.call();
42-
subscriber.add(resource);
46+
final Resource resource = resourceFactory.call();
47+
subscriber.add(Subscriptions.create(new Action0() {
48+
49+
@Override
50+
public void call() {
51+
dispose.call(resource);
52+
}
53+
54+
}));
4355
Observable<? extends T> observable = observableFactory.call(resource);
4456
observable.subscribe(subscriber);
4557
} catch (Throwable e) {
46-
if (resource != null) {
47-
resource.unsubscribe();
48-
}
58+
// eagerly call unsubscribe since this operator is specifically about resource management
59+
subscriber.unsubscribe();
60+
// then propagate error
4961
subscriber.onError(e);
5062
}
5163
}

rxjava-core/src/test/java/rx/internal/operators/OnSubscribeUsingTest.java

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,36 @@
3232
import rx.Subscription;
3333
import rx.exceptions.TestException;
3434
import rx.functions.Action0;
35+
import rx.functions.Action1;
3536
import rx.functions.Func0;
3637
import rx.functions.Func1;
3738
import rx.subscriptions.Subscriptions;
3839

3940
public class OnSubscribeUsingTest {
4041

41-
private static interface Resource extends Subscription {
42+
private static interface Resource {
4243
public String getTextFromWeb();
4344

45+
public void dispose();
46+
}
47+
48+
private static class DisposeAction implements Action1<Resource> {
49+
4450
@Override
45-
public void unsubscribe();
51+
public void call(Resource r) {
52+
r.dispose();
53+
}
54+
4655
}
56+
57+
private final Action1<Subscription> disposeSubscription = new Action1<Subscription>() {
58+
59+
@Override
60+
public void call(Subscription s) {
61+
s.unsubscribe();
62+
}
63+
64+
};
4765

4866
@Test
4967
public void testUsing() {
@@ -66,7 +84,7 @@ public Observable<String> call(Resource resource) {
6684

6785
@SuppressWarnings("unchecked")
6886
Observer<String> observer = (Observer<String>) mock(Observer.class);
69-
Observable<String> observable = Observable.using(resourceFactory, observableFactory);
87+
Observable<String> observable = Observable.using(resourceFactory, observableFactory, new DisposeAction());
7088
observable.subscribe(observer);
7189

7290
InOrder inOrder = inOrder(observer);
@@ -76,7 +94,7 @@ public Observable<String> call(Resource resource) {
7694
inOrder.verifyNoMoreInteractions();
7795

7896
// The resouce should be closed
79-
verify(resource, times(1)).unsubscribe();
97+
verify(resource, times(1)).dispose();
8098
}
8199

82100
@Test
@@ -97,14 +115,10 @@ public String getTextFromWeb() {
97115
}
98116
return "Nothing";
99117
}
100-
118+
101119
@Override
102-
public void unsubscribe() {
103-
}
104-
105-
@Override
106-
public boolean isUnsubscribed() {
107-
return false;
120+
public void dispose() {
121+
// do nothing
108122
}
109123

110124
};
@@ -120,7 +134,7 @@ public Observable<String> call(Resource resource) {
120134

121135
@SuppressWarnings("unchecked")
122136
Observer<String> observer = (Observer<String>) mock(Observer.class);
123-
Observable<String> observable = Observable.using(resourceFactory, observableFactory);
137+
Observable<String> observable = Observable.using(resourceFactory, observableFactory, new DisposeAction());
124138
observable.subscribe(observer);
125139
observable.subscribe(observer);
126140

@@ -151,8 +165,8 @@ public Observable<Integer> call(Subscription subscription) {
151165
return Observable.empty();
152166
}
153167
};
154-
155-
Observable.using(resourceFactory, observableFactory).toBlocking().last();
168+
169+
Observable.using(resourceFactory, observableFactory, disposeSubscription).toBlocking().last();
156170
}
157171

158172
@Test
@@ -171,9 +185,9 @@ public Observable<Integer> call(Subscription subscription) {
171185
throw new TestException();
172186
}
173187
};
174-
188+
175189
try {
176-
Observable.using(resourceFactory, observableFactory).toBlocking().last();
190+
Observable.using(resourceFactory, observableFactory, disposeSubscription).toBlocking().last();
177191
fail("Should throw a TestException when the observableFactory throws it");
178192
} catch (TestException e) {
179193
// Make sure that unsubscribe is called so that users can close
@@ -203,9 +217,11 @@ public void call(Subscriber<? super Integer> t1) {
203217
});
204218
}
205219
};
220+
221+
206222

207223
try {
208-
Observable.using(resourceFactory, observableFactory).toBlocking().last();
224+
Observable.using(resourceFactory, observableFactory, disposeSubscription).toBlocking().last();
209225
fail("Should throw a TestException when the observableFactory throws it");
210226
} catch (TestException e) {
211227
// Make sure that unsubscribe is called so that users can close

0 commit comments

Comments
 (0)