Skip to content

Commit d3a2f68

Browse files
Merge pull request #1291 from mattrjacobs/check-unsubscribe-within-observable-from-future
Check unsubscribe within observable from future
2 parents 6a9ff72 + e285a7b commit d3a2f68

File tree

2 files changed

+77
-0
lines changed

2 files changed

+77
-0
lines changed

rxjava-core/src/main/java/rx/operators/OnSubscribeToObservableFuture.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ public void call() {
6262
}
6363
}));
6464
try {
65+
//don't block or propagate CancellationException if already unsubscribed
66+
if (subscriber.isUnsubscribed()) {
67+
return;
68+
}
6569
T value = (unit == null) ? (T) that.get() : (T) that.get(time, unit);
6670
subscriber.onNext(value);
6771
subscriber.onCompleted();
@@ -71,6 +75,10 @@ public void call() {
7175
// since it's already subscribed.
7276
// If the Future is canceled in other place, CancellationException will be still
7377
// passed to the final Subscriber.
78+
if (subscriber.isUnsubscribed()) {
79+
//refuse to emit onError if already unsubscribed
80+
return;
81+
}
7482
subscriber.onError(e);
7583
}
7684
}

rxjava-core/src/test/java/rx/operators/OnSubscribeToObservableFutureTest.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,30 @@
1515
*/
1616
package rx.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.mockito.Mockito.any;
1920
import static org.mockito.Mockito.mock;
2021
import static org.mockito.Mockito.never;
2122
import static org.mockito.Mockito.times;
2223
import static org.mockito.Mockito.verify;
2324
import static org.mockito.Mockito.when;
2425

26+
import java.util.concurrent.CancellationException;
27+
import java.util.concurrent.ExecutionException;
2528
import java.util.concurrent.Future;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.TimeoutException;
31+
import java.util.concurrent.atomic.AtomicBoolean;
2632

2733
import org.junit.Test;
2834

2935
import rx.Observable;
3036
import rx.Observer;
37+
import rx.Subscriber;
3138
import rx.Subscription;
3239
import rx.observers.TestObserver;
40+
import rx.observers.TestSubscriber;
41+
import rx.schedulers.Schedulers;
3342

3443
public class OnSubscribeToObservableFutureTest {
3544

@@ -68,4 +77,64 @@ public void testFailure() throws Exception {
6877
verify(o, times(1)).onError(e);
6978
verify(future, times(1)).cancel(true);
7079
}
80+
81+
@Test
82+
public void testCancelledBeforeSubscribe() throws Exception {
83+
Future<Object> future = mock(Future.class);
84+
CancellationException e = new CancellationException("unit test synthetic cancellation");
85+
when(future.get()).thenThrow(e);
86+
Observer<Object> o = mock(Observer.class);
87+
88+
TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>(o);
89+
testSubscriber.unsubscribe();
90+
Subscription sub = Observable.from(future).subscribe(testSubscriber);
91+
assertEquals(0, testSubscriber.getOnErrorEvents().size());
92+
assertEquals(0, testSubscriber.getOnCompletedEvents().size());
93+
}
94+
95+
@Test
96+
public void testCancellationDuringFutureGet() throws Exception {
97+
Future<Object> future = new Future<Object>() {
98+
private AtomicBoolean isCancelled = new AtomicBoolean(false);
99+
private AtomicBoolean isDone = new AtomicBoolean(false);
100+
101+
@Override
102+
public boolean cancel(boolean mayInterruptIfRunning) {
103+
isCancelled.compareAndSet(false, true);
104+
return true;
105+
}
106+
107+
@Override
108+
public boolean isCancelled() {
109+
return isCancelled.get();
110+
}
111+
112+
@Override
113+
public boolean isDone() {
114+
return isCancelled() || isDone.get();
115+
}
116+
117+
@Override
118+
public Object get() throws InterruptedException, ExecutionException {
119+
Thread.sleep(500);
120+
isDone.compareAndSet(false, true);
121+
return "foo";
122+
}
123+
124+
@Override
125+
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
126+
return get();
127+
}
128+
};
129+
130+
Observer<Object> o = mock(Observer.class);
131+
132+
TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>(o);
133+
Observable<Object> futureObservable = Observable.from(future);
134+
Subscription sub = futureObservable.subscribeOn(Schedulers.computation()).subscribe(testSubscriber);
135+
sub.unsubscribe();
136+
assertEquals(0, testSubscriber.getOnErrorEvents().size());
137+
assertEquals(0, testSubscriber.getOnCompletedEvents().size());
138+
assertEquals(0, testSubscriber.getOnNextEvents().size());
139+
}
71140
}

0 commit comments

Comments
 (0)