3232import rx .Subscription ;
3333import rx .exceptions .TestException ;
3434import rx .functions .Action0 ;
35+ import rx .functions .Action1 ;
3536import rx .functions .Func0 ;
3637import rx .functions .Func1 ;
3738import rx .subscriptions .Subscriptions ;
3839
3940public class OnSubscribeUsingTest {
4041
41- private static interface Resource extends Subscription {
42+ private static interface Resource {
4243 public String getTextFromWeb ();
4344
45+ public void dispose ();
46+ }
47+
48+ private static class DisposeAction implements Action1 <Resource > {
49+
4450 @ Override
45- public void unsubscribe ();
51+ public void call (Resource r ) {
52+ r .dispose ();
53+ }
54+
4655 }
56+
57+ private final Action1 <Subscription > disposeSubscription = new Action1 <Subscription >() {
58+
59+ @ Override
60+ public void call (Subscription s ) {
61+ s .unsubscribe ();
62+ }
63+
64+ };
4765
4866 @ Test
4967 public void testUsing () {
@@ -66,7 +84,7 @@ public Observable<String> call(Resource resource) {
6684
6785 @ SuppressWarnings ("unchecked" )
6886 Observer <String > observer = (Observer <String >) mock (Observer .class );
69- Observable <String > observable = Observable .using (resourceFactory , observableFactory );
87+ Observable <String > observable = Observable .using (resourceFactory , observableFactory , new DisposeAction () );
7088 observable .subscribe (observer );
7189
7290 InOrder inOrder = inOrder (observer );
@@ -76,7 +94,7 @@ public Observable<String> call(Resource resource) {
7694 inOrder .verifyNoMoreInteractions ();
7795
7896 // The resouce should be closed
79- verify (resource , times (1 )).unsubscribe ();
97+ verify (resource , times (1 )).dispose ();
8098 }
8199
82100 @ Test
@@ -97,14 +115,10 @@ public String getTextFromWeb() {
97115 }
98116 return "Nothing" ;
99117 }
100-
118+
101119 @ Override
102- public void unsubscribe () {
103- }
104-
105- @ Override
106- public boolean isUnsubscribed () {
107- return false ;
120+ public void dispose () {
121+ // do nothing
108122 }
109123
110124 };
@@ -120,7 +134,7 @@ public Observable<String> call(Resource resource) {
120134
121135 @ SuppressWarnings ("unchecked" )
122136 Observer <String > observer = (Observer <String >) mock (Observer .class );
123- Observable <String > observable = Observable .using (resourceFactory , observableFactory );
137+ Observable <String > observable = Observable .using (resourceFactory , observableFactory , new DisposeAction () );
124138 observable .subscribe (observer );
125139 observable .subscribe (observer );
126140
@@ -151,8 +165,8 @@ public Observable<Integer> call(Subscription subscription) {
151165 return Observable .empty ();
152166 }
153167 };
154-
155- Observable .using (resourceFactory , observableFactory ).toBlocking ().last ();
168+
169+ Observable .using (resourceFactory , observableFactory , disposeSubscription ).toBlocking ().last ();
156170 }
157171
158172 @ Test
@@ -171,9 +185,9 @@ public Observable<Integer> call(Subscription subscription) {
171185 throw new TestException ();
172186 }
173187 };
174-
188+
175189 try {
176- Observable .using (resourceFactory , observableFactory ).toBlocking ().last ();
190+ Observable .using (resourceFactory , observableFactory , disposeSubscription ).toBlocking ().last ();
177191 fail ("Should throw a TestException when the observableFactory throws it" );
178192 } catch (TestException e ) {
179193 // Make sure that unsubscribe is called so that users can close
@@ -203,9 +217,11 @@ public void call(Subscriber<? super Integer> t1) {
203217 });
204218 }
205219 };
220+
221+
206222
207223 try {
208- Observable .using (resourceFactory , observableFactory ).toBlocking ().last ();
224+ Observable .using (resourceFactory , observableFactory , disposeSubscription ).toBlocking ().last ();
209225 fail ("Should throw a TestException when the observableFactory throws it" );
210226 } catch (TestException e ) {
211227 // Make sure that unsubscribe is called so that users can close
0 commit comments