Skip to content

Commit 8cc2b32

Browse files
committed
chore(spanner): refactor code for lock order
1 parent 4035c84 commit 8cc2b32

File tree

4 files changed

+45
-28
lines changed

4 files changed

+45
-28
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,17 @@ public TransactionContextFutureImpl beginAsync() {
7878

7979
private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
8080
txnState = TransactionState.STARTED;
81-
ByteString previousAbortedTransactionId =
82-
!firstAttempt && session.getIsMultiplexed() ? txn.transactionId : null;
83-
txn = session.newTransaction(options, previousAbortedTransactionId);
81+
82+
// Determine the latest transactionId when using a multiplexed session.
83+
ByteString multiplexedSessionPreviousTransactionId = null;
84+
if (session.getIsMultiplexed() && !firstAttempt) {
85+
// Use the current transactionId if available, otherwise fallback to the previous aborted
86+
// transactionId.
87+
multiplexedSessionPreviousTransactionId =
88+
txn.transactionId != null ? txn.transactionId : txn.previousTransactionId;
89+
}
90+
91+
txn = session.newTransaction(options, multiplexedSessionPreviousTransactionId);
8492
if (firstAttempt) {
8593
session.setActive(this);
8694
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ static void throwIfTransactionsPending() {
7070
}
7171

7272
static TransactionOptions createReadWriteTransactionOptions(
73-
Options options, ByteString previousAbortedTransactionId) {
73+
Options options, ByteString previousTransactionId) {
7474
TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder();
7575
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
7676
transactionOptions.setExcludeTxnFromChangeStreams(true);
@@ -79,10 +79,10 @@ static TransactionOptions createReadWriteTransactionOptions(
7979
if (options.withOptimisticLock() == Boolean.TRUE) {
8080
readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC);
8181
}
82-
if (previousAbortedTransactionId != null
83-
&& previousAbortedTransactionId != com.google.protobuf.ByteString.EMPTY) {
82+
if (previousTransactionId != null
83+
&& previousTransactionId != com.google.protobuf.ByteString.EMPTY) {
8484
// TODO(sriharshach): uncomment this when multiplexed session R/W proto is published
85-
// readWrite.setMultiplexedSessionPreviousTransactionId(previousAbortedTransactionId);
85+
// readWrite.setMultiplexedSessionPreviousTransactionId(previousTransactionId);
8686
}
8787
transactionOptions.setReadWrite(readWrite);
8888
return transactionOptions.build();
@@ -436,14 +436,14 @@ ApiFuture<ByteString> beginTransactionAsync(
436436
Options transactionOptions,
437437
boolean routeToLeader,
438438
Map<SpannerRpc.Option, ?> channelHint,
439-
ByteString previousAbortedTransactionId) {
439+
ByteString previousTransactionId) {
440440
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
441441
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
442442
final BeginTransactionRequest request =
443443
BeginTransactionRequest.newBuilder()
444444
.setSession(getName())
445445
.setOptions(
446-
createReadWriteTransactionOptions(transactionOptions, previousAbortedTransactionId))
446+
createReadWriteTransactionOptions(transactionOptions, previousTransactionId))
447447
.build();
448448
final ApiFuture<Transaction> requestFuture;
449449
try (IScope ignore = tracer.withSpan(span)) {
@@ -479,12 +479,12 @@ ApiFuture<ByteString> beginTransactionAsync(
479479
return res;
480480
}
481481

482-
TransactionContextImpl newTransaction(Options options, ByteString previousAbortedTransactionId) {
482+
TransactionContextImpl newTransaction(Options options, ByteString previousTransactionId) {
483483
return TransactionContextImpl.newBuilder()
484484
.setSession(this)
485485
.setOptions(options)
486486
.setTransactionId(null)
487-
.setPreviousAbortedTransactionId(previousAbortedTransactionId)
487+
.setPreviousTransactionId(previousTransactionId)
488488
.setOptions(options)
489489
.setTrackTransactionStarter(spanner.getOptions().isTrackTransactionStarter())
490490
.setRpc(spanner.getRpc())

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,15 @@ public TransactionContext resetForRetry() {
104104
try (IScope s = tracer.withSpan(span)) {
105105
boolean useInlinedBegin = txn.transactionId != null;
106106

107-
ByteString previousAbortedTransactionId =
108-
session.getIsMultiplexed() ? txn.transactionId : null;
109-
txn = session.newTransaction(options, previousAbortedTransactionId);
107+
// Determine the latest transactionId when using a multiplexed session.
108+
ByteString multiplexedSessionPreviousTransactionId = null;
109+
if (session.getIsMultiplexed()) {
110+
// Use the current transactionId if available, otherwise fallback to the previous aborted
111+
// transactionId.
112+
multiplexedSessionPreviousTransactionId =
113+
txn.transactionId != null ? txn.transactionId : txn.previousTransactionId;
114+
}
115+
txn = session.newTransaction(options, multiplexedSessionPreviousTransactionId);
110116
if (!useInlinedBegin) {
111117
txn.ensureTxn();
112118
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ static class Builder extends AbstractReadContext.Builder<Builder, TransactionCon
9595
private ByteString transactionId;
9696
// This field is set only when the transaction is created during a retry and uses a
9797
// multiplexed session.
98-
private ByteString previousAbortedTransactionId;
98+
private ByteString previousTransactionId;
9999
private Options options;
100100
private boolean trackTransactionStarter;
101101

@@ -121,8 +121,8 @@ Builder setTrackTransactionStarter(boolean trackTransactionStarter) {
121121
return self();
122122
}
123123

124-
Builder setPreviousAbortedTransactionId(ByteString previousAbortedTransactionId) {
125-
this.previousAbortedTransactionId = previousAbortedTransactionId;
124+
Builder setPreviousTransactionId(ByteString previousTransactionId) {
125+
this.previousTransactionId = previousTransactionId;
126126
return self();
127127
}
128128

@@ -209,7 +209,7 @@ public void removeListener(Runnable listener) {
209209

210210
volatile ByteString transactionId;
211211

212-
ByteString previousAbortedTransactionId;
212+
ByteString previousTransactionId;
213213

214214
private CommitResponse commitResponse;
215215
private final Clock clock;
@@ -226,7 +226,7 @@ private TransactionContextImpl(Builder builder) {
226226
this.channelHint =
227227
getChannelHintOptions(
228228
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
229-
this.previousAbortedTransactionId = builder.previousAbortedTransactionId;
229+
this.previousTransactionId = builder.previousTransactionId;
230230
}
231231

232232
@Override
@@ -295,10 +295,7 @@ private void createTxnAsync(final SettableApiFuture<Void> res) {
295295
span.addAnnotation("Creating Transaction");
296296
final ApiFuture<ByteString> fut =
297297
session.beginTransactionAsync(
298-
options,
299-
isRouteToLeader(),
300-
getTransactionChannelHint(),
301-
previousAbortedTransactionId);
298+
options, isRouteToLeader(), getTransactionChannelHint(), previousTransactionId);
302299
fut.addListener(
303300
() -> {
304301
try {
@@ -574,8 +571,7 @@ TransactionSelector getTransactionSelector() {
574571
if (tx == null) {
575572
return TransactionSelector.newBuilder()
576573
.setBegin(
577-
SessionImpl.createReadWriteTransactionOptions(
578-
options, previousAbortedTransactionId))
574+
SessionImpl.createReadWriteTransactionOptions(options, previousTransactionId))
579575
.build();
580576
} else {
581577
// Wait for the transaction to come available. The tx.get() call will fail with an
@@ -1135,9 +1131,16 @@ private <T> T runInternal(final TransactionCallable<T> txCallable) {
11351131
// actually start a transaction.
11361132
useInlinedBegin = txn.transactionId != null;
11371133

1138-
ByteString previousAbortedTransactionId =
1139-
session.getIsMultiplexed() ? txn.transactionId : null;
1140-
txn = session.newTransaction(options, previousAbortedTransactionId);
1134+
// Determine the latest transactionId when using a multiplexed session.
1135+
ByteString multiplexedSessionPreviousTransactionId = null;
1136+
if (session.getIsMultiplexed()) {
1137+
// Use the current transactionId if available, otherwise fallback to the previous
1138+
// transactionId.
1139+
multiplexedSessionPreviousTransactionId =
1140+
txn.transactionId != null ? txn.transactionId : txn.previousTransactionId;
1141+
}
1142+
1143+
txn = session.newTransaction(options, multiplexedSessionPreviousTransactionId);
11411144
}
11421145
checkState(
11431146
isValid, "TransactionRunner has been invalidated by a new operation on the session");

0 commit comments

Comments
 (0)