@@ -104,7 +104,7 @@ public void tryNextNotAllowed() {
104104 new AsyncResultSetImpl (
105105 mockedProvider , mock (ResultSet .class ), AsyncResultSetImpl .DEFAULT_BUFFER_SIZE )) {
106106 rs .setCallback (mock (Executor .class ), mock (ReadyCallback .class ));
107- IllegalStateException e = assertThrows (IllegalStateException .class , () -> rs . tryNext () );
107+ IllegalStateException e = assertThrows (IllegalStateException .class , rs :: tryNext );
108108 assertThat (e .getMessage ()).contains ("tryNext may only be called from a DataReady callback." );
109109 }
110110 }
@@ -152,7 +152,7 @@ public void toListAsync() throws InterruptedException, ExecutionException {
152152 }
153153
154154 @ Test
155- public void toListAsyncPropagatesError () throws InterruptedException {
155+ public void toListAsyncPropagatesError () {
156156 ExecutorService executor = Executors .newFixedThreadPool (1 );
157157 ResultSet delegate = mock (ResultSet .class );
158158 when (delegate .next ())
@@ -326,10 +326,7 @@ public void testCallbackIsNotCalledWhilePaused() throws InterruptedException, Ex
326326 @ Override
327327 public Boolean answer (InvocationOnMock invocation ) throws Throwable {
328328 row ++;
329- if (row > simulatedRows ) {
330- return false ;
331- }
332- return true ;
329+ return row <= simulatedRows ;
333330 }
334331 });
335332 when (delegate .getCurrentRowAsStruct ()).thenReturn (mock (Struct .class ));
@@ -345,17 +342,17 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
345342 assertFalse (paused .get ());
346343 callbackCounter .incrementAndGet ();
347344 try {
348- while ( true ) {
349- switch ( resultSet . tryNext ()) {
350- case OK :
351- paused . set ( true );
352- queue . put ( new Object ()) ;
353- return CallbackResponse . PAUSE ;
354- case DONE :
355- return CallbackResponse . DONE ;
356- case NOT_READY :
357- return CallbackResponse . CONTINUE ;
358- }
345+ switch ( resultSet . tryNext () ) {
346+ case OK :
347+ paused . set ( true );
348+ queue . put ( new Object () );
349+ return CallbackResponse . PAUSE ;
350+ case DONE :
351+ return CallbackResponse . DONE ;
352+ case NOT_READY :
353+ return CallbackResponse . CONTINUE ;
354+ default :
355+ throw new IllegalStateException ();
359356 }
360357 } catch (InterruptedException e ) {
361358 throw SpannerExceptionFactory .propagateInterrupt (e );
@@ -384,9 +381,8 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
384381 }
385382
386383 @ Test
387- public void testCallbackIsNotCalledWhilePausedAndCanceled ()
388- throws InterruptedException , ExecutionException {
389- Executor executor = Executors .newSingleThreadExecutor ();
384+ public void testCallbackIsNotCalledWhilePausedAndCanceled () {
385+ ExecutorService executor = Executors .newSingleThreadExecutor ();
390386 StreamingResultSet delegate = mock (StreamingResultSet .class );
391387
392388 final AtomicInteger callbackCounter = new AtomicInteger ();
@@ -414,6 +410,8 @@ public void testCallbackIsNotCalledWhilePausedAndCanceled()
414410 SpannerException exception = assertThrows (SpannerException .class , () -> get (callbackResult ));
415411 assertEquals (ErrorCode .CANCELLED , exception .getErrorCode ());
416412 assertEquals (1 , callbackCounter .get ());
413+ } finally {
414+ executor .shutdown ();
417415 }
418416 }
419417
0 commit comments