4444import java .util .concurrent .CountDownLatch ;
4545import java .util .concurrent .Executor ;
4646import java .util .concurrent .TimeUnit ;
47- import java .util .concurrent .atomic .AtomicReference ;
4847import java .util .logging .Level ;
4948import java .util .logging .Logger ;
5049import javax .annotation .Nullable ;
@@ -71,8 +70,7 @@ abstract class ResumableStreamIterator extends AbstractIterator<PartialResultSet
7170 private final int maxBufferSize ;
7271 private final ISpan span ;
7372 private final TraceWrapper tracer ;
74- private AtomicReference <CloseableIterator <PartialResultSet >> streamCache =
75- new AtomicReference <>();
73+ private volatile CloseableIterator <PartialResultSet > stream ;
7674 private ByteString resumeToken ;
7775 private boolean finished ;
7876 /**
@@ -215,16 +213,16 @@ boolean prepareIteratorForRetryOnDifferentGrpcChannel() {
215213
216214 @ Override
217215 public void close (@ Nullable String message ) {
218- if (streamCache . get () != null ) {
219- streamCache . get () .close (message );
216+ if (stream != null ) {
217+ stream .close (message );
220218 span .end ();
221- streamCache . set ( null ) ;
219+ stream = null ;
222220 }
223221 }
224222
225223 @ Override
226224 public boolean isWithBeginTransaction () {
227- return streamCache . get () != null && streamCache . get () .isWithBeginTransaction ();
225+ return stream != null && stream .isWithBeginTransaction ();
228226 }
229227
230228 @ Override
@@ -248,8 +246,8 @@ protected PartialResultSet computeNext() {
248246 return buffer .pop ();
249247 }
250248 try {
251- if (streamCache . get () .hasNext ()) {
252- PartialResultSet next = streamCache . get () .next ();
249+ if (stream .hasNext ()) {
250+ PartialResultSet next = stream .next ();
253251 boolean hasResumeToken = !next .getResumeToken ().isEmpty ();
254252 if (hasResumeToken ) {
255253 resumeToken = next .getResumeToken ();
@@ -283,7 +281,7 @@ protected PartialResultSet computeNext() {
283281 buffer .removeLast ();
284282 }
285283 assert buffer .isEmpty () || buffer .getLast ().getResumeToken ().equals (resumeToken );
286- streamCache . set ( null ) ;
284+ stream = null ;
287285 try (IScope s = tracer .withSpan (span )) {
288286 long delay = spannerException .getRetryDelayInMillis ();
289287 if (delay != -1 ) {
@@ -304,7 +302,7 @@ protected PartialResultSet computeNext() {
304302 if (translated instanceof RetryOnDifferentGrpcChannelException ) {
305303 if (++numAttemptsOnOtherChannel < errorHandler .getMaxAttempts ()
306304 && prepareIteratorForRetryOnDifferentGrpcChannel ()) {
307- streamCache . set ( null ) ;
305+ stream = null ;
308306 continue ;
309307 }
310308 }
@@ -325,7 +323,7 @@ private void startGrpcStreaming() {
325323 System .out .printf (
326324 "[%s][%s] Inside startGrpcStreaming\n " ,
327325 OffsetDateTime .now (), Thread .currentThread ().getName ());
328- if (streamCache . get () == null ) {
326+ if (stream == null ) {
329327 span .addAnnotation (
330328 "Starting/Resuming stream" ,
331329 "ResumeToken" ,
@@ -336,7 +334,7 @@ private void startGrpcStreaming() {
336334 System .out .printf (
337335 "[%s][%s] Inside creating stream\n " ,
338336 OffsetDateTime .now (), Thread .currentThread ().getName ());
339- streamCache . set ( checkNotNull (startStream (resumeToken , streamMessageListener ) ));
337+ stream = checkNotNull (startStream (resumeToken , streamMessageListener ));
340338 }
341339 }
342340 // }
0 commit comments