@@ -52,6 +52,7 @@ class AsyncResultSetImpl extends ForwardingStructReader
5252 private enum State {
5353 INITIALIZED ,
5454 STREAMING_INITIALIZED ,
55+ STREAMING_IN_PROGRESS ,
5556 /** SYNC indicates that the {@link ResultSet} is used in sync pattern. */
5657 SYNC ,
5758 CONSUMING ,
@@ -468,8 +469,18 @@ public void run() {
468469 // Those result sets will trigger initiateProduceRows() when the first results are received.
469470 // Non-streaming result sets do not trigger this callback, and for those result sets, we
470471 // need to eagerly start the ProduceRowsRunnable.
471- if (!initiateStreaming (AsyncResultSetImpl .this )) {
472- initiateProduceRows ();
472+ synchronized (monitor ) {
473+ if (state == State .STREAMING_IN_PROGRESS
474+ || state == State .RUNNING
475+ || state == State .CONSUMING ) {
476+ return ;
477+ }
478+ if (state == State .STREAMING_INITIALIZED ) {
479+ state = State .STREAMING_IN_PROGRESS ;
480+ }
481+ if (!initiateStreaming (AsyncResultSetImpl .this )) {
482+ initiateProduceRows ();
483+ }
473484 }
474485 } catch (Throwable exception ) {
475486 executionException = SpannerExceptionFactory .asSpannerException (exception );
@@ -499,7 +510,7 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
499510
500511 private void initiateProduceRows () {
501512 synchronized (monitor ) {
502- if (this .state == State .STREAMING_INITIALIZED ) {
513+ if (this .state == State .STREAMING_IN_PROGRESS ) {
503514 this .state = State .RUNNING ;
504515 }
505516 produceRowsInitiated = true ;
@@ -649,7 +660,7 @@ public void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsF
649660 !partialResultSet .getResumeToken ().isEmpty ()
650661 || bufferIsFull
651662 || partialResultSet == GrpcStreamIterator .END_OF_STREAM ;
652- if (startJobThread || state != State .STREAMING_INITIALIZED ) {
663+ if (startJobThread || state != State .STREAMING_IN_PROGRESS ) {
653664 initiateProduceRows ();
654665 }
655666 }
0 commit comments