Skip to content

Commit 63b67d5

Browse files
committed
fix awaitTerminalEventAndUnsubscribeOnTimeout
1 parent 32179c6 commit 63b67d5

File tree

2 files changed

+40
-4
lines changed

2 files changed

+40
-4
lines changed

src/main/java/rx/observers/TestSubscriber.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ public void awaitTerminalEvent(long timeout, TimeUnit unit) {
281281
* Blocks until this {@link Subscriber} receives a notification that the {@code Observable} is complete
282282
* (either an {@code onCompleted} or {@code onError} notification), or until a timeout expires; if the
283283
* Subscriber is interrupted before either of these events take place, this method unsubscribes the
284-
* Subscriber from the Observable).
284+
* Subscriber from the Observable). If timeout expires then the Subscriber is unsubscribed from the Observable.
285285
*
286286
* @param timeout
287287
* the duration of the timeout
@@ -290,8 +290,12 @@ public void awaitTerminalEvent(long timeout, TimeUnit unit) {
290290
*/
291291
public void awaitTerminalEventAndUnsubscribeOnTimeout(long timeout, TimeUnit unit) {
292292
try {
293-
awaitTerminalEvent(timeout, unit);
294-
} catch (RuntimeException e) {
293+
boolean result = latch.await(timeout, unit);
294+
if (!result) {
295+
// timeout occurred
296+
unsubscribe();
297+
}
298+
} catch (InterruptedException e) {
295299
unsubscribe();
296300
}
297301
}

src/test/java/rx/observers/TestSubscriberTest.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,24 @@
1616
package rx.observers;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertTrue;
1920
import static org.mockito.Mockito.inOrder;
2021
import static org.mockito.Mockito.mock;
2122
import static org.mockito.Mockito.times;
2223

2324
import java.util.Arrays;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2427

28+
import org.junit.Assert;
2529
import org.junit.Rule;
2630
import org.junit.Test;
2731
import org.junit.rules.ExpectedException;
2832
import org.mockito.InOrder;
2933

3034
import rx.Observable;
3135
import rx.Observer;
36+
import rx.functions.Action0;
3237
import rx.subjects.PublishSubject;
3338

3439
public class TestSubscriberTest {
@@ -124,8 +129,35 @@ public void testWrappingMockWhenUnsubscribeInvolved() {
124129
@Test
125130
public void testAssertError() {
126131
RuntimeException e = new RuntimeException("Oops");
127-
TestSubscriber subscriber = new TestSubscriber();
132+
TestSubscriber<Object> subscriber = new TestSubscriber<Object>();
128133
Observable.error(e).subscribe(subscriber);
129134
subscriber.assertError(e);
130135
}
136+
137+
@Test
138+
public void testAwaitTerminalEventWithDuration() {
139+
TestSubscriber<Object> ts = new TestSubscriber<Object>();
140+
Observable.just(1).subscribe(ts);
141+
ts.awaitTerminalEvent(1, TimeUnit.SECONDS);
142+
ts.assertTerminalEvent();
143+
}
144+
145+
@Test
146+
public void testAwaitTerminalEventWithDurationAndUnsubscribeOnTimeout() {
147+
TestSubscriber<Object> ts = new TestSubscriber<Object>();
148+
final AtomicBoolean unsub = new AtomicBoolean(false);
149+
Observable.just(1)
150+
//
151+
.doOnUnsubscribe(new Action0() {
152+
@Override
153+
public void call() {
154+
unsub.set(true);
155+
}
156+
})
157+
//
158+
.delay(1000, TimeUnit.MILLISECONDS).subscribe(ts);
159+
ts.awaitTerminalEventAndUnsubscribeOnTimeout(100, TimeUnit.MILLISECONDS);
160+
assertTrue(unsub.get());
161+
}
162+
131163
}

0 commit comments

Comments
 (0)