Skip to content

Commit 6cc1646

Browse files
authored
1.x: fix subscribe(Action1 [, Action1]) to report isUnsubscribed (#4716)
1 parent 1ac5834 commit 6cc1646

File tree

3 files changed

+77
-32
lines changed

3 files changed

+77
-32
lines changed

src/main/java/rx/Single.java

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1535,19 +1535,7 @@ public final Single<T> onErrorResumeNext(final Func1<Throwable, ? extends Single
15351535
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
15361536
*/
15371537
public final Subscription subscribe() {
1538-
return subscribe(new SingleSubscriber<T>() {
1539-
1540-
@Override
1541-
public final void onError(Throwable e) {
1542-
throw new OnErrorNotImplementedException(e);
1543-
}
1544-
1545-
@Override
1546-
public final void onSuccess(T args) {
1547-
// do nothing
1548-
}
1549-
1550-
});
1538+
return subscribe(Actions.empty(), Actions.errorNotImplemented());
15511539
}
15521540

15531541
/**
@@ -1567,23 +1555,7 @@ public final void onSuccess(T args) {
15671555
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
15681556
*/
15691557
public final Subscription subscribe(final Action1<? super T> onSuccess) {
1570-
if (onSuccess == null) {
1571-
throw new IllegalArgumentException("onSuccess can not be null");
1572-
}
1573-
1574-
return subscribe(new SingleSubscriber<T>() {
1575-
1576-
@Override
1577-
public final void onError(Throwable e) {
1578-
throw new OnErrorNotImplementedException(e);
1579-
}
1580-
1581-
@Override
1582-
public final void onSuccess(T args) {
1583-
onSuccess.call(args);
1584-
}
1585-
1586-
});
1558+
return subscribe(onSuccess, Actions.errorNotImplemented());
15871559
}
15881560

15891561
/**
@@ -1617,12 +1589,20 @@ public final Subscription subscribe(final Action1<? super T> onSuccess, final Ac
16171589

16181590
@Override
16191591
public final void onError(Throwable e) {
1620-
onError.call(e);
1592+
try {
1593+
onError.call(e);
1594+
} finally {
1595+
unsubscribe();
1596+
}
16211597
}
16221598

16231599
@Override
16241600
public final void onSuccess(T args) {
1625-
onSuccess.call(args);
1601+
try {
1602+
onSuccess.call(args);
1603+
} finally {
1604+
unsubscribe();
1605+
}
16261606
}
16271607

16281608
});

src/main/java/rx/functions/Actions.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package rx.functions;
1717

18+
import rx.exceptions.OnErrorNotImplementedException;
19+
1820
/**
1921
* Utility class for the Action interfaces.
2022
*/
@@ -566,4 +568,20 @@ public void call(T t) {
566568
action.call();
567569
}
568570
}
571+
572+
enum NotImplemented implements Action1<Throwable> {
573+
INSTANCE;
574+
@Override
575+
public void call(Throwable t) {
576+
throw new OnErrorNotImplementedException(t);
577+
}
578+
}
579+
580+
/**
581+
* Returns an action which throws OnErrorNotImplementedException.
582+
* @return the the shared action
583+
*/
584+
public static Action1<Throwable> errorNotImplemented() {
585+
return NotImplemented.INSTANCE;
586+
}
569587
}

src/test/java/rx/SingleTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2136,4 +2136,51 @@ public void call(final Notification<? extends Integer> notification) {
21362136

21372137
assertEquals(1, atomicInteger.get());
21382138
}
2139+
2140+
@Test
2141+
public void isUnsubscribedAfterSuccess() {
2142+
PublishSubject<Integer> ps = PublishSubject.create();
2143+
2144+
final int[] calls = { 0 };
2145+
2146+
Subscription s = ps.toSingle().subscribe(new Action1<Integer>() {
2147+
@Override
2148+
public void call(Integer t) {
2149+
calls[0]++;
2150+
}
2151+
});
2152+
2153+
assertFalse(s.isUnsubscribed());
2154+
2155+
ps.onNext(1);
2156+
ps.onCompleted();
2157+
2158+
assertTrue(s.isUnsubscribed());
2159+
2160+
assertEquals(1, calls[0]);
2161+
}
2162+
2163+
@Test
2164+
public void isUnsubscribedAfterError() {
2165+
PublishSubject<Integer> ps = PublishSubject.create();
2166+
2167+
final int[] calls = { 0 };
2168+
2169+
Action1<Integer> a = Actions.empty();
2170+
2171+
Subscription s = ps.toSingle().subscribe(a, new Action1<Throwable>() {
2172+
@Override
2173+
public void call(Throwable t) {
2174+
calls[0]++;
2175+
}
2176+
});
2177+
2178+
assertFalse(s.isUnsubscribed());
2179+
2180+
ps.onError(new TestException());
2181+
2182+
assertTrue(s.isUnsubscribed());
2183+
2184+
assertEquals(1, calls[0]);
2185+
}
21392186
}

0 commit comments

Comments
 (0)