Skip to content

Commit adfd072

Browse files
committed
chore(Spanner): fix mutation only case ibn rw using mux with aborted errors
1 parent 8d295c4 commit adfd072

File tree

7 files changed

+357
-11
lines changed

7 files changed

+357
-11
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,16 @@ public TransactionContextFutureImpl beginAsync() {
8282
private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
8383
txnState = TransactionState.STARTED;
8484

85+
boolean isMutationsOnlyTransaction = false;
86+
8587
// Determine the latest transactionId when using a multiplexed session.
8688
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
8789
if (txn != null && session.getIsMultiplexed() && !firstAttempt) {
8890
// Use the current transactionId if available, otherwise fallback to the previous aborted
8991
// transactionId.
9092
multiplexedSessionPreviousTransactionId =
9193
txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
94+
isMutationsOnlyTransaction = txn.mutationsOnlyTransaction;
9295
}
9396

9497
txn =
@@ -99,7 +102,17 @@ private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
99102
}
100103
final SettableApiFuture<TransactionContext> res = SettableApiFuture.create();
101104
final ApiFuture<Void> fut;
102-
if (firstAttempt) {
105+
106+
/*
107+
If the transaction contains only mutations and is using a multiplexed session, perform a
108+
`BeginTransaction` after the user operation completes during a retry.
109+
110+
This ensures that a random mutation from the mutations list is chosen when invoking
111+
`BeginTransaction`. If `BeginTransaction` is performed before the user operation,
112+
the mutations are not sent, and the precommit token is not received, resulting in
113+
an INVALID_ARGUMENT error (missing precommit token) during commit.
114+
*/
115+
if (firstAttempt || isMutationsOnlyTransaction) {
103116
fut = ApiFutures.immediateFuture(null);
104117
} else {
105118
fut = txn.ensureTxnAsync();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -253,10 +253,15 @@ public void onSessionReady(SessionImpl session) {
253253
// initiate a begin transaction request to verify if read-write transactions are
254254
// supported using multiplexed sessions.
255255
if (sessionClient
256-
.getSpanner()
257-
.getOptions()
258-
.getSessionPoolOptions()
259-
.getUseMultiplexedSessionForRW()) {
256+
.getSpanner()
257+
.getOptions()
258+
.getSessionPoolOptions()
259+
.getUseMultiplexedSessionForRW()
260+
&& !sessionClient
261+
.getSpanner()
262+
.getOptions()
263+
.getSessionPoolOptions()
264+
.getSkipVerifyBeginTransactionForMuxRW()) {
260265
verifyBeginTransactionWithRWOnMultiplexedSessionAsync(session.getName());
261266
}
262267
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public class SessionPoolOptions {
8383

8484
// TODO: Change to use java.time.Duration.
8585
private final Duration multiplexedSessionMaintenanceDuration;
86+
private final boolean skipVerifyingBeginTransactionForMuxRW;
8687

8788
private SessionPoolOptions(Builder builder) {
8889
// minSessions > maxSessions is only possible if the user has only set a value for maxSessions.
@@ -132,6 +133,7 @@ private SessionPoolOptions(Builder builder) {
132133
? useMultiplexedSessionFromEnvVariablePartitionedOps
133134
: builder.useMultiplexedSessionPartitionedOps;
134135
this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration;
136+
this.skipVerifyingBeginTransactionForMuxRW = builder.skipVerifyingBeginTransactionForMuxRW;
135137
}
136138

137139
@Override
@@ -169,8 +171,10 @@ public boolean equals(Object o) {
169171
&& Objects.equals(this.useMultiplexedSession, other.useMultiplexedSession)
170172
&& Objects.equals(this.useMultiplexedSessionForRW, other.useMultiplexedSessionForRW)
171173
&& Objects.equals(
172-
this.multiplexedSessionMaintenanceDuration,
173-
other.multiplexedSessionMaintenanceDuration);
174+
this.multiplexedSessionMaintenanceDuration, other.multiplexedSessionMaintenanceDuration)
175+
&& Objects.equals(
176+
this.skipVerifyingBeginTransactionForMuxRW,
177+
other.skipVerifyingBeginTransactionForMuxRW);
174178
}
175179

176180
@Override
@@ -199,7 +203,8 @@ public int hashCode() {
199203
this.poolMaintainerClock,
200204
this.useMultiplexedSession,
201205
this.useMultiplexedSessionForRW,
202-
this.multiplexedSessionMaintenanceDuration);
206+
this.multiplexedSessionMaintenanceDuration,
207+
this.skipVerifyingBeginTransactionForMuxRW);
203208
}
204209

205210
public Builder toBuilder() {
@@ -392,6 +397,12 @@ Duration getMultiplexedSessionMaintenanceDuration() {
392397
return multiplexedSessionMaintenanceDuration;
393398
}
394399

400+
@VisibleForTesting
401+
@InternalApi
402+
boolean getSkipVerifyBeginTransactionForMuxRW() {
403+
return skipVerifyingBeginTransactionForMuxRW;
404+
}
405+
395406
public static Builder newBuilder() {
396407
return new Builder();
397408
}
@@ -607,6 +618,7 @@ public static class Builder {
607618

608619
private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7);
609620
private Clock poolMaintainerClock = Clock.INSTANCE;
621+
private boolean skipVerifyingBeginTransactionForMuxRW = false;
610622

611623
private static Position getReleaseToPositionFromSystemProperty() {
612624
// NOTE: This System property is a beta feature. Support for it can be removed in the future.
@@ -650,6 +662,7 @@ private Builder(SessionPoolOptions options) {
650662
this.useMultiplexedSessionPartitionedOps = options.useMultiplexedSessionForPartitionedOps;
651663
this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration;
652664
this.poolMaintainerClock = options.poolMaintainerClock;
665+
this.skipVerifyingBeginTransactionForMuxRW = options.skipVerifyingBeginTransactionForMuxRW;
653666
}
654667

655668
/**
@@ -872,6 +885,13 @@ Builder setMultiplexedSessionMaintenanceDuration(
872885
return this;
873886
}
874887

888+
@VisibleForTesting
889+
Builder setSkipVerifyingBeginTransactionForMuxRW(
890+
boolean skipVerifyingBeginTransactionForMuxRW) {
891+
this.skipVerifyingBeginTransactionForMuxRW = skipVerifyingBeginTransactionForMuxRW;
892+
return this;
893+
}
894+
875895
/**
876896
* Sets whether the client should automatically execute a background query to detect the dialect
877897
* that is used by the database or not. Set this option to true if you do not know what the

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,16 @@ public TransactionContext resetForRetry() {
102102
"resetForRetry can only be called if the previous attempt" + " aborted");
103103
}
104104
try (IScope s = tracer.withSpan(span)) {
105-
boolean useInlinedBegin = txn.transactionId != null;
105+
/*
106+
If the transaction contains only mutations and is using a multiplexed session, perform a
107+
`BeginTransaction` after the user operation completes during a retry.
108+
109+
This ensures that a random mutation from the mutations list is chosen when invoking
110+
`BeginTransaction`. If `BeginTransaction` is performed before the user operation,
111+
the mutations are not sent, and the precommit token is not received, resulting in
112+
an INVALID_ARGUMENT error (missing precommit token) during commit.
113+
*/
114+
boolean useInlinedBegin = txn.mutationsOnlyTransaction || txn.transactionId != null;
106115

107116
// Determine the latest transactionId when using a multiplexed session.
108117
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,9 @@ public void removeListener(Runnable listener) {
222222

223223
private final Map<SpannerRpc.Option, ?> channelHint;
224224

225+
// This field indicates whether the read-write transaction contains only mutation operations.
226+
boolean mutationsOnlyTransaction = false;
227+
225228
private TransactionContextImpl(Builder builder) {
226229
super(builder);
227230
this.transactionId = builder.transactionId;
@@ -402,6 +405,11 @@ ApiFuture<CommitResponse> commitAsync() {
402405
synchronized (lock) {
403406
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
404407
finishOps = SettableApiFuture.create();
408+
// At this point, it is ensured that the transaction contains only mutations. Adding a
409+
// safeguard to apply this only for multiplexed sessions.
410+
if (session.getIsMultiplexed()) {
411+
mutationsOnlyTransaction = true;
412+
}
405413
createTxnAsync(finishOps, randomMutation);
406414
} else {
407415
finishOps = finishedAsyncOperations;
@@ -1229,7 +1237,16 @@ private <T> T runInternal(final TransactionCallable<T> txCallable) {
12291237
if (attempt.get() > 0) {
12301238
// Do not inline the BeginTransaction during a retry if the initial attempt did not
12311239
// actually start a transaction.
1232-
useInlinedBegin = txn.transactionId != null;
1240+
/*
1241+
If the transaction contains only mutations and is using a multiplexed session, perform a
1242+
`BeginTransaction` after the user operation completes during a retry.
1243+
1244+
This ensures that a random mutation from the mutations list is chosen when invoking
1245+
`BeginTransaction`. If `BeginTransaction` is performed before the user operation,
1246+
the mutations are not sent, and the precommit token is not received, resulting in
1247+
an INVALID_ARGUMENT error (missing precommit token) during commit.
1248+
*/
1249+
useInlinedBegin = txn.mutationsOnlyTransaction || txn.transactionId != null;
12331250

12341251
// Determine the latest transactionId when using a multiplexed session.
12351252
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1914,7 +1914,8 @@ private Transaction beginTransaction(
19141914
}
19151915
if (session.getMultiplexed()
19161916
&& options.getModeCase() == ModeCase.READ_WRITE
1917-
&& mutationKey != null) {
1917+
&& mutationKey != null
1918+
&& mutationKey != com.google.spanner.v1.Mutation.getDefaultInstance()) {
19181919
// Mutation only case in a read-write transaction.
19191920
builder.setPrecommitToken(getTransactionPrecommitToken(transactionId));
19201921
}
@@ -2013,6 +2014,12 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
20132014
return;
20142015
}
20152016
sessionLastUsed.put(session.getName(), Instant.now());
2017+
if (session.getMultiplexed() && !request.hasPrecommitToken()) {
2018+
throw Status.INVALID_ARGUMENT
2019+
.withDescription(
2020+
"A Commit request for a read-write transaction on a multiplexed session must specify a precommit token.")
2021+
.asRuntimeException();
2022+
}
20162023
try {
20172024
commitExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
20182025
// Find or start a transaction

0 commit comments

Comments
 (0)