Skip to content

Commit 45fa1ba

Browse files
committed
chore(x-goog-spanner-request-id): assert expectations in tests for retries + aborts
Updates #3537 chore(x-goog-spanner-request-id): add BeginTransaction+ResumableStreamIterator Plumbs x-goog-spanner-request-id into BeginTransaction and ResumableStreamIterator and for PartitionedDmlTransaction. Updates #3537 Get more tests reveal needs for plumbing Add requestId to PartitionQuery + PartitionRead Propagate requestId into some more SpannerException values Use session.getChannel() and assert for results More debugging + fix up sorting comparator Fix more channelId TODOs Fix up withNthRequest for deterministic checks for BatchCreateSessions More retrofits More updates for tests More validity checks Add debugs to assert behavior of outgoing headers
1 parent 71cda80 commit 45fa1ba

File tree

6 files changed

+252
-37
lines changed

6 files changed

+252
-37
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ long executeStreamingPartitionedUpdate(
9393
final Duration remainingTimeout = tryUpdateTimeout(timeout, stopwatch);
9494

9595
try {
96+
System.out.println("\033[31mreqIdPump: " + reqId + "\033[00m");
9697
ServerStream<PartialResultSet> stream =
9798
rpc.executeStreamingPartitionedDml(
9899
request, reqId.withOptions(session.getOptions()), remainingTimeout);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ private void backoffSleep(Context context, long backoffMillis) throws SpannerExc
198198

199199
public void ensureNonNullXGoogRequestId() {
200200
if (this.xGoogRequestId == null) {
201+
System.out.println(
202+
"\033[34mXGoogRequestId.ensureNonNull: "
203+
+ this.xGoogRequestId
204+
+ " for:: "
205+
+ System.identityHashCode(this)
206+
+ "\033[00m");
201207
this.xGoogRequestId =
202208
this.xGoogRequestIdCreator.nextRequestId(1 /*TODO: infer channelId*/, 1 /*attempt*/);
203209
}
@@ -206,6 +212,11 @@ public void ensureNonNullXGoogRequestId() {
206212
public void incrementXGoogRequestIdAttempt() {
207213
this.ensureNonNullXGoogRequestId();
208214
this.xGoogRequestId.incrementAttempt();
215+
System.out.println(
216+
"\033[35mincrementXGoogAttempt: "
217+
+ this.xGoogRequestId
218+
+ " :: "
219+
+ System.identityHashCode(this));
209220
}
210221

211222
private enum DirectExecutor implements Executor {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ interface SessionConsumer {
190190
// SessionClient is created long before a DatabaseClientImpl is created,
191191
// as batch sessions are firstly created then later attached to each Client.
192192
private static final AtomicInteger NTH_ID = new AtomicInteger(0);
193-
private final int nthId = NTH_ID.incrementAndGet();
193+
private final int nthId;
194194
private final AtomicInteger nthRequest = new AtomicInteger(0);
195195

196196
@GuardedBy("this")
@@ -205,6 +205,7 @@ interface SessionConsumer {
205205
this.executorFactory = executorFactory;
206206
this.executor = executorFactory.get();
207207
this.commonAttributes = spanner.getTracer().createCommonAttributes(db);
208+
this.nthId = NTH_ID.incrementAndGet();
208209
}
209210

210211
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2043,7 +2043,7 @@ <ReqT, RespT> GrpcCallContext newCallContext(
20432043
}
20442044
if (options != null) {
20452045
// TODO(@odeke-em): Infer the affinity if it doesn't match up with in the request-id.
2046-
context = withRequestId(context, options);
2046+
context = withRequestId(context, options, method);
20472047
}
20482048
context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName));
20492049
if (routeToLeader && leaderAwareRoutingEnabled) {
@@ -2061,15 +2061,40 @@ <ReqT, RespT> GrpcCallContext newCallContext(
20612061
if (configurator != null) {
20622062
apiCallContextFromContext = configurator.configure(context, request, method);
20632063
}
2064+
2065+
// Debug the call headers before this.
2066+
Map<String, List<String>> hdrs = context.getExtraHeaders();
2067+
if (method.getFullMethodName().compareTo("google.spanner.v1.Spanner/DeleteSession") != 0) {
2068+
System.out.println(
2069+
"\033[32mextraHeaders going out for " + method.getFullMethodName() + "\033[00m");
2070+
for (Map.Entry<String, List<String>> entry : hdrs.entrySet()) {
2071+
System.out.println(
2072+
"\t\033[36mcall.Key: " + entry.getKey() + ":: " + entry.getValue() + "\033[00m");
2073+
}
2074+
}
20642075
return (GrpcCallContext) context.merge(apiCallContextFromContext);
20652076
}
20662077

2067-
GrpcCallContext withRequestId(GrpcCallContext context, Map<SpannerRpc.Option, ?> options) {
2078+
<ReqT, RespT> GrpcCallContext withRequestId(
2079+
GrpcCallContext context,
2080+
Map<SpannerRpc.Option, ?> options,
2081+
MethodDescriptor<ReqT, RespT> method) {
20682082
XGoogSpannerRequestId reqId = (XGoogSpannerRequestId) options.get(Option.REQUEST_ID);
20692083
if (reqId == null) {
20702084
return context;
20712085
}
20722086

2087+
String methodName = method.getFullMethodName();
2088+
if (methodName.compareTo("google.spanner.v1.Spanner/ExecuteStreamingSql") == 0) {
2089+
System.out.println(
2090+
"\033[36mGapiSpannerRpc.withRequestId: "
2091+
+ reqId
2092+
+ " for: "
2093+
+ method.getFullMethodName()
2094+
+ " "
2095+
+ System.identityHashCode(context)
2096+
+ "\033[00m");
2097+
}
20732098
Map<String, List<String>> withReqId =
20742099
ImmutableMap.of(
20752100
XGoogSpannerRequestId.REQUEST_HEADER_KEY.name(),

0 commit comments

Comments
 (0)