Skip to content

Commit 782909f

Browse files
committed
Fixed testOnErrorViaHasNext in issue #383
1 parent 8e44bcb commit 782909f

File tree

1 file changed

+8
-1
lines changed

1 file changed

+8
-1
lines changed

rxjava-core/src/main/java/rx/operators/OperationNext.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public void remove() {
104104
private static class NextObserver<T> implements Observer<Notification<? extends T>> {
105105
private final BlockingQueue<Notification<? extends T>> buf = new ArrayBlockingQueue<Notification<? extends T>>(1);
106106
private final AtomicBoolean waiting = new AtomicBoolean(false);
107+
private volatile boolean completed = false;
107108

108109
@Override
109110
public void onCompleted() {
@@ -139,7 +140,11 @@ public void await() {
139140
public boolean isCompleted(boolean rethrowExceptionIfExists) {
140141
Notification<? extends T> lastItem = buf.peek();
141142
if (lastItem == null) {
142-
return false;
143+
// Fixed issue #383 testOnErrorViaHasNext fails sometimes.
144+
// If the buf is empty, there are two cases:
145+
// 1. The next item has not been emitted yet.
146+
// 2. The error or completed notification is removed in takeNext method.
147+
return completed;
143148
}
144149

145150
if (lastItem.isOnError()) {
@@ -157,10 +162,12 @@ public T takeNext() throws InterruptedException {
157162
Notification<? extends T> next = buf.take();
158163

159164
if (next.isOnError()) {
165+
completed = true;
160166
throw Exceptions.propagate(next.getThrowable());
161167
}
162168

163169
if (next.isOnCompleted()) {
170+
completed = true;
164171
throw new IllegalStateException("Observable is completed");
165172
}
166173

0 commit comments

Comments
 (0)