Skip to content

Commit 144f706

Browse files
committed
chore(spanner): track previous transaction id incase of retry during aborted transaction
1 parent 254b2f5 commit 144f706

File tree

4 files changed

+49
-11
lines changed

4 files changed

+49
-11
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.google.common.base.MoreObjects;
2929
import com.google.common.base.Preconditions;
3030
import com.google.common.util.concurrent.MoreExecutors;
31+
import com.google.protobuf.ByteString;
3132

3233
/** Implementation of {@link AsyncTransactionManager}. */
3334
final class AsyncTransactionManagerImpl
@@ -77,7 +78,9 @@ public TransactionContextFutureImpl beginAsync() {
7778

7879
private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
7980
txnState = TransactionState.STARTED;
80-
txn = session.newTransaction(options);
81+
ByteString previousAbortedTransactionId =
82+
!firstAttempt && session.getIsMultiplexed() ? txn.transactionId : null;
83+
txn = session.newTransaction(options, previousAbortedTransactionId);
8184
if (firstAttempt) {
8285
session.setActive(this);
8386
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ static void throwIfTransactionsPending() {
6969
}
7070
}
7171

72-
static TransactionOptions createReadWriteTransactionOptions(Options options) {
72+
static TransactionOptions createReadWriteTransactionOptions(
73+
Options options, ByteString previousAbortedTransactionId) {
7374
TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder();
7475
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
7576
transactionOptions.setExcludeTxnFromChangeStreams(true);
@@ -78,6 +79,11 @@ static TransactionOptions createReadWriteTransactionOptions(Options options) {
7879
if (options.withOptimisticLock() == Boolean.TRUE) {
7980
readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC);
8081
}
82+
if (previousAbortedTransactionId != null
83+
&& previousAbortedTransactionId != com.google.protobuf.ByteString.EMPTY) {
84+
// TODO(sriharshach): uncomment this when multiplexed session R/W proto is published
85+
// readWrite.setMultiplexedSessionPreviousTransactionId(previousAbortedTransactionId);
86+
}
8187
transactionOptions.setReadWrite(readWrite);
8288
return transactionOptions.build();
8389
}
@@ -427,13 +433,17 @@ public void close() {
427433
}
428434

429435
ApiFuture<ByteString> beginTransactionAsync(
430-
Options transactionOptions, boolean routeToLeader, Map<SpannerRpc.Option, ?> channelHint) {
436+
Options transactionOptions,
437+
boolean routeToLeader,
438+
Map<SpannerRpc.Option, ?> channelHint,
439+
ByteString previousAbortedTransactionId) {
431440
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
432441
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
433442
final BeginTransactionRequest request =
434443
BeginTransactionRequest.newBuilder()
435444
.setSession(getName())
436-
.setOptions(createReadWriteTransactionOptions(transactionOptions))
445+
.setOptions(
446+
createReadWriteTransactionOptions(transactionOptions, previousAbortedTransactionId))
437447
.build();
438448
final ApiFuture<Transaction> requestFuture;
439449
try (IScope ignore = tracer.withSpan(span)) {
@@ -469,11 +479,12 @@ ApiFuture<ByteString> beginTransactionAsync(
469479
return res;
470480
}
471481

472-
TransactionContextImpl newTransaction(Options options) {
482+
TransactionContextImpl newTransaction(Options options, ByteString previousAbortedTransactionId) {
473483
return TransactionContextImpl.newBuilder()
474484
.setSession(this)
475485
.setOptions(options)
476486
.setTransactionId(null)
487+
.setPreviousAbortedTransactionId(previousAbortedTransactionId)
477488
.setOptions(options)
478489
.setTrackTransactionStarter(spanner.getOptions().isTrackTransactionStarter())
479490
.setRpc(spanner.getRpc())

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.cloud.spanner.Options.TransactionOption;
2121
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
2222
import com.google.common.base.Preconditions;
23+
import com.google.protobuf.ByteString;
2324

2425
/** Implementation of {@link TransactionManager}. */
2526
final class TransactionManagerImpl implements TransactionManager, SessionTransaction {
@@ -53,7 +54,7 @@ public void setSpan(ISpan span) {
5354
public TransactionContext begin() {
5455
Preconditions.checkState(txn == null, "begin can only be called once");
5556
try (IScope s = tracer.withSpan(span)) {
56-
txn = session.newTransaction(options);
57+
txn = session.newTransaction(options, null);
5758
session.setActive(this);
5859
txnState = TransactionState.STARTED;
5960
return txn;
@@ -102,7 +103,10 @@ public TransactionContext resetForRetry() {
102103
}
103104
try (IScope s = tracer.withSpan(span)) {
104105
boolean useInlinedBegin = txn.transactionId != null;
105-
txn = session.newTransaction(options);
106+
107+
ByteString previousAbortedTransactionId =
108+
session.getIsMultiplexed() ? txn.transactionId : null;
109+
txn = session.newTransaction(options, previousAbortedTransactionId);
106110
if (!useInlinedBegin) {
107111
txn.ensureTxn();
108112
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ static class Builder extends AbstractReadContext.Builder<Builder, TransactionCon
9393

9494
private Clock clock = new Clock();
9595
private ByteString transactionId;
96+
// This field is set only when the transaction is created during a retry and uses a
97+
// multiplexed session.
98+
private ByteString previousAbortedTransactionId;
9699
private Options options;
97100
private boolean trackTransactionStarter;
98101

@@ -118,6 +121,11 @@ Builder setTrackTransactionStarter(boolean trackTransactionStarter) {
118121
return self();
119122
}
120123

124+
Builder setPreviousAbortedTransactionId(ByteString previousAbortedTransactionId) {
125+
this.previousAbortedTransactionId = previousAbortedTransactionId;
126+
return self();
127+
}
128+
121129
@Override
122130
TransactionContextImpl build() {
123131
Preconditions.checkState(this.options != null, "Options must be set");
@@ -201,6 +209,8 @@ public void removeListener(Runnable listener) {
201209

202210
volatile ByteString transactionId;
203211

212+
final ByteString previousAbortedTransactionId;
213+
204214
private CommitResponse commitResponse;
205215
private final Clock clock;
206216

@@ -216,6 +226,7 @@ private TransactionContextImpl(Builder builder) {
216226
this.channelHint =
217227
getChannelHintOptions(
218228
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
229+
this.previousAbortedTransactionId = builder.previousAbortedTransactionId;
219230
}
220231

221232
@Override
@@ -283,7 +294,11 @@ ApiFuture<Void> ensureTxnAsync() {
283294
private void createTxnAsync(final SettableApiFuture<Void> res) {
284295
span.addAnnotation("Creating Transaction");
285296
final ApiFuture<ByteString> fut =
286-
session.beginTransactionAsync(options, isRouteToLeader(), getTransactionChannelHint());
297+
session.beginTransactionAsync(
298+
options,
299+
isRouteToLeader(),
300+
getTransactionChannelHint(),
301+
previousAbortedTransactionId);
287302
fut.addListener(
288303
() -> {
289304
try {
@@ -558,7 +573,9 @@ TransactionSelector getTransactionSelector() {
558573
}
559574
if (tx == null) {
560575
return TransactionSelector.newBuilder()
561-
.setBegin(SessionImpl.createReadWriteTransactionOptions(options))
576+
.setBegin(
577+
SessionImpl.createReadWriteTransactionOptions(
578+
options, previousAbortedTransactionId))
562579
.build();
563580
} else {
564581
// Wait for the transaction to come available. The tx.get() call will fail with an
@@ -1079,7 +1096,7 @@ public TransactionRunner allowNestedTransaction() {
10791096
TransactionRunnerImpl(SessionImpl session, TransactionOption... options) {
10801097
this.session = session;
10811098
this.options = Options.fromTransactionOptions(options);
1082-
this.txn = session.newTransaction(this.options);
1099+
this.txn = session.newTransaction(this.options, null);
10831100
this.tracer = session.getTracer();
10841101
}
10851102

@@ -1117,7 +1134,10 @@ private <T> T runInternal(final TransactionCallable<T> txCallable) {
11171134
// Do not inline the BeginTransaction during a retry if the initial attempt did not
11181135
// actually start a transaction.
11191136
useInlinedBegin = txn.transactionId != null;
1120-
txn = session.newTransaction(options);
1137+
1138+
ByteString previousAbortedTransactionId =
1139+
session.getIsMultiplexed() ? txn.transactionId : null;
1140+
txn = session.newTransaction(options, previousAbortedTransactionId);
11211141
}
11221142
checkState(
11231143
isValid, "TransactionRunner has been invalidated by a new operation on the session");

0 commit comments

Comments
 (0)