1818
1919import com .google .api .core .ApiFuture ;
2020import com .google .api .core .ApiFutures ;
21+ import com .google .api .core .ListenableFutureToApiFuture ;
2122import com .google .api .core .SettableApiFuture ;
2223import com .google .api .gax .core .ExecutorProvider ;
2324import com .google .cloud .spanner .AbstractReadContext .ListenableAsyncResultSet ;
2829import com .google .common .collect .ImmutableList ;
2930import com .google .common .util .concurrent .ListeningScheduledExecutorService ;
3031import com .google .common .util .concurrent .MoreExecutors ;
31- import com .google .spanner .v1 .PartialResultSet ;
3232import com .google .spanner .v1 .ResultSetMetadata ;
3333import com .google .spanner .v1 .ResultSetStats ;
3434import java .util .Collection ;
3535import java .util .LinkedList ;
3636import java .util .List ;
3737import java .util .concurrent .BlockingDeque ;
38+ import java .util .concurrent .Callable ;
3839import java .util .concurrent .CountDownLatch ;
3940import java .util .concurrent .ExecutionException ;
4041import java .util .concurrent .Executor ;
4445import java .util .logging .Logger ;
4546
4647/** Default implementation for {@link AsyncResultSet}. */
47- class AsyncResultSetImpl extends ForwardingStructReader
48- implements ListenableAsyncResultSet , AsyncResultSet .StreamMessageListener {
48+ class AsyncResultSetImpl extends ForwardingStructReader implements ListenableAsyncResultSet {
4949 private static final Logger log = Logger .getLogger (AsyncResultSetImpl .class .getName ());
5050
5151 /** State of an {@link AsyncResultSetImpl}. */
5252 private enum State {
5353 INITIALIZED ,
54- STREAMING_INITIALIZED ,
5554 /** SYNC indicates that the {@link ResultSet} is used in sync pattern. */
5655 SYNC ,
5756 CONSUMING ,
@@ -116,15 +115,12 @@ private enum State {
116115
117116 private State state = State .INITIALIZED ;
118117
119- /** This variable indicates that produce rows thread is initiated */
120- private volatile boolean produceRowsInitiated ;
121-
122118 /**
123119 * This variable indicates whether all the results from the underlying result set have been read.
124120 */
125121 private volatile boolean finished ;
126122
127- private volatile SettableApiFuture <Void > result ;
123+ private volatile ApiFuture <Void > result ;
128124
129125 /**
130126 * This variable indicates whether {@link #tryNext()} has returned {@link CursorState#DONE} or a
@@ -333,12 +329,12 @@ public void run() {
333329 private final CallbackRunnable callbackRunnable = new CallbackRunnable ();
334330
335331 /**
336- * {@link ProduceRowsRunnable } reads data from the underlying {@link ResultSet}, places these in
332+ * {@link ProduceRowsCallable } reads data from the underlying {@link ResultSet}, places these in
337333 * the buffer and dispatches the {@link CallbackRunnable} when data is ready to be consumed.
338334 */
339- private class ProduceRowsRunnable implements Runnable {
335+ private class ProduceRowsCallable implements Callable < Void > {
340336 @ Override
341- public void run () {
337+ public Void call () throws Exception {
342338 boolean stop = false ;
343339 boolean hasNext = false ;
344340 try {
@@ -397,17 +393,12 @@ public void run() {
397393 }
398394 // Call the callback if there are still rows in the buffer that need to be processed.
399395 while (!stop ) {
400- try {
401- waitIfPaused ();
402- startCallbackIfNecessary ();
403- // Make sure we wait until the callback runner has actually finished.
404- consumingLatch .await ();
405- synchronized (monitor ) {
406- stop = cursorReturnedDoneOrException ;
407- }
408- } catch (Throwable e ) {
409- result .setException (e );
410- return ;
396+ waitIfPaused ();
397+ startCallbackIfNecessary ();
398+ // Make sure we wait until the callback runner has actually finished.
399+ consumingLatch .await ();
400+ synchronized (monitor ) {
401+ stop = cursorReturnedDoneOrException ;
411402 }
412403 }
413404 } finally {
@@ -419,14 +410,14 @@ public void run() {
419410 }
420411 synchronized (monitor ) {
421412 if (executionException != null ) {
422- result .setException (executionException );
423- } else if (state == State .CANCELLED ) {
424- result .setException (CANCELLED_EXCEPTION );
425- } else {
426- result .set (null );
413+ throw executionException ;
414+ }
415+ if (state == State .CANCELLED ) {
416+ throw CANCELLED_EXCEPTION ;
427417 }
428418 }
429419 }
420+ return null ;
430421 }
431422
432423 private void waitIfPaused () throws InterruptedException {
@@ -458,26 +449,6 @@ private void startCallbackWithBufferLatchIfNecessary(int bufferLatch) {
458449 }
459450 }
460451
461- private class InitiateStreamingRunnable implements Runnable {
462-
463- @ Override
464- public void run () {
465- try {
466- // This method returns true if the underlying result set is a streaming result set (e.g. a
467- // GrpcResultSet).
468- // Those result sets will trigger initiateProduceRows() when the first results are received.
469- // Non-streaming result sets do not trigger this callback, and for those result sets, we
470- // need to eagerly start the ProduceRowsRunnable.
471- if (!initiateStreaming (AsyncResultSetImpl .this )) {
472- initiateProduceRows ();
473- }
474- } catch (Throwable exception ) {
475- executionException = SpannerExceptionFactory .asSpannerException (exception );
476- initiateProduceRows ();
477- }
478- }
479- }
480-
481452 /** Sets the callback for this {@link AsyncResultSet}. */
482453 @ Override
483454 public ApiFuture <Void > setCallback (Executor exec , ReadyCallback cb ) {
@@ -487,24 +458,16 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
487458 this .state == State .INITIALIZED , "callback may not be set multiple times" );
488459
489460 // Start to fetch data and buffer these.
490- this .result = SettableApiFuture .create ();
491- this .state = State .STREAMING_INITIALIZED ;
492- this .service .execute (new InitiateStreamingRunnable ());
461+ this .result =
462+ new ListenableFutureToApiFuture <>(this .service .submit (new ProduceRowsCallable ()));
493463 this .executor = MoreExecutors .newSequentialExecutor (Preconditions .checkNotNull (exec ));
494464 this .callback = Preconditions .checkNotNull (cb );
465+ this .state = State .RUNNING ;
495466 pausedLatch .countDown ();
496467 return result ;
497468 }
498469 }
499470
500- private void initiateProduceRows () {
501- if (this .state == State .STREAMING_INITIALIZED ) {
502- this .state = State .RUNNING ;
503- }
504- produceRowsInitiated = true ;
505- this .service .execute (new ProduceRowsRunnable ());
506- }
507-
508471 Future <Void > getResult () {
509472 return result ;
510473 }
@@ -615,10 +578,6 @@ public ResultSetMetadata getMetadata() {
615578 return delegateResultSet .get ().getMetadata ();
616579 }
617580
618- boolean initiateStreaming (StreamMessageListener streamMessageListener ) {
619- return StreamingUtil .initiateStreaming (delegateResultSet .get (), streamMessageListener );
620- }
621-
622581 @ Override
623582 protected void checkValidState () {
624583 synchronized (monitor ) {
@@ -634,22 +593,4 @@ public Struct getCurrentRowAsStruct() {
634593 checkValidState ();
635594 return currentRow ;
636595 }
637-
638- @ Override
639- public void onStreamMessage (PartialResultSet partialResultSet , boolean bufferIsFull ) {
640- synchronized (monitor ) {
641- if (produceRowsInitiated ) {
642- return ;
643- }
644- // if PartialResultSet contains a resume token or buffer size is full, or
645- // we have reached the end of the stream, we can start the thread.
646- boolean startJobThread =
647- !partialResultSet .getResumeToken ().isEmpty ()
648- || bufferIsFull
649- || partialResultSet == GrpcStreamIterator .END_OF_STREAM ;
650- if (startJobThread || state != State .STREAMING_INITIALIZED ) {
651- initiateProduceRows ();
652- }
653- }
654- }
655596}
0 commit comments