Skip to content

Commit c4e3a6c

Browse files
committed
Blocked 'hasNext' instead of 'next' until any notification arrives
1 parent f67efa7 commit c4e3a6c

File tree

1 file changed

+120
-109
lines changed

1 file changed

+120
-109
lines changed

rxjava-core/src/main/java/rx/operators/OperationNext.java

Lines changed: 120 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,16 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.junit.Assert.*;
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertFalse;
20+
import static org.junit.Assert.assertNull;
21+
import static org.junit.Assert.assertTrue;
22+
import static org.junit.Assert.fail;
1923

2024
import java.util.Iterator;
2125
import java.util.concurrent.ArrayBlockingQueue;
2226
import java.util.concurrent.BlockingQueue;
23-
import java.util.concurrent.Callable;
2427
import java.util.concurrent.CountDownLatch;
25-
import java.util.concurrent.ExecutionException;
26-
import java.util.concurrent.ExecutorService;
27-
import java.util.concurrent.Executors;
28-
import java.util.concurrent.Future;
2928
import java.util.concurrent.TimeUnit;
3029
import java.util.concurrent.atomic.AtomicBoolean;
3130
import java.util.concurrent.atomic.AtomicInteger;
@@ -37,6 +36,7 @@
3736
import rx.Observable.OnSubscribeFunc;
3837
import rx.Observer;
3938
import rx.Subscription;
39+
import rx.concurrency.Schedulers;
4040
import rx.subjects.PublishSubject;
4141
import rx.subjects.Subject;
4242
import rx.subscriptions.Subscriptions;
@@ -68,31 +68,44 @@ public Iterator<T> iterator() {
6868
private static class NextIterator<T> implements Iterator<T> {
6969

7070
private final NextObserver<? extends T> observer;
71+
private T next;
72+
private boolean hasNext = true;
7173

7274
private NextIterator(NextObserver<? extends T> observer) {
7375
this.observer = observer;
7476
}
7577

7678
@Override
7779
public boolean hasNext() {
78-
return !observer.isCompleted(false);
79-
}
80-
81-
@Override
82-
public T next() {
83-
if (observer.isCompleted(true)) {
84-
throw new IllegalStateException("Observable is completed");
80+
// Since an iterator should not be used in different thread,
81+
// so we do not need any synchronization.
82+
if(hasNext == false) {
83+
return false;
8584
}
86-
87-
observer.await();
88-
8985
try {
90-
return observer.takeNext();
86+
Notification<? extends T> nextNotification = observer.takeNext();
87+
if(nextNotification.isOnNext()) {
88+
next = nextNotification.getValue();
89+
return true;
90+
}
91+
// If an observable is completed or fails,
92+
// next always return null and hasNext always return false.
93+
next = null;
94+
hasNext = false;
95+
if(nextNotification.isOnCompleted()) {
96+
return false;
97+
}
98+
// onError
99+
throw Exceptions.propagate(nextNotification.getThrowable());
91100
} catch (InterruptedException e) {
92101
Thread.currentThread().interrupt();
93102
throw Exceptions.propagate(e);
94103
}
104+
}
95105

106+
@Override
107+
public T next() {
108+
return next;
96109
}
97110

98111
@Override
@@ -104,7 +117,6 @@ public void remove() {
104117
private static class NextObserver<T> implements Observer<Notification<? extends T>> {
105118
private final BlockingQueue<Notification<? extends T>> buf = new ArrayBlockingQueue<Notification<? extends T>>(1);
106119
private final AtomicBoolean waiting = new AtomicBoolean(false);
107-
private volatile boolean completed = false;
108120

109121
@Override
110122
public void onCompleted() {
@@ -125,146 +137,145 @@ public void onNext(Notification<? extends T> args) {
125137
Notification<? extends T> concurrentItem = buf.poll();
126138

127139
// in case if we won race condition with onComplete/onError method
128-
if (!concurrentItem.isOnNext()) {
140+
if (concurrentItem != null && !concurrentItem.isOnNext()) {
129141
toOffer = concurrentItem;
130142
}
131143
}
132144
}
133145

134146
}
135147

136-
public void await() {
148+
public Notification<? extends T> takeNext() throws InterruptedException {
137149
waiting.set(true);
150+
return buf.take();
138151
}
139152

140-
public boolean isCompleted(boolean rethrowExceptionIfExists) {
141-
Notification<? extends T> lastItem = buf.peek();
142-
if (lastItem == null) {
143-
// Fixed issue #383 testOnErrorViaHasNext fails sometimes.
144-
// If the buf is empty, there are two cases:
145-
// 1. The next item has not been emitted yet.
146-
// 2. The error or completed notification is removed in takeNext method.
147-
return completed;
148-
}
153+
}
149154

150-
if (lastItem.isOnError()) {
151-
if (rethrowExceptionIfExists) {
152-
throw Exceptions.propagate(lastItem.getThrowable());
153-
} else {
154-
return true;
155-
}
156-
}
155+
public static class UnitTest {
157156

158-
return lastItem.isOnCompleted();
157+
private void fireOnNextInNewThread(final Subject<String, String> o, final String value) {
158+
new Thread() {
159+
@Override
160+
public void run() {
161+
try {
162+
Thread.sleep(500);
163+
} catch (InterruptedException e) {
164+
// ignore
165+
}
166+
o.onNext(value);
167+
}
168+
}.start();
159169
}
160170

161-
public T takeNext() throws InterruptedException {
162-
Notification<? extends T> next = buf.take();
163-
164-
if (next.isOnError()) {
165-
completed = true;
166-
throw Exceptions.propagate(next.getThrowable());
167-
}
168-
169-
if (next.isOnCompleted()) {
170-
completed = true;
171-
throw new IllegalStateException("Observable is completed");
172-
}
173-
174-
return next.getValue();
175-
171+
private void fireOnErrorInNewThread(final Subject<String, String> o) {
172+
new Thread() {
173+
@Override
174+
public void run() {
175+
try {
176+
Thread.sleep(500);
177+
} catch (InterruptedException e) {
178+
// ignore
179+
}
180+
o.onError(new TestException());
181+
}
182+
}.start();
176183
}
177184

178-
}
179-
180-
public static class UnitTest {
181-
private final ExecutorService executor = Executors.newSingleThreadExecutor();
182185

183186
@Test
184-
public void testNext() throws Throwable {
187+
public void testNext() {
185188
Subject<String, String> obs = PublishSubject.create();
186-
187189
Iterator<String> it = next(obs).iterator();
188-
189-
assertTrue(it.hasNext());
190-
191-
Future<String> next = nextAsync(it);
192-
Thread.sleep(100);
193-
obs.onNext("one");
194-
assertEquals("one", next.get());
195-
190+
fireOnNextInNewThread(obs, "one");
196191
assertTrue(it.hasNext());
192+
assertEquals("one", it.next());
197193

198-
next = nextAsync(it);
199-
Thread.sleep(100);
200-
obs.onNext("two");
201-
assertEquals("two", next.get());
202-
194+
fireOnNextInNewThread(obs, "two");
203195
assertTrue(it.hasNext());
196+
assertEquals("two", it.next());
204197

205198
obs.onCompleted();
199+
assertFalse(it.hasNext());
200+
assertNull(it.next());
206201

202+
// If the observable is completed, hasNext always returns false and next always returns null.
207203
assertFalse(it.hasNext());
204+
assertNull(it.next());
208205
}
209206

210-
@Test(expected = TestException.class)
211-
public void testOnError() throws Throwable {
207+
@Test
208+
public void testNextWithError() {
212209
Subject<String, String> obs = PublishSubject.create();
213-
214210
Iterator<String> it = next(obs).iterator();
215-
211+
fireOnNextInNewThread(obs, "one");
216212
assertTrue(it.hasNext());
213+
assertEquals("one", it.next());
217214

218-
Future<String> next = nextAsync(it);
219-
Thread.sleep(100);
220-
obs.onNext("one");
221-
assertEquals("one", next.get());
222-
223-
assertTrue(it.hasNext());
224-
225-
next = nextAsync(it);
226-
Thread.sleep(100);
227-
obs.onError(new TestException());
228-
215+
fireOnErrorInNewThread(obs);
229216
try {
230-
next.get();
231-
} catch (ExecutionException e) {
232-
throw e.getCause();
217+
it.hasNext();
218+
fail("Expected an TestException");
219+
}
220+
catch(TestException e) {
221+
// successful
233222
}
223+
224+
// After the observable fails, hasNext always returns false and next always returns null.
225+
assertFalse(it.hasNext());
226+
assertNull(it.next());
234227
}
235228

236229
@Test
237-
public void testOnErrorViaHasNext() throws InterruptedException, ExecutionException {
238-
Subject<String, String> obs = PublishSubject.create();
230+
public void testNextWithEmpty() {
231+
Observable<String> obs = Observable.<String>empty().observeOn(Schedulers.newThread());
232+
Iterator<String> it = next(obs).iterator();
239233

240-
Iterator<String> it = next(obs).iterator();
241-
242-
assertTrue(it.hasNext());
234+
assertFalse(it.hasNext());
235+
assertNull(it.next());
243236

244-
Future<String> next = nextAsync(it);
245-
Thread.sleep(100);
246-
obs.onNext("one");
247-
assertEquals("one", next.get());
237+
// If the observable is completed, hasNext always returns false and next always returns null.
238+
assertFalse(it.hasNext());
239+
assertNull(it.next());
240+
}
248241

249-
assertTrue(it.hasNext());
242+
@Test
243+
public void testOnError() throws Throwable {
244+
Subject<String, String> obs = PublishSubject.create();
245+
Iterator<String> it = next(obs).iterator();
250246

251-
next = nextAsync(it);
252-
Thread.sleep(100);
253247
obs.onError(new TestException());
248+
try {
249+
it.hasNext();
250+
fail("Expected an TestException");
251+
}
252+
catch(TestException e) {
253+
// successful
254+
}
254255

255-
// this should not throw an exception but instead just return false
256+
// After the observable fails, hasNext always returns false and next always returns null.
256257
assertFalse(it.hasNext());
258+
assertNull(it.next());
257259
}
258260

259-
private Future<String> nextAsync(final Iterator<String> it) {
261+
@Test
262+
public void testOnErrorInNewThread() {
263+
Subject<String, String> obs = PublishSubject.create();
264+
Iterator<String> it = next(obs).iterator();
260265

261-
return executor.submit(new Callable<String>() {
266+
fireOnErrorInNewThread(obs);
262267

263-
@Override
264-
public String call() throws Exception {
265-
return it.next();
266-
}
267-
});
268+
try {
269+
it.hasNext();
270+
fail("Expected an TestException");
271+
}
272+
catch(TestException e) {
273+
// successful
274+
}
275+
276+
// After the observable fails, hasNext always returns false and next always returns null.
277+
assertFalse(it.hasNext());
278+
assertNull(it.next());
268279
}
269280

270281
@SuppressWarnings("serial")

0 commit comments

Comments
 (0)