diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java index 0057bb15bea..1578de87cdb 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java @@ -55,7 +55,7 @@ public void setSpan(ISpan span) { @Override public void close() { - closeAsync(); + SpannerApiFutures.get(closeAsync()); } @Override @@ -183,6 +183,10 @@ public ApiFuture rollbackAsync() { @Override public TransactionContextFuture resetForRetryAsync() { + if (txn == null || !txn.isAborted() && txnState != TransactionState.ABORTED) { + throw new IllegalStateException( + "resetForRetry can only be called if the previous attempt aborted"); + } return new TransactionContextFutureImpl(this, internalBeginAsync(false)); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java index 006a926e907..dd13c39abc8 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java @@ -16,18 +16,14 @@ package com.google.cloud.spanner; -import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.clearInvocations; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.api.core.ApiFutures; import com.google.cloud.Timestamp; -import com.google.protobuf.ByteString; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; import org.junit.Test; @@ -60,67 +56,4 @@ public void testCommitReturnsCommitStats() { verify(transaction).commitAsync(); } } - - @Test - public void testRetryUsesPreviousTransactionIdOnMultiplexedSession() { - // Set up mock transaction IDs - final ByteString mockTransactionId = ByteString.copyFromUtf8("mockTransactionId"); - final ByteString mockPreviousTransactionId = - ByteString.copyFromUtf8("mockPreviousTransactionId"); - - Span oTspan = mock(Span.class); - ISpan span = new OpenTelemetrySpan(oTspan); - when(oTspan.makeCurrent()).thenReturn(mock(Scope.class)); - // Mark the session as multiplexed. - when(session.getIsMultiplexed()).thenReturn(true); - - // Initialize a mock transaction with transactionId = null, previousTransactionId = null. - transaction = mock(TransactionRunnerImpl.TransactionContextImpl.class); - when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null)); - when(session.newTransaction(eq(Options.fromTransactionOptions(Options.commitStats())), any())) - .thenReturn(transaction); - - // Simulate an ABORTED error being thrown when `commitAsync()` is called. - doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, "")) - .when(transaction) - .commitAsync(); - - try (AsyncTransactionManagerImpl manager = - new AsyncTransactionManagerImpl(session, span, Options.commitStats())) { - manager.beginAsync(); - - // Verify that for the first transaction attempt, the `previousTransactionId` is - // ByteString.EMPTY. - // This is because no transaction has been previously aborted at this point. - verify(session) - .newTransaction(Options.fromTransactionOptions(Options.commitStats()), ByteString.EMPTY); - assertThrows(AbortedException.class, manager::commitAsync); - clearInvocations(session); - - // Mock the transaction object to contain transactionID=null and - // previousTransactionId=mockPreviousTransactionId - when(transaction.getPreviousTransactionId()).thenReturn(mockPreviousTransactionId); - manager.resetForRetryAsync(); - // Verify that in the first retry attempt, the `previousTransactionId` - // (mockPreviousTransactionId) is passed to the new transaction. - // This allows Spanner to retry the transaction using the ID of the aborted transaction. - verify(session) - .newTransaction( - Options.fromTransactionOptions(Options.commitStats()), mockPreviousTransactionId); - assertThrows(AbortedException.class, manager::commitAsync); - clearInvocations(session); - - // Mock the transaction object to contain transactionID=mockTransactionId and - // previousTransactionId=mockPreviousTransactionId and transactionID = null - transaction.transactionId = mockTransactionId; - manager.resetForRetryAsync(); - // Verify that the latest `transactionId` (mockTransactionId) is used in the retry. - // This ensures the retry logic is working as expected with the latest transaction ID. - verify(session) - .newTransaction(Options.fromTransactionOptions(Options.commitStats()), mockTransactionId); - - when(transaction.rollbackAsync()).thenReturn(ApiFutures.immediateFuture(null)); - manager.closeAsync(); - } - } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java index dc5abd77afd..b324e39e43e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java @@ -253,6 +253,12 @@ public void runAsync() throws Exception { }, executor); assertThat(insertCount.get()).isEqualTo(1L); + if (env.getTestHelper().getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()) { + // The runAsync() method should only be called once on the runner. + // However, due to a bug in regular sessions, it can be executed multiple times on the same + // runner. + runner = client.runAsync(); + } ApiFuture deleteCount = runner.runAsync( txn -> @@ -299,6 +305,12 @@ public void runAsyncBatchUpdate() throws Exception { }, executor); assertThat(insertCount.get()).asList().containsExactly(1L, 1L, 1L); + if (env.getTestHelper().getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()) { + // The runAsync() method should only be called once on the runner. + // However, due to a bug in regular sessions, it can be executed multiple times on the same + // runner. + runner = client.runAsync(); + } ApiFuture deleteCount = runner.runAsync( txn -> diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBatchDmlTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBatchDmlTest.java index b11e4f613ce..2decef6158e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBatchDmlTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBatchDmlTest.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner.it; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assume.assumeFalse; import com.google.api.gax.longrunning.OperationFuture; import com.google.cloud.spanner.Database; @@ -84,6 +85,10 @@ public void dropTable() throws Exception { @Test public void noStatementsInRequest() { + // TODO(sriharshach): Remove this skip once backend support empty transactions to commit. + assumeFalse( + "Skipping for multiplexed sessions since it does not allow empty transactions to commit", + isUsingMultiplexedSessionsForRW()); final TransactionCallable callable = transaction -> { List stmts = new ArrayList<>(); @@ -252,4 +257,8 @@ public void largeBatchDml_withNonParameterisedStatements() { assertThat(actualRowCounts.length).isEqualTo(80); assertThat(expectedRowCounts).isEqualTo(actualRowCounts); } + + boolean isUsingMultiplexedSessionsForRW() { + return env.getTestHelper().getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW(); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBatchReadTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBatchReadTest.java index f028fbc2b15..4f68949eab5 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBatchReadTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBatchReadTest.java @@ -181,7 +181,9 @@ public static void setUpDatabase() throws Exception { totalSize = 0; } } - dbClient.write(mutations); + if (!mutations.isEmpty()) { + dbClient.write(mutations); + } } // Our read/queries are executed with some staleness. Thread.sleep(2 * STALENESS_MILLISEC); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java index ea60b9fb649..627523f0555 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java @@ -464,7 +464,10 @@ public void nestedSingleUseReadTxnThrows() { @Test public void nestedTxnSucceedsWhenAllowed() { assumeFalse("Emulator does not support multiple parallel transactions", isUsingEmulator()); - + // TODO(sriharshach): Remove this skip once backend support empty transactions to commit. + assumeFalse( + "Skipping for multiplexed sessions since it does not allow empty transactions to commit", + isUsingMultiplexedSessionsForRW()); client .readWriteTransaction() .allowNestedTransaction() @@ -588,4 +591,8 @@ public void testTransactionRunnerReturnsCommitStats() { // MutationCount = 2 (2 columns). assertEquals(2L, runner.getCommitResponse().getCommitStats().getMutationCount()); } + + boolean isUsingMultiplexedSessionsForRW() { + return env.getTestHelper().getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW(); + } }