Skip to content

Commit 4233c2a

Browse files
blocking synchronous next
Fixes #624
1 parent 73b7518 commit 4233c2a

File tree

2 files changed

+28
-31
lines changed

2 files changed

+28
-31
lines changed

rxjava-core/src/main/java/rx/internal/operators/BlockingOperatorNext.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.NoSuchElementException;
2020
import java.util.concurrent.ArrayBlockingQueue;
2121
import java.util.concurrent.BlockingQueue;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2324

2425
import rx.Notification;
@@ -46,11 +47,7 @@ public static <T> Iterable<T> next(final Observable<? extends T> items) {
4647
@Override
4748
public Iterator<T> iterator() {
4849
NextObserver<T> nextObserver = new NextObserver<T>();
49-
final NextIterator<T> nextIterator = new NextIterator<T>(nextObserver);
50-
51-
items.materialize().subscribe(nextObserver);
52-
53-
return nextIterator;
50+
return new NextIterator<T>(items, nextObserver);
5451
}
5552
};
5653

@@ -59,28 +56,19 @@ public Iterator<T> iterator() {
5956
// test needs to access the observer.waiting flag non-blockingly.
6057
/* private */static final class NextIterator<T> implements Iterator<T> {
6158

62-
private final NextObserver<? extends T> observer;
59+
private final NextObserver<T> observer;
60+
private final Observable<? extends T> items;
6361
private T next;
6462
private boolean hasNext = true;
6563
private boolean isNextConsumed = true;
6664
private Throwable error = null;
65+
private boolean started = false;
6766

68-
private NextIterator(NextObserver<? extends T> observer) {
67+
private NextIterator(Observable<? extends T> items, NextObserver<T> observer) {
68+
this.items = items;
6969
this.observer = observer;
7070
}
7171

72-
73-
// in tests, set the waiting flag without blocking for the next value to
74-
// allow lockstepping instead of multi-threading
75-
/**
76-
* In tests, set the waiting flag without blocking for the next value to
77-
* allow lockstepping instead of multi-threading
78-
* @param value use 1 to enter into the waiting state
79-
*/
80-
void setWaiting(int value) {
81-
observer.setWaiting(value);
82-
}
83-
8472
@Override
8573
public boolean hasNext() {
8674
if (error != null) {
@@ -102,6 +90,13 @@ public boolean hasNext() {
10290

10391
private boolean moveToNext() {
10492
try {
93+
if (!started) {
94+
started = true;
95+
// if not started, start now
96+
observer.setWaiting(1);
97+
items.materialize().subscribe(observer);
98+
}
99+
105100
Notification<? extends T> nextNotification = observer.takeNext();
106101
if (nextNotification.isOnNext()) {
107102
isNextConsumed = false;

rxjava-core/src/test/java/rx/internal/operators/BlockingOperatorNextTest.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import rx.internal.operators.BlockingOperatorNext;
3838
import rx.observables.BlockingObservable;
3939
import rx.schedulers.Schedulers;
40+
import rx.subjects.BehaviorSubject;
4041
import rx.subjects.PublishSubject;
4142
import rx.subjects.Subject;
4243

@@ -295,26 +296,27 @@ public void run() {
295296

296297
@Test /* (timeout = 8000) */
297298
public void testSingleSourceManyIterators() throws InterruptedException {
298-
PublishSubject<Long> ps = PublishSubject.create();
299-
BlockingObservable<Long> source = ps.take(10).toBlocking();
299+
Observable<Long> o = Observable.interval(10, TimeUnit.MILLISECONDS);
300+
PublishSubject<Void> terminal = PublishSubject.create();
301+
BlockingObservable<Long> source = o.takeUntil(terminal).toBlocking();
300302

301303
Iterable<Long> iter = source.next();
302304

303305
for (int j = 0; j < 3; j++) {
304306
BlockingOperatorNext.NextIterator<Long> it = (BlockingOperatorNext.NextIterator<Long>)iter.iterator();
305307

306-
for (long i = 0; i < 9; i++) {
307-
// hasNext has to set the waiting to true, otherwise, all onNext will be skipped
308-
it.setWaiting(1);
309-
ps.onNext(i);
308+
for (long i = 0; i < 10; i++) {
310309
Assert.assertEquals(true, it.hasNext());
311-
Assert.assertEquals(j + "th iteration", Long.valueOf(i), it.next());
310+
Assert.assertEquals(j + "th iteration next", Long.valueOf(i), it.next());
312311
}
313-
it.setWaiting(1);
314-
ps.onNext(9L);
315-
316-
Assert.assertEquals(j + "th iteration", false, it.hasNext());
312+
terminal.onNext(null);
317313
}
318-
314+
}
315+
316+
@Test
317+
public void testSynchronousNext() {
318+
assertEquals(1, BehaviorSubject.create(1).take(1).toBlocking().single().intValue());
319+
assertEquals(2, BehaviorSubject.create(2).toBlocking().toIterable().iterator().next().intValue());
320+
assertEquals(3, BehaviorSubject.create(3).toBlocking().next().iterator().next().intValue());
319321
}
320322
}

0 commit comments

Comments
 (0)