Skip to content

Commit 0202e63

Browse files
committed
Fixed testSingleSourceManyIterators
1 parent 1573df9 commit 0202e63

File tree

2 files changed

+19
-9
lines changed

2 files changed

+19
-9
lines changed

rxjava-core/src/main/java/rx/operators/OperationNext.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public Iterator<T> iterator() {
4848

4949
}
5050

51-
private static class NextIterator<T> implements Iterator<T> {
51+
// test needs to access the observer.waiting flag non-blockingly.
52+
/* private */static final class NextIterator<T> implements Iterator<T> {
5253

5354
private final NextObserver<? extends T> observer;
5455
private T next;
@@ -60,6 +61,12 @@ private NextIterator(NextObserver<? extends T> observer) {
6061
this.observer = observer;
6162
}
6263

64+
// in tests, set the waiting flag without blocking for the next value to
65+
// allow lockstepping instead of multi-threading
66+
void setWaiting(boolean value) {
67+
observer.waiting.set(value);
68+
}
69+
6370
@Override
6471
public boolean hasNext() {
6572
if (error != null) {

rxjava-core/src/test/java/rx/operators/OperationNextTest.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -296,24 +296,27 @@ public void run() {
296296
System.out.println("a: " + a + " b: " + b + " c: " + c);
297297
}
298298

299-
@Test(timeout = 8000)
299+
@Test /* (timeout = 8000) */
300300
public void testSingleSourceManyIterators() throws InterruptedException {
301-
BlockingObservable<Long> source = Observable.interval(200, TimeUnit.MILLISECONDS).take(10).toBlockingObservable();
301+
PublishSubject<Long> ps = PublishSubject.create();
302+
BlockingObservable<Long> source = ps.take(10).toBlockingObservable();
302303

303304
Iterable<Long> iter = source.next();
304305

305306
for (int j = 0; j < 3; j++) {
306-
Iterator<Long> it = iter.iterator();
307+
OperationNext.NextIterator<Long> it = (OperationNext.NextIterator<Long>)iter.iterator();
307308

308-
for (int i = 0; i < 9; i++) {
309+
for (long i = 0; i < 9; i++) {
309310
// hasNext has to set the waiting to true, otherwise, all onNext will be skipped
311+
it.setWaiting(true);
312+
ps.onNext(i);
310313
Assert.assertEquals(true, it.hasNext());
311-
Assert.assertEquals(Long.valueOf(i), it.next());
314+
Assert.assertEquals(j + "th iteration", Long.valueOf(i), it.next());
312315
}
316+
it.setWaiting(true);
317+
ps.onNext(9L);
313318

314-
Thread.sleep(400);
315-
316-
Assert.assertEquals(false, it.hasNext());
319+
Assert.assertEquals(j + "th iteration", false, it.hasNext());
317320
}
318321

319322
}

0 commit comments

Comments
 (0)