2323
2424import io .reactivex .exceptions .TestException ;
2525import io .reactivex .functions .Function ;
26- import io .reactivex .observers .TestObserver ;
26+ import io .reactivex .observers .* ;
2727import io .reactivex .plugins .RxJavaPlugins ;
2828import io .reactivex .schedulers .Schedulers ;
2929import io .reactivex .subscribers .TestSubscriber ;
3030
3131public class XFlatMapTest {
3232
3333 @ Rule
34- public Retry retry = new Retry (3 , 1000 , true );
34+ public Retry retry = new Retry (5 , 1000 , true );
3535
3636 static final int SLEEP_AFTER_CANCEL = 500 ;
3737
@@ -40,12 +40,23 @@ public class XFlatMapTest {
4040 void sleep () throws Exception {
4141 cb .await ();
4242 try {
43+ long before = System .currentTimeMillis ();
4344 Thread .sleep (5000 );
45+ throw new IllegalStateException ("Was not interrupted in time?! " + (System .currentTimeMillis () - before ));
4446 } catch (InterruptedException ex ) {
4547 // ignored here
4648 }
4749 }
4850
51+ void beforeCancelSleep (BaseTestConsumer <?, ?> ts ) throws Exception {
52+ long before = System .currentTimeMillis ();
53+ Thread .sleep (50 );
54+ if (System .currentTimeMillis () - before > 100 ) {
55+ ts .dispose ();
56+ throw new IllegalStateException ("Overslept?" + (System .currentTimeMillis () - before ));
57+ }
58+ }
59+
4960 @ Test
5061 public void flowableFlowable () throws Exception {
5162 List <Throwable > errors = TestHelper .trackPluginErrors ();
@@ -63,7 +74,7 @@ public Publisher<Integer> apply(Integer v) throws Exception {
6374
6475 cb .await ();
6576
66- Thread . sleep ( 50 );
77+ beforeCancelSleep ( ts );
6778
6879 ts .cancel ();
6980
@@ -94,7 +105,7 @@ public Single<Integer> apply(Integer v) throws Exception {
94105
95106 cb .await ();
96107
97- Thread . sleep ( 50 );
108+ beforeCancelSleep ( ts );
98109
99110 ts .cancel ();
100111
@@ -125,7 +136,7 @@ public Maybe<Integer> apply(Integer v) throws Exception {
125136
126137 cb .await ();
127138
128- Thread . sleep ( 50 );
139+ beforeCancelSleep ( ts );
129140
130141 ts .cancel ();
131142
@@ -156,7 +167,7 @@ public Completable apply(Integer v) throws Exception {
156167
157168 cb .await ();
158169
159- Thread . sleep ( 50 );
170+ beforeCancelSleep ( ts );
160171
161172 ts .cancel ();
162173
@@ -188,7 +199,7 @@ public Completable apply(Integer v) throws Exception {
188199
189200 cb .await ();
190201
191- Thread . sleep ( 50 );
202+ beforeCancelSleep ( ts );
192203
193204 ts .cancel ();
194205
@@ -219,7 +230,7 @@ public Observable<Integer> apply(Integer v) throws Exception {
219230
220231 cb .await ();
221232
222- Thread . sleep ( 50 );
233+ beforeCancelSleep ( ts );
223234
224235 ts .cancel ();
225236
@@ -250,7 +261,7 @@ public Single<Integer> apply(Integer v) throws Exception {
250261
251262 cb .await ();
252263
253- Thread . sleep ( 50 );
264+ beforeCancelSleep ( ts );
254265
255266 ts .cancel ();
256267
@@ -281,7 +292,7 @@ public Maybe<Integer> apply(Integer v) throws Exception {
281292
282293 cb .await ();
283294
284- Thread . sleep ( 50 );
295+ beforeCancelSleep ( ts );
285296
286297 ts .cancel ();
287298
@@ -312,7 +323,7 @@ public Completable apply(Integer v) throws Exception {
312323
313324 cb .await ();
314325
315- Thread . sleep ( 50 );
326+ beforeCancelSleep ( ts );
316327
317328 ts .cancel ();
318329
@@ -344,7 +355,7 @@ public Completable apply(Integer v) throws Exception {
344355
345356 cb .await ();
346357
347- Thread . sleep ( 50 );
358+ beforeCancelSleep ( ts );
348359
349360 ts .cancel ();
350361
@@ -375,7 +386,7 @@ public Single<Integer> apply(Integer v) throws Exception {
375386
376387 cb .await ();
377388
378- Thread . sleep ( 50 );
389+ beforeCancelSleep ( ts );
379390
380391 ts .cancel ();
381392
@@ -406,7 +417,7 @@ public Maybe<Integer> apply(Integer v) throws Exception {
406417
407418 cb .await ();
408419
409- Thread . sleep ( 50 );
420+ beforeCancelSleep ( ts );
410421
411422 ts .cancel ();
412423
@@ -437,7 +448,7 @@ public Completable apply(Integer v) throws Exception {
437448
438449 cb .await ();
439450
440- Thread . sleep ( 50 );
451+ beforeCancelSleep ( ts );
441452
442453 ts .cancel ();
443454
@@ -469,7 +480,7 @@ public Completable apply(Integer v) throws Exception {
469480
470481 cb .await ();
471482
472- Thread . sleep ( 50 );
483+ beforeCancelSleep ( ts );
473484
474485 ts .cancel ();
475486
@@ -500,7 +511,7 @@ public Single<Integer> apply(Integer v) throws Exception {
500511
501512 cb .await ();
502513
503- Thread . sleep ( 50 );
514+ beforeCancelSleep ( ts );
504515
505516 ts .cancel ();
506517
@@ -531,7 +542,7 @@ public Maybe<Integer> apply(Integer v) throws Exception {
531542
532543 cb .await ();
533544
534- Thread . sleep ( 50 );
545+ beforeCancelSleep ( ts );
535546
536547 ts .cancel ();
537548
@@ -562,7 +573,7 @@ public Completable apply(Integer v) throws Exception {
562573
563574 cb .await ();
564575
565- Thread . sleep ( 50 );
576+ beforeCancelSleep ( ts );
566577
567578 ts .cancel ();
568579
@@ -594,7 +605,7 @@ public Completable apply(Integer v) throws Exception {
594605
595606 cb .await ();
596607
597- Thread . sleep ( 50 );
608+ beforeCancelSleep ( ts );
598609
599610 ts .cancel ();
600611
0 commit comments