Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,16 @@ public TransactionContextFutureImpl beginAsync() {
private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
txnState = TransactionState.STARTED;

boolean isMutationsOnlyTransaction = false;

// Determine the latest transactionId when using a multiplexed session.
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
if (txn != null && session.getIsMultiplexed() && !firstAttempt) {
// Use the current transactionId if available, otherwise fallback to the previous aborted
// transactionId.
multiplexedSessionPreviousTransactionId =
txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
isMutationsOnlyTransaction = txn.mutationsOnlyTransaction;
}

txn =
Expand All @@ -99,7 +102,17 @@ private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
}
final SettableApiFuture<TransactionContext> res = SettableApiFuture.create();
final ApiFuture<Void> fut;
if (firstAttempt) {

/*
If the transaction contains only mutations and is using a multiplexed session, perform a
`BeginTransaction` after the user operation completes during a retry.

This ensures that a random mutation from the mutations list is chosen when invoking
`BeginTransaction`. If `BeginTransaction` is performed before the user operation,
the mutations are not sent, and the precommit token is not received, resulting in
an INVALID_ARGUMENT error (missing precommit token) during commit.
*/
if (firstAttempt || isMutationsOnlyTransaction) {
fut = ApiFutures.immediateFuture(null);
} else {
fut = txn.ensureTxnAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,15 @@ public void onSessionReady(SessionImpl session) {
// initiate a begin transaction request to verify if read-write transactions are
// supported using multiplexed sessions.
if (sessionClient
.getSpanner()
.getOptions()
.getSessionPoolOptions()
.getUseMultiplexedSessionForRW()) {
.getSpanner()
.getOptions()
.getSessionPoolOptions()
.getUseMultiplexedSessionForRW()
&& !sessionClient
.getSpanner()
.getOptions()
.getSessionPoolOptions()
.getSkipVerifyBeginTransactionForMuxRW()) {
verifyBeginTransactionWithRWOnMultiplexedSessionAsync(session.getName());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class SessionPoolOptions {

// TODO: Change to use java.time.Duration.
private final Duration multiplexedSessionMaintenanceDuration;
private final boolean skipVerifyingBeginTransactionForMuxRW;

private SessionPoolOptions(Builder builder) {
// minSessions > maxSessions is only possible if the user has only set a value for maxSessions.
Expand Down Expand Up @@ -132,6 +133,7 @@ private SessionPoolOptions(Builder builder) {
? useMultiplexedSessionFromEnvVariablePartitionedOps
: builder.useMultiplexedSessionPartitionedOps;
this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration;
this.skipVerifyingBeginTransactionForMuxRW = builder.skipVerifyingBeginTransactionForMuxRW;
}

@Override
Expand Down Expand Up @@ -169,8 +171,10 @@ public boolean equals(Object o) {
&& Objects.equals(this.useMultiplexedSession, other.useMultiplexedSession)
&& Objects.equals(this.useMultiplexedSessionForRW, other.useMultiplexedSessionForRW)
&& Objects.equals(
this.multiplexedSessionMaintenanceDuration,
other.multiplexedSessionMaintenanceDuration);
this.multiplexedSessionMaintenanceDuration, other.multiplexedSessionMaintenanceDuration)
&& Objects.equals(
this.skipVerifyingBeginTransactionForMuxRW,
other.skipVerifyingBeginTransactionForMuxRW);
}

@Override
Expand Down Expand Up @@ -199,7 +203,8 @@ public int hashCode() {
this.poolMaintainerClock,
this.useMultiplexedSession,
this.useMultiplexedSessionForRW,
this.multiplexedSessionMaintenanceDuration);
this.multiplexedSessionMaintenanceDuration,
this.skipVerifyingBeginTransactionForMuxRW);
}

public Builder toBuilder() {
Expand Down Expand Up @@ -392,6 +397,12 @@ Duration getMultiplexedSessionMaintenanceDuration() {
return multiplexedSessionMaintenanceDuration;
}

@VisibleForTesting
@InternalApi
boolean getSkipVerifyBeginTransactionForMuxRW() {
return skipVerifyingBeginTransactionForMuxRW;
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down Expand Up @@ -607,6 +618,7 @@ public static class Builder {

private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7);
private Clock poolMaintainerClock = Clock.INSTANCE;
private boolean skipVerifyingBeginTransactionForMuxRW = false;

private static Position getReleaseToPositionFromSystemProperty() {
// NOTE: This System property is a beta feature. Support for it can be removed in the future.
Expand Down Expand Up @@ -650,6 +662,7 @@ private Builder(SessionPoolOptions options) {
this.useMultiplexedSessionPartitionedOps = options.useMultiplexedSessionForPartitionedOps;
this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration;
this.poolMaintainerClock = options.poolMaintainerClock;
this.skipVerifyingBeginTransactionForMuxRW = options.skipVerifyingBeginTransactionForMuxRW;
}

/**
Expand Down Expand Up @@ -872,6 +885,13 @@ Builder setMultiplexedSessionMaintenanceDuration(
return this;
}

@VisibleForTesting
Builder setSkipVerifyingBeginTransactionForMuxRW(
boolean skipVerifyingBeginTransactionForMuxRW) {
this.skipVerifyingBeginTransactionForMuxRW = skipVerifyingBeginTransactionForMuxRW;
return this;
}

/**
* Sets whether the client should automatically execute a background query to detect the dialect
* that is used by the database or not. Set this option to true if you do not know what the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,16 @@ public TransactionContext resetForRetry() {
"resetForRetry can only be called if the previous attempt" + " aborted");
}
try (IScope s = tracer.withSpan(span)) {
boolean useInlinedBegin = txn.transactionId != null;
/*
If the transaction contains only mutations and is using a multiplexed session, perform a
`BeginTransaction` after the user operation completes during a retry.

This ensures that a random mutation from the mutations list is chosen when invoking
`BeginTransaction`. If `BeginTransaction` is performed before the user operation,
the mutations are not sent, and the precommit token is not received, resulting in
an INVALID_ARGUMENT error (missing precommit token) during commit.
*/
boolean useInlinedBegin = txn.mutationsOnlyTransaction || txn.transactionId != null;

// Determine the latest transactionId when using a multiplexed session.
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ public void removeListener(Runnable listener) {

private final Map<SpannerRpc.Option, ?> channelHint;

// This field indicates whether the read-write transaction contains only mutation operations.
boolean mutationsOnlyTransaction = false;

private TransactionContextImpl(Builder builder) {
super(builder);
this.transactionId = builder.transactionId;
Expand Down Expand Up @@ -402,6 +405,11 @@ ApiFuture<CommitResponse> commitAsync() {
synchronized (lock) {
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
finishOps = SettableApiFuture.create();
// At this point, it is ensured that the transaction contains only mutations. Adding a
// safeguard to apply this only for multiplexed sessions.
if (session.getIsMultiplexed()) {
mutationsOnlyTransaction = true;
}
createTxnAsync(finishOps, randomMutation);
} else {
finishOps = finishedAsyncOperations;
Expand Down Expand Up @@ -1229,7 +1237,16 @@ private <T> T runInternal(final TransactionCallable<T> txCallable) {
if (attempt.get() > 0) {
// Do not inline the BeginTransaction during a retry if the initial attempt did not
// actually start a transaction.
useInlinedBegin = txn.transactionId != null;
/*
If the transaction contains only mutations and is using a multiplexed session, perform a
`BeginTransaction` after the user operation completes during a retry.

This ensures that a random mutation from the mutations list is chosen when invoking
`BeginTransaction`. If `BeginTransaction` is performed before the user operation,
the mutations are not sent, and the precommit token is not received, resulting in
an INVALID_ARGUMENT error (missing precommit token) during commit.
*/
useInlinedBegin = txn.mutationsOnlyTransaction || txn.transactionId != null;

// Determine the latest transactionId when using a multiplexed session.
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1914,7 +1914,8 @@ private Transaction beginTransaction(
}
if (session.getMultiplexed()
&& options.getModeCase() == ModeCase.READ_WRITE
&& mutationKey != null) {
&& mutationKey != null
&& mutationKey != com.google.spanner.v1.Mutation.getDefaultInstance()) {
// Mutation only case in a read-write transaction.
builder.setPrecommitToken(getTransactionPrecommitToken(transactionId));
}
Expand Down Expand Up @@ -2013,6 +2014,14 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
return;
}
sessionLastUsed.put(session.getName(), Instant.now());
if (session.getMultiplexed()
&& !request.hasPrecommitToken()
&& !request.hasSingleUseTransaction()) {
throw Status.INVALID_ARGUMENT
.withDescription(
"A Commit request for a read-write transaction on a multiplexed session must specify a precommit token.")
.asRuntimeException();
}
try {
commitExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
// Find or start a transaction
Expand Down
Loading
Loading