1818
1919import  com .google .api .core .ApiFuture ;
2020import  com .google .api .core .ApiFutures ;
21- import  com .google .api .core .ListenableFutureToApiFuture ;
2221import  com .google .api .core .SettableApiFuture ;
2322import  com .google .api .gax .core .ExecutorProvider ;
2423import  com .google .cloud .spanner .AbstractReadContext .ListenableAsyncResultSet ;
2928import  com .google .common .collect .ImmutableList ;
3029import  com .google .common .util .concurrent .ListeningScheduledExecutorService ;
3130import  com .google .common .util .concurrent .MoreExecutors ;
31+ import  com .google .spanner .v1 .PartialResultSet ;
3232import  com .google .spanner .v1 .ResultSetMetadata ;
3333import  com .google .spanner .v1 .ResultSetStats ;
34+ 
3435import  java .util .Collection ;
3536import  java .util .LinkedList ;
3637import  java .util .List ;
37- import  java .util .concurrent .BlockingDeque ;
38- import  java .util .concurrent .Callable ;
39- import  java .util .concurrent .CountDownLatch ;
40- import  java .util .concurrent .ExecutionException ;
41- import  java .util .concurrent .Executor ;
42- import  java .util .concurrent .Future ;
43- import  java .util .concurrent .LinkedBlockingDeque ;
38+ import  java .util .concurrent .*;
4439import  java .util .logging .Level ;
4540import  java .util .logging .Logger ;
4641
4742/** Default implementation for {@link AsyncResultSet}. */ 
48- class  AsyncResultSetImpl  extends  ForwardingStructReader  implements  ListenableAsyncResultSet  {
43+ class  AsyncResultSetImpl  extends  ForwardingStructReader  implements  ListenableAsyncResultSet ,  AsyncResultSet . StreamMessageListener  {
4944  private  static  final  Logger  log  = Logger .getLogger (AsyncResultSetImpl .class .getName ());
5045
46+ 
5147  /** State of an {@link AsyncResultSetImpl}. */ 
5248  private  enum  State  {
5349    INITIALIZED ,
50+     IN_PROGRESS ,
5451    /** SYNC indicates that the {@link ResultSet} is used in sync pattern. */ 
5552    SYNC ,
5653    CONSUMING ,
@@ -120,7 +117,7 @@ private enum State {
120117   */ 
121118  private  volatile  boolean  finished ;
122119
123-   private  volatile  ApiFuture <Void > result ;
120+   private  volatile  SettableApiFuture <Void > result ;
124121
125122  /** 
126123   * This variable indicates whether {@link #tryNext()} has returned {@link CursorState#DONE} or a 
@@ -329,12 +326,12 @@ public void run() {
329326  private  final  CallbackRunnable  callbackRunnable  = new  CallbackRunnable ();
330327
331328  /** 
332-    * {@link ProduceRowsCallable } reads data from the underlying {@link ResultSet}, places these in 
329+    * {@link ProduceRowsRunnable } reads data from the underlying {@link ResultSet}, places these in 
333330   * the buffer and dispatches the {@link CallbackRunnable} when data is ready to be consumed. 
334331   */ 
335-   private  class  ProduceRowsCallable  implements  Callable < Void >  {
332+   private  class  ProduceRowsRunnable  implements  Runnable  {
336333    @ Override 
337-     public  Void   call ()  throws   Exception  {
334+     public  void   run ()  {
338335      boolean  stop  = false ;
339336      boolean  hasNext  = false ;
340337      try  {
@@ -393,12 +390,17 @@ public Void call() throws Exception {
393390        }
394391        // Call the callback if there are still rows in the buffer that need to be processed. 
395392        while  (!stop ) {
396-           waitIfPaused ();
397-           startCallbackIfNecessary ();
398-           // Make sure we wait until the callback runner has actually finished. 
399-           consumingLatch .await ();
400-           synchronized  (monitor ) {
401-             stop  = cursorReturnedDoneOrException ;
393+           try  {
394+             waitIfPaused ();
395+             startCallbackIfNecessary ();
396+             // Make sure we wait until the callback runner has actually finished. 
397+             consumingLatch .await ();
398+             synchronized  (monitor ) {
399+               stop  = cursorReturnedDoneOrException ;
400+             }
401+           } catch  (InterruptedException  e ) {
402+             result .setException (e );
403+             return ;
402404          }
403405        }
404406      } finally  {
@@ -410,14 +412,16 @@ public Void call() throws Exception {
410412        }
411413        synchronized  (monitor ) {
412414          if  (executionException  != null ) {
415+             result .setException (executionException );
413416            throw  executionException ;
414417          }
415418          if  (state  == State .CANCELLED ) {
419+             result .setException (CANCELLED_EXCEPTION );
416420            throw  CANCELLED_EXCEPTION ;
417421          }
418422        }
419423      }
420-       return   null ;
424+       result . set ( null ) ;
421425    }
422426
423427    private  void  waitIfPaused () throws  InterruptedException  {
@@ -449,6 +453,21 @@ private void startCallbackWithBufferLatchIfNecessary(int bufferLatch) {
449453    }
450454  }
451455
456+   private  class  InitiateStreamingRunnable  implements  Runnable  {
457+ 
458+     @ Override 
459+     public  void  run () {
460+       try  {
461+         if (!initiateStreaming (AsyncResultSetImpl .this )) {
462+           initiateProduceRows ();
463+         }
464+       } catch  (SpannerException  e ) {
465+         executionException  = e ;
466+         initiateProduceRows ();
467+       }
468+     }
469+   }
470+ 
452471  /** Sets the callback for this {@link AsyncResultSet}. */ 
453472  @ Override 
454473  public  ApiFuture <Void > setCallback (Executor  exec , ReadyCallback  cb ) {
@@ -458,16 +477,21 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
458477          this .state  == State .INITIALIZED , "callback may not be set multiple times" );
459478
460479      // Start to fetch data and buffer these. 
461-       this .result  =
462-           new  ListenableFutureToApiFuture <>(this .service .submit (new  ProduceRowsCallable ()));
480+       this .result  = SettableApiFuture .create ();
481+       this .state  = State .IN_PROGRESS ;
482+       this .service .execute (new  InitiateStreamingRunnable ());
463483      this .executor  = MoreExecutors .newSequentialExecutor (Preconditions .checkNotNull (exec ));
464484      this .callback  = Preconditions .checkNotNull (cb );
465-       this .state  = State .RUNNING ;
466485      pausedLatch .countDown ();
467486      return  result ;
468487    }
469488  }
470489
490+   private  void  initiateProduceRows () {
491+     this .service .execute (new  ProduceRowsRunnable ());
492+     this .state  = State .RUNNING ;
493+   }
494+ 
471495  Future <Void > getResult () {
472496    return  result ;
473497  }
@@ -480,6 +504,7 @@ public void cancel() {
480504          "cannot cancel a result set without a callback" );
481505      state  = State .CANCELLED ;
482506      pausedLatch .countDown ();
507+       this .result .setException (CANCELLED_EXCEPTION );
483508    }
484509  }
485510
@@ -578,6 +603,11 @@ public ResultSetMetadata getMetadata() {
578603    return  delegateResultSet .get ().getMetadata ();
579604  }
580605
606+   @ Override 
607+   public  boolean  initiateStreaming (StreamMessageListener  streamMessageListener ) {
608+     return  delegateResultSet .get ().initiateStreaming (streamMessageListener );
609+   }
610+ 
581611  @ Override 
582612  protected  void  checkValidState () {
583613    synchronized  (monitor ) {
@@ -593,4 +623,21 @@ public Struct getCurrentRowAsStruct() {
593623    checkValidState ();
594624    return  currentRow ;
595625  }
626+ 
627+   @ Override 
628+   public  void  onStreamMessage (PartialResultSet  partialResultSet , int  prefetchChunks , int  currentBufferSize , StreamMessageRequestor  streamMessageRequestor ) {
629+     synchronized  (monitor ) {
630+       if  (state  == State .IN_PROGRESS ) {
631+         // if PartialResultSet contains resume token or buffer size is more than configured size or we have reached 
632+         // end of stream, we can start the thread 
633+         boolean  startJobThread  = !partialResultSet .getResumeToken ().isEmpty ()
634+                 || currentBufferSize  > prefetchChunks  || partialResultSet  == GrpcStreamIterator .END_OF_STREAM ;
635+         if  (startJobThread ){
636+           initiateProduceRows ();
637+         } else  {
638+           streamMessageRequestor .requestMessages (1 );
639+         }
640+       }
641+     }
642+   }
596643}
0 commit comments