From 3cf0cd404a7f54ad67708391ebc7985f393722a6 Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Tue, 30 Jul 2024 13:58:30 +0530 Subject: [PATCH 1/7] feat(spanner): support multiplexed session for blind write with single use transaction. --- ...tractMultiplexedSessionDatabaseClient.java | 8 +-- .../cloud/spanner/DatabaseClientImpl.java | 18 ++++++- .../DelayedMultiplexedSessionTransaction.java | 25 +++++++++ .../MultiplexedSessionDatabaseClient.java | 8 +++ .../com/google/cloud/spanner/SessionPool.java | 6 +-- .../cloud/spanner/SessionPoolOptions.java | 52 +++++++++++++++++++ .../com/google/cloud/spanner/SpannerImpl.java | 11 ++-- .../IntegrationTestWithClosedSessionsEnv.java | 3 +- ...edSessionDatabaseClientMockServerTest.java | 1 + 9 files changed, 115 insertions(+), 17 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java index 92035a18418..27253bf1e13 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java @@ -53,13 +53,7 @@ public CommitResponse writeWithOptions(Iterable mutations, Transaction @Override public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { - throw new UnsupportedOperationException(); - } - - @Override - public CommitResponse writeAtLeastOnceWithOptions( - Iterable mutations, TransactionOption... options) throws SpannerException { - throw new UnsupportedOperationException(); + return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp(); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index a20bcd9e925..d7b78200c26 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -37,24 +37,28 @@ class DatabaseClientImpl implements DatabaseClient { @VisibleForTesting final SessionPool pool; @VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient; + final boolean useMultiplexedSessionForBlindWrite; + @VisibleForTesting DatabaseClientImpl(SessionPool pool, TraceWrapper tracer) { - this("", pool, /* multiplexedSessionDatabaseClient = */ null, tracer); + this("", pool, /* multiplexedSessionDatabaseClient = */ null, false, tracer); } @VisibleForTesting DatabaseClientImpl(String clientId, SessionPool pool, TraceWrapper tracer) { - this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, tracer); + this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, false, tracer); } DatabaseClientImpl( String clientId, SessionPool pool, @Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient, + boolean useMultiplexedSessionForBlindWrite, TraceWrapper tracer) { this.clientId = clientId; this.pool = pool; this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient; + this.useMultiplexedSessionForBlindWrite = useMultiplexedSessionForBlindWrite; this.tracer = tracer; } @@ -112,6 +116,16 @@ public Timestamp writeAtLeastOnce(final Iterable mutations) throws Spa public CommitResponse writeAtLeastOnceWithOptions( final Iterable mutations, final TransactionOption... options) throws SpannerException { + if (useMultiplexedSessionForBlindWrite) { + return getMultiplexedSession().writeAtLeastOnceWithOptions(mutations, options); + } else { + return writeAtLeastOnceWithSession(mutations, options); + } + } + + public CommitResponse writeAtLeastOnceWithSession( + final Iterable mutations, final TransactionOption... options) + throws SpannerException { ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options); try (IScope s = tracer.withSpan(span)) { return runWithSessionRetry( diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java index 928927d49a0..49bc770e868 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java @@ -22,7 +22,9 @@ import com.google.api.core.ApiFutures; import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction; import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.common.util.concurrent.MoreExecutors; +import java.util.concurrent.ExecutionException; /** * Represents a delayed execution of a transaction on a multiplexed session. The execution is @@ -119,4 +121,27 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { .readOnlyTransaction(bound), MoreExecutors.directExecutor())); } + + @Override + public CommitResponse writeAtLeastOnceWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { + try { + return ApiFutures.transform( + this.sessionFuture, + sessionReference -> + new MultiplexedSessionTransaction( + client, span, sessionReference, NO_CHANNEL_HINT, false) + .writeAtLeastOnceWithOptions(mutations, options), + MoreExecutors.directExecutor()).get(); + } catch (ExecutionException executionException) { + // Propagate the underlying exception as a RuntimeException (SpannerException is also a + // RuntimeException). + if (executionException.getCause() instanceof RuntimeException) { + throw (RuntimeException) executionException.getCause(); + } + throw SpannerExceptionFactory.asSpannerException(executionException.getCause()); + } catch (InterruptedException interruptedException) { + throw SpannerExceptionFactory.propagateInterrupt(interruptedException); + } + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index e742481be2c..acdebf2587a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -21,10 +21,12 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.SessionClient.SessionConsumer; import com.google.cloud.spanner.SpannerException.ResourceNotFoundException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; import java.time.Clock; import java.time.Duration; import java.time.Instant; @@ -358,6 +360,12 @@ private int getSingleUseChannelHint() { } } + @Override + public CommitResponse writeAtLeastOnceWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { + return createMultiplexedSessionTransaction(true) + .writeAtLeastOnceWithOptions(mutations, options); + } @Override public ReadContext singleUse() { return createMultiplexedSessionTransaction(true).singleUse(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index f36da57a816..a68b298748c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -2185,15 +2185,13 @@ public CommitResponse writeWithOptions( @Override public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + return this.delegate.writeAtLeastOnce(mutations); } @Override public CommitResponse writeAtLeastOnceWithOptions( Iterable mutations, TransactionOption... options) throws SpannerException { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + return this.delegate.writeAtLeastOnceWithOptions(mutations, options); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index ba2eedbccb8..8ed4a3e5760 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -73,6 +73,8 @@ public class SessionPoolOptions { private final boolean useMultiplexedSession; + private final boolean useMultiplexedSessionForBlindWrite; + // TODO: Change to use java.time.Duration. private final Duration multiplexedSessionMaintenanceDuration; @@ -108,6 +110,12 @@ private SessionPoolOptions(Builder builder) { (useMultiplexedSessionFromEnvVariable != null) ? useMultiplexedSessionFromEnvVariable : builder.useMultiplexedSession; + // useMultiplexedSessionForBlindWrite priority => Environment var > private setter > client default + Boolean useMultiplexedSessionBlindWriteFromEnvVariable = getUseMultiplexedSessionBlindWriteFromEnvVariable(); + this.useMultiplexedSessionForBlindWrite = + (useMultiplexedSessionBlindWriteFromEnvVariable != null) + ? useMultiplexedSessionBlindWriteFromEnvVariable + : builder.useMultiplexedSessionForBlindWrite; this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration; } @@ -307,6 +315,28 @@ public boolean getUseMultiplexedSession() { return useMultiplexedSession; } + @VisibleForTesting + @InternalApi + public boolean getUseMultiplexedSessionForBlindWrite() { + return useMultiplexedSessionForBlindWrite; + } + + private static Boolean getUseMultiplexedSessionBlindWriteFromEnvVariable() { + String useMultiplexedSessionFromEnvVariable = + System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_BLIND_WRITES"); + if (useMultiplexedSessionFromEnvVariable != null + && useMultiplexedSessionFromEnvVariable.length() > 0) { + if ("true".equalsIgnoreCase(useMultiplexedSessionFromEnvVariable) + || "false".equalsIgnoreCase(useMultiplexedSessionFromEnvVariable)) { + return Boolean.parseBoolean(useMultiplexedSessionFromEnvVariable); + } else { + throw new IllegalArgumentException( + "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_BLIND_WRITES should be either true or false."); + } + } + return null; + } + private static Boolean getUseMultiplexedSessionFromEnvVariable() { String useMultiplexedSessionFromEnvVariable = System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"); @@ -529,6 +559,11 @@ public static class Builder { // Set useMultiplexedSession to true to make multiplexed session the default. private boolean useMultiplexedSession = false; + // This field controls the default behavior of session management in Java client. + // Set useMultiplexedSessionForBlindWrite to true to make multiplexed session the default for + // blind writes. + private boolean useMultiplexedSessionForBlindWrite = false; + private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7); private Clock poolMaintainerClock = Clock.INSTANCE; @@ -570,6 +605,7 @@ private Builder(SessionPoolOptions options) { this.randomizePositionQPSThreshold = options.randomizePositionQPSThreshold; this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions; this.useMultiplexedSession = options.useMultiplexedSession; + this.useMultiplexedSessionForBlindWrite = options.useMultiplexedSessionForBlindWrite; this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration; this.poolMaintainerClock = options.poolMaintainerClock; } @@ -757,6 +793,22 @@ Builder setUseMultiplexedSession(boolean useMultiplexedSession) { return this; } + /** + * Sets whether the client should use multiplexed session or not for writeAtLeastOnce. If set to + * true, the client optimises and runs multiple applicable requests concurrently on a single + * session. A single multiplexed session is sufficient to handle all concurrent traffic. + * + *

When set to false, the client uses the regular session cached in the session pool for + * running 1 concurrent transaction per session. We require to provision sufficient sessions by + * making use of {@link SessionPoolOptions#minSessions} and {@link + * SessionPoolOptions#maxSessions} based on the traffic load. Failing to do so will result in + * higher latencies. + */ + Builder setUseMultiplexedSessionForBlindWrite(boolean useMultiplexedSessionForBlindWrite) { + this.useMultiplexedSessionForBlindWrite = useMultiplexedSessionForBlindWrite; + return this; + } + @VisibleForTesting Builder setMultiplexedSessionMaintenanceDuration( Duration multiplexedSessionMaintenanceDuration) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 6aa0d646a84..1b9e3b3f3be 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -277,6 +277,8 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { boolean useMultiplexedSession = getOptions().getSessionPoolOptions().getUseMultiplexedSession(); + boolean useMultiplexedSessionForBlindWrite = + getOptions().getSessionPoolOptions().getUseMultiplexedSessionForBlindWrite(); MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient = useMultiplexedSession ? new MultiplexedSessionDatabaseClient(SpannerImpl.this.getSessionClient(db)) @@ -300,7 +302,8 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { numMultiplexedSessionsReleased); pool.maybeWaitOnMinSessions(); DatabaseClientImpl dbClient = - createDatabaseClient(clientId, pool, multiplexedSessionDatabaseClient); + createDatabaseClient(clientId, pool, multiplexedSessionDatabaseClient, + useMultiplexedSessionForBlindWrite); dbClients.put(db, dbClient); return dbClient; } @@ -311,8 +314,10 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { DatabaseClientImpl createDatabaseClient( String clientId, SessionPool pool, - @Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient) { - return new DatabaseClientImpl(clientId, pool, multiplexedSessionClient, tracer); + @Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient, + boolean multiplexedSessionForBlindWrite) { + return new DatabaseClientImpl(clientId, pool, multiplexedSessionClient, + multiplexedSessionForBlindWrite, tracer); } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java index b71771ae2ca..636cc45b636 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java @@ -47,7 +47,8 @@ private static class SpannerWithClosedSessionsImpl extends SpannerImpl { @Override DatabaseClientImpl createDatabaseClient( - String clientId, SessionPool pool, MultiplexedSessionDatabaseClient ignore) { + String clientId, SessionPool pool, MultiplexedSessionDatabaseClient ignore, + boolean multiplexedSessionForBlindWrite) { return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index bf4a02a10c5..f80983d7499 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -63,6 +63,7 @@ public void createSpannerInstance() { .setSessionPoolOption( SessionPoolOptions.newBuilder() .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForBlindWrite(true) // Set the maintainer to loop once every 1ms .setMultiplexedSessionMaintenanceLoopFrequency(Duration.ofMillis(1L)) // Set multiplexed sessions to be replaced once every 1ms From 4863532473e99efe0fae77aad39594cd5ec45c34 Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Tue, 30 Jul 2024 14:57:29 +0530 Subject: [PATCH 2/7] test(spanner): added test for the support of multiplexed session for blind writes (writeAtLeastOnce) --- ...edSessionDatabaseClientMockServerTest.java | 125 ++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index f80983d7499..5c014862daa 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -16,6 +16,7 @@ package com.google.cloud.spanner; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -24,15 +25,21 @@ import static org.junit.Assert.assertTrue; import com.google.cloud.NoCredentials; +import com.google.cloud.Timestamp; import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.cloud.spanner.Options.RpcPriority; import com.google.cloud.spanner.connection.RandomResultSetGenerator; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.RequestOptions.Priority; import com.google.spanner.v1.Session; import io.grpc.Status; import java.time.Duration; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.UUID; @@ -310,6 +317,124 @@ public void testMaintainerInvalidatesMultiplexedSessionClientIfUnimplemented() { assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } + @Test + public void testWriteAtLeastOnceAborted() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + // Force the Commit RPC to return Aborted the first time it is called. The exception is cleared + // after the first call, so the retry should succeed. + mockSpanner.setCommitExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + Timestamp timestamp = + client.writeAtLeastOnce( + Collections.singletonList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + assertNotNull(timestamp); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(2, commitRequests.size()); + } + + @Test + public void testWriteAtLeastOnce() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + Timestamp timestamp = + client.writeAtLeastOnce( + Collections.singletonList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + assertNotNull(timestamp); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(commitRequests).hasSize(1); + CommitRequest commit = commitRequests.get(0); + assertNotNull(commit.getSingleUseTransaction()); + assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); + assertNotNull(commit.getRequestOptions()); + assertEquals(Priority.PRIORITY_UNSPECIFIED, commit.getRequestOptions().getPriority()); + } + + @Test + public void testWriteAtLeastOnceWithCommitStats() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + CommitResponse response = + client.writeAtLeastOnceWithOptions( + Collections.singletonList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.commitStats()); + assertNotNull(response); + assertNotNull(response.getCommitTimestamp()); + assertNotNull(response.getCommitStats()); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(commitRequests).hasSize(1); + CommitRequest commit = commitRequests.get(0); + assertNotNull(commit.getSingleUseTransaction()); + assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); + assertNotNull(commit.getRequestOptions()); + assertEquals(Priority.PRIORITY_UNSPECIFIED, commit.getRequestOptions().getPriority()); + } + + @Test + public void testWriteAtLeastOnceWithOptions() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + client.writeAtLeastOnceWithOptions( + Collections.singletonList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.priority(RpcPriority.LOW)); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(commitRequests).hasSize(1); + CommitRequest commit = commitRequests.get(0); + assertNotNull(commit.getSingleUseTransaction()); + assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); + assertNotNull(commit.getRequestOptions()); + assertEquals(Priority.PRIORITY_LOW, commit.getRequestOptions().getPriority()); + } + + @Test + public void testWriteAtLeastOnceWithTagOptions() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + client.writeAtLeastOnceWithOptions( + Collections.singletonList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.tag("app=spanner,env=test")); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(commitRequests).hasSize(1); + CommitRequest commit = commitRequests.get(0); + assertNotNull(commit.getSingleUseTransaction()); + assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); + assertNotNull(commit.getRequestOptions()); + assertThat(commit.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test"); + assertThat(commit.getRequestOptions().getRequestTag()).isEmpty(); + } + + @Test + public void testWriteAtLeastOnceWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + client.writeAtLeastOnceWithOptions( + Collections.singletonList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.excludeTxnFromChangeStreams()); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(commitRequests).hasSize(1); + CommitRequest commit = commitRequests.get(0); + assertNotNull(commit.getSingleUseTransaction()); + assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertTrue(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); + } + private void waitForSessionToBeReplaced(DatabaseClientImpl client) { assertNotNull(client.multiplexedSessionDatabaseClient); SessionReference sessionReference = From 8ebd5219e74fe58fb92f0cb7e6ebd637a7780599 Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Tue, 30 Jul 2024 15:11:21 +0530 Subject: [PATCH 3/7] chore(spanner): lint --- .../cloud/spanner/DatabaseClientImpl.java | 14 ++++++++++++-- .../DelayedMultiplexedSessionTransaction.java | 13 +++++++------ .../MultiplexedSessionDatabaseClient.java | 2 +- .../cloud/spanner/SessionPoolOptions.java | 6 ++++-- .../com/google/cloud/spanner/SpannerImpl.java | 9 ++++++--- .../IntegrationTestWithClosedSessionsEnv.java | 4 +++- ...xedSessionDatabaseClientMockServerTest.java | 18 ++++++------------ 7 files changed, 39 insertions(+), 27 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index d7b78200c26..986ce242bbd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -41,12 +41,22 @@ class DatabaseClientImpl implements DatabaseClient { @VisibleForTesting DatabaseClientImpl(SessionPool pool, TraceWrapper tracer) { - this("", pool, /* multiplexedSessionDatabaseClient = */ null, false, tracer); + this( + "", + pool, + /* multiplexedSessionDatabaseClient = */ null, + /*useMultiplexedSessionForBlindWrite = */ false, + tracer); } @VisibleForTesting DatabaseClientImpl(String clientId, SessionPool pool, TraceWrapper tracer) { - this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, false, tracer); + this( + clientId, + pool, + /* multiplexedSessionDatabaseClient = */ null, + /*useMultiplexedSessionForBlindWrite = */ false, + tracer); } DatabaseClientImpl( diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java index 49bc770e868..1a90715ca78 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java @@ -127,12 +127,13 @@ public CommitResponse writeAtLeastOnceWithOptions( Iterable mutations, TransactionOption... options) throws SpannerException { try { return ApiFutures.transform( - this.sessionFuture, - sessionReference -> - new MultiplexedSessionTransaction( - client, span, sessionReference, NO_CHANNEL_HINT, false) - .writeAtLeastOnceWithOptions(mutations, options), - MoreExecutors.directExecutor()).get(); + this.sessionFuture, + sessionReference -> + new MultiplexedSessionTransaction( + client, span, sessionReference, NO_CHANNEL_HINT, false) + .writeAtLeastOnceWithOptions(mutations, options), + MoreExecutors.directExecutor()) + .get(); } catch (ExecutionException executionException) { // Propagate the underlying exception as a RuntimeException (SpannerException is also a // RuntimeException). diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index acdebf2587a..495c3a1dc84 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -26,7 +26,6 @@ import com.google.cloud.spanner.SpannerException.ResourceNotFoundException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.MoreExecutors; import java.time.Clock; import java.time.Duration; import java.time.Instant; @@ -366,6 +365,7 @@ public CommitResponse writeAtLeastOnceWithOptions( return createMultiplexedSessionTransaction(true) .writeAtLeastOnceWithOptions(mutations, options); } + @Override public ReadContext singleUse() { return createMultiplexedSessionTransaction(true).singleUse(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 8ed4a3e5760..590ebe01db0 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -110,8 +110,10 @@ private SessionPoolOptions(Builder builder) { (useMultiplexedSessionFromEnvVariable != null) ? useMultiplexedSessionFromEnvVariable : builder.useMultiplexedSession; - // useMultiplexedSessionForBlindWrite priority => Environment var > private setter > client default - Boolean useMultiplexedSessionBlindWriteFromEnvVariable = getUseMultiplexedSessionBlindWriteFromEnvVariable(); + // useMultiplexedSessionForBlindWrite priority => Environment var > private setter > client + // default + Boolean useMultiplexedSessionBlindWriteFromEnvVariable = + getUseMultiplexedSessionBlindWriteFromEnvVariable(); this.useMultiplexedSessionForBlindWrite = (useMultiplexedSessionBlindWriteFromEnvVariable != null) ? useMultiplexedSessionBlindWriteFromEnvVariable diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 1b9e3b3f3be..23fd767e4da 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -302,7 +302,10 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { numMultiplexedSessionsReleased); pool.maybeWaitOnMinSessions(); DatabaseClientImpl dbClient = - createDatabaseClient(clientId, pool, multiplexedSessionDatabaseClient, + createDatabaseClient( + clientId, + pool, + multiplexedSessionDatabaseClient, useMultiplexedSessionForBlindWrite); dbClients.put(db, dbClient); return dbClient; @@ -316,8 +319,8 @@ DatabaseClientImpl createDatabaseClient( SessionPool pool, @Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient, boolean multiplexedSessionForBlindWrite) { - return new DatabaseClientImpl(clientId, pool, multiplexedSessionClient, - multiplexedSessionForBlindWrite, tracer); + return new DatabaseClientImpl( + clientId, pool, multiplexedSessionClient, multiplexedSessionForBlindWrite, tracer); } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java index 636cc45b636..22dd72af542 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java @@ -47,7 +47,9 @@ private static class SpannerWithClosedSessionsImpl extends SpannerImpl { @Override DatabaseClientImpl createDatabaseClient( - String clientId, SessionPool pool, MultiplexedSessionDatabaseClient ignore, + String clientId, + SessionPool pool, + MultiplexedSessionDatabaseClient ignore, boolean multiplexedSessionForBlindWrite) { return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 5c014862daa..370a70d1a97 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -319,8 +319,7 @@ public void testMaintainerInvalidatesMultiplexedSessionClientIfUnimplemented() { @Test public void testWriteAtLeastOnceAborted() { - DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); // Force the Commit RPC to return Aborted the first time it is called. The exception is cleared // after the first call, so the retry should succeed. mockSpanner.setCommitExecutionTime( @@ -338,8 +337,7 @@ public void testWriteAtLeastOnceAborted() { @Test public void testWriteAtLeastOnce() { - DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); Timestamp timestamp = client.writeAtLeastOnce( Collections.singletonList( @@ -358,8 +356,7 @@ public void testWriteAtLeastOnce() { @Test public void testWriteAtLeastOnceWithCommitStats() { - DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); CommitResponse response = client.writeAtLeastOnceWithOptions( Collections.singletonList( @@ -381,8 +378,7 @@ public void testWriteAtLeastOnceWithCommitStats() { @Test public void testWriteAtLeastOnceWithOptions() { - DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); client.writeAtLeastOnceWithOptions( Collections.singletonList( Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), @@ -400,8 +396,7 @@ public void testWriteAtLeastOnceWithOptions() { @Test public void testWriteAtLeastOnceWithTagOptions() { - DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); client.writeAtLeastOnceWithOptions( Collections.singletonList( Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), @@ -420,8 +415,7 @@ public void testWriteAtLeastOnceWithTagOptions() { @Test public void testWriteAtLeastOnceWithExcludeTxnFromChangeStreams() { - DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); client.writeAtLeastOnceWithOptions( Collections.singletonList( Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), From 5b3c6735cc8bb1f312ad47b4daf35ed5d729a758 Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Thu, 8 Aug 2024 18:25:38 +0530 Subject: [PATCH 4/7] fix(spanner): updated the adoption for blind write into GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS. --- .../cloud/spanner/DatabaseClientImpl.java | 25 +++------ .../DelayedMultiplexedSessionTransaction.java | 13 ++--- .../MultiplexedSessionDatabaseClient.java | 8 +++ .../cloud/spanner/SessionPoolOptions.java | 54 ------------------- .../com/google/cloud/spanner/SpannerImpl.java | 14 ++--- .../IntegrationTestWithClosedSessionsEnv.java | 5 +- ...edSessionDatabaseClientMockServerTest.java | 43 ++++++++++++--- 7 files changed, 59 insertions(+), 103 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index 986ce242bbd..0f048596f29 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -37,38 +37,24 @@ class DatabaseClientImpl implements DatabaseClient { @VisibleForTesting final SessionPool pool; @VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient; - final boolean useMultiplexedSessionForBlindWrite; - @VisibleForTesting DatabaseClientImpl(SessionPool pool, TraceWrapper tracer) { - this( - "", - pool, - /* multiplexedSessionDatabaseClient = */ null, - /*useMultiplexedSessionForBlindWrite = */ false, - tracer); + this("", pool, /* multiplexedSessionDatabaseClient = */ null, tracer); } @VisibleForTesting DatabaseClientImpl(String clientId, SessionPool pool, TraceWrapper tracer) { - this( - clientId, - pool, - /* multiplexedSessionDatabaseClient = */ null, - /*useMultiplexedSessionForBlindWrite = */ false, - tracer); + this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, tracer); } DatabaseClientImpl( String clientId, SessionPool pool, @Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient, - boolean useMultiplexedSessionForBlindWrite, TraceWrapper tracer) { this.clientId = clientId; this.pool = pool; this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient; - this.useMultiplexedSessionForBlindWrite = useMultiplexedSessionForBlindWrite; this.tracer = tracer; } @@ -126,14 +112,15 @@ public Timestamp writeAtLeastOnce(final Iterable mutations) throws Spa public CommitResponse writeAtLeastOnceWithOptions( final Iterable mutations, final TransactionOption... options) throws SpannerException { - if (useMultiplexedSessionForBlindWrite) { - return getMultiplexedSession().writeAtLeastOnceWithOptions(mutations, options); + if (this.multiplexedSessionDatabaseClient != null + && this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported()) { + return this.multiplexedSessionDatabaseClient.writeAtLeastOnceWithOptions(mutations, options); } else { return writeAtLeastOnceWithSession(mutations, options); } } - public CommitResponse writeAtLeastOnceWithSession( + private CommitResponse writeAtLeastOnceWithSession( final Iterable mutations, final TransactionOption... options) throws SpannerException { ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java index 1a90715ca78..9d95324c5ba 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java @@ -122,18 +122,15 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { MoreExecutors.directExecutor())); } + /** This is a blocking call as the expected by the parent interface. */ @Override public CommitResponse writeAtLeastOnceWithOptions( Iterable mutations, TransactionOption... options) throws SpannerException { try { - return ApiFutures.transform( - this.sessionFuture, - sessionReference -> - new MultiplexedSessionTransaction( - client, span, sessionReference, NO_CHANNEL_HINT, false) - .writeAtLeastOnceWithOptions(mutations, options), - MoreExecutors.directExecutor()) - .get(); + SessionReference sessionReference = this.sessionFuture.get(); + return new MultiplexedSessionTransaction( + client, span, sessionReference, NO_CHANNEL_HINT, false) + .writeAtLeastOnceWithOptions(mutations, options); } catch (ExecutionException executionException) { // Propagate the underlying exception as a RuntimeException (SpannerException is also a // RuntimeException). diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 495c3a1dc84..da1c082d21e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -108,6 +108,14 @@ void onReadDone() { } } + @Override + public CommitResponse writeAtLeastOnceWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { + CommitResponse response = super.writeAtLeastOnceWithOptions(mutations, options); + onTransactionDone(); + return response; + } + @Override void onTransactionDone() { boolean markedDone = false; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 590ebe01db0..ba2eedbccb8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -73,8 +73,6 @@ public class SessionPoolOptions { private final boolean useMultiplexedSession; - private final boolean useMultiplexedSessionForBlindWrite; - // TODO: Change to use java.time.Duration. private final Duration multiplexedSessionMaintenanceDuration; @@ -110,14 +108,6 @@ private SessionPoolOptions(Builder builder) { (useMultiplexedSessionFromEnvVariable != null) ? useMultiplexedSessionFromEnvVariable : builder.useMultiplexedSession; - // useMultiplexedSessionForBlindWrite priority => Environment var > private setter > client - // default - Boolean useMultiplexedSessionBlindWriteFromEnvVariable = - getUseMultiplexedSessionBlindWriteFromEnvVariable(); - this.useMultiplexedSessionForBlindWrite = - (useMultiplexedSessionBlindWriteFromEnvVariable != null) - ? useMultiplexedSessionBlindWriteFromEnvVariable - : builder.useMultiplexedSessionForBlindWrite; this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration; } @@ -317,28 +307,6 @@ public boolean getUseMultiplexedSession() { return useMultiplexedSession; } - @VisibleForTesting - @InternalApi - public boolean getUseMultiplexedSessionForBlindWrite() { - return useMultiplexedSessionForBlindWrite; - } - - private static Boolean getUseMultiplexedSessionBlindWriteFromEnvVariable() { - String useMultiplexedSessionFromEnvVariable = - System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_BLIND_WRITES"); - if (useMultiplexedSessionFromEnvVariable != null - && useMultiplexedSessionFromEnvVariable.length() > 0) { - if ("true".equalsIgnoreCase(useMultiplexedSessionFromEnvVariable) - || "false".equalsIgnoreCase(useMultiplexedSessionFromEnvVariable)) { - return Boolean.parseBoolean(useMultiplexedSessionFromEnvVariable); - } else { - throw new IllegalArgumentException( - "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_BLIND_WRITES should be either true or false."); - } - } - return null; - } - private static Boolean getUseMultiplexedSessionFromEnvVariable() { String useMultiplexedSessionFromEnvVariable = System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"); @@ -561,11 +529,6 @@ public static class Builder { // Set useMultiplexedSession to true to make multiplexed session the default. private boolean useMultiplexedSession = false; - // This field controls the default behavior of session management in Java client. - // Set useMultiplexedSessionForBlindWrite to true to make multiplexed session the default for - // blind writes. - private boolean useMultiplexedSessionForBlindWrite = false; - private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7); private Clock poolMaintainerClock = Clock.INSTANCE; @@ -607,7 +570,6 @@ private Builder(SessionPoolOptions options) { this.randomizePositionQPSThreshold = options.randomizePositionQPSThreshold; this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions; this.useMultiplexedSession = options.useMultiplexedSession; - this.useMultiplexedSessionForBlindWrite = options.useMultiplexedSessionForBlindWrite; this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration; this.poolMaintainerClock = options.poolMaintainerClock; } @@ -795,22 +757,6 @@ Builder setUseMultiplexedSession(boolean useMultiplexedSession) { return this; } - /** - * Sets whether the client should use multiplexed session or not for writeAtLeastOnce. If set to - * true, the client optimises and runs multiple applicable requests concurrently on a single - * session. A single multiplexed session is sufficient to handle all concurrent traffic. - * - *

When set to false, the client uses the regular session cached in the session pool for - * running 1 concurrent transaction per session. We require to provision sufficient sessions by - * making use of {@link SessionPoolOptions#minSessions} and {@link - * SessionPoolOptions#maxSessions} based on the traffic load. Failing to do so will result in - * higher latencies. - */ - Builder setUseMultiplexedSessionForBlindWrite(boolean useMultiplexedSessionForBlindWrite) { - this.useMultiplexedSessionForBlindWrite = useMultiplexedSessionForBlindWrite; - return this; - } - @VisibleForTesting Builder setMultiplexedSessionMaintenanceDuration( Duration multiplexedSessionMaintenanceDuration) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 23fd767e4da..6aa0d646a84 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -277,8 +277,6 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { boolean useMultiplexedSession = getOptions().getSessionPoolOptions().getUseMultiplexedSession(); - boolean useMultiplexedSessionForBlindWrite = - getOptions().getSessionPoolOptions().getUseMultiplexedSessionForBlindWrite(); MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient = useMultiplexedSession ? new MultiplexedSessionDatabaseClient(SpannerImpl.this.getSessionClient(db)) @@ -302,11 +300,7 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { numMultiplexedSessionsReleased); pool.maybeWaitOnMinSessions(); DatabaseClientImpl dbClient = - createDatabaseClient( - clientId, - pool, - multiplexedSessionDatabaseClient, - useMultiplexedSessionForBlindWrite); + createDatabaseClient(clientId, pool, multiplexedSessionDatabaseClient); dbClients.put(db, dbClient); return dbClient; } @@ -317,10 +311,8 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { DatabaseClientImpl createDatabaseClient( String clientId, SessionPool pool, - @Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient, - boolean multiplexedSessionForBlindWrite) { - return new DatabaseClientImpl( - clientId, pool, multiplexedSessionClient, multiplexedSessionForBlindWrite, tracer); + @Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient) { + return new DatabaseClientImpl(clientId, pool, multiplexedSessionClient, tracer); } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java index 22dd72af542..b71771ae2ca 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java @@ -47,10 +47,7 @@ private static class SpannerWithClosedSessionsImpl extends SpannerImpl { @Override DatabaseClientImpl createDatabaseClient( - String clientId, - SessionPool pool, - MultiplexedSessionDatabaseClient ignore, - boolean multiplexedSessionForBlindWrite) { + String clientId, SessionPool pool, MultiplexedSessionDatabaseClient ignore) { return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 370a70d1a97..990ed09b43a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -70,7 +70,6 @@ public void createSpannerInstance() { .setSessionPoolOption( SessionPoolOptions.newBuilder() .setUseMultiplexedSession(true) - .setUseMultiplexedSessionForBlindWrite(true) // Set the maintainer to loop once every 1ms .setMultiplexedSessionMaintenanceLoopFrequency(Duration.ofMillis(1L)) // Set multiplexed sessions to be replaced once every 1ms @@ -319,7 +318,8 @@ public void testMaintainerInvalidatesMultiplexedSessionClientIfUnimplemented() { @Test public void testWriteAtLeastOnceAborted() { - DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); // Force the Commit RPC to return Aborted the first time it is called. The exception is cleared // after the first call, so the retry should succeed. mockSpanner.setCommitExecutionTime( @@ -333,11 +333,16 @@ public void testWriteAtLeastOnceAborted() { List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); assertEquals(2, commitRequests.size()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } @Test public void testWriteAtLeastOnce() { - DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); Timestamp timestamp = client.writeAtLeastOnce( Collections.singletonList( @@ -352,11 +357,16 @@ public void testWriteAtLeastOnce() { assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); assertNotNull(commit.getRequestOptions()); assertEquals(Priority.PRIORITY_UNSPECIFIED, commit.getRequestOptions().getPriority()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } @Test public void testWriteAtLeastOnceWithCommitStats() { - DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); CommitResponse response = client.writeAtLeastOnceWithOptions( Collections.singletonList( @@ -374,11 +384,16 @@ public void testWriteAtLeastOnceWithCommitStats() { assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); assertNotNull(commit.getRequestOptions()); assertEquals(Priority.PRIORITY_UNSPECIFIED, commit.getRequestOptions().getPriority()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } @Test public void testWriteAtLeastOnceWithOptions() { - DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); client.writeAtLeastOnceWithOptions( Collections.singletonList( Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), @@ -392,11 +407,16 @@ public void testWriteAtLeastOnceWithOptions() { assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); assertNotNull(commit.getRequestOptions()); assertEquals(Priority.PRIORITY_LOW, commit.getRequestOptions().getPriority()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } @Test public void testWriteAtLeastOnceWithTagOptions() { - DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); client.writeAtLeastOnceWithOptions( Collections.singletonList( Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), @@ -411,11 +431,16 @@ public void testWriteAtLeastOnceWithTagOptions() { assertNotNull(commit.getRequestOptions()); assertThat(commit.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test"); assertThat(commit.getRequestOptions().getRequestTag()).isEmpty(); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } @Test public void testWriteAtLeastOnceWithExcludeTxnFromChangeStreams() { - DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); client.writeAtLeastOnceWithOptions( Collections.singletonList( Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), @@ -427,6 +452,10 @@ public void testWriteAtLeastOnceWithExcludeTxnFromChangeStreams() { assertNotNull(commit.getSingleUseTransaction()); assertTrue(commit.getSingleUseTransaction().hasReadWrite()); assertTrue(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } private void waitForSessionToBeReplaced(DatabaseClientImpl client) { From 0efb264bae7ef993de609271da46871e95282aa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Mon, 19 Aug 2024 11:55:18 +0200 Subject: [PATCH 5/7] chore: simplify code to make it easier to reuse for later additions --- .../cloud/spanner/DatabaseClientImpl.java | 27 ++++++++++--------- .../DelayedMultiplexedSessionTransaction.java | 22 +++++++++++---- ...edSessionDatabaseClientMockServerTest.java | 8 ++++++ 3 files changed, 39 insertions(+), 18 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index 0f048596f29..41a15b69923 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -65,13 +65,21 @@ PooledSessionFuture getSession() { @VisibleForTesting DatabaseClient getMultiplexedSession() { - if (this.multiplexedSessionDatabaseClient != null - && this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported()) { + if (canUseMultiplexedSessions()) { return this.multiplexedSessionDatabaseClient; } return pool.getMultiplexedSessionWithFallback(); } + private MultiplexedSessionDatabaseClient getMultiplexedSessionDatabaseClient() { + return canUseMultiplexedSessions() ? this.multiplexedSessionDatabaseClient : null; + } + + private boolean canUseMultiplexedSessions() { + return this.multiplexedSessionDatabaseClient != null + && this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported(); + } + @Override public Dialect getDialect() { return pool.getDialect(); @@ -112,19 +120,12 @@ public Timestamp writeAtLeastOnce(final Iterable mutations) throws Spa public CommitResponse writeAtLeastOnceWithOptions( final Iterable mutations, final TransactionOption... options) throws SpannerException { - if (this.multiplexedSessionDatabaseClient != null - && this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported()) { - return this.multiplexedSessionDatabaseClient.writeAtLeastOnceWithOptions(mutations, options); - } else { - return writeAtLeastOnceWithSession(mutations, options); - } - } - - private CommitResponse writeAtLeastOnceWithSession( - final Iterable mutations, final TransactionOption... options) - throws SpannerException { ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options); try (IScope s = tracer.withSpan(span)) { + if (getMultiplexedSessionDatabaseClient() != null) { + return getMultiplexedSessionDatabaseClient() + .writeAtLeastOnceWithOptions(mutations, options); + } return runWithSessionRetry( session -> session.writeAtLeastOnceWithOptions(mutations, options)); } catch (RuntimeException e) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java index 9d95324c5ba..36750eaccd1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java @@ -122,15 +122,27 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { MoreExecutors.directExecutor())); } - /** This is a blocking call as the expected by the parent interface. */ + /** + * This is a blocking method, as the interface that it implements is also defined as a blocking + * method. + */ @Override public CommitResponse writeAtLeastOnceWithOptions( Iterable mutations, TransactionOption... options) throws SpannerException { + SessionReference sessionReference = getSessionReference(); + try (MultiplexedSessionTransaction transaction = + new MultiplexedSessionTransaction(client, span, sessionReference, NO_CHANNEL_HINT, true)) { + return transaction.writeAtLeastOnceWithOptions(mutations, options); + } + } + + /** + * Gets the session reference that this delayed transaction is waiting for. This method should + * only be called by methods that are allowed to be blocking. + */ + private SessionReference getSessionReference() { try { - SessionReference sessionReference = this.sessionFuture.get(); - return new MultiplexedSessionTransaction( - client, span, sessionReference, NO_CHANNEL_HINT, false) - .writeAtLeastOnceWithOptions(mutations, options); + return this.sessionFuture.get(); } catch (ExecutionException executionException) { // Propagate the underlying exception as a RuntimeException (SpannerException is also a // RuntimeException). diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 990ed09b43a..0382b73712e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -333,6 +333,9 @@ public void testWriteAtLeastOnceAborted() { List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); assertEquals(2, commitRequests.size()); + for (CommitRequest request : commitRequests) { + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + } assertNotNull(client.multiplexedSessionDatabaseClient); assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); @@ -357,6 +360,7 @@ public void testWriteAtLeastOnce() { assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); assertNotNull(commit.getRequestOptions()); assertEquals(Priority.PRIORITY_UNSPECIFIED, commit.getRequestOptions().getPriority()); + assertTrue(mockSpanner.getSession(commit.getSession()).getMultiplexed()); assertNotNull(client.multiplexedSessionDatabaseClient); assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); @@ -384,6 +388,7 @@ public void testWriteAtLeastOnceWithCommitStats() { assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); assertNotNull(commit.getRequestOptions()); assertEquals(Priority.PRIORITY_UNSPECIFIED, commit.getRequestOptions().getPriority()); + assertTrue(mockSpanner.getSession(commit.getSession()).getMultiplexed()); assertNotNull(client.multiplexedSessionDatabaseClient); assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); @@ -407,6 +412,7 @@ public void testWriteAtLeastOnceWithOptions() { assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); assertNotNull(commit.getRequestOptions()); assertEquals(Priority.PRIORITY_LOW, commit.getRequestOptions().getPriority()); + assertTrue(mockSpanner.getSession(commit.getSession()).getMultiplexed()); assertNotNull(client.multiplexedSessionDatabaseClient); assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); @@ -431,6 +437,7 @@ public void testWriteAtLeastOnceWithTagOptions() { assertNotNull(commit.getRequestOptions()); assertThat(commit.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test"); assertThat(commit.getRequestOptions().getRequestTag()).isEmpty(); + assertTrue(mockSpanner.getSession(commit.getSession()).getMultiplexed()); assertNotNull(client.multiplexedSessionDatabaseClient); assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); @@ -452,6 +459,7 @@ public void testWriteAtLeastOnceWithExcludeTxnFromChangeStreams() { assertNotNull(commit.getSingleUseTransaction()); assertTrue(commit.getSingleUseTransaction().hasReadWrite()); assertTrue(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); + assertTrue(mockSpanner.getSession(commit.getSession()).getMultiplexed()); assertNotNull(client.multiplexedSessionDatabaseClient); assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); From 341d959a8e1741dccc97547f60efe0a078878502 Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Mon, 16 Sep 2024 21:59:16 +0530 Subject: [PATCH 6/7] feat(spanner): added flag to control use of multiplexed session for blind write. This flag will be used by systest. --- .../executor/spanner/CloudClientExecutor.java | 11 +++--- .../spanner/SessionPoolOptionsHelper.java | 8 +++++ .../cloud/spanner/DatabaseClientImpl.java | 20 +++++++++-- .../cloud/spanner/SessionPoolOptions.java | 36 +++++++++++++++++++ .../com/google/cloud/spanner/SpannerImpl.java | 10 ++++-- .../IntegrationTestWithClosedSessionsEnv.java | 5 ++- ...edSessionDatabaseClientMockServerTest.java | 1 + 7 files changed, 81 insertions(+), 10 deletions(-) diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java index 443a8faf238..d180f55d06a 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java @@ -803,10 +803,13 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex .setTotalTimeout(rpcTimeout) .build(); - com.google.cloud.spanner.SessionPoolOptions sessionPoolOptions = - SessionPoolOptionsHelper.setUseMultiplexedSession( - com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession) - .build(); + com.google.cloud.spanner.SessionPoolOptions.Builder poolOptionsBuilder = + com.google.cloud.spanner.SessionPoolOptions.newBuilder(); + SessionPoolOptionsHelper.setUseMultiplexedSession( + com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession); + SessionPoolOptionsHelper.setUseMultiplexedSessionBlindWrite( + com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession); + com.google.cloud.spanner.SessionPoolOptions sessionPoolOptions = poolOptionsBuilder.build(); // Cloud Spanner Client does not support global retry settings, // Thus, we need to add retry settings to each individual stub. SpannerOptions.Builder optionsBuilder = diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java index 8f978a39a31..8fc89a1eb50 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java @@ -30,4 +30,12 @@ public static SessionPoolOptions.Builder setUseMultiplexedSession( SessionPoolOptions.Builder sessionPoolOptionsBuilder, boolean useMultiplexedSession) { return sessionPoolOptionsBuilder.setUseMultiplexedSession(useMultiplexedSession); } + + // TODO: Remove when Builder.setUseMultiplexedSession(..) has been made public. + public static SessionPoolOptions.Builder setUseMultiplexedSessionBlindWrite( + SessionPoolOptions.Builder sessionPoolOptionsBuilder, + boolean useMultiplexedSessionBlindWrite) { + return sessionPoolOptionsBuilder.setUseMultiplexedSessionBlindWrite( + useMultiplexedSessionBlindWrite); + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index 41a15b69923..909d731818f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -37,23 +37,37 @@ class DatabaseClientImpl implements DatabaseClient { @VisibleForTesting final SessionPool pool; @VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient; + final boolean useMultiplexedSessionBlindWrite; + @VisibleForTesting DatabaseClientImpl(SessionPool pool, TraceWrapper tracer) { - this("", pool, /* multiplexedSessionDatabaseClient = */ null, tracer); + this( + "", + pool, + /* useMultiplexedSessionBlindWrite = */ false, + /* multiplexedSessionDatabaseClient = */ null, + tracer); } @VisibleForTesting DatabaseClientImpl(String clientId, SessionPool pool, TraceWrapper tracer) { - this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, tracer); + this( + clientId, + pool, + /* useMultiplexedSessionBlindWrite = */ false, + /* multiplexedSessionDatabaseClient = */ null, + tracer); } DatabaseClientImpl( String clientId, SessionPool pool, + boolean useMultiplexedSessionBlindWrite, @Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient, TraceWrapper tracer) { this.clientId = clientId; this.pool = pool; + this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite; this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient; this.tracer = tracer; } @@ -122,7 +136,7 @@ public CommitResponse writeAtLeastOnceWithOptions( throws SpannerException { ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options); try (IScope s = tracer.withSpan(span)) { - if (getMultiplexedSessionDatabaseClient() != null) { + if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient() != null) { return getMultiplexedSessionDatabaseClient() .writeAtLeastOnceWithOptions(mutations, options); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 389a91448bf..d124ee372db 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -73,6 +73,12 @@ public class SessionPoolOptions { private final boolean useMultiplexedSession; + /** + * Controls whether multiplexed session is enabled for blind write or not. This is only used for + * systest soak. Should be removed once released. + */ + private final boolean useMultiplexedSessionBlindWrite; + private final boolean useMultiplexedSessionForRW; // TODO: Change to use java.time.Duration. @@ -110,6 +116,7 @@ private SessionPoolOptions(Builder builder) { (useMultiplexedSessionFromEnvVariable != null) ? useMultiplexedSessionFromEnvVariable : builder.useMultiplexedSession; + this.useMultiplexedSessionBlindWrite = builder.useMultiplexedSessionBlindWrite; // useMultiplexedSessionForRW priority => Environment var > private setter > client default Boolean useMultiplexedSessionForRWFromEnvVariable = getUseMultiplexedSessionForRWFromEnvVariable(); @@ -184,6 +191,7 @@ public int hashCode() { this.inactiveTransactionRemovalOptions, this.poolMaintainerClock, this.useMultiplexedSession, + this.useMultiplexedSessionBlindWrite, this.useMultiplexedSessionForRW, this.multiplexedSessionMaintenanceDuration); } @@ -318,6 +326,12 @@ public boolean getUseMultiplexedSession() { return useMultiplexedSession; } + @VisibleForTesting + @InternalApi + public boolean getUseMultiplexedSessionBlindWrite() { + return getUseMultiplexedSession() && useMultiplexedSessionBlindWrite; + } + @VisibleForTesting @InternalApi public boolean getUseMultiplexedSessionForRW() { @@ -554,6 +568,11 @@ public static class Builder { // Set useMultiplexedSession to true to make multiplexed session the default. private boolean useMultiplexedSession = false; + // This field controls the default behavior of session management in Java client. + // Set useMultiplexedSessionBlindWrite to true to make multiplexed session the default for blind + // write. + private boolean useMultiplexedSessionBlindWrite = false; + // This field controls the default behavior of session management for RW operations in Java // client. // Set useMultiplexedSessionForRW to true to make multiplexed session for RW operations the @@ -601,6 +620,7 @@ private Builder(SessionPoolOptions options) { this.randomizePositionQPSThreshold = options.randomizePositionQPSThreshold; this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions; this.useMultiplexedSession = options.useMultiplexedSession; + this.useMultiplexedSessionBlindWrite = options.useMultiplexedSessionBlindWrite; this.useMultiplexedSessionForRW = options.useMultiplexedSessionForRW; this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration; this.poolMaintainerClock = options.poolMaintainerClock; @@ -789,6 +809,22 @@ Builder setUseMultiplexedSession(boolean useMultiplexedSession) { return this; } + /** + * Sets whether the client should use multiplexed session for blind write or not. If set to + * true, the client optimises and runs multiple applicable requests concurrently on a single + * session. A single multiplexed session is sufficient to handle all concurrent traffic. + * + *

When set to false, the client uses the regular session cached in the session pool for + * running 1 concurrent transaction per session. We require to provision sufficient sessions by + * making use of {@link SessionPoolOptions#minSessions} and {@link + * SessionPoolOptions#maxSessions} based on the traffic load. Failing to do so will result in + * higher latencies. + */ + Builder setUseMultiplexedSessionBlindWrite(boolean useMultiplexedSessionBlindWrite) { + this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite; + return this; + } + /** * Sets whether the client should use multiplexed session for R/W operations or not. This method * is intentionally package-private and intended for internal use. diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index bf31df1eb5d..e5982cba0c8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -303,7 +303,11 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { numMultiplexedSessionsReleased); pool.maybeWaitOnMinSessions(); DatabaseClientImpl dbClient = - createDatabaseClient(clientId, pool, multiplexedSessionDatabaseClient); + createDatabaseClient( + clientId, + pool, + getOptions().getSessionPoolOptions().getUseMultiplexedSessionBlindWrite(), + multiplexedSessionDatabaseClient); dbClients.put(db, dbClient); return dbClient; } @@ -314,8 +318,10 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { DatabaseClientImpl createDatabaseClient( String clientId, SessionPool pool, + boolean useMultiplexedSessionBlindWrite, @Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient) { - return new DatabaseClientImpl(clientId, pool, multiplexedSessionClient, tracer); + return new DatabaseClientImpl( + clientId, pool, useMultiplexedSessionBlindWrite, multiplexedSessionClient, tracer); } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java index b71771ae2ca..7627ed54883 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java @@ -47,7 +47,10 @@ private static class SpannerWithClosedSessionsImpl extends SpannerImpl { @Override DatabaseClientImpl createDatabaseClient( - String clientId, SessionPool pool, MultiplexedSessionDatabaseClient ignore) { + String clientId, + SessionPool pool, + boolean useMultiplexedSessionBlindWriteIgnore, + MultiplexedSessionDatabaseClient ignore) { return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 0382b73712e..b6dff424079 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -70,6 +70,7 @@ public void createSpannerInstance() { .setSessionPoolOption( SessionPoolOptions.newBuilder() .setUseMultiplexedSession(true) + .setUseMultiplexedSessionBlindWrite(true) // Set the maintainer to loop once every 1ms .setMultiplexedSessionMaintenanceLoopFrequency(Duration.ofMillis(1L)) // Set multiplexed sessions to be replaced once every 1ms From ad4370d75c29c4567a250cd8838050ecb3c82565 Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Tue, 17 Sep 2024 16:39:08 +0530 Subject: [PATCH 7/7] lint(spanner): javadoc fixes. --- .../spanner/SessionPoolOptionsHelper.java | 2 +- .../cloud/spanner/SessionPoolOptions.java | 21 +++++++------------ 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java index 8fc89a1eb50..dafaa4a1f31 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java @@ -31,7 +31,7 @@ public static SessionPoolOptions.Builder setUseMultiplexedSession( return sessionPoolOptionsBuilder.setUseMultiplexedSession(useMultiplexedSession); } - // TODO: Remove when Builder.setUseMultiplexedSession(..) has been made public. + // TODO: Remove when multiplexed session for blind write is released. public static SessionPoolOptions.Builder setUseMultiplexedSessionBlindWrite( SessionPoolOptions.Builder sessionPoolOptionsBuilder, boolean useMultiplexedSessionBlindWrite) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index d124ee372db..ba335cf8f9f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -75,7 +75,7 @@ public class SessionPoolOptions { /** * Controls whether multiplexed session is enabled for blind write or not. This is only used for - * systest soak. Should be removed once released. + * systest soak. TODO: Remove when multiplexed session for blind write is released. */ private final boolean useMultiplexedSessionBlindWrite; @@ -328,7 +328,7 @@ public boolean getUseMultiplexedSession() { @VisibleForTesting @InternalApi - public boolean getUseMultiplexedSessionBlindWrite() { + protected boolean getUseMultiplexedSessionBlindWrite() { return getUseMultiplexedSession() && useMultiplexedSessionBlindWrite; } @@ -568,9 +568,7 @@ public static class Builder { // Set useMultiplexedSession to true to make multiplexed session the default. private boolean useMultiplexedSession = false; - // This field controls the default behavior of session management in Java client. - // Set useMultiplexedSessionBlindWrite to true to make multiplexed session the default for blind - // write. + // TODO: Remove when multiplexed session for blind write is released. private boolean useMultiplexedSessionBlindWrite = false; // This field controls the default behavior of session management for RW operations in Java @@ -810,16 +808,11 @@ Builder setUseMultiplexedSession(boolean useMultiplexedSession) { } /** - * Sets whether the client should use multiplexed session for blind write or not. If set to - * true, the client optimises and runs multiple applicable requests concurrently on a single - * session. A single multiplexed session is sufficient to handle all concurrent traffic. - * - *

When set to false, the client uses the regular session cached in the session pool for - * running 1 concurrent transaction per session. We require to provision sufficient sessions by - * making use of {@link SessionPoolOptions#minSessions} and {@link - * SessionPoolOptions#maxSessions} based on the traffic load. Failing to do so will result in - * higher latencies. + * This method enables multiplexed sessions for blind writes. This method will be removed in the + * future when multiplexed sessions has been made the default for all operations. */ + @InternalApi + @VisibleForTesting Builder setUseMultiplexedSessionBlindWrite(boolean useMultiplexedSessionBlindWrite) { this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite; return this;