Skip to content

Commit 7e27aca

Browse files
chore(spanner): unflake unit tests (#3584)
1 parent 6225efa commit 7e27aca

File tree

4 files changed

+14
-2
lines changed

4 files changed

+14
-2
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,6 @@ CloseableIterator<PartialResultSet> startStream(
797797
isRouteToLeader());
798798
session.markUsed(clock.instant());
799799
stream.setCall(call, request.getTransaction().hasBegin());
800-
call.request(prefetchChunks);
801800
return stream;
802801
}
803802

@@ -992,7 +991,6 @@ CloseableIterator<PartialResultSet> startStream(
992991
isRouteToLeader());
993992
session.markUsed(clock.instant());
994993
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
995-
call.request(prefetchChunks);
996994
return stream;
997995
}
998996

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ interface CloseableIterator<T> extends Iterator<T> {
158158
default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
159159
return false;
160160
}
161+
162+
/** it requests the initial prefetch chunks from gRPC stream */
163+
default void requestPrefetchChunks() {};
161164
}
162165

163166
static double valueProtoToFloat64(com.google.protobuf.Value proto) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.spanner;
1818

19+
import com.google.api.core.InternalApi;
1920
import com.google.api.gax.rpc.ApiCallContext;
2021
import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
2122
import com.google.cloud.spanner.spi.v1.SpannerRpc;
@@ -39,6 +40,7 @@ class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
3940
implements CloseableIterator<PartialResultSet> {
4041
private static final Logger logger = Logger.getLogger(GrpcStreamIterator.class.getName());
4142
static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build();
43+
private final int prefetchChunks;
4244
private AsyncResultSet.StreamMessageListener streamMessageListener;
4345

4446
private final ConsumerImpl consumer;
@@ -60,6 +62,7 @@ class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
6062
GrpcStreamIterator(
6163
Statement statement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) {
6264
this.statement = statement;
65+
this.prefetchChunks = prefetchChunks;
6366
this.consumer = new ConsumerImpl(cancelQueryWhenClientIsClosed);
6467
// One extra to allow for END_OF_STREAM message.
6568
this.stream = new LinkedBlockingQueue<>(prefetchChunks + 1);
@@ -102,6 +105,13 @@ public void close(@Nullable String message) {
102105
}
103106
}
104107

108+
@Override
109+
@InternalApi
110+
public void requestPrefetchChunks() {
111+
Preconditions.checkState(call != null, "The StreamingCall object is not initialized");
112+
call.request(prefetchChunks);
113+
}
114+
105115
@Override
106116
public boolean isWithBeginTransaction() {
107117
return withBeginTransaction;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ private void startGrpcStreaming() {
326326
// When start a new stream set the Span as current to make the gRPC Span a child of
327327
// this Span.
328328
stream = checkNotNull(startStream(resumeToken, streamMessageListener));
329+
stream.requestPrefetchChunks();
329330
}
330331
}
331332
}

0 commit comments

Comments
 (0)