@@ -71,13 +71,18 @@ private static class NextIterator<T> implements Iterator<T> {
7171 private T next ;
7272 private boolean hasNext = true ;
7373 private boolean isNextConsumed = true ;
74+ private Throwable error = null ;
7475
7576 private NextIterator (NextObserver <? extends T > observer ) {
7677 this .observer = observer ;
7778 }
7879
7980 @ Override
8081 public boolean hasNext () {
82+ if (error != null ) {
83+ // If any error has already been thrown, throw it again.
84+ throw Exceptions .propagate (error );
85+ }
8186 // Since an iterator should not be used in different thread,
8287 // so we do not need any synchronization.
8388 if (hasNext == false ) {
@@ -106,17 +111,23 @@ private boolean moveToNext() {
106111 return false ;
107112 }
108113 if (nextNotification .isOnError ()) {
109- throw Exceptions .propagate (nextNotification .getThrowable ());
114+ error = nextNotification .getThrowable ();
115+ throw Exceptions .propagate (error );
110116 }
111117 throw new IllegalStateException ("Should not reach here" );
112118 } catch (InterruptedException e ) {
113119 Thread .currentThread ().interrupt ();
114- throw Exceptions .propagate (e );
120+ error = e ;
121+ throw Exceptions .propagate (error );
115122 }
116123 }
117124
118125 @ Override
119126 public T next () {
127+ if (error != null ) {
128+ // If any error has already been thrown, throw it again.
129+ throw Exceptions .propagate (error );
130+ }
120131 if (hasNext ()) {
121132 isNextConsumed = true ;
122133 return next ;
@@ -246,17 +257,9 @@ public void testNextWithError() {
246257 fail ("Expected an TestException" );
247258 }
248259 catch (TestException e ) {
249- // successful
250260 }
251261
252- // After the observable fails, hasNext always returns false and next always throw a NoSuchElementException.
253- assertFalse (it .hasNext ());
254- try {
255- it .next ();
256- fail ("At the end of an iterator should throw a NoSuchElementException" );
257- }
258- catch (NoSuchElementException e ){
259- }
262+ assertErrorAfterObservableFail (it );
260263 }
261264
262265 @ Test
@@ -296,14 +299,7 @@ public void testOnError() throws Throwable {
296299 // successful
297300 }
298301
299- // After the observable fails, hasNext always returns false and next always throw a NoSuchElementException.
300- assertFalse (it .hasNext ());
301- try {
302- it .next ();
303- fail ("At the end of an iterator should throw a NoSuchElementException" );
304- }
305- catch (NoSuchElementException e ){
306- }
302+ assertErrorAfterObservableFail (it );
307303 }
308304
309305 @ Test
@@ -321,13 +317,22 @@ public void testOnErrorInNewThread() {
321317 // successful
322318 }
323319
324- // After the observable fails, hasNext always returns false and next always throw a NoSuchElementException.
325- assertFalse (it .hasNext ());
320+ assertErrorAfterObservableFail (it );
321+ }
322+
323+ private void assertErrorAfterObservableFail (Iterator <String > it ) {
324+ // After the observable fails, hasNext and next always throw the exception.
325+ try {
326+ it .hasNext ();
327+ fail ("hasNext should throw a TestException" );
328+ }
329+ catch (TestException e ){
330+ }
326331 try {
327332 it .next ();
328- fail ("At the end of an iterator should throw a NoSuchElementException " );
333+ fail ("next should throw a TestException " );
329334 }
330- catch (NoSuchElementException e ){
335+ catch (TestException e ){
331336 }
332337 }
333338
0 commit comments