Skip to content

Commit e5d0ca4

Browse files
authored
Merge branch 'main' into mux-rw-5
2 parents 12b7c04 + f370394 commit e5d0ca4

12 files changed

+406
-30
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ If you are using Maven without the BOM, add this to your dependencies:
4141
<dependency>
4242
<groupId>com.google.cloud</groupId>
4343
<artifactId>google-cloud-spanner</artifactId>
44-
<version>6.78.0</version>
44+
<version>6.79.0</version>
4545
</dependency>
4646

4747
```

generation_config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
gapic_generator_version: 2.47.0
2-
googleapis_commitish: 7a0dc39843032039878e26add4fed272a2c36807
2+
googleapis_commitish: a26064a9cc78d4518b8a9fd2ea78891edad4d87d
33
libraries_bom_version: 26.48.0
44
libraries:
55
- api_shortname: spanner

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.google.common.base.MoreObjects;
2929
import com.google.common.base.Preconditions;
3030
import com.google.common.util.concurrent.MoreExecutors;
31+
import com.google.protobuf.ByteString;
3132

3233
/** Implementation of {@link AsyncTransactionManager}. */
3334
final class AsyncTransactionManagerImpl
@@ -80,7 +81,19 @@ public TransactionContextFutureImpl beginAsync() {
8081

8182
private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
8283
txnState = TransactionState.STARTED;
83-
txn = session.newTransaction(options);
84+
85+
// Determine the latest transactionId when using a multiplexed session.
86+
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
87+
if (txn != null && session.getIsMultiplexed() && !firstAttempt) {
88+
// Use the current transactionId if available, otherwise fallback to the previous aborted
89+
// transactionId.
90+
multiplexedSessionPreviousTransactionId =
91+
txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
92+
}
93+
94+
txn =
95+
session.newTransaction(
96+
options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
8497
if (firstAttempt) {
8598
session.setActive(this);
8699
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class DatabaseClientImpl implements DatabaseClient {
5959
/* useMultiplexedSessionBlindWrite = */ false,
6060
/* multiplexedSessionDatabaseClient = */ null,
6161
tracer,
62-
false);
62+
/* useMultiplexedSessionForRW = */ false);
6363
}
6464

6565
DatabaseClientImpl(

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ static void throwIfTransactionsPending() {
6969
}
7070
}
7171

72-
static TransactionOptions createReadWriteTransactionOptions(Options options) {
72+
static TransactionOptions createReadWriteTransactionOptions(
73+
Options options, ByteString previousTransactionId) {
7374
TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder();
7475
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
7576
transactionOptions.setExcludeTxnFromChangeStreams(true);
@@ -78,6 +79,10 @@ static TransactionOptions createReadWriteTransactionOptions(Options options) {
7879
if (options.withOptimisticLock() == Boolean.TRUE) {
7980
readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC);
8081
}
82+
if (previousTransactionId != null
83+
&& previousTransactionId != com.google.protobuf.ByteString.EMPTY) {
84+
readWrite.setMultiplexedSessionPreviousTransactionId(previousTransactionId);
85+
}
8186
transactionOptions.setReadWrite(readWrite);
8287
return transactionOptions.build();
8388
}
@@ -427,13 +432,17 @@ public void close() {
427432
}
428433

429434
ApiFuture<ByteString> beginTransactionAsync(
430-
Options transactionOptions, boolean routeToLeader, Map<SpannerRpc.Option, ?> channelHint) {
435+
Options transactionOptions,
436+
boolean routeToLeader,
437+
Map<SpannerRpc.Option, ?> channelHint,
438+
ByteString previousTransactionId) {
431439
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
432440
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
433441
final BeginTransactionRequest request =
434442
BeginTransactionRequest.newBuilder()
435443
.setSession(getName())
436-
.setOptions(createReadWriteTransactionOptions(transactionOptions))
444+
.setOptions(
445+
createReadWriteTransactionOptions(transactionOptions, previousTransactionId))
437446
.build();
438447
final ApiFuture<Transaction> requestFuture;
439448
try (IScope ignore = tracer.withSpan(span)) {
@@ -469,11 +478,12 @@ ApiFuture<ByteString> beginTransactionAsync(
469478
return res;
470479
}
471480

472-
TransactionContextImpl newTransaction(Options options) {
481+
TransactionContextImpl newTransaction(Options options, ByteString previousTransactionId) {
473482
return TransactionContextImpl.newBuilder()
474483
.setSession(this)
475484
.setOptions(options)
476485
.setTransactionId(null)
486+
.setPreviousTransactionId(previousTransactionId)
477487
.setOptions(options)
478488
.setTrackTransactionStarter(spanner.getOptions().isTrackTransactionStarter())
479489
.setRpc(spanner.getRpc())

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.cloud.spanner.Options.TransactionOption;
2121
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
2222
import com.google.common.base.Preconditions;
23+
import com.google.protobuf.ByteString;
2324

2425
/** Implementation of {@link TransactionManager}. */
2526
final class TransactionManagerImpl implements TransactionManager, SessionTransaction {
@@ -53,7 +54,7 @@ public void setSpan(ISpan span) {
5354
public TransactionContext begin() {
5455
Preconditions.checkState(txn == null, "begin can only be called once");
5556
try (IScope s = tracer.withSpan(span)) {
56-
txn = session.newTransaction(options);
57+
txn = session.newTransaction(options, /* previousTransactionId = */ ByteString.EMPTY);
5758
session.setActive(this);
5859
txnState = TransactionState.STARTED;
5960
return txn;
@@ -102,7 +103,18 @@ public TransactionContext resetForRetry() {
102103
}
103104
try (IScope s = tracer.withSpan(span)) {
104105
boolean useInlinedBegin = txn.transactionId != null;
105-
txn = session.newTransaction(options);
106+
107+
// Determine the latest transactionId when using a multiplexed session.
108+
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
109+
if (session.getIsMultiplexed()) {
110+
// Use the current transactionId if available, otherwise fallback to the previous aborted
111+
// transactionId.
112+
multiplexedSessionPreviousTransactionId =
113+
txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
114+
}
115+
txn =
116+
session.newTransaction(
117+
options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
106118
if (!useInlinedBegin) {
107119
txn.ensureTxn();
108120
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ static class Builder extends AbstractReadContext.Builder<Builder, TransactionCon
9494

9595
private Clock clock = new Clock();
9696
private ByteString transactionId;
97+
// This field is set only when the transaction is created during a retry and uses a
98+
// multiplexed session.
99+
private ByteString previousTransactionId;
97100
private Options options;
98101
private boolean trackTransactionStarter;
99102

@@ -119,6 +122,11 @@ Builder setTrackTransactionStarter(boolean trackTransactionStarter) {
119122
return self();
120123
}
121124

125+
Builder setPreviousTransactionId(ByteString previousTransactionId) {
126+
this.previousTransactionId = previousTransactionId;
127+
return self();
128+
}
129+
122130
@Override
123131
TransactionContextImpl build() {
124132
Preconditions.checkState(this.options != null, "Options must be set");
@@ -207,6 +215,8 @@ public void removeListener(Runnable listener) {
207215

208216
volatile ByteString transactionId;
209217

218+
final ByteString previousTransactionId;
219+
210220
private CommitResponse commitResponse;
211221
private final Clock clock;
212222

@@ -222,6 +232,7 @@ private TransactionContextImpl(Builder builder) {
222232
this.channelHint =
223233
getChannelHintOptions(
224234
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
235+
this.previousTransactionId = builder.previousTransactionId;
225236
}
226237

227238
@Override
@@ -252,6 +263,10 @@ private void decreaseAsyncOperations() {
252263
}
253264
}
254265

266+
ByteString getPreviousTransactionId() {
267+
return this.previousTransactionId;
268+
}
269+
255270
@Override
256271
public void close() {
257272
// Only mark the context as closed, but do not end the tracer span, as that is done by the
@@ -289,7 +304,8 @@ ApiFuture<Void> ensureTxnAsync() {
289304
private void createTxnAsync(final SettableApiFuture<Void> res) {
290305
span.addAnnotation("Creating Transaction");
291306
final ApiFuture<ByteString> fut =
292-
session.beginTransactionAsync(options, isRouteToLeader(), getTransactionChannelHint());
307+
session.beginTransactionAsync(
308+
options, isRouteToLeader(), getTransactionChannelHint(), getPreviousTransactionId());
293309
fut.addListener(
294310
() -> {
295311
try {
@@ -568,7 +584,9 @@ TransactionSelector getTransactionSelector() {
568584
}
569585
if (tx == null) {
570586
return TransactionSelector.newBuilder()
571-
.setBegin(SessionImpl.createReadWriteTransactionOptions(options))
587+
.setBegin(
588+
SessionImpl.createReadWriteTransactionOptions(
589+
options, getPreviousTransactionId()))
572590
.build();
573591
} else {
574592
// Wait for the transaction to come available. The tx.get() call will fail with an
@@ -1128,7 +1146,7 @@ public TransactionRunner allowNestedTransaction() {
11281146
TransactionRunnerImpl(SessionImpl session, TransactionOption... options) {
11291147
this.session = session;
11301148
this.options = Options.fromTransactionOptions(options);
1131-
this.txn = session.newTransaction(this.options);
1149+
this.txn = session.newTransaction(this.options, /* previousTransactionId = */ ByteString.EMPTY);
11321150
this.tracer = session.getTracer();
11331151
}
11341152

@@ -1167,7 +1185,19 @@ private <T> T runInternal(final TransactionCallable<T> txCallable) {
11671185
// Do not inline the BeginTransaction during a retry if the initial attempt did not
11681186
// actually start a transaction.
11691187
useInlinedBegin = txn.transactionId != null;
1170-
txn = session.newTransaction(options);
1188+
1189+
// Determine the latest transactionId when using a multiplexed session.
1190+
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
1191+
if (session.getIsMultiplexed()) {
1192+
// Use the current transactionId if available, otherwise fallback to the previous
1193+
// transactionId.
1194+
multiplexedSessionPreviousTransactionId =
1195+
txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
1196+
}
1197+
1198+
txn =
1199+
session.newTransaction(
1200+
options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
11711201
}
11721202
checkState(
11731203
isValid, "TransactionRunner has been invalidated by a new operation on the session");

google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,18 @@
1616

1717
package com.google.cloud.spanner;
1818

19+
import static org.junit.Assert.assertThrows;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.eq;
22+
import static org.mockito.Mockito.clearInvocations;
23+
import static org.mockito.Mockito.doThrow;
1924
import static org.mockito.Mockito.mock;
2025
import static org.mockito.Mockito.verify;
2126
import static org.mockito.Mockito.when;
2227

2328
import com.google.api.core.ApiFutures;
2429
import com.google.cloud.Timestamp;
30+
import com.google.protobuf.ByteString;
2531
import io.opentelemetry.api.trace.Span;
2632
import io.opentelemetry.context.Scope;
2733
import org.junit.Test;
@@ -42,7 +48,7 @@ public void testCommitReturnsCommitStats() {
4248
when(oTspan.makeCurrent()).thenReturn(mock(Scope.class));
4349
try (AsyncTransactionManagerImpl manager =
4450
new AsyncTransactionManagerImpl(session, span, Options.commitStats())) {
45-
when(session.newTransaction(Options.fromTransactionOptions(Options.commitStats())))
51+
when(session.newTransaction(eq(Options.fromTransactionOptions(Options.commitStats())), any()))
4652
.thenReturn(transaction);
4753
when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null));
4854
Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1);
@@ -54,4 +60,67 @@ public void testCommitReturnsCommitStats() {
5460
verify(transaction).commitAsync();
5561
}
5662
}
63+
64+
@Test
65+
public void testRetryUsesPreviousTransactionIdOnMultiplexedSession() {
66+
// Set up mock transaction IDs
67+
final ByteString mockTransactionId = ByteString.copyFromUtf8("mockTransactionId");
68+
final ByteString mockPreviousTransactionId =
69+
ByteString.copyFromUtf8("mockPreviousTransactionId");
70+
71+
Span oTspan = mock(Span.class);
72+
ISpan span = new OpenTelemetrySpan(oTspan);
73+
when(oTspan.makeCurrent()).thenReturn(mock(Scope.class));
74+
// Mark the session as multiplexed.
75+
when(session.getIsMultiplexed()).thenReturn(true);
76+
77+
// Initialize a mock transaction with transactionId = null, previousTransactionId = null.
78+
transaction = mock(TransactionRunnerImpl.TransactionContextImpl.class);
79+
when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null));
80+
when(session.newTransaction(eq(Options.fromTransactionOptions(Options.commitStats())), any()))
81+
.thenReturn(transaction);
82+
83+
// Simulate an ABORTED error being thrown when `commitAsync()` is called.
84+
doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, ""))
85+
.when(transaction)
86+
.commitAsync();
87+
88+
try (AsyncTransactionManagerImpl manager =
89+
new AsyncTransactionManagerImpl(session, span, Options.commitStats())) {
90+
manager.beginAsync();
91+
92+
// Verify that for the first transaction attempt, the `previousTransactionId` is
93+
// ByteString.EMPTY.
94+
// This is because no transaction has been previously aborted at this point.
95+
verify(session)
96+
.newTransaction(Options.fromTransactionOptions(Options.commitStats()), ByteString.EMPTY);
97+
assertThrows(AbortedException.class, manager::commitAsync);
98+
clearInvocations(session);
99+
100+
// Mock the transaction object to contain transactionID=null and
101+
// previousTransactionId=mockPreviousTransactionId
102+
when(transaction.getPreviousTransactionId()).thenReturn(mockPreviousTransactionId);
103+
manager.resetForRetryAsync();
104+
// Verify that in the first retry attempt, the `previousTransactionId`
105+
// (mockPreviousTransactionId) is passed to the new transaction.
106+
// This allows Spanner to retry the transaction using the ID of the aborted transaction.
107+
verify(session)
108+
.newTransaction(
109+
Options.fromTransactionOptions(Options.commitStats()), mockPreviousTransactionId);
110+
assertThrows(AbortedException.class, manager::commitAsync);
111+
clearInvocations(session);
112+
113+
// Mock the transaction object to contain transactionID=mockTransactionId and
114+
// previousTransactionId=mockPreviousTransactionId and transactionID = null
115+
transaction.transactionId = mockTransactionId;
116+
manager.resetForRetryAsync();
117+
// Verify that the latest `transactionId` (mockTransactionId) is used in the retry.
118+
// This ensures the retry logic is working as expected with the latest transaction ID.
119+
verify(session)
120+
.newTransaction(Options.fromTransactionOptions(Options.commitStats()), mockTransactionId);
121+
122+
when(transaction.rollbackAsync()).thenReturn(ApiFutures.immediateFuture(null));
123+
manager.closeAsync();
124+
}
125+
}
57126
}

0 commit comments

Comments
 (0)