Skip to content

Commit 25cfede

Browse files
fix
1 parent 34b007c commit 25cfede

File tree

1 file changed

+50
-0
lines changed

1 file changed

+50
-0
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import com.google.cloud.Tuple;
5656
import com.google.cloud.grpc.GrpcTransportOptions;
5757
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
58+
import com.google.cloud.spanner.AsyncResultSet.StreamMessageListener;
5859
import com.google.cloud.spanner.Options.QueryOption;
5960
import com.google.cloud.spanner.Options.ReadOption;
6061
import com.google.cloud.spanner.Options.TransactionOption;
@@ -266,6 +267,55 @@ private ResultSet wrap(final CachedResultSetSupplier resultSetSupplier) {
266267
return new ForwardingResultSet(resultSetSupplier) {
267268
private boolean beforeFirst = true;
268269

270+
@Override
271+
public boolean initiateStreaming(StreamMessageListener streamMessageListener) {
272+
while (true) {
273+
try {
274+
return internalInitiateStreaming(streamMessageListener);
275+
} catch (SessionNotFoundException e) {
276+
while (true) {
277+
// Keep the replace-if-possible outside the try-block to let the exception bubble up
278+
// if it's too late to replace the session.
279+
replaceSessionIfPossible(e);
280+
try {
281+
replaceDelegate(resultSetSupplier.reload());
282+
break;
283+
} catch (SessionNotFoundException snfe) {
284+
e = snfe;
285+
// retry on yet another session.
286+
}
287+
}
288+
}
289+
}
290+
}
291+
292+
private boolean internalInitiateStreaming(final StreamMessageListener streamMessageListener) {
293+
try {
294+
boolean ret = super.initiateStreaming(streamMessageListener);
295+
if (beforeFirst) {
296+
synchronized (lock) {
297+
session.get().markUsed();
298+
beforeFirst = false;
299+
sessionUsedForQuery = true;
300+
}
301+
}
302+
if (!ret && isSingleUse) {
303+
close();
304+
}
305+
return ret;
306+
} catch (SessionNotFoundException e) {
307+
throw e;
308+
} catch (SpannerException e) {
309+
synchronized (lock) {
310+
if (!closed && isSingleUse) {
311+
session.get().setLastException(e);
312+
AutoClosingReadContext.this.close();
313+
}
314+
}
315+
throw e;
316+
}
317+
}
318+
269319
@Override
270320
public boolean next() throws SpannerException {
271321
while (true) {

0 commit comments

Comments
 (0)