Skip to content

Commit 5aae0d0

Browse files
committed
chore(spanner): handle server side kill switch for multiplexed sessions with read-write
1 parent c15339f commit 5aae0d0

File tree

3 files changed

+326
-2
lines changed

3 files changed

+326
-2
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ DatabaseClient getMultiplexedSession() {
9292

9393
@VisibleForTesting
9494
DatabaseClient getMultiplexedSessionForRW() {
95-
if (this.useMultiplexedSessionForRW) {
95+
if (canUseMultiplexedSessionsForRW()) {
9696
return getMultiplexedSession();
9797
}
9898
return getSession();
@@ -107,6 +107,12 @@ private boolean canUseMultiplexedSessions() {
107107
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported();
108108
}
109109

110+
private boolean canUseMultiplexedSessionsForRW() {
111+
return this.useMultiplexedSessionForRW
112+
&& this.multiplexedSessionDatabaseClient != null
113+
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsForRWSupported();
114+
}
115+
110116
@Override
111117
public Dialect getDialect() {
112118
return pool.getDialect();
@@ -129,7 +135,7 @@ public CommitResponse writeWithOptions(
129135
throws SpannerException {
130136
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
131137
try (IScope s = tracer.withSpan(span)) {
132-
if (this.useMultiplexedSessionForRW && getMultiplexedSessionDatabaseClient() != null) {
138+
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
133139
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
134140
}
135141
return runWithSessionRetry(session -> session.writeWithOptions(mutations, options));

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.cloud.spanner;
1818

1919
import static com.google.cloud.spanner.SessionImpl.NO_CHANNEL_HINT;
20+
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
2021

2122
import com.google.api.core.ApiFuture;
2223
import com.google.api.core.ApiFutures;
@@ -27,6 +28,9 @@
2728
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
2829
import com.google.common.annotations.VisibleForTesting;
2930
import com.google.common.base.Preconditions;
31+
import com.google.common.util.concurrent.MoreExecutors;
32+
import com.google.spanner.v1.BeginTransactionRequest;
33+
import com.google.spanner.v1.Transaction;
3034
import java.time.Clock;
3135
import java.time.Duration;
3236
import java.time.Instant;
@@ -92,6 +96,9 @@ void onError(SpannerException spannerException) {
9296
// synchronizing, as it does not really matter exactly which error is set.
9397
this.client.resourceNotFoundException.set((ResourceNotFoundException) spannerException);
9498
}
99+
// Mark multiplexed sessions for RW as unimplemented and fall back to regular sessions if
100+
// UNIMPLEMENTED is returned.
101+
this.client.maybeMarkUnimplementedForRW(spannerException);
95102
}
96103

97104
@Override
@@ -164,6 +171,13 @@ public void close() {
164171
/** The current multiplexed session that is used by this client. */
165172
private final AtomicReference<ApiFuture<SessionReference>> multiplexedSessionReference;
166173

174+
@VisibleForTesting
175+
/**
176+
* The Transaction response returned by the BeginTransaction request with read-write when a
177+
* multiplexed session is created during client initialization.
178+
*/
179+
private final SettableApiFuture<Transaction> readWriteBeginTransactionReferenceFuture;
180+
167181
/** The expiration date/time of the current multiplexed session. */
168182
private final AtomicReference<Instant> expirationDate;
169183

@@ -190,6 +204,12 @@ public void close() {
190204
*/
191205
private final AtomicBoolean unimplemented = new AtomicBoolean(false);
192206

207+
/**
208+
* This flag is set to true if the server return UNIMPLEMENTED when a read-write transaction is
209+
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
210+
*/
211+
@VisibleForTesting final AtomicBoolean unimplementedForRW = new AtomicBoolean(false);
212+
193213
MultiplexedSessionDatabaseClient(SessionClient sessionClient) {
194214
this(sessionClient, Clock.systemUTC());
195215
}
@@ -217,6 +237,7 @@ public void close() {
217237
this.tracer = sessionClient.getSpanner().getTracer();
218238
final SettableApiFuture<SessionReference> initialSessionReferenceFuture =
219239
SettableApiFuture.create();
240+
this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create();
220241
this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture);
221242
this.sessionClient.asyncCreateMultiplexedSession(
222243
new SessionConsumer() {
@@ -226,6 +247,9 @@ public void onSessionReady(SessionImpl session) {
226247
// only start the maintainer if we actually managed to create a session in the first
227248
// place.
228249
maintainer.start();
250+
// initiate a begin transaction request to verify if read-write transactions are
251+
// supported using multiplexed sessions.
252+
verifyBeginTransactionWithRWOnMultiplexedSession(session.getName());
229253
}
230254

231255
@Override
@@ -267,6 +291,51 @@ private void maybeMarkUnimplemented(Throwable t) {
267291
}
268292
}
269293

294+
private void maybeMarkUnimplementedForRW(SpannerException spannerException) {
295+
if (spannerException.getErrorCode() == ErrorCode.UNIMPLEMENTED
296+
// && spannerException.getReason().equalsIgnoreCase("Transaction type read_write not supported
297+
// with multiplexed sessions")
298+
) {
299+
unimplementedForRW.set(true);
300+
}
301+
}
302+
303+
// TODO:
304+
private void verifyBeginTransactionWithRWOnMultiplexedSession(String sessionName) {
305+
BeginTransactionRequest.Builder requestBuilder =
306+
BeginTransactionRequest.newBuilder()
307+
.setSession(sessionName)
308+
.setOptions(
309+
SessionImpl.createReadWriteTransactionOptions(
310+
Options.fromTransactionOptions(), /* previousTransactionId = */ null));
311+
final BeginTransactionRequest request = requestBuilder.build();
312+
final ApiFuture<Transaction> requestFuture;
313+
requestFuture =
314+
sessionClient
315+
.getSpanner()
316+
.getRpc()
317+
.beginTransactionAsync(request, /* options = */ null, /* routeToLeader = */ true);
318+
requestFuture.addListener(
319+
() -> {
320+
try {
321+
Transaction txn = requestFuture.get();
322+
if (txn.getId().isEmpty()) {
323+
throw newSpannerException(
324+
ErrorCode.INTERNAL, "Missing id in transaction\n" + sessionName);
325+
}
326+
readWriteBeginTransactionReferenceFuture.set(txn);
327+
} catch (Exception e) {
328+
SpannerException spannerException = SpannerExceptionFactory.newSpannerException(e);
329+
// Mark multiplexed sessions for RW as unimplemented and fall back to regular sessions
330+
// if
331+
// UNIMPLEMENTED is returned.
332+
maybeMarkUnimplementedForRW(spannerException);
333+
readWriteBeginTransactionReferenceFuture.setException(e);
334+
}
335+
},
336+
MoreExecutors.directExecutor());
337+
}
338+
270339
boolean isValid() {
271340
return resourceNotFoundException.get() == null;
272341
}
@@ -283,6 +352,10 @@ boolean isMultiplexedSessionsSupported() {
283352
return !this.unimplemented.get();
284353
}
285354

355+
boolean isMultiplexedSessionsForRWSupported() {
356+
return !this.unimplementedForRW.get();
357+
}
358+
286359
void close() {
287360
synchronized (this) {
288361
if (!this.isClosed) {
@@ -308,6 +381,17 @@ SessionReference getCurrentSessionReference() {
308381
}
309382
}
310383

384+
@VisibleForTesting
385+
Transaction getReadWriteBeginTransactionReference() {
386+
try {
387+
return this.readWriteBeginTransactionReferenceFuture.get();
388+
} catch (ExecutionException executionException) {
389+
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
390+
} catch (InterruptedException interruptedException) {
391+
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
392+
}
393+
}
394+
311395
/**
312396
* Returns true if the multiplexed session has been created. This client can be used before the
313397
* session has been created, and will in that case use a delayed transaction that contains a

0 commit comments

Comments
 (0)