@@ -65,10 +65,11 @@ abstract class ResumableStreamIterator extends AbstractIterator<PartialResultSet
6565 private static final Logger logger = Logger .getLogger (ResumableStreamIterator .class .getName ());
6666 private BackOff backOff ;
6767 private final LinkedList <PartialResultSet > buffer = new LinkedList <>();
68+ private final Object monitor = new Object ();
6869 private final int maxBufferSize ;
6970 private final ISpan span ;
7071 private final TraceWrapper tracer ;
71- private CloseableIterator <PartialResultSet > stream ;
72+ private volatile CloseableIterator <PartialResultSet > stream ;
7273 private ByteString resumeToken ;
7374 private boolean finished ;
7475 /**
@@ -317,15 +318,17 @@ && prepareIteratorForRetryOnDifferentGrpcChannel()) {
317318 }
318319
319320 private void startGrpcStreaming () {
320- if (stream == null ) {
321- span .addAnnotation (
322- "Starting/Resuming stream" ,
323- "ResumeToken" ,
324- resumeToken == null ? "null" : resumeToken .toStringUtf8 ());
325- try (IScope scope = tracer .withSpan (span )) {
326- // When start a new stream set the Span as current to make the gRPC Span a child of
327- // this Span.
328- stream = checkNotNull (startStream (resumeToken , streamMessageListener ));
321+ synchronized (monitor ) {
322+ if (stream == null ) {
323+ span .addAnnotation (
324+ "Starting/Resuming stream" ,
325+ "ResumeToken" ,
326+ resumeToken == null ? "null" : resumeToken .toStringUtf8 ());
327+ try (IScope scope = tracer .withSpan (span )) {
328+ // When start a new stream set the Span as current to make the gRPC Span a child of
329+ // this Span.
330+ stream = checkNotNull (startStream (resumeToken , streamMessageListener ));
331+ }
329332 }
330333 }
331334 }
0 commit comments