Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void setSpan(ISpan span) {

@Override
public void close() {
closeAsync();
SpannerApiFutures.get(closeAsync());
}

@Override
Expand Down Expand Up @@ -183,6 +183,10 @@ public ApiFuture<Void> rollbackAsync() {

@Override
public TransactionContextFuture resetForRetryAsync() {
if (txn == null || !txn.isAborted() && txnState != TransactionState.ABORTED) {
throw new IllegalStateException(
"resetForRetry can only be called if the previous attempt aborted");
}
return new TransactionContextFutureImpl(this, internalBeginAsync(false));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,67 +60,4 @@ public void testCommitReturnsCommitStats() {
verify(transaction).commitAsync();
}
}

@Test
public void testRetryUsesPreviousTransactionIdOnMultiplexedSession() {
Copy link
Contributor Author

@harshachinta harshachinta Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this unit test since it is not possible to change the txnstate to ABORTED of the TransactionManager using unit tests. This is needed since we are calling resetForRetryAsync, which will fail if transactionstate is not ABORTED.

We have same tests covered in mock spanner

public class MultiplexedSessionDatabaseClientMockServerTest extends AbstractMockServerTest {

// Set up mock transaction IDs
final ByteString mockTransactionId = ByteString.copyFromUtf8("mockTransactionId");
final ByteString mockPreviousTransactionId =
ByteString.copyFromUtf8("mockPreviousTransactionId");

Span oTspan = mock(Span.class);
ISpan span = new OpenTelemetrySpan(oTspan);
when(oTspan.makeCurrent()).thenReturn(mock(Scope.class));
// Mark the session as multiplexed.
when(session.getIsMultiplexed()).thenReturn(true);

// Initialize a mock transaction with transactionId = null, previousTransactionId = null.
transaction = mock(TransactionRunnerImpl.TransactionContextImpl.class);
when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null));
when(session.newTransaction(eq(Options.fromTransactionOptions(Options.commitStats())), any()))
.thenReturn(transaction);

// Simulate an ABORTED error being thrown when `commitAsync()` is called.
doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, ""))
.when(transaction)
.commitAsync();

try (AsyncTransactionManagerImpl manager =
new AsyncTransactionManagerImpl(session, span, Options.commitStats())) {
manager.beginAsync();

// Verify that for the first transaction attempt, the `previousTransactionId` is
// ByteString.EMPTY.
// This is because no transaction has been previously aborted at this point.
verify(session)
.newTransaction(Options.fromTransactionOptions(Options.commitStats()), ByteString.EMPTY);
assertThrows(AbortedException.class, manager::commitAsync);
clearInvocations(session);

// Mock the transaction object to contain transactionID=null and
// previousTransactionId=mockPreviousTransactionId
when(transaction.getPreviousTransactionId()).thenReturn(mockPreviousTransactionId);
manager.resetForRetryAsync();
// Verify that in the first retry attempt, the `previousTransactionId`
// (mockPreviousTransactionId) is passed to the new transaction.
// This allows Spanner to retry the transaction using the ID of the aborted transaction.
verify(session)
.newTransaction(
Options.fromTransactionOptions(Options.commitStats()), mockPreviousTransactionId);
assertThrows(AbortedException.class, manager::commitAsync);
clearInvocations(session);

// Mock the transaction object to contain transactionID=mockTransactionId and
// previousTransactionId=mockPreviousTransactionId and transactionID = null
transaction.transactionId = mockTransactionId;
manager.resetForRetryAsync();
// Verify that the latest `transactionId` (mockTransactionId) is used in the retry.
// This ensures the retry logic is working as expected with the latest transaction ID.
verify(session)
.newTransaction(Options.fromTransactionOptions(Options.commitStats()), mockTransactionId);

when(transaction.rollbackAsync()).thenReturn(ApiFutures.immediateFuture(null));
manager.closeAsync();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ public void runAsync() throws Exception {
},
executor);
assertThat(insertCount.get()).isEqualTo(1L);
if (env.getTestHelper().getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()) {
// The runAsync() method should only be called once on the runner.
// However, due to a bug in regular sessions, it can be executed multiple times on the same
// runner.
runner = client.runAsync();
}
ApiFuture<Long> deleteCount =
runner.runAsync(
txn ->
Expand Down Expand Up @@ -299,6 +305,12 @@ public void runAsyncBatchUpdate() throws Exception {
},
executor);
assertThat(insertCount.get()).asList().containsExactly(1L, 1L, 1L);
if (env.getTestHelper().getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()) {
// The runAsync() method should only be called once on the runner.
// However, due to a bug in regular sessions, it can be executed multiple times on the same
// runner.
runner = client.runAsync();
}
ApiFuture<long[]> deleteCount =
runner.runAsync(
txn ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner.it;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assume.assumeFalse;

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.spanner.Database;
Expand Down Expand Up @@ -84,6 +85,10 @@ public void dropTable() throws Exception {

@Test
public void noStatementsInRequest() {
// TODO(sriharshach): Remove this skip once backend support empty transactions to commit.
assumeFalse(
"Skipping for multiplexed sessions since it does not allow empty transactions to commit",
isUsingMultiplexedSessionsForRW());
final TransactionCallable<long[]> callable =
transaction -> {
List<Statement> stmts = new ArrayList<>();
Expand Down Expand Up @@ -252,4 +257,8 @@ public void largeBatchDml_withNonParameterisedStatements() {
assertThat(actualRowCounts.length).isEqualTo(80);
assertThat(expectedRowCounts).isEqualTo(actualRowCounts);
}

boolean isUsingMultiplexedSessionsForRW() {
return env.getTestHelper().getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ public static void setUpDatabase() throws Exception {
totalSize = 0;
}
}
dbClient.write(mutations);
if (mutations.size() > 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we really need this check? (and if so, could we change is to !mutations.isEmpty())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that at this point, there are no mutations to commit, making the db.write call unnecessary. Adding a check for pending mutations before committing will avoid an unnecessary backend call and save some time in our tests, though the improvement is minimal.

dbClient.write(mutations);
}
}
// Our read/queries are executed with some staleness.
Thread.sleep(2 * STALENESS_MILLISEC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,10 @@ public void nestedSingleUseReadTxnThrows() {
@Test
public void nestedTxnSucceedsWhenAllowed() {
assumeFalse("Emulator does not support multiple parallel transactions", isUsingEmulator());

// TODO(sriharshach): Remove this skip once backend support empty transactions to commit.
assumeFalse(
"Skipping for multiplexed sessions since it does not allow empty transactions to commit",
isUsingMultiplexedSessionsForRW());
client
.readWriteTransaction()
.allowNestedTransaction()
Expand Down Expand Up @@ -588,4 +591,8 @@ public void testTransactionRunnerReturnsCommitStats() {
// MutationCount = 2 (2 columns).
assertEquals(2L, runner.getCommitResponse().getCommitStats().getMutationCount());
}

boolean isUsingMultiplexedSessionsForRW() {
return env.getTestHelper().getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW();
}
}
Loading