Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,17 @@ class DelayedMultiplexedSessionTransaction extends AbstractMultiplexedSessionDat
private final ISpan span;

private final ApiFuture<SessionReference> sessionFuture;
private final SessionPool sessionPool;

DelayedMultiplexedSessionTransaction(
MultiplexedSessionDatabaseClient client,
ISpan span,
ApiFuture<SessionReference> sessionFuture) {
ApiFuture<SessionReference> sessionFuture,
SessionPool sessionPool) {
this.client = client;
this.span = span;
this.sessionFuture = sessionFuture;
this.sessionPool = sessionPool;
}

@Override
Expand Down Expand Up @@ -189,7 +192,12 @@ public TransactionRunner readWriteTransaction(TransactionOption... options) {
this.sessionFuture,
sessionReference ->
new MultiplexedSessionTransaction(
client, span, sessionReference, NO_CHANNEL_HINT, /* singleUse = */ false)
client,
span,
sessionReference,
NO_CHANNEL_HINT,
/* singleUse = */ false,
this.sessionPool)
.readWriteTransaction(options),
MoreExecutors.directExecutor()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SessionPool.SessionPoolTransactionRunner;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand All @@ -52,6 +53,89 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* {@link TransactionRunner} that automatically handles "UNIMPLEMENTED" errors with the message
* "Transaction type read_write not supported with multiplexed sessions" by switching from a
* multiplexed session to a regular session and then restarts the transaction.
*/
class MultiplexedSessionTransactionRunner implements TransactionRunner {
private final SessionPool sessionPool;
private final TransactionRunnerImpl transactionRunnerForMultiplexedSession;
private SessionPoolTransactionRunner transactionRunnerForRegularSession;
private final TransactionOption[] options;
private boolean isUsingMultiplexedSession = true;

public MultiplexedSessionTransactionRunner(
SessionImpl multiplexedSession, SessionPool sessionPool, TransactionOption... options) {
this.sessionPool = sessionPool;
this.transactionRunnerForMultiplexedSession =
new TransactionRunnerImpl(
multiplexedSession, options); // Uses multiplexed session initially
multiplexedSession.setActive(this.transactionRunnerForMultiplexedSession);
this.options = options;
}

private TransactionRunner getRunner() {
if (this.isUsingMultiplexedSession) {
return this.transactionRunnerForMultiplexedSession;
} else {
if (this.transactionRunnerForRegularSession == null) {
this.transactionRunnerForRegularSession =
new SessionPoolTransactionRunner<>(
sessionPool.getSession(),
sessionPool.getPooledSessionReplacementHandler(),
options);
}
return this.transactionRunnerForRegularSession;
}
}

@Override
public <T> T run(TransactionCallable<T> callable) {
while (true) {
try {
return getRunner().run(callable);
} catch (SpannerException e) {
if (e.getErrorCode() == ErrorCode.UNIMPLEMENTED
&& verifyUnimplementedErrorMessageForRWMux(e)) {
this.isUsingMultiplexedSession = false; // Fallback to regular session
} else {
throw e; // Other errors propagate
}
}
}
}

@Override
public Timestamp getCommitTimestamp() {
return getRunner().getCommitTimestamp();
}

@Override
public CommitResponse getCommitResponse() {
return getRunner().getCommitResponse();
}

@Override
public TransactionRunner allowNestedTransaction() {
getRunner().allowNestedTransaction();
return this;
}

private boolean verifyUnimplementedErrorMessageForRWMux(SpannerException spannerException) {
if (spannerException.getCause() == null) {
return false;
}
if (spannerException.getCause().getMessage() == null) {
return false;
}
return spannerException
.getCause()
.getMessage()
.contains("Transaction type read_write not supported with multiplexed sessions");
}
}

/**
* {@link DatabaseClient} implementation that uses a single multiplexed session to execute
* transactions.
Expand All @@ -75,18 +159,30 @@ static class MultiplexedSessionTransaction extends SessionImpl {
private final int singleUseChannelHint;

private boolean done;
private final SessionPool pool;

MultiplexedSessionTransaction(
MultiplexedSessionDatabaseClient client,
ISpan span,
SessionReference sessionReference,
int singleUseChannelHint,
boolean singleUse) {
this(client, span, sessionReference, singleUseChannelHint, singleUse, null);
}

MultiplexedSessionTransaction(
MultiplexedSessionDatabaseClient client,
ISpan span,
SessionReference sessionReference,
int singleUseChannelHint,
boolean singleUse,
SessionPool pool) {
super(client.sessionClient.getSpanner(), sessionReference, singleUseChannelHint);
this.client = client;
this.singleUse = singleUse;
this.singleUseChannelHint = singleUseChannelHint;
this.client.numSessionsAcquired.incrementAndGet();
this.pool = pool;
setCurrentSpan(span);
}

Expand Down Expand Up @@ -134,6 +230,11 @@ public CommitResponse writeAtLeastOnceWithOptions(
return response;
}

@Override
public TransactionRunner readWriteTransaction(TransactionOption... options) {
return new MultiplexedSessionTransactionRunner(this, pool, options);
}

@Override
void onTransactionDone() {
boolean markedDone = false;
Expand Down Expand Up @@ -225,6 +326,8 @@ public void close() {
*/
@VisibleForTesting final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean(false);

private SessionPool pool;

MultiplexedSessionDatabaseClient(SessionClient sessionClient) {
this(sessionClient, Clock.systemUTC());
}
Expand Down Expand Up @@ -299,6 +402,10 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
initialSessionReferenceFuture);
}

void setPool(SessionPool pool) {
this.pool = pool;
}

private static void maybeWaitForSessionCreation(
SessionPoolOptions sessionPoolOptions, ApiFuture<SessionReference> future) {
Duration waitDuration = sessionPoolOptions.getWaitForMinSessions();
Expand Down Expand Up @@ -489,7 +596,8 @@ private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
// any special handling of such errors.
multiplexedSessionReference.get().get(),
singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT,
singleUse);
singleUse,
this.pool);
} catch (ExecutionException executionException) {
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
} catch (InterruptedException interruptedException) {
Expand All @@ -499,7 +607,7 @@ private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(

private DelayedMultiplexedSessionTransaction createDelayedMultiplexSessionTransaction() {
return new DelayedMultiplexedSessionTransaction(
this, tracer.getCurrentSpan(), multiplexedSessionReference.get());
this, tracer.getCurrentSpan(), multiplexedSessionReference.get(), this.pool);
}

private int getSingleUseChannelHint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1004,15 +1004,15 @@ public TransactionState getState() {
* {@link TransactionRunner} that automatically handles {@link SessionNotFoundException}s by
* replacing the underlying session and then restarts the transaction.
*/
private static final class SessionPoolTransactionRunner<I extends SessionFuture>
static final class SessionPoolTransactionRunner<I extends SessionFuture>
implements TransactionRunner {

private I session;
private final SessionReplacementHandler<I> sessionReplacementHandler;
private final TransactionOption[] options;
private TransactionRunner runner;

private SessionPoolTransactionRunner(
SessionPoolTransactionRunner(
I session,
SessionReplacementHandler<I> sessionReplacementHandler,
TransactionOption... options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,12 @@ DatabaseClientImpl createDatabaseClient(
boolean useMultiplexedSessionPartitionedOps,
boolean useMultiplexedSessionForRW,
Attributes commonAttributes) {
if (multiplexedSessionClient != null) {
// Set the session pool in the multiplexed session client.
// This is required to handle fallback to regular sessions for in-progress transactions that
// use multiplexed sessions but fail with UNIMPLEMENTED errors.
multiplexedSessionClient.setPool(pool);
}
return new DatabaseClientImpl(
clientId,
pool,
Expand Down
Loading
Loading