Skip to content

Commit d807292

Browse files
committed
1.x: SyncOnSubscribeTest.testConcurrentRequests give more time.
The test failed on Travis and locally if my machine was under heavy load without interacting with the mock. This change gives more time in the inner await and reports the exception instead of itself throwing.
1 parent 3e2b3b1 commit d807292

File tree

1 file changed

+37
-52
lines changed

1 file changed

+37
-52
lines changed

src/test/java/rx/observables/SyncOnSubscribeTest.java

Lines changed: 37 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -16,57 +16,25 @@
1616

1717
package rx.observables;
1818

19-
import static org.junit.Assert.assertEquals;
20-
import static org.junit.Assert.assertFalse;
21-
import static org.junit.Assert.assertNull;
22-
import static org.junit.Assert.assertTrue;
23-
import static org.mockito.Matchers.any;
24-
import static org.mockito.Matchers.isA;
25-
import static org.mockito.Mockito.inOrder;
26-
import static org.mockito.Mockito.mock;
27-
import static org.mockito.Mockito.never;
28-
import static org.mockito.Mockito.times;
29-
import static org.mockito.Mockito.verify;
30-
31-
import java.util.ArrayList;
32-
import java.util.Arrays;
33-
import java.util.Iterator;
34-
import java.util.List;
35-
import java.util.Map;
36-
import java.util.concurrent.BrokenBarrierException;
37-
import java.util.concurrent.Callable;
38-
import java.util.concurrent.ConcurrentHashMap;
39-
import java.util.concurrent.CountDownLatch;
40-
import java.util.concurrent.CyclicBarrier;
41-
import java.util.concurrent.ExecutionException;
42-
import java.util.concurrent.ExecutorService;
43-
import java.util.concurrent.Executors;
44-
import java.util.concurrent.Future;
45-
import java.util.concurrent.TimeUnit;
46-
import java.util.concurrent.atomic.AtomicBoolean;
47-
import java.util.concurrent.atomic.AtomicInteger;
48-
import java.util.concurrent.atomic.AtomicReference;
19+
import static org.junit.Assert.*;
20+
import static org.mockito.Matchers.*;
21+
import static org.mockito.Mockito.*;
22+
23+
import java.util.*;
24+
import java.util.concurrent.*;
25+
import java.util.concurrent.atomic.*;
4926

5027
import org.junit.Test;
51-
import org.mockito.InOrder;
52-
import org.mockito.Matchers;
53-
import org.mockito.Mockito;
28+
import org.mockito.*;
5429

30+
import rx.*;
5531
import rx.Observable;
56-
import rx.Observable.OnSubscribe;
57-
import rx.Observable.Operator;
32+
import rx.Observable.*;
5833
import rx.Observer;
59-
import rx.Producer;
60-
import rx.Subscriber;
6134
import rx.exceptions.TestException;
62-
import rx.functions.Action0;
63-
import rx.functions.Action1;
64-
import rx.functions.Action2;
65-
import rx.functions.Func0;
66-
import rx.functions.Func2;
35+
import rx.functions.*;
6736
import rx.observers.TestSubscriber;
68-
import rx.schedulers.Schedulers;
69-
import rx.schedulers.TestScheduler;
37+
import rx.schedulers.*;
7038

7139
/**
7240
* Test if SyncOnSubscribe adheres to the usual unsubscription and backpressure contracts.
@@ -489,6 +457,16 @@ public Integer call(Integer state, Observer<? super Integer> observer) {
489457
verify(onUnSubscribe, times(1)).call(any(Integer.class));
490458
}
491459

460+
@Test
461+
public void testConcurrentRequestsLoop() throws InterruptedException {
462+
for (int i = 0; i < 100; i++) {
463+
if (i % 10 == 0) {
464+
System.out.println("testConcurrentRequestsLoop >> " + i);
465+
}
466+
testConcurrentRequests();
467+
}
468+
}
469+
492470
@Test
493471
public void testConcurrentRequests() throws InterruptedException {
494472
final int count1 = 1000;
@@ -514,12 +492,20 @@ public Integer call(Integer state, Observer<? super Integer> observer) {
514492
l2.countDown();
515493
// wait until the 2nd request returns then proceed
516494
try {
517-
if (!l1.await(1, TimeUnit.SECONDS))
518-
throw new IllegalStateException();
519-
} catch (InterruptedException e) {}
495+
if (!l1.await(2, TimeUnit.SECONDS)) {
496+
observer.onError(new TimeoutException());
497+
return state + 1;
498+
}
499+
} catch (InterruptedException e) {
500+
observer.onError(e);
501+
return state + 1;
502+
}
520503
observer.onNext(state);
521-
if (state == finalCount)
504+
505+
if (state == finalCount) {
522506
observer.onCompleted();
507+
}
508+
523509
return state + 1;
524510
}},
525511
onUnSubscribe);
@@ -532,10 +518,9 @@ public Integer call(Integer state, Observer<? super Integer> observer) {
532518
Observable.create(os).subscribeOn(Schedulers.newThread()).subscribe(ts);
533519

534520
// wait until the first request has started processing
535-
try {
536-
if (!l2.await(1, TimeUnit.SECONDS))
537-
throw new IllegalStateException();
538-
} catch (InterruptedException e) {}
521+
if (!l2.await(2, TimeUnit.SECONDS)) {
522+
fail("SyncOnSubscribe failed to countDown in time");
523+
}
539524
// make a concurrent request, this should return
540525
ts.requestMore(count2);
541526
// unblock the 1st thread to proceed fulfilling requests

0 commit comments

Comments
 (0)