@@ -52,6 +52,7 @@ class AsyncResultSetImpl extends ForwardingStructReader
5252 private enum State {
5353 INITIALIZED ,
5454 STREAMING_INITIALIZED ,
55+ STREAMING_IN_PROGRESS ,
5556 /** SYNC indicates that the {@link ResultSet} is used in sync pattern. */
5657 SYNC ,
5758 CONSUMING ,
@@ -468,8 +469,16 @@ public void run() {
468469 // Those result sets will trigger initiateProduceRows() when the first results are received.
469470 // Non-streaming result sets do not trigger this callback, and for those result sets, we
470471 // need to eagerly start the ProduceRowsRunnable.
471- if (!initiateStreaming (AsyncResultSetImpl .this )) {
472- initiateProduceRows ();
472+ synchronized (monitor ) {
473+ if (state == State .STREAMING_IN_PROGRESS || state == State .RUNNING || state == State .CONSUMING ) {
474+ return ;
475+ }
476+ if (state == State .STREAMING_INITIALIZED ) {
477+ state = State .STREAMING_IN_PROGRESS ;
478+ }
479+ if (!initiateStreaming (AsyncResultSetImpl .this )) {
480+ initiateProduceRows ();
481+ }
473482 }
474483 } catch (Throwable exception ) {
475484 executionException = SpannerExceptionFactory .asSpannerException (exception );
@@ -499,7 +508,7 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
499508
500509 private void initiateProduceRows () {
501510 synchronized (monitor ) {
502- if (this .state == State .STREAMING_INITIALIZED ) {
511+ if (this .state == State .STREAMING_IN_PROGRESS ) {
503512 this .state = State .RUNNING ;
504513 }
505514 produceRowsInitiated = true ;
@@ -649,7 +658,7 @@ public void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsF
649658 !partialResultSet .getResumeToken ().isEmpty ()
650659 || bufferIsFull
651660 || partialResultSet == GrpcStreamIterator .END_OF_STREAM ;
652- if (startJobThread || state != State .STREAMING_INITIALIZED ) {
661+ if (startJobThread || state != State .STREAMING_IN_PROGRESS ) {
653662 initiateProduceRows ();
654663 }
655664 }
0 commit comments