From 0e2cd4a1c2ed81d79e398fcdbdfed34e89f23b33 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Mon, 24 Mar 2025 11:11:09 +0000 Subject: [PATCH 1/9] chore(spanner): fallback for TransactionRunner --- .../MultiplexedSessionDatabaseClient.java | 79 ++++++++++++++++++- .../com/google/cloud/spanner/SessionPool.java | 4 +- .../com/google/cloud/spanner/SpannerImpl.java | 3 + 3 files changed, 83 insertions(+), 3 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index eb00ff8f8ca..49527aaa01c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -27,6 +27,8 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionConsumer; +import com.google.cloud.spanner.SessionPool.PooledSessionReplacementHandler; +import com.google.cloud.spanner.SessionPool.SessionPoolTransactionRunner; import com.google.cloud.spanner.SpannerException.ResourceNotFoundException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -52,6 +54,56 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +class MultiplexedSessionTransactionRunner implements TransactionRunner { + private final SessionPool sessionPool; + private final TransactionRunnerImpl transactionRunner; + private final TransactionOption[] options; + + public MultiplexedSessionTransactionRunner(SessionImpl session, SessionPool sessionPool, TransactionOption... options) { + this.sessionPool = sessionPool; + this.transactionRunner = new TransactionRunnerImpl(session, options); // Uses multiplexed session initially + session.setActive(this.transactionRunner); + this.options = options; + } + + @Override + public T run(TransactionCallable callable) { + boolean useRegularSession = false; + while (true) { + try { + if (useRegularSession) { + TransactionRunner runner = new SessionPoolTransactionRunner(sessionPool.getSession(), null, options); + return runner.run(callable); + } else { + return transactionRunner.run(callable); // Run using multiplexed session + } + } catch (SpannerException e) { + if (e.getErrorCode() == ErrorCode.UNIMPLEMENTED) { + useRegularSession = true; // Switch to regular session + } else { + throw e; // Other errors propagate + } + } + } + } + + @Override + public Timestamp getCommitTimestamp() { + return null; + } + + @Override + public CommitResponse getCommitResponse() { + return null; + } + + @Override + public TransactionRunner allowNestedTransaction() { + return null; + } +} + + /** * {@link DatabaseClient} implementation that uses a single multiplexed session to execute * transactions. @@ -75,6 +127,7 @@ static class MultiplexedSessionTransaction extends SessionImpl { private final int singleUseChannelHint; private boolean done; + private SessionPool pool; MultiplexedSessionTransaction( MultiplexedSessionDatabaseClient client, @@ -82,11 +135,22 @@ static class MultiplexedSessionTransaction extends SessionImpl { SessionReference sessionReference, int singleUseChannelHint, boolean singleUse) { + this(client, span, sessionReference, singleUseChannelHint, singleUse, null); + } + + MultiplexedSessionTransaction( + MultiplexedSessionDatabaseClient client, + ISpan span, + SessionReference sessionReference, + int singleUseChannelHint, + boolean singleUse, + SessionPool pool) { super(client.sessionClient.getSpanner(), sessionReference, singleUseChannelHint); this.client = client; this.singleUse = singleUse; this.singleUseChannelHint = singleUseChannelHint; this.client.numSessionsAcquired.incrementAndGet(); + this.pool = pool; setCurrentSpan(span); } @@ -134,6 +198,12 @@ public CommitResponse writeAtLeastOnceWithOptions( return response; } + @Override + public TransactionRunner readWriteTransaction(TransactionOption... options) { + return new MultiplexedSessionTransactionRunner(this, pool, options); + } + + @Override void onTransactionDone() { boolean markedDone = false; @@ -225,6 +295,8 @@ public void close() { */ @VisibleForTesting final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean(false); + private SessionPool pool; + MultiplexedSessionDatabaseClient(SessionClient sessionClient) { this(sessionClient, Clock.systemUTC()); } @@ -299,6 +371,10 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount initialSessionReferenceFuture); } + void setPool(SessionPool pool) { + this.pool = pool; + } + private static void maybeWaitForSessionCreation( SessionPoolOptions sessionPoolOptions, ApiFuture future) { Duration waitDuration = sessionPoolOptions.getWaitForMinSessions(); @@ -489,7 +565,8 @@ private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction( // any special handling of such errors. multiplexedSessionReference.get().get(), singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT, - singleUse); + singleUse, + this.pool); } catch (ExecutionException executionException) { throw SpannerExceptionFactory.asSpannerException(executionException.getCause()); } catch (InterruptedException interruptedException) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index a50bdf16847..c0a7fd9fa0e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -1004,7 +1004,7 @@ public TransactionState getState() { * {@link TransactionRunner} that automatically handles {@link SessionNotFoundException}s by * replacing the underlying session and then restarts the transaction. */ - private static final class SessionPoolTransactionRunner + static final class SessionPoolTransactionRunner implements TransactionRunner { private I session; @@ -1012,7 +1012,7 @@ private static final class SessionPoolTransactionRunner private final TransactionOption[] options; private TransactionRunner runner; - private SessionPoolTransactionRunner( + SessionPoolTransactionRunner( I session, SessionReplacementHandler sessionReplacementHandler, TransactionOption... options) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index b3eec55f73d..8f50f8e5096 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -332,6 +332,9 @@ DatabaseClientImpl createDatabaseClient( boolean useMultiplexedSessionPartitionedOps, boolean useMultiplexedSessionForRW, Attributes commonAttributes) { + if (multiplexedSessionClient != null) { + multiplexedSessionClient.setPool(pool); + } return new DatabaseClientImpl( clientId, pool, From ef453419d0c217fe1e428259f8b85a84d57b30e2 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Mon, 24 Mar 2025 11:15:30 +0000 Subject: [PATCH 2/9] chore(spanner): fallback for TransactionRunner --- .../cloud/spanner/MultiplexedSessionDatabaseClient.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 49527aaa01c..42cf02ba240 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -89,17 +89,17 @@ public T run(TransactionCallable callable) { @Override public Timestamp getCommitTimestamp() { - return null; + return this.transactionRunner.getCommitTimestamp(); } @Override public CommitResponse getCommitResponse() { - return null; + return this.transactionRunner.getCommitResponse(); } @Override public TransactionRunner allowNestedTransaction() { - return null; + return this.transactionRunner.allowNestedTransaction(); } } From 76dfcc1164f10bda10359c5aad3059cc82abf05f Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 26 Mar 2025 07:17:34 +0000 Subject: [PATCH 3/9] chore(spanner): update implementation logic for fallback --- .../DelayedMultiplexedSessionTransaction.java | 12 ++++- .../MultiplexedSessionDatabaseClient.java | 52 ++++++++++++------- 2 files changed, 42 insertions(+), 22 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java index 90de3d7de31..57ffca6bce6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java @@ -44,14 +44,17 @@ class DelayedMultiplexedSessionTransaction extends AbstractMultiplexedSessionDat private final ISpan span; private final ApiFuture sessionFuture; + private final SessionPool sessionPool; DelayedMultiplexedSessionTransaction( MultiplexedSessionDatabaseClient client, ISpan span, - ApiFuture sessionFuture) { + ApiFuture sessionFuture, + SessionPool sessionPool) { this.client = client; this.span = span; this.sessionFuture = sessionFuture; + this.sessionPool = sessionPool; } @Override @@ -189,7 +192,12 @@ public TransactionRunner readWriteTransaction(TransactionOption... options) { this.sessionFuture, sessionReference -> new MultiplexedSessionTransaction( - client, span, sessionReference, NO_CHANNEL_HINT, /* singleUse = */ false) + client, + span, + sessionReference, + NO_CHANNEL_HINT, + /* singleUse = */ false, + this.sessionPool) .readWriteTransaction(options), MoreExecutors.directExecutor())); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 42cf02ba240..0cfa8604fa9 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -27,7 +27,6 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionConsumer; -import com.google.cloud.spanner.SessionPool.PooledSessionReplacementHandler; import com.google.cloud.spanner.SessionPool.SessionPoolTransactionRunner; import com.google.cloud.spanner.SpannerException.ResourceNotFoundException; import com.google.common.annotations.VisibleForTesting; @@ -56,30 +55,44 @@ class MultiplexedSessionTransactionRunner implements TransactionRunner { private final SessionPool sessionPool; - private final TransactionRunnerImpl transactionRunner; + private final TransactionRunnerImpl transactionRunnerForMultiplexedSession; + private SessionPoolTransactionRunner transactionRunnerForRegularSession; private final TransactionOption[] options; + private boolean isUsingMultiplexedSession = true; - public MultiplexedSessionTransactionRunner(SessionImpl session, SessionPool sessionPool, TransactionOption... options) { + public MultiplexedSessionTransactionRunner( + SessionImpl multiplexedSession, SessionPool sessionPool, TransactionOption... options) { this.sessionPool = sessionPool; - this.transactionRunner = new TransactionRunnerImpl(session, options); // Uses multiplexed session initially - session.setActive(this.transactionRunner); + this.transactionRunnerForMultiplexedSession = + new TransactionRunnerImpl( + multiplexedSession, options); // Uses multiplexed session initially + multiplexedSession.setActive(this.transactionRunnerForMultiplexedSession); this.options = options; } + private TransactionRunner getRunner() { + if (this.isUsingMultiplexedSession) { + return this.transactionRunnerForMultiplexedSession; + } else { + if (this.transactionRunnerForRegularSession == null) { + this.transactionRunnerForRegularSession = + new SessionPoolTransactionRunner<>( + sessionPool.getSession(), + sessionPool.getPooledSessionReplacementHandler(), + options); + } + return this.transactionRunnerForRegularSession; + } + } + @Override public T run(TransactionCallable callable) { - boolean useRegularSession = false; while (true) { try { - if (useRegularSession) { - TransactionRunner runner = new SessionPoolTransactionRunner(sessionPool.getSession(), null, options); - return runner.run(callable); - } else { - return transactionRunner.run(callable); // Run using multiplexed session - } + return getRunner().run(callable); } catch (SpannerException e) { if (e.getErrorCode() == ErrorCode.UNIMPLEMENTED) { - useRegularSession = true; // Switch to regular session + this.isUsingMultiplexedSession = false; } else { throw e; // Other errors propagate } @@ -89,21 +102,21 @@ public T run(TransactionCallable callable) { @Override public Timestamp getCommitTimestamp() { - return this.transactionRunner.getCommitTimestamp(); + return getRunner().getCommitTimestamp(); } @Override public CommitResponse getCommitResponse() { - return this.transactionRunner.getCommitResponse(); + return getRunner().getCommitResponse(); } @Override public TransactionRunner allowNestedTransaction() { - return this.transactionRunner.allowNestedTransaction(); + getRunner().allowNestedTransaction(); + return this; } } - /** * {@link DatabaseClient} implementation that uses a single multiplexed session to execute * transactions. @@ -127,7 +140,7 @@ static class MultiplexedSessionTransaction extends SessionImpl { private final int singleUseChannelHint; private boolean done; - private SessionPool pool; + private final SessionPool pool; MultiplexedSessionTransaction( MultiplexedSessionDatabaseClient client, @@ -203,7 +216,6 @@ public TransactionRunner readWriteTransaction(TransactionOption... options) { return new MultiplexedSessionTransactionRunner(this, pool, options); } - @Override void onTransactionDone() { boolean markedDone = false; @@ -576,7 +588,7 @@ private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction( private DelayedMultiplexedSessionTransaction createDelayedMultiplexSessionTransaction() { return new DelayedMultiplexedSessionTransaction( - this, tracer.getCurrentSpan(), multiplexedSessionReference.get()); + this, tracer.getCurrentSpan(), multiplexedSessionReference.get(), this.pool); } private int getSingleUseChannelHint() { From e56c714f5cb55787d388522f016aa168bf847e1d Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 26 Mar 2025 07:18:34 +0000 Subject: [PATCH 4/9] chore(spanner): update mockspanner tests for mux --- ...edSessionDatabaseClientMockServerTest.java | 72 ++++++++++--------- 1 file changed, 40 insertions(+), 32 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 388539524fd..5905ab34ed9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -1627,20 +1627,17 @@ public void testPartitionedQuery_receivesUnimplemented_fallsBackToRegularSession // will then fail. mockSpanner.unfreeze(); - SpannerException e = - assertThrows( - SpannerException.class, - () -> - runner.run( - transaction -> { - ResultSet resultSet = transaction.executeQuery(STATEMENT); - //noinspection StatementWithEmptyBody - while (resultSet.next()) { - // ignore - } - return null; - })); - assertEquals(ErrorCode.UNIMPLEMENTED, e.getErrorCode()); + // The ExecuteStreamingSql call fails with UNIMPLEMENTED error, but the retry should happen + // internally with regular session. + runner.run( + transaction -> { + ResultSet resultSet = transaction.executeQuery(STATEMENT); + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + return null; + }); // Wait until the client sees that MultiplexedSessions are not supported for read-write. assertNotNull(client.multiplexedSessionDatabaseClient); @@ -1667,17 +1664,25 @@ public void testPartitionedQuery_receivesUnimplemented_fallsBackToRegularSession // Verify that two ExecuteSqlRequests were received: the first using a multiplexed session and // the second using a regular session. - assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(3, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + // The ExecuteSqlRequest of the first read-write transaction should use multiplexed session. Session session1 = mockSpanner.getSession(requests.get(0).getSession()); assertNotNull(session1); assertTrue(session1.getMultiplexed()); + // Retry of the ExecuteSqlRequest of the first read-write transaction should use regular + // session. Session session2 = mockSpanner.getSession(requests.get(1).getSession()); assertNotNull(session2); assertFalse(session2.getMultiplexed()); + // The ExecuteSqlRequest of the second read-write transaction should use regular session. + Session session3 = mockSpanner.getSession(requests.get(2).getSession()); + assertNotNull(session3); + assertFalse(session3.getMultiplexed()); + assertNotNull(client.multiplexedSessionDatabaseClient); assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); @@ -1711,22 +1716,17 @@ public void testReadWriteUnimplemented_firstReceivesError_secondFallsBackToRegul assertNotNull(txn.getId()); assertFalse(client.multiplexedSessionDatabaseClient.unimplementedForRW.get()); - SpannerException e = - assertThrows( - SpannerException.class, - () -> - client - .readWriteTransaction() - .run( - transaction -> { - ResultSet resultSet = transaction.executeQuery(STATEMENT); - //noinspection StatementWithEmptyBody - while (resultSet.next()) { - // ignore - } - return null; - })); - assertEquals(ErrorCode.UNIMPLEMENTED, e.getErrorCode()); + client + .readWriteTransaction() + .run( + transaction -> { + ResultSet resultSet = transaction.executeQuery(STATEMENT); + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + return null; + }); // Verify that the previous failed transaction has marked multiplexed session client to be // unimplemented for read-write. @@ -1748,17 +1748,25 @@ public void testReadWriteUnimplemented_firstReceivesError_secondFallsBackToRegul // Verify that two ExecuteSqlRequests were received: the first using a multiplexed session and // the second using a regular session. - assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(3, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + // The ExecuteSqlRequest of the first read-write transaction should use multiplexed session. Session session1 = mockSpanner.getSession(requests.get(0).getSession()); assertNotNull(session1); assertTrue(session1.getMultiplexed()); + // Retry of the ExecuteSqlRequest of the first read-write transaction should use regular + // session. Session session2 = mockSpanner.getSession(requests.get(1).getSession()); assertNotNull(session2); assertFalse(session2.getMultiplexed()); + // The ExecuteSqlRequest of the second read-write transaction should use regular session. + Session session3 = mockSpanner.getSession(requests.get(1).getSession()); + assertNotNull(session3); + assertFalse(session3.getMultiplexed()); + assertNotNull(client.multiplexedSessionDatabaseClient); assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); From ba76d77f3052c14824bde7f543a21547c18f98d4 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 26 Mar 2025 09:49:58 +0000 Subject: [PATCH 5/9] chore(spanner): add comments --- .../MultiplexedSessionDatabaseClient.java | 23 +++++++++++++++++-- .../com/google/cloud/spanner/SpannerImpl.java | 3 +++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 0cfa8604fa9..798b976c721 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -53,6 +53,11 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +/** + * {@link TransactionRunner} that automatically handles "UNIMPLEMENTED" errors with the message + * "Transaction type read_write not supported with multiplexed sessions" by switching from a + * multiplexed session to a regular session and then restarts the transaction. + */ class MultiplexedSessionTransactionRunner implements TransactionRunner { private final SessionPool sessionPool; private final TransactionRunnerImpl transactionRunnerForMultiplexedSession; @@ -91,8 +96,9 @@ public T run(TransactionCallable callable) { try { return getRunner().run(callable); } catch (SpannerException e) { - if (e.getErrorCode() == ErrorCode.UNIMPLEMENTED) { - this.isUsingMultiplexedSession = false; + if (e.getErrorCode() == ErrorCode.UNIMPLEMENTED + && verifyUnimplementedErrorMessageForRWMux(e)) { + this.isUsingMultiplexedSession = false; // Fallback to regular session } else { throw e; // Other errors propagate } @@ -115,6 +121,19 @@ public TransactionRunner allowNestedTransaction() { getRunner().allowNestedTransaction(); return this; } + + private boolean verifyUnimplementedErrorMessageForRWMux(SpannerException spannerException) { + if (spannerException.getCause() == null) { + return false; + } + if (spannerException.getCause().getMessage() == null) { + return false; + } + return spannerException + .getCause() + .getMessage() + .contains("Transaction type read_write not supported with multiplexed sessions"); + } } /** diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 8f50f8e5096..63d501fbe63 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -333,6 +333,9 @@ DatabaseClientImpl createDatabaseClient( boolean useMultiplexedSessionForRW, Attributes commonAttributes) { if (multiplexedSessionClient != null) { + // Set the session pool in the multiplexed session client. + // This is required to handle fallback to regular sessions for in-progress transactions that + // use multiplexed sessions but fail with UNIMPLEMENTED errors. multiplexedSessionClient.setPool(pool); } return new DatabaseClientImpl( From 37530a682f97bbdb82bc3c0ae4251a30952854e2 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 26 Mar 2025 09:51:43 +0000 Subject: [PATCH 6/9] chore(spanner): update test cases --- ...edSessionDatabaseClientMockServerTest.java | 101 ++++++++++++++++-- 1 file changed, 91 insertions(+), 10 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 5905ab34ed9..76c36dcafa9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -1253,12 +1253,6 @@ public void testMutationOnlyUsingAsyncTransactionManager() { } private Spanner setupSpannerForAbortedBeginTransactionTests() { - // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception - // is cleared after the first call, so the retry should succeed. - mockSpanner.setBeginTransactionExecutionTime( - SimulatedExecutionTime.ofException( - mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); - return SpannerOptions.newBuilder() .setProjectId("test-project") .setChannelProvider(channelProvider) @@ -1304,6 +1298,13 @@ public void testMutationOnlyCaseAbortedDuringBeginTransaction() { // 1. The mutation key is correctly included in the BeginTransaction request. // 2. The precommit token is properly set in the Commit request. Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + + // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception + // is cleared after the first call, so the retry should succeed. + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + DatabaseClientImpl client = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); @@ -1336,6 +1337,13 @@ public void testMutationOnlyUsingTransactionManagerAbortedDuringBeginTransaction // 1. The mutation key is correctly included in the BeginTransaction request. // 2. The precommit token is properly set in the Commit request. Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + + // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception + // is cleared after the first call, so the retry should succeed. + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + DatabaseClientImpl client = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); @@ -1375,6 +1383,13 @@ public void testMutationOnlyUsingAsyncRunnerAbortedDuringBeginTransaction() { // 2. The precommit token is properly set in the Commit request. Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + + // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception + // is cleared after the first call, so the retry should succeed. + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + DatabaseClientImpl client = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); @@ -1408,6 +1423,13 @@ public void testMutationOnlyUsingTransactionManagerAsyncAbortedDuringBeginTransa // request // and precommit token is set in Commit request. Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + + // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception + // is cleared after the first call, so the retry should succeed. + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + DatabaseClientImpl client = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); @@ -1589,7 +1611,7 @@ public void testPartitionedQuery_receivesUnimplemented_fallsBackToRegularSession @Test public void - testReadWriteUnimplementedErrorDuringInitialBeginTransactionRPC_firstReceivesError_secondFallsBackToRegularSessions() { + testReadWriteUnimplementedErrorDuringInitialBeginTransactionRPC_firstRetriedWithRegularSession_secondFallsBackToRegularSessions() { // This test simulates the following scenario, // 1. The server-side flag for RW multiplexed sessions is disabled. // 2. Application starts. The initial BeginTransaction RPC during client initialization will @@ -1638,6 +1660,8 @@ public void testPartitionedQuery_receivesUnimplemented_fallsBackToRegularSession } return null; }); + assertNotNull(runner.getCommitTimestamp()); + assertNotNull(runner.getCommitResponse()); // Wait until the client sees that MultiplexedSessions are not supported for read-write. assertNotNull(client.multiplexedSessionDatabaseClient); @@ -1689,7 +1713,8 @@ public void testPartitionedQuery_receivesUnimplemented_fallsBackToRegularSession } @Test - public void testReadWriteUnimplemented_firstReceivesError_secondFallsBackToRegularSessions() { + public void + testReadWriteUnimplemented_firstRetriedWithRegularSession_secondFallsBackToRegularSessions() { // This test simulates the following scenario, // 1. The server side flag for read-write multiplexed session is not disabled. When an // application starts, the initial BeginTransaction RPC with read-write will succeed. @@ -1716,6 +1741,10 @@ public void testReadWriteUnimplemented_firstReceivesError_secondFallsBackToRegul assertNotNull(txn.getId()); assertFalse(client.multiplexedSessionDatabaseClient.unimplementedForRW.get()); + // Initially, the first attempt executes an ExecuteSqlRequest using multiplexed sessions, but it + // fails with UNIMPLEMENTED. + // On retry, the request should automatically switch to regular sessions, ensuring the + // transaction completes successfully. client .readWriteTransaction() .run( @@ -1728,11 +1757,12 @@ public void testReadWriteUnimplemented_firstReceivesError_secondFallsBackToRegul return null; }); - // Verify that the previous failed transaction has marked multiplexed session client to be + // Verify that the previous failed transaction during first attempt has marked multiplexed + // session client to be // unimplemented for read-write. assertTrue(client.multiplexedSessionDatabaseClient.unimplementedForRW.get()); - // The next read-write transaction will fall back to regular sessions and succeed. + // The next read-write transaction will automatically fall back to regular sessions and succeed. client .readWriteTransaction() .run( @@ -1929,6 +1959,57 @@ public void testBatchWriteAtLeastOnce() { assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } + @Test + public void + testReadWriteUnimplementedError_DuringExplicitBegin_RetriedWithRegularSessionForInFlightTransaction() { + // Test scenario: + // 1. The first attempt does an inline begin using a multiplexed session with an invalid + // statement, resulting in failure due to invalid syntax. + // 2. A retry occurs with an explicit begin using a multiplexed session, but we assume the + // backend flag is turned OFF, leading to UNIMPLEMENTED errors. + // 3. Upon encountering the UNIMPLEMENTED error, the entire transaction callable is retried + // using regular sessions, but the inline begin fails again. + // 4. A final retry executes the explicit BeginTransaction on a regular session. + Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofException( + Status.UNIMPLEMENTED + .withDescription( + "Transaction type read_write not supported with multiplexed sessions") + .asRuntimeException())); + + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + TransactionRunner runner = client.readWriteTransaction(); + Long updateCount = + runner.run( + transaction -> { + // This update statement carries the BeginTransaction, but fails. This will + // cause the entire transaction to be retried with an explicit + // BeginTransaction RPC to ensure all statements in the transaction are + // actually executed against the same transaction. + SpannerException e = + assertThrows( + SpannerException.class, + () -> transaction.executeUpdate(INVALID_UPDATE_STATEMENT)); + assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode()); + return transaction.executeUpdate(UPDATE_STATEMENT); + }); + + assertThat(updateCount).isEqualTo(1L); + List beginTransactionRequests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertEquals(2, beginTransactionRequests.size()); + + // Verify the first BeginTransaction request is executed using multiplexed sessions. + assertTrue( + mockSpanner.getSession(beginTransactionRequests.get(0).getSession()).getMultiplexed()); + + // Verify the second BeginTransaction request is executed using regular sessions. + assertFalse( + mockSpanner.getSession(beginTransactionRequests.get(1).getSession()).getMultiplexed()); + } + private void waitForSessionToBeReplaced(DatabaseClientImpl client) { assertNotNull(client.multiplexedSessionDatabaseClient); SessionReference sessionReference = From 1864f774b65c6060bcd0bded16b67de5024df007 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 26 Mar 2025 10:30:42 +0000 Subject: [PATCH 7/9] chore(spanner): add SessionNotFound test case --- ...edSessionDatabaseClientMockServerTest.java | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 76c36dcafa9..9e1b7cf6675 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -2010,6 +2010,62 @@ public void testBatchWriteAtLeastOnce() { mockSpanner.getSession(beginTransactionRequests.get(1).getSession()).getMultiplexed()); } + @Test + public void + testReadWriteUnimplementedError_RetriedWithRegularSessionForInFlightTransaction_FailsWithSessionNotFound() { + // Test scenario: + // 1. The initial attempt performs an inline begin using a multiplexed session, but with the + // backend flag assumed to be OFF, resulting in an UNIMPLEMENTED error. + // 2. Upon encountering the UNIMPLEMENTED error, the entire transaction callable is retried + // using regular sessions. However, the Commit request fails due to a SessionNotFound error. + // 3. A final retry is triggered to handle the SessionNotFound error by selecting a new session + // from the pool, leading to a successful transaction. + Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + + // The first ExecuteSql request that does an inline begin with multiplexed sessions fail with + // UNIMPLEMENTED error. + mockSpanner.setExecuteSqlExecutionTime( + SimulatedExecutionTime.ofException( + Status.UNIMPLEMENTED + .withDescription( + "Transaction type read_write not supported with multiplexed sessions") + .asRuntimeException())); + + // The first Commit request fails with SessionNotFound exception. The first time this commit is + // called with be using regular sessions. + // This is done to verify if SessionNotFound errors on regular sessions are handled. + mockSpanner.setCommitExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createSessionNotFoundException("TEST_SESSION_NAME"))); + + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + TransactionRunner runner = client.readWriteTransaction(); + Long updateCount = + runner.run( + transaction -> { + return transaction.executeUpdate(UPDATE_STATEMENT); + }); + + assertThat(updateCount).isEqualTo(1L); + List executeSqlRequests = + mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(3, executeSqlRequests.size()); + + // Verify the first BeginTransaction request is executed using multiplexed sessions. + assertTrue(mockSpanner.getSession(executeSqlRequests.get(0).getSession()).getMultiplexed()); + + // Verify the second BeginTransaction request is executed using regular sessions. + assertFalse(mockSpanner.getSession(executeSqlRequests.get(1).getSession()).getMultiplexed()); + + // Verify the second BeginTransaction request is executed using regular sessions. + assertFalse(mockSpanner.getSession(executeSqlRequests.get(2).getSession()).getMultiplexed()); + + // Verify that after the first regular session failed with SessionNotFoundException, a new + // regular session is picked up to re-run the transaction. + assertNotEquals(executeSqlRequests.get(1).getSession(), executeSqlRequests.get(2).getSession()); + } + private void waitForSessionToBeReplaced(DatabaseClientImpl client) { assertNotNull(client.multiplexedSessionDatabaseClient); SessionReference sessionReference = From accc8f2c5012ee84feb841eba32c6d708918495f Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 26 Mar 2025 15:18:10 +0000 Subject: [PATCH 8/9] chore(spanner): add base test case --- ...edSessionDatabaseClientMockServerTest.java | 61 ++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 9e1b7cf6675..4a6c06247f0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -2012,7 +2012,7 @@ public void testBatchWriteAtLeastOnce() { @Test public void - testReadWriteUnimplementedError_RetriedWithRegularSessionForInFlightTransaction_FailsWithSessionNotFound() { + testReadWriteUnimplementedError_RetriedWithRegularSessionForInFlightTransaction_RetriedWithSessionNotFound() { // Test scenario: // 1. The initial attempt performs an inline begin using a multiplexed session, but with the // backend flag assumed to be OFF, resulting in an UNIMPLEMENTED error. @@ -2066,6 +2066,65 @@ public void testBatchWriteAtLeastOnce() { assertNotEquals(executeSqlRequests.get(1).getSession(), executeSqlRequests.get(2).getSession()); } + @Test + public void + testReadWriteUnimplementedError_FirstSucceedsWithMux_SecondRetriedWithRegularSessionDueToUnimplementedError() { + // Test scenario: + // 1. The first read-write transaction successfully performs an inline begin using a multiplexed + // session. + // 2. The second read-write transaction attempts to execute with a multiplexed session, but + // since the backend flag is assumed to be OFF, it encounters an UNIMPLEMENTED error. + // 3. Upon encountering the UNIMPLEMENTED error, the entire transaction callable for the second + // read-write transaction is retried using a regular session. + + Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + // First read-write transaction attempt succeeds. + TransactionRunner runner = client.readWriteTransaction(); + Long updateCount = + runner.run( + transaction -> { + return transaction.executeUpdate(UPDATE_STATEMENT); + }); + + assertThat(updateCount).isEqualTo(1L); + + // The ExecuteSql request is forced to fail with UNIMPLEMENTED error. + mockSpanner.setExecuteSqlExecutionTime( + SimulatedExecutionTime.ofException( + Status.UNIMPLEMENTED + .withDescription( + "Transaction type read_write not supported with multiplexed sessions") + .asRuntimeException())); + + // Second read-write transaction on mux fails with UNIMPLEMENTED error, and then retried using + // regular session. + TransactionRunner runner1 = client.readWriteTransaction(); + Long updateCount1 = + runner1.run( + transaction -> { + return transaction.executeUpdate(UPDATE_STATEMENT); + }); + + assertThat(updateCount1).isEqualTo(1L); + + List executeSqlRequests = + mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(3, executeSqlRequests.size()); + + // Verify the first BeginTransaction request is executed using multiplexed sessions. + assertTrue(mockSpanner.getSession(executeSqlRequests.get(0).getSession()).getMultiplexed()); + + // Verify the second BeginTransaction request is executed using multiplexed sessions. + assertTrue(mockSpanner.getSession(executeSqlRequests.get(1).getSession()).getMultiplexed()); + + // Verify the second BeginTransaction request is executed using regular sessions. + assertFalse(mockSpanner.getSession(executeSqlRequests.get(2).getSession()).getMultiplexed()); + } + private void waitForSessionToBeReplaced(DatabaseClientImpl client) { assertNotNull(client.multiplexedSessionDatabaseClient); SessionReference sessionReference = From b223f704fcfdc22972f1818768063694b9161848 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 26 Mar 2025 15:20:02 +0000 Subject: [PATCH 9/9] chore(spanner): add base test case --- ...lexedSessionDatabaseClientMockServerTest.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 4a6c06247f0..eec3f7679a6 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -1252,7 +1252,7 @@ public void testMutationOnlyUsingAsyncTransactionManager() { request.getPrecommitToken().getPrecommitToken()); } - private Spanner setupSpannerForAbortedBeginTransactionTests() { + private Spanner setupSpannerBySkippingBeginTransactionVerificationForMux() { return SpannerOptions.newBuilder() .setProjectId("test-project") .setChannelProvider(channelProvider) @@ -1297,7 +1297,7 @@ public void testMutationOnlyCaseAbortedDuringBeginTransaction() { // ABORT error in the BeginTransaction RPC: // 1. The mutation key is correctly included in the BeginTransaction request. // 2. The precommit token is properly set in the Commit request. - Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + Spanner spanner = setupSpannerBySkippingBeginTransactionVerificationForMux(); // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception // is cleared after the first call, so the retry should succeed. @@ -1336,7 +1336,7 @@ public void testMutationOnlyUsingTransactionManagerAbortedDuringBeginTransaction // ABORT error in the BeginTransaction RPC: // 1. The mutation key is correctly included in the BeginTransaction request. // 2. The precommit token is properly set in the Commit request. - Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + Spanner spanner = setupSpannerBySkippingBeginTransactionVerificationForMux(); // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception // is cleared after the first call, so the retry should succeed. @@ -1382,7 +1382,7 @@ public void testMutationOnlyUsingAsyncRunnerAbortedDuringBeginTransaction() { // 1. The mutation key is correctly included in the BeginTransaction request. // 2. The precommit token is properly set in the Commit request. - Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + Spanner spanner = setupSpannerBySkippingBeginTransactionVerificationForMux(); // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception // is cleared after the first call, so the retry should succeed. @@ -1422,7 +1422,7 @@ public void testMutationOnlyUsingTransactionManagerAsyncAbortedDuringBeginTransa // ABORT in BeginTransaction RPC, the mutation key is correctly set in the BeginTransaction // request // and precommit token is set in Commit request. - Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + Spanner spanner = setupSpannerBySkippingBeginTransactionVerificationForMux(); // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception // is cleared after the first call, so the retry should succeed. @@ -1970,7 +1970,7 @@ public void testBatchWriteAtLeastOnce() { // 3. Upon encountering the UNIMPLEMENTED error, the entire transaction callable is retried // using regular sessions, but the inline begin fails again. // 4. A final retry executes the explicit BeginTransaction on a regular session. - Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + Spanner spanner = setupSpannerBySkippingBeginTransactionVerificationForMux(); mockSpanner.setBeginTransactionExecutionTime( SimulatedExecutionTime.ofException( Status.UNIMPLEMENTED @@ -2020,7 +2020,7 @@ public void testBatchWriteAtLeastOnce() { // using regular sessions. However, the Commit request fails due to a SessionNotFound error. // 3. A final retry is triggered to handle the SessionNotFound error by selecting a new session // from the pool, leading to a successful transaction. - Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + Spanner spanner = setupSpannerBySkippingBeginTransactionVerificationForMux(); // The first ExecuteSql request that does an inline begin with multiplexed sessions fail with // UNIMPLEMENTED error. @@ -2077,7 +2077,7 @@ public void testBatchWriteAtLeastOnce() { // 3. Upon encountering the UNIMPLEMENTED error, the entire transaction callable for the second // read-write transaction is retried using a regular session. - Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + Spanner spanner = setupSpannerBySkippingBeginTransactionVerificationForMux(); DatabaseClientImpl client = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));