17
17
18
18
import static org .junit .Assert .assertEquals ;
19
19
import static org .junit .Assert .assertFalse ;
20
- import static org .junit .Assert .assertNull ;
21
20
import static org .junit .Assert .assertTrue ;
22
21
import static org .junit .Assert .fail ;
23
22
24
23
import java .util .Iterator ;
24
+ import java .util .NoSuchElementException ;
25
25
import java .util .concurrent .ArrayBlockingQueue ;
26
26
import java .util .concurrent .BlockingQueue ;
27
27
import java .util .concurrent .CountDownLatch ;
@@ -70,6 +70,7 @@ private static class NextIterator<T> implements Iterator<T> {
70
70
private final NextObserver <? extends T > observer ;
71
71
private T next ;
72
72
private boolean hasNext = true ;
73
+ private boolean isNextConsumed = true ;
73
74
74
75
private NextIterator (NextObserver <? extends T > observer ) {
75
76
this .observer = observer ;
@@ -80,23 +81,34 @@ public boolean hasNext() {
80
81
// Since an iterator should not be used in different thread,
81
82
// so we do not need any synchronization.
82
83
if (hasNext == false ) {
84
+ // the iterator has reached the end.
83
85
return false ;
84
86
}
87
+ if (isNextConsumed == false ) {
88
+ // next has not been used yet.
89
+ return true ;
90
+ }
91
+ return moveToNext ();
92
+ }
93
+
94
+ private boolean moveToNext () {
85
95
try {
86
96
Notification <? extends T > nextNotification = observer .takeNext ();
87
97
if (nextNotification .isOnNext ()) {
98
+ isNextConsumed = false ;
88
99
next = nextNotification .getValue ();
89
100
return true ;
90
101
}
91
102
// If an observable is completed or fails,
92
- // next always return null and hasNext always return false.
93
- next = null ;
103
+ // hasNext() always return false.
94
104
hasNext = false ;
95
105
if (nextNotification .isOnCompleted ()) {
96
106
return false ;
97
107
}
98
- // onError
99
- throw Exceptions .propagate (nextNotification .getThrowable ());
108
+ if (nextNotification .isOnError ()) {
109
+ throw Exceptions .propagate (nextNotification .getThrowable ());
110
+ }
111
+ throw new IllegalStateException ("Should not reach here" );
100
112
} catch (InterruptedException e ) {
101
113
Thread .currentThread ().interrupt ();
102
114
throw Exceptions .propagate (e );
@@ -105,7 +117,13 @@ public boolean hasNext() {
105
117
106
118
@ Override
107
119
public T next () {
108
- return next ;
120
+ if (hasNext ()) {
121
+ isNextConsumed = true ;
122
+ return next ;
123
+ }
124
+ else {
125
+ throw new NoSuchElementException ("No more elements" );
126
+ }
109
127
}
110
128
111
129
@ Override
@@ -197,11 +215,21 @@ public void testNext() {
197
215
198
216
obs .onCompleted ();
199
217
assertFalse (it .hasNext ());
200
- assertNull (it .next ());
218
+ try {
219
+ it .next ();
220
+ fail ("At the end of an iterator should throw a NoSuchElementException" );
221
+ }
222
+ catch (NoSuchElementException e ){
223
+ }
201
224
202
- // If the observable is completed, hasNext always returns false and next always returns null .
225
+ // If the observable is completed, hasNext always returns false and next always throw a NoSuchElementException .
203
226
assertFalse (it .hasNext ());
204
- assertNull (it .next ());
227
+ try {
228
+ it .next ();
229
+ fail ("At the end of an iterator should throw a NoSuchElementException" );
230
+ }
231
+ catch (NoSuchElementException e ){
232
+ }
205
233
}
206
234
207
235
@ Test
@@ -221,9 +249,14 @@ public void testNextWithError() {
221
249
// successful
222
250
}
223
251
224
- // After the observable fails, hasNext always returns false and next always returns null .
252
+ // After the observable fails, hasNext always returns false and next always throw a NoSuchElementException .
225
253
assertFalse (it .hasNext ());
226
- assertNull (it .next ());
254
+ try {
255
+ it .next ();
256
+ fail ("At the end of an iterator should throw a NoSuchElementException" );
257
+ }
258
+ catch (NoSuchElementException e ){
259
+ }
227
260
}
228
261
229
262
@ Test
@@ -232,11 +265,21 @@ public void testNextWithEmpty() {
232
265
Iterator <String > it = next (obs ).iterator ();
233
266
234
267
assertFalse (it .hasNext ());
235
- assertNull (it .next ());
236
-
237
- // If the observable is completed, hasNext always returns false and next always returns null.
268
+ try {
269
+ it .next ();
270
+ fail ("At the end of an iterator should throw a NoSuchElementException" );
271
+ }
272
+ catch (NoSuchElementException e ){
273
+ }
274
+
275
+ // If the observable is completed, hasNext always returns false and next always throw a NoSuchElementException.
238
276
assertFalse (it .hasNext ());
239
- assertNull (it .next ());
277
+ try {
278
+ it .next ();
279
+ fail ("At the end of an iterator should throw a NoSuchElementException" );
280
+ }
281
+ catch (NoSuchElementException e ){
282
+ }
240
283
}
241
284
242
285
@ Test
@@ -253,9 +296,14 @@ public void testOnError() throws Throwable {
253
296
// successful
254
297
}
255
298
256
- // After the observable fails, hasNext always returns false and next always returns null .
299
+ // After the observable fails, hasNext always returns false and next always throw a NoSuchElementException .
257
300
assertFalse (it .hasNext ());
258
- assertNull (it .next ());
301
+ try {
302
+ it .next ();
303
+ fail ("At the end of an iterator should throw a NoSuchElementException" );
304
+ }
305
+ catch (NoSuchElementException e ){
306
+ }
259
307
}
260
308
261
309
@ Test
@@ -273,9 +321,53 @@ public void testOnErrorInNewThread() {
273
321
// successful
274
322
}
275
323
276
- // After the observable fails, hasNext always returns false and next always returns null .
324
+ // After the observable fails, hasNext always returns false and next always throw a NoSuchElementException .
277
325
assertFalse (it .hasNext ());
278
- assertNull (it .next ());
326
+ try {
327
+ it .next ();
328
+ fail ("At the end of an iterator should throw a NoSuchElementException" );
329
+ }
330
+ catch (NoSuchElementException e ){
331
+ }
332
+ }
333
+
334
+ @ Test
335
+ public void testNextWithOnlyUsingNextMethod () {
336
+ Subject <String , String > obs = PublishSubject .create ();
337
+ Iterator <String > it = next (obs ).iterator ();
338
+ fireOnNextInNewThread (obs , "one" );
339
+ assertEquals ("one" , it .next ());
340
+
341
+ fireOnNextInNewThread (obs , "two" );
342
+ assertEquals ("two" , it .next ());
343
+
344
+ obs .onCompleted ();
345
+ try {
346
+ it .next ();
347
+ fail ("At the end of an iterator should throw a NoSuchElementException" );
348
+ }
349
+ catch (NoSuchElementException e ){
350
+ }
351
+ }
352
+
353
+ @ Test
354
+ public void testNextWithCallingHasNextMultipleTimes () {
355
+ Subject <String , String > obs = PublishSubject .create ();
356
+ Iterator <String > it = next (obs ).iterator ();
357
+ fireOnNextInNewThread (obs , "one" );
358
+ assertTrue (it .hasNext ());
359
+ assertTrue (it .hasNext ());
360
+ assertTrue (it .hasNext ());
361
+ assertTrue (it .hasNext ());
362
+ assertEquals ("one" , it .next ());
363
+
364
+ obs .onCompleted ();
365
+ try {
366
+ it .next ();
367
+ fail ("At the end of an iterator should throw a NoSuchElementException" );
368
+ }
369
+ catch (NoSuchElementException e ){
370
+ }
279
371
}
280
372
281
373
@ SuppressWarnings ("serial" )
@@ -342,6 +434,8 @@ public void run() {
342
434
assertTrue ("expected that c [" + c + "] is higher than or equal to " + COUNT , c >= COUNT );
343
435
344
436
assertTrue (it .hasNext ());
437
+ int d = it .next ();
438
+ assertTrue (d > c );
345
439
346
440
// shut down the thread
347
441
running .set (false );
0 commit comments