Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ DatabaseClient getMultiplexedSession() {

@VisibleForTesting
DatabaseClient getMultiplexedSessionForRW() {
if (this.useMultiplexedSessionForRW) {
if (canUseMultiplexedSessionsForRW()) {
return getMultiplexedSession();
}
return getSession();
Expand All @@ -107,6 +107,12 @@ private boolean canUseMultiplexedSessions() {
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported();
}

private boolean canUseMultiplexedSessionsForRW() {
return this.useMultiplexedSessionForRW
&& this.multiplexedSessionDatabaseClient != null
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsForRWSupported();
}

@Override
public Dialect getDialect() {
return pool.getDialect();
Expand All @@ -129,7 +135,7 @@ public CommitResponse writeWithOptions(
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
try (IScope s = tracer.withSpan(span)) {
if (this.useMultiplexedSessionForRW && getMultiplexedSessionDatabaseClient() != null) {
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
}
return runWithSessionRetry(session -> session.writeWithOptions(mutations, options));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import static com.google.cloud.spanner.SessionImpl.NO_CHANNEL_HINT;
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
Expand All @@ -27,6 +28,10 @@
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.Transaction;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -92,6 +97,9 @@ void onError(SpannerException spannerException) {
// synchronizing, as it does not really matter exactly which error is set.
this.client.resourceNotFoundException.set((ResourceNotFoundException) spannerException);
}
// Mark multiplexed sessions for RW as unimplemented and fall back to regular sessions if
// UNIMPLEMENTED is returned.
this.client.maybeMarkUnimplementedForRW(spannerException);
}

@Override
Expand Down Expand Up @@ -164,6 +172,13 @@ public void close() {
/** The current multiplexed session that is used by this client. */
private final AtomicReference<ApiFuture<SessionReference>> multiplexedSessionReference;

@VisibleForTesting
/**
* The Transaction response returned by the BeginTransaction request with read-write when a
* multiplexed session is created during client initialization.
*/
private final SettableApiFuture<Transaction> readWriteBeginTransactionReferenceFuture;

/** The expiration date/time of the current multiplexed session. */
private final AtomicReference<Instant> expirationDate;

Expand All @@ -190,6 +205,12 @@ public void close() {
*/
private final AtomicBoolean unimplemented = new AtomicBoolean(false);

/**
* This flag is set to true if the server return UNIMPLEMENTED when a read-write transaction is
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
*/
@VisibleForTesting final AtomicBoolean unimplementedForRW = new AtomicBoolean(false);

MultiplexedSessionDatabaseClient(SessionClient sessionClient) {
this(sessionClient, Clock.systemUTC());
}
Expand Down Expand Up @@ -217,6 +238,7 @@ public void close() {
this.tracer = sessionClient.getSpanner().getTracer();
final SettableApiFuture<SessionReference> initialSessionReferenceFuture =
SettableApiFuture.create();
this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create();
this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture);
this.sessionClient.asyncCreateMultiplexedSession(
new SessionConsumer() {
Expand All @@ -226,6 +248,16 @@ public void onSessionReady(SessionImpl session) {
// only start the maintainer if we actually managed to create a session in the first
// place.
maintainer.start();

// initiate a begin transaction request to verify if read-write transactions are
// supported using multiplexed sessions.
if (sessionClient
.getSpanner()
.getOptions()
.getSessionPoolOptions()
.getUseMultiplexedSessionForRW()) {
verifyBeginTransactionWithRWOnMultiplexedSession(session.getName());
}
}

@Override
Expand Down Expand Up @@ -267,6 +299,59 @@ private void maybeMarkUnimplemented(Throwable t) {
}
}

private void maybeMarkUnimplementedForRW(SpannerException spannerException) {
if (spannerException.getErrorCode() == ErrorCode.UNIMPLEMENTED
// && spannerException.getCause().getMessage().contains("Transaction type read_write not
// supported with multiplexed sessions")
) {
unimplementedForRW.set(true);
}
}

private void verifyBeginTransactionWithRWOnMultiplexedSession(String sessionName) {
// annotate the explict BeginTransactionRequest with a transaction tag
// "multiplexed-rw-background-begin-txn" to avoid storing this request on mock spanner.
// this is to safeguard other mock spanner tests whose BeginTransaction request count will
// otherwise increase by 1. Modifying the unit tests do not seem valid since this code is
// temporary and will be removed once the read-write on multiplexed session looks stable at
// backend.
BeginTransactionRequest.Builder requestBuilder =
BeginTransactionRequest.newBuilder()
.setSession(sessionName)
.setOptions(
SessionImpl.createReadWriteTransactionOptions(
Options.fromTransactionOptions(), /* previousTransactionId = */ null))
.setRequestOptions(
RequestOptions.newBuilder()
.setTransactionTag("multiplexed-rw-background-begin-txn")
.build());
final BeginTransactionRequest request = requestBuilder.build();
final ApiFuture<Transaction> requestFuture;
requestFuture =
sessionClient
.getSpanner()
.getRpc()
.beginTransactionAsync(request, /* options = */ null, /* routeToLeader = */ true);
requestFuture.addListener(
() -> {
try {
Transaction txn = requestFuture.get();
if (txn.getId().isEmpty()) {
throw newSpannerException(
ErrorCode.INTERNAL, "Missing id in transaction\n" + sessionName);
}
readWriteBeginTransactionReferenceFuture.set(txn);
} catch (Exception e) {
SpannerException spannerException = SpannerExceptionFactory.newSpannerException(e);
// Mark multiplexed sessions for RW as unimplemented and fall back to regular sessions
// if UNIMPLEMENTED is returned.
maybeMarkUnimplementedForRW(spannerException);
readWriteBeginTransactionReferenceFuture.setException(e);
}
},
MoreExecutors.directExecutor());
}

boolean isValid() {
return resourceNotFoundException.get() == null;
}
Expand All @@ -283,6 +368,10 @@ boolean isMultiplexedSessionsSupported() {
return !this.unimplemented.get();
}

boolean isMultiplexedSessionsForRWSupported() {
return !this.unimplementedForRW.get();
}

void close() {
synchronized (this) {
if (!this.isClosed) {
Expand All @@ -308,6 +397,17 @@ SessionReference getCurrentSessionReference() {
}
}

@VisibleForTesting
Transaction getReadWriteBeginTransactionReference() {
try {
return this.readWriteBeginTransactionReferenceFuture.get();
} catch (ExecutionException executionException) {
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
} catch (InterruptedException interruptedException) {
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
}
}

/**
* Returns true if the multiplexed session has been created. This client can be used before the
* session has been created, and will in that case use a delayed transaction that contains a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1872,7 +1872,16 @@ private Transaction getTemporaryTransactionOrNull(TransactionSelector tx) {
@Override
public void beginTransaction(
BeginTransactionRequest request, StreamObserver<Transaction> responseObserver) {
requests.add(request);
// Skip storing the explicit BeginTransactionRequest used to verify read-write transaction
// server availability on multiplexed sessions.
// This code will be removed once read-write multiplexed sessions are stable on the backend,
// hence the temporary trade-off.
if (!request
.getRequestOptions()
.getTransactionTag()
.equals("multiplexed-rw-background-begin-txn")) {
requests.add(request);
}
Preconditions.checkNotNull(request.getSession());
Session session = getSession(request.getSession());
if (session == null) {
Expand Down
Loading
Loading