1818
1919import com .google .api .core .ApiFuture ;
2020import com .google .api .core .ApiFutures ;
21- import com .google .api .core .ListenableFutureToApiFuture ;
2221import com .google .api .core .SettableApiFuture ;
2322import com .google .api .gax .core .ExecutorProvider ;
2423import com .google .cloud .spanner .AbstractReadContext .ListenableAsyncResultSet ;
2928import com .google .common .collect .ImmutableList ;
3029import com .google .common .util .concurrent .ListeningScheduledExecutorService ;
3130import com .google .common .util .concurrent .MoreExecutors ;
31+ import com .google .spanner .v1 .PartialResultSet ;
3232import com .google .spanner .v1 .ResultSetMetadata ;
3333import com .google .spanner .v1 .ResultSetStats ;
3434import java .util .Collection ;
3535import java .util .LinkedList ;
3636import java .util .List ;
37- import java .util .concurrent .BlockingDeque ;
38- import java .util .concurrent .Callable ;
39- import java .util .concurrent .CountDownLatch ;
40- import java .util .concurrent .ExecutionException ;
41- import java .util .concurrent .Executor ;
42- import java .util .concurrent .Future ;
43- import java .util .concurrent .LinkedBlockingDeque ;
37+ import java .util .concurrent .*;
4438import java .util .logging .Level ;
4539import java .util .logging .Logger ;
4640
4741/** Default implementation for {@link AsyncResultSet}. */
48- class AsyncResultSetImpl extends ForwardingStructReader implements ListenableAsyncResultSet {
42+ class AsyncResultSetImpl extends ForwardingStructReader
43+ implements ListenableAsyncResultSet , AsyncResultSet .StreamMessageListener {
4944 private static final Logger log = Logger .getLogger (AsyncResultSetImpl .class .getName ());
5045
5146 /** State of an {@link AsyncResultSetImpl}. */
5247 private enum State {
5348 INITIALIZED ,
49+ IN_PROGRESS ,
5450 /** SYNC indicates that the {@link ResultSet} is used in sync pattern. */
5551 SYNC ,
5652 CONSUMING ,
@@ -115,12 +111,15 @@ private enum State {
115111
116112 private State state = State .INITIALIZED ;
117113
114+ /** This variable indicates that produce rows thread is initiated */
115+ private volatile boolean produceRowsInitiated ;
116+
118117 /**
119118 * This variable indicates whether all the results from the underlying result set have been read.
120119 */
121120 private volatile boolean finished ;
122121
123- private volatile ApiFuture <Void > result ;
122+ private volatile SettableApiFuture <Void > result ;
124123
125124 /**
126125 * This variable indicates whether {@link #tryNext()} has returned {@link CursorState#DONE} or a
@@ -329,12 +328,12 @@ public void run() {
329328 private final CallbackRunnable callbackRunnable = new CallbackRunnable ();
330329
331330 /**
332- * {@link ProduceRowsCallable } reads data from the underlying {@link ResultSet}, places these in
331+ * {@link ProduceRowsRunnable } reads data from the underlying {@link ResultSet}, places these in
333332 * the buffer and dispatches the {@link CallbackRunnable} when data is ready to be consumed.
334333 */
335- private class ProduceRowsCallable implements Callable < Void > {
334+ private class ProduceRowsRunnable implements Runnable {
336335 @ Override
337- public Void call () throws Exception {
336+ public void run () {
338337 boolean stop = false ;
339338 boolean hasNext = false ;
340339 try {
@@ -393,12 +392,17 @@ public Void call() throws Exception {
393392 }
394393 // Call the callback if there are still rows in the buffer that need to be processed.
395394 while (!stop ) {
396- waitIfPaused ();
397- startCallbackIfNecessary ();
398- // Make sure we wait until the callback runner has actually finished.
399- consumingLatch .await ();
400- synchronized (monitor ) {
401- stop = cursorReturnedDoneOrException ;
395+ try {
396+ waitIfPaused ();
397+ startCallbackIfNecessary ();
398+ // Make sure we wait until the callback runner has actually finished.
399+ consumingLatch .await ();
400+ synchronized (monitor ) {
401+ stop = cursorReturnedDoneOrException ;
402+ }
403+ } catch (InterruptedException e ) {
404+ result .setException (e );
405+ return ;
402406 }
403407 }
404408 } finally {
@@ -410,14 +414,16 @@ public Void call() throws Exception {
410414 }
411415 synchronized (monitor ) {
412416 if (executionException != null ) {
417+ result .setException (executionException );
413418 throw executionException ;
414419 }
415420 if (state == State .CANCELLED ) {
421+ result .setException (CANCELLED_EXCEPTION );
416422 throw CANCELLED_EXCEPTION ;
417423 }
418424 }
419425 }
420- return null ;
426+ result . set ( null ) ;
421427 }
422428
423429 private void waitIfPaused () throws InterruptedException {
@@ -449,6 +455,21 @@ private void startCallbackWithBufferLatchIfNecessary(int bufferLatch) {
449455 }
450456 }
451457
458+ private class InitiateStreamingRunnable implements Runnable {
459+
460+ @ Override
461+ public void run () {
462+ try {
463+ if (!initiateStreaming (AsyncResultSetImpl .this )) {
464+ initiateProduceRows ();
465+ }
466+ } catch (SpannerException e ) {
467+ executionException = e ;
468+ initiateProduceRows ();
469+ }
470+ }
471+ }
472+
452473 /** Sets the callback for this {@link AsyncResultSet}. */
453474 @ Override
454475 public ApiFuture <Void > setCallback (Executor exec , ReadyCallback cb ) {
@@ -458,16 +479,24 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
458479 this .state == State .INITIALIZED , "callback may not be set multiple times" );
459480
460481 // Start to fetch data and buffer these.
461- this .result =
462- new ListenableFutureToApiFuture <>(this .service .submit (new ProduceRowsCallable ()));
482+ this .result = SettableApiFuture .create ();
483+ this .state = State .IN_PROGRESS ;
484+ this .service .execute (new InitiateStreamingRunnable ());
463485 this .executor = MoreExecutors .newSequentialExecutor (Preconditions .checkNotNull (exec ));
464486 this .callback = Preconditions .checkNotNull (cb );
465- this .state = State .RUNNING ;
466487 pausedLatch .countDown ();
467488 return result ;
468489 }
469490 }
470491
492+ private void initiateProduceRows () {
493+ this .service .execute (new ProduceRowsRunnable ());
494+ if (this .state == State .IN_PROGRESS ) {
495+ this .state = State .RUNNING ;
496+ }
497+ produceRowsInitiated = true ;
498+ }
499+
471500 Future <Void > getResult () {
472501 return result ;
473502 }
@@ -578,6 +607,11 @@ public ResultSetMetadata getMetadata() {
578607 return delegateResultSet .get ().getMetadata ();
579608 }
580609
610+ @ Override
611+ public boolean initiateStreaming (StreamMessageListener streamMessageListener ) {
612+ return delegateResultSet .get ().initiateStreaming (streamMessageListener );
613+ }
614+
581615 @ Override
582616 protected void checkValidState () {
583617 synchronized (monitor ) {
@@ -593,4 +627,28 @@ public Struct getCurrentRowAsStruct() {
593627 checkValidState ();
594628 return currentRow ;
595629 }
630+
631+ @ Override
632+ public void onStreamMessage (
633+ PartialResultSet partialResultSet ,
634+ int prefetchChunks ,
635+ int currentBufferSize ,
636+ StreamMessageRequestor streamMessageRequestor ) {
637+ synchronized (monitor ) {
638+ if (produceRowsInitiated ) {
639+ return ;
640+ }
641+ // if PartialResultSet contains resume token or buffer size is more than configured size or
642+ // we have reached end of stream, we can start the thread
643+ boolean startJobThread =
644+ !partialResultSet .getResumeToken ().isEmpty ()
645+ || currentBufferSize >= prefetchChunks
646+ || partialResultSet == GrpcStreamIterator .END_OF_STREAM ;
647+ if (startJobThread || state != State .IN_PROGRESS ) {
648+ initiateProduceRows ();
649+ } else {
650+ streamMessageRequestor .requestMessages (1 );
651+ }
652+ }
653+ }
596654}
0 commit comments