Skip to content

Commit baf2b5a

Browse files
fix(spanner): unflake unit tests
1 parent 6225efa commit baf2b5a

File tree

4 files changed

+10
-2
lines changed

4 files changed

+10
-2
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,6 @@ CloseableIterator<PartialResultSet> startStream(
797797
isRouteToLeader());
798798
session.markUsed(clock.instant());
799799
stream.setCall(call, request.getTransaction().hasBegin());
800-
call.request(prefetchChunks);
801800
return stream;
802801
}
803802

@@ -992,7 +991,6 @@ CloseableIterator<PartialResultSet> startStream(
992991
isRouteToLeader());
993992
session.markUsed(clock.instant());
994993
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
995-
call.request(prefetchChunks);
996994
return stream;
997995
}
998996

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ interface CloseableIterator<T> extends Iterator<T> {
158158
default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
159159
return false;
160160
}
161+
162+
default void requestPrefetchChunks() {};
161163
}
162164

163165
static double valueProtoToFloat64(com.google.protobuf.Value proto) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
3939
implements CloseableIterator<PartialResultSet> {
4040
private static final Logger logger = Logger.getLogger(GrpcStreamIterator.class.getName());
4141
static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build();
42+
private final int prefetchChunks;
4243
private AsyncResultSet.StreamMessageListener streamMessageListener;
4344

4445
private final ConsumerImpl consumer;
@@ -60,6 +61,7 @@ class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
6061
GrpcStreamIterator(
6162
Statement statement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) {
6263
this.statement = statement;
64+
this.prefetchChunks = prefetchChunks;
6365
this.consumer = new ConsumerImpl(cancelQueryWhenClientIsClosed);
6466
// One extra to allow for END_OF_STREAM message.
6567
this.stream = new LinkedBlockingQueue<>(prefetchChunks + 1);
@@ -102,6 +104,11 @@ public void close(@Nullable String message) {
102104
}
103105
}
104106

107+
@Override
108+
public void requestPrefetchChunks() {
109+
call.request(prefetchChunks);
110+
}
111+
105112
@Override
106113
public boolean isWithBeginTransaction() {
107114
return withBeginTransaction;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ private void startGrpcStreaming() {
326326
// When start a new stream set the Span as current to make the gRPC Span a child of
327327
// this Span.
328328
stream = checkNotNull(startStream(resumeToken, streamMessageListener));
329+
stream.requestPrefetchChunks();
329330
}
330331
}
331332
}

0 commit comments

Comments
 (0)