@@ -71,6 +71,7 @@ abstract class ResumableStreamIterator extends AbstractIterator<PartialResultSet
7171 private CloseableIterator <PartialResultSet > stream ;
7272 private ByteString resumeToken ;
7373 private boolean finished ;
74+ private final Object monitor = new Object ();
7475 /**
7576 * Indicates whether it is currently safe to retry RPCs. This will be {@code false} if we have
7677 * reached the maximum buffer size without seeing a restart token; in this case, we will drain the
@@ -318,17 +319,19 @@ && prepareIteratorForRetryOnDifferentGrpcChannel()) {
318319 }
319320
320321 private void startGrpcStreaming () {
321- System .out .println ("Starting Stream " + stream + " " + streamMessageListener );
322- if (stream == null ) {
323- span .addAnnotation (
324- "Starting/Resuming stream" ,
325- "ResumeToken" ,
326- resumeToken == null ? "null" : resumeToken .toStringUtf8 ());
327- try (IScope scope = tracer .withSpan (span )) {
328- // When start a new stream set the Span as current to make the gRPC Span a child of
329- // this Span.
330- System .out .println ("Creating Stream " + stream + " " + streamMessageListener );
331- stream = checkNotNull (startStream (resumeToken , streamMessageListener ));
322+ synchronized (monitor ) {
323+ System .out .println ("Starting Stream " + stream + " " + streamMessageListener );
324+ if (stream == null ) {
325+ span .addAnnotation (
326+ "Starting/Resuming stream" ,
327+ "ResumeToken" ,
328+ resumeToken == null ? "null" : resumeToken .toStringUtf8 ());
329+ try (IScope scope = tracer .withSpan (span )) {
330+ // When start a new stream set the Span as current to make the gRPC Span a child of
331+ // this Span.
332+ System .out .println ("Creating Stream " + stream + " " + streamMessageListener );
333+ stream = checkNotNull (startStream (resumeToken , streamMessageListener ));
334+ }
332335 }
333336 }
334337 }
0 commit comments