Skip to content

Commit 2d11791

Browse files
committed
2 new tests for OnSubscribeToObservableFuture
1) Creating an Observable from an already-cancelled Future should emit nothing (equivalent to Observable.never()) (it currently emits onError(CancellationException) 2) Creating an Observable from a Future then unsubscribeing immediately before Future.get() unblocks should be an Observable.never(). This test already passes.
1 parent 428c0b5 commit 2d11791

File tree

1 file changed

+69
-0
lines changed

1 file changed

+69
-0
lines changed

rxjava-core/src/test/java/rx/operators/OnSubscribeToObservableFutureTest.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,30 @@
1515
*/
1616
package rx.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.mockito.Mockito.any;
1920
import static org.mockito.Mockito.mock;
2021
import static org.mockito.Mockito.never;
2122
import static org.mockito.Mockito.times;
2223
import static org.mockito.Mockito.verify;
2324
import static org.mockito.Mockito.when;
2425

26+
import java.util.concurrent.CancellationException;
27+
import java.util.concurrent.ExecutionException;
2528
import java.util.concurrent.Future;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.TimeoutException;
31+
import java.util.concurrent.atomic.AtomicBoolean;
2632

2733
import org.junit.Test;
2834

2935
import rx.Observable;
3036
import rx.Observer;
37+
import rx.Subscriber;
3138
import rx.Subscription;
3239
import rx.observers.TestObserver;
40+
import rx.observers.TestSubscriber;
41+
import rx.schedulers.Schedulers;
3342

3443
public class OnSubscribeToObservableFutureTest {
3544

@@ -68,4 +77,64 @@ public void testFailure() throws Exception {
6877
verify(o, times(1)).onError(e);
6978
verify(future, times(1)).cancel(true);
7079
}
80+
81+
@Test
82+
public void testCancelledBeforeSubscribe() throws Exception {
83+
Future<Object> future = mock(Future.class);
84+
CancellationException e = new CancellationException("unit test synthetic cancellation");
85+
when(future.get()).thenThrow(e);
86+
Observer<Object> o = mock(Observer.class);
87+
88+
TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>(o);
89+
testSubscriber.unsubscribe();
90+
Subscription sub = Observable.from(future).subscribe(testSubscriber);
91+
assertEquals(0, testSubscriber.getOnErrorEvents().size());
92+
assertEquals(0, testSubscriber.getOnCompletedEvents().size());
93+
}
94+
95+
@Test
96+
public void testCancellationDuringFutureGet() throws Exception {
97+
Future<Object> future = new Future<Object>() {
98+
private AtomicBoolean isCancelled = new AtomicBoolean(false);
99+
private AtomicBoolean isDone = new AtomicBoolean(false);
100+
101+
@Override
102+
public boolean cancel(boolean mayInterruptIfRunning) {
103+
isCancelled.compareAndSet(false, true);
104+
return true;
105+
}
106+
107+
@Override
108+
public boolean isCancelled() {
109+
return isCancelled.get();
110+
}
111+
112+
@Override
113+
public boolean isDone() {
114+
return isCancelled() || isDone.get();
115+
}
116+
117+
@Override
118+
public Object get() throws InterruptedException, ExecutionException {
119+
Thread.sleep(500);
120+
isDone.compareAndSet(false, true);
121+
return "foo";
122+
}
123+
124+
@Override
125+
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
126+
return get();
127+
}
128+
};
129+
130+
Observer<Object> o = mock(Observer.class);
131+
132+
TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>(o);
133+
Observable<Object> futureObservable = Observable.from(future);
134+
Subscription sub = futureObservable.subscribeOn(Schedulers.computation()).subscribe(testSubscriber);
135+
sub.unsubscribe();
136+
assertEquals(0, testSubscriber.getOnErrorEvents().size());
137+
assertEquals(0, testSubscriber.getOnCompletedEvents().size());
138+
assertEquals(0, testSubscriber.getOnNextEvents().size());
139+
}
71140
}

0 commit comments

Comments
 (0)