diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 9e9fe62304a..fad4ce564ab 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -326,6 +326,19 @@ private void createTxnAsync( } res.set(null); } catch (ExecutionException e) { + SpannerException spannerException = SpannerExceptionFactory.asSpannerException(e); + if (spannerException.getErrorCode() == ErrorCode.ABORTED + && session.getIsMultiplexed() + && mutation != null) { + // Begin transaction can return ABORTED errors. This can only happen if it included + // a mutation key, which again means that this is a mutation-only transaction on a + // multiplexed session. + span.addAnnotation( + "Transaction Creation Failed with ABORT. Retrying", + e.getCause() == null ? e : e.getCause()); + createTxnAsync(res, mutation); + return; + } span.addAnnotation( "Transaction Creation Failed", e.getCause() == null ? e : e.getCause()); res.setException(e.getCause() == null ? e : e.getCause()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 9f27b28d323..35c2d553b08 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -1919,7 +1919,8 @@ private Transaction beginTransaction( } if (session.getMultiplexed() && options.getModeCase() == ModeCase.READ_WRITE - && mutationKey != null) { + && mutationKey != null + && mutationKey != com.google.spanner.v1.Mutation.getDefaultInstance()) { // Mutation only case in a read-write transaction. builder.setPrecommitToken(getTransactionPrecommitToken(transactionId)); } @@ -2023,6 +2024,14 @@ public void commit(CommitRequest request, StreamObserver respons return; } sessionLastUsed.put(session.getName(), Instant.now()); + if (session.getMultiplexed() + && !request.hasPrecommitToken() + && !request.hasSingleUseTransaction()) { + throw Status.INVALID_ARGUMENT + .withDescription( + "A Commit request for a read-write transaction on a multiplexed session must specify a precommit token.") + .asRuntimeException(); + } try { commitExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); // Find or start a transaction diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index edafa15f00d..d5fa4dd5c37 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -1294,6 +1294,204 @@ public void testMutationOnlyUsingAsyncTransactionManager() { request.getPrecommitToken().getPrecommitToken()); } + private Spanner setupSpannerForAbortedBeginTransactionTests() { + // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception + // is cleared after the first call, so the retry should succeed. + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + + return SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setSkipVerifyingBeginTransactionForMuxRW(true) + .build()) + .build() + .getService(); + } + + private void verifyMutationKeySetInBeginTransactionRequests( + List beginTransactionRequests) { + assertEquals(2, beginTransactionRequests.size()); + // Verify the requests are executed using multiplexed sessions + for (BeginTransactionRequest request : beginTransactionRequests) { + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + assertTrue(request.hasMutationKey()); + assertTrue(request.getMutationKey().hasInsert()); + } + } + + private void verifyPreCommitTokenSetInCommitRequest(List commitRequests) { + assertEquals(1L, commitRequests.size()); + for (CommitRequest request : commitRequests) { + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + assertNotNull(request.getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("TransactionPrecommitToken"), + request.getPrecommitToken().getPrecommitToken()); + } + } + + // The following 4 tests validate mutation-only cases where the BeginTransaction RPC fails with an + // ABORTED or retryable error + @Test + public void testMutationOnlyCaseAbortedDuringBeginTransaction() { + // This test ensures that when a transaction containing only mutations is retried after an + // ABORT error in the BeginTransaction RPC: + // 1. The mutation key is correctly included in the BeginTransaction request. + // 2. The precommit token is properly set in the Commit request. + Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + client + .readWriteTransaction() + .run( + transaction -> { + Mutation mutation = + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build(); + transaction.buffer(mutation); + return null; + }); + + // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. + List beginTransactionRequests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + verifyMutationKeySetInBeginTransactionRequests(beginTransactionRequests); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + verifyPreCommitTokenSetInCommitRequest(commitRequests); + + spanner.close(); + } + + @Test + public void testMutationOnlyUsingTransactionManagerAbortedDuringBeginTransaction() { + // This test ensures that when a transaction containing only mutations is retried after an + // ABORT error in the BeginTransaction RPC: + // 1. The mutation key is correctly included in the BeginTransaction request. + // 2. The precommit token is properly set in the Commit request. + Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + while (true) { + try { + Mutation mutation = + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build(); + transaction.buffer(mutation); + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + } + } + + // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. + List beginTransactionRequests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + verifyMutationKeySetInBeginTransactionRequests(beginTransactionRequests); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + verifyPreCommitTokenSetInCommitRequest(commitRequests); + + spanner.close(); + } + + @Test + public void testMutationOnlyUsingAsyncRunnerAbortedDuringBeginTransaction() { + // This test ensures that when a transaction containing only mutations is retried after an + // ABORT error in the BeginTransaction RPC: + // 1. The mutation key is correctly included in the BeginTransaction request. + // 2. The precommit token is properly set in the Commit request. + + Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + AsyncRunner runner = client.runAsync(); + get( + runner.runAsync( + txn -> { + txn.buffer( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()); + return ApiFutures.immediateFuture(null); + }, + MoreExecutors.directExecutor())); + + // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. + List beginTransactionRequests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + verifyMutationKeySetInBeginTransactionRequests(beginTransactionRequests); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + verifyPreCommitTokenSetInCommitRequest(commitRequests); + + spanner.close(); + } + + @Test + public void testMutationOnlyUsingTransactionManagerAsyncAbortedDuringBeginTransaction() + throws Exception { + // This test verifies that in the case of mutations-only, when a transaction is retried after an + // ABORT in BeginTransaction RPC, the mutation key is correctly set in the BeginTransaction + // request + // and precommit token is set in Commit request. + Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + try (AsyncTransactionManager manager = client.transactionManagerAsync()) { + TransactionContextFuture transaction = manager.beginAsync(); + while (true) { + CommitTimestampFuture commitTimestamp = + transaction + .then( + (txn, input) -> { + txn.buffer( + Mutation.newInsertBuilder("FOO") + .set("ID") + .to(1L) + .set("NAME") + .to("Bar") + .build()); + return ApiFutures.immediateFuture(null); + }, + MoreExecutors.directExecutor()) + .commitAsync(); + try { + assertThat(commitTimestamp.get()).isNotNull(); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetryAsync(); + } + } + } + + // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. + List beginTransactionRequests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + verifyMutationKeySetInBeginTransactionRequests(beginTransactionRequests); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + verifyPreCommitTokenSetInCommitRequest(commitRequests); + + spanner.close(); + } + // Tests the behavior of the server-side kill switch for read-write multiplexed sessions.. @Test public void testInitialBeginTransactionWithRW_receivesUnimplemented_fallsBackToRegularSession() {