Skip to content

Commit 64d2d15

Browse files
authored
2.x: fix CI load sensitive BlockingNextTests retry with backoff (#5088)
2.x: fix CI load sensitive BlockingNextTests, XFlatMapTest
1 parent 0bf1ec0 commit 64d2d15

File tree

3 files changed

+123
-94
lines changed

3 files changed

+123
-94
lines changed

src/test/java/io/reactivex/XFlatMapTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class XFlatMapTest {
3737
void sleep() throws Exception {
3838
cb.await();
3939
try {
40-
Thread.sleep(1000);
40+
Thread.sleep(5000);
4141
} catch (InterruptedException ex) {
4242
// ignored here
4343
}

src/test/java/io/reactivex/internal/operators/flowable/BlockingFlowableNextTest.java

Lines changed: 61 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.reactivestreams.*;
2424

2525
import io.reactivex.*;
26+
import io.reactivex.disposables.SerialDisposable;
2627
import io.reactivex.exceptions.TestException;
2728
import io.reactivex.internal.operators.flowable.BlockingFlowableNext.NextSubscriber;
2829
import io.reactivex.internal.subscriptions.BooleanSubscription;
@@ -228,67 +229,81 @@ public void testNextWithCallingHasNextMultipleTimes() {
228229
*/
229230
@Test
230231
public void testNoBufferingOrBlockingOfSequence() throws Throwable {
231-
final CountDownLatch finished = new CountDownLatch(1);
232-
final int COUNT = 30;
233-
final CountDownLatch timeHasPassed = new CountDownLatch(COUNT);
234-
final AtomicBoolean running = new AtomicBoolean(true);
235-
final AtomicInteger count = new AtomicInteger(0);
236-
final Flowable<Integer> obs = Flowable.unsafeCreate(new Publisher<Integer>() {
237-
238-
@Override
239-
public void subscribe(final Subscriber<? super Integer> o) {
240-
o.onSubscribe(new BooleanSubscription());
241-
new Thread(new Runnable() {
232+
int repeat = 0;
233+
for (;;) {
234+
final SerialDisposable task = new SerialDisposable();
235+
try {
236+
final CountDownLatch finished = new CountDownLatch(1);
237+
final int COUNT = 30;
238+
final CountDownLatch timeHasPassed = new CountDownLatch(COUNT);
239+
final AtomicBoolean running = new AtomicBoolean(true);
240+
final AtomicInteger count = new AtomicInteger(0);
241+
final Flowable<Integer> obs = Flowable.unsafeCreate(new Publisher<Integer>() {
242242

243243
@Override
244-
public void run() {
245-
try {
246-
while (running.get()) {
247-
o.onNext(count.incrementAndGet());
248-
timeHasPassed.countDown();
244+
public void subscribe(final Subscriber<? super Integer> o) {
245+
o.onSubscribe(new BooleanSubscription());
246+
task.replace(Schedulers.single().scheduleDirect(new Runnable() {
247+
248+
@Override
249+
public void run() {
250+
try {
251+
while (running.get() && !task.isDisposed()) {
252+
o.onNext(count.incrementAndGet());
253+
timeHasPassed.countDown();
254+
}
255+
o.onComplete();
256+
} catch (Throwable e) {
257+
o.onError(e);
258+
} finally {
259+
finished.countDown();
260+
}
249261
}
250-
o.onComplete();
251-
} catch (Throwable e) {
252-
o.onError(e);
253-
} finally {
254-
finished.countDown();
255-
}
262+
}));
256263
}
257-
}).start();
258-
}
259264

260-
});
265+
});
261266

262-
Iterator<Integer> it = obs.blockingNext().iterator();
267+
Iterator<Integer> it = obs.blockingNext().iterator();
263268

264-
assertTrue(it.hasNext());
265-
int a = it.next();
266-
assertTrue(it.hasNext());
267-
int b = it.next();
268-
// we should have a different value
269-
assertTrue("a and b should be different", a != b);
269+
assertTrue(it.hasNext());
270+
int a = it.next();
271+
assertTrue(it.hasNext());
272+
int b = it.next();
273+
// we should have a different value
274+
assertTrue("a and b should be different", a != b);
270275

271-
// wait for some time (if times out we are blocked somewhere so fail ... set very high for very slow, constrained machines)
272-
timeHasPassed.await(8000, TimeUnit.MILLISECONDS);
276+
// wait for some time (if times out we are blocked somewhere so fail ... set very high for very slow, constrained machines)
277+
timeHasPassed.await(8000, TimeUnit.MILLISECONDS);
273278

274-
assertTrue(it.hasNext());
275-
int c = it.next();
279+
assertTrue(it.hasNext());
280+
int c = it.next();
276281

277-
assertTrue("c should not just be the next in sequence", c != (b + 1));
278-
assertTrue("expected that c [" + c + "] is higher than or equal to " + COUNT, c >= COUNT);
282+
assertTrue("c should not just be the next in sequence", c != (b + 1));
283+
assertTrue("expected that c [" + c + "] is higher than or equal to " + COUNT, c >= COUNT);
279284

280-
assertTrue(it.hasNext());
281-
int d = it.next();
282-
assertTrue(d > c);
285+
assertTrue(it.hasNext());
286+
int d = it.next();
287+
assertTrue(d > c);
283288

284-
// shut down the thread
285-
running.set(false);
289+
// shut down the thread
290+
running.set(false);
286291

287-
finished.await();
292+
finished.await();
288293

289-
assertFalse(it.hasNext());
294+
assertFalse(it.hasNext());
290295

291-
System.out.println("a: " + a + " b: " + b + " c: " + c);
296+
System.out.println("a: " + a + " b: " + b + " c: " + c);
297+
break;
298+
} catch (AssertionError ex) {
299+
if (++repeat == 3) {
300+
throw ex;
301+
}
302+
Thread.sleep((int)(1000 * Math.pow(2, repeat - 1)));
303+
} finally {
304+
task.dispose();
305+
}
306+
}
292307
}
293308

294309
@Test /* (timeout = 8000) */

src/test/java/io/reactivex/internal/operators/observable/BlockingObservableNextTest.java

Lines changed: 61 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import io.reactivex.*;
2525
import io.reactivex.Observable;
2626
import io.reactivex.Observer;
27-
import io.reactivex.disposables.Disposables;
27+
import io.reactivex.disposables.*;
2828
import io.reactivex.exceptions.TestException;
2929
import io.reactivex.internal.operators.observable.BlockingObservableNext.NextObserver;
3030
import io.reactivex.plugins.RxJavaPlugins;
@@ -234,67 +234,81 @@ public void testNextWithCallingHasNextMultipleTimes() {
234234
*/
235235
@Test
236236
public void testNoBufferingOrBlockingOfSequence() throws Throwable {
237-
final CountDownLatch finished = new CountDownLatch(1);
238-
final int COUNT = 30;
239-
final CountDownLatch timeHasPassed = new CountDownLatch(COUNT);
240-
final AtomicBoolean running = new AtomicBoolean(true);
241-
final AtomicInteger count = new AtomicInteger(0);
242-
final Observable<Integer> obs = Observable.unsafeCreate(new ObservableSource<Integer>() {
243-
244-
@Override
245-
public void subscribe(final Observer<? super Integer> o) {
246-
o.onSubscribe(Disposables.empty());
247-
new Thread(new Runnable() {
237+
int repeat = 0;
238+
for (;;) {
239+
final SerialDisposable task = new SerialDisposable();
240+
try {
241+
final CountDownLatch finished = new CountDownLatch(1);
242+
final int COUNT = 30;
243+
final CountDownLatch timeHasPassed = new CountDownLatch(COUNT);
244+
final AtomicBoolean running = new AtomicBoolean(true);
245+
final AtomicInteger count = new AtomicInteger(0);
246+
final Observable<Integer> obs = Observable.unsafeCreate(new ObservableSource<Integer>() {
248247

249248
@Override
250-
public void run() {
251-
try {
252-
while (running.get()) {
253-
o.onNext(count.incrementAndGet());
254-
timeHasPassed.countDown();
249+
public void subscribe(final Observer<? super Integer> o) {
250+
o.onSubscribe(Disposables.empty());
251+
task.replace(Schedulers.single().scheduleDirect(new Runnable() {
252+
253+
@Override
254+
public void run() {
255+
try {
256+
while (running.get() && !task.isDisposed()) {
257+
o.onNext(count.incrementAndGet());
258+
timeHasPassed.countDown();
259+
}
260+
o.onComplete();
261+
} catch (Throwable e) {
262+
o.onError(e);
263+
} finally {
264+
finished.countDown();
265+
}
255266
}
256-
o.onComplete();
257-
} catch (Throwable e) {
258-
o.onError(e);
259-
} finally {
260-
finished.countDown();
261-
}
267+
}));
262268
}
263-
}).start();
264-
}
265269

266-
});
270+
});
267271

268-
Iterator<Integer> it = next(obs).iterator();
272+
Iterator<Integer> it = next(obs).iterator();
269273

270-
assertTrue(it.hasNext());
271-
int a = it.next();
272-
assertTrue(it.hasNext());
273-
int b = it.next();
274-
// we should have a different value
275-
assertTrue("a and b should be different", a != b);
274+
assertTrue(it.hasNext());
275+
int a = it.next();
276+
assertTrue(it.hasNext());
277+
int b = it.next();
278+
// we should have a different value
279+
assertTrue("a and b should be different", a != b);
276280

277-
// wait for some time (if times out we are blocked somewhere so fail ... set very high for very slow, constrained machines)
278-
timeHasPassed.await(8000, TimeUnit.MILLISECONDS);
281+
// wait for some time (if times out we are blocked somewhere so fail ... set very high for very slow, constrained machines)
282+
timeHasPassed.await(8000, TimeUnit.MILLISECONDS);
279283

280-
assertTrue(it.hasNext());
281-
int c = it.next();
284+
assertTrue(it.hasNext());
285+
int c = it.next();
282286

283-
assertTrue("c should not just be the next in sequence", c != (b + 1));
284-
assertTrue("expected that c [" + c + "] is higher than or equal to " + COUNT, c >= COUNT);
287+
assertTrue("c should not just be the next in sequence", c != (b + 1));
288+
assertTrue("expected that c [" + c + "] is higher than or equal to " + COUNT, c >= COUNT);
285289

286-
assertTrue(it.hasNext());
287-
int d = it.next();
288-
assertTrue(d > c);
290+
assertTrue(it.hasNext());
291+
int d = it.next();
292+
assertTrue(d > c);
289293

290-
// shut down the thread
291-
running.set(false);
294+
// shut down the thread
295+
running.set(false);
292296

293-
finished.await();
297+
finished.await();
294298

295-
assertFalse(it.hasNext());
299+
assertFalse(it.hasNext());
296300

297-
System.out.println("a: " + a + " b: " + b + " c: " + c);
301+
System.out.println("a: " + a + " b: " + b + " c: " + c);
302+
break;
303+
} catch (AssertionError ex) {
304+
if (++repeat == 3) {
305+
throw ex;
306+
}
307+
Thread.sleep((int)(1000 * Math.pow(2, repeat - 1)));
308+
} finally {
309+
task.dispose();
310+
}
311+
}
298312
}
299313

300314
@Test /* (timeout = 8000) */

0 commit comments

Comments
 (0)