@@ -52,6 +52,7 @@ class AsyncResultSetImpl extends ForwardingStructReader
5252 private enum State {
5353 INITIALIZED ,
5454 STREAMING_INITIALIZED ,
55+ STREAMING_STARTED ,
5556 /** SYNC indicates that the {@link ResultSet} is used in sync pattern. */
5657 SYNC ,
5758 CONSUMING ,
@@ -469,10 +470,10 @@ public void run() {
469470 // Non-streaming result sets do not trigger this callback, and for those result sets, we
470471 // need to eagerly start the ProduceRowsRunnable.
471472 synchronized (monitor ) {
472- if (state == State .STREAMING_INITIALIZED ) {
473+ if (state == State .STREAMING_STARTED || state == State . RUNNING || state == State . CONSUMING ) {
473474 return ;
474475 }
475- state = State .STREAMING_INITIALIZED ;
476+ state = State .STREAMING_STARTED ;
476477 System .out .printf ("State: %s\n " , state );
477478 if (!initiateStreaming (AsyncResultSetImpl .this )) {
478479 initiateProduceRows ();
@@ -495,6 +496,7 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
495496
496497 // Start to fetch data and buffer these.
497498 this .result = SettableApiFuture .create ();
499+ this .state = State .STREAMING_INITIALIZED ;
498500 this .service .execute (new InitiateStreamingRunnable ());
499501 this .executor = MoreExecutors .newSequentialExecutor (Preconditions .checkNotNull (exec ));
500502 this .callback = Preconditions .checkNotNull (cb );
@@ -505,7 +507,7 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
505507
506508 private void initiateProduceRows () {
507509 synchronized (monitor ) {
508- if (this .state == State .STREAMING_INITIALIZED ) {
510+ if (this .state == State .STREAMING_STARTED ) {
509511 this .state = State .RUNNING ;
510512 }
511513 produceRowsInitiated = true ;
0 commit comments