diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java index 443a8faf238..d180f55d06a 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java @@ -803,10 +803,13 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex .setTotalTimeout(rpcTimeout) .build(); - com.google.cloud.spanner.SessionPoolOptions sessionPoolOptions = - SessionPoolOptionsHelper.setUseMultiplexedSession( - com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession) - .build(); + com.google.cloud.spanner.SessionPoolOptions.Builder poolOptionsBuilder = + com.google.cloud.spanner.SessionPoolOptions.newBuilder(); + SessionPoolOptionsHelper.setUseMultiplexedSession( + com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession); + SessionPoolOptionsHelper.setUseMultiplexedSessionBlindWrite( + com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession); + com.google.cloud.spanner.SessionPoolOptions sessionPoolOptions = poolOptionsBuilder.build(); // Cloud Spanner Client does not support global retry settings, // Thus, we need to add retry settings to each individual stub. SpannerOptions.Builder optionsBuilder = diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java index 8f978a39a31..dafaa4a1f31 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java @@ -30,4 +30,12 @@ public static SessionPoolOptions.Builder setUseMultiplexedSession( SessionPoolOptions.Builder sessionPoolOptionsBuilder, boolean useMultiplexedSession) { return sessionPoolOptionsBuilder.setUseMultiplexedSession(useMultiplexedSession); } + + // TODO: Remove when multiplexed session for blind write is released. + public static SessionPoolOptions.Builder setUseMultiplexedSessionBlindWrite( + SessionPoolOptions.Builder sessionPoolOptionsBuilder, + boolean useMultiplexedSessionBlindWrite) { + return sessionPoolOptionsBuilder.setUseMultiplexedSessionBlindWrite( + useMultiplexedSessionBlindWrite); + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java index 92035a18418..27253bf1e13 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java @@ -53,13 +53,7 @@ public CommitResponse writeWithOptions(Iterable mutations, Transaction @Override public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { - throw new UnsupportedOperationException(); - } - - @Override - public CommitResponse writeAtLeastOnceWithOptions( - Iterable mutations, TransactionOption... options) throws SpannerException { - throw new UnsupportedOperationException(); + return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp(); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index a20bcd9e925..909d731818f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -37,23 +37,37 @@ class DatabaseClientImpl implements DatabaseClient { @VisibleForTesting final SessionPool pool; @VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient; + final boolean useMultiplexedSessionBlindWrite; + @VisibleForTesting DatabaseClientImpl(SessionPool pool, TraceWrapper tracer) { - this("", pool, /* multiplexedSessionDatabaseClient = */ null, tracer); + this( + "", + pool, + /* useMultiplexedSessionBlindWrite = */ false, + /* multiplexedSessionDatabaseClient = */ null, + tracer); } @VisibleForTesting DatabaseClientImpl(String clientId, SessionPool pool, TraceWrapper tracer) { - this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, tracer); + this( + clientId, + pool, + /* useMultiplexedSessionBlindWrite = */ false, + /* multiplexedSessionDatabaseClient = */ null, + tracer); } DatabaseClientImpl( String clientId, SessionPool pool, + boolean useMultiplexedSessionBlindWrite, @Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient, TraceWrapper tracer) { this.clientId = clientId; this.pool = pool; + this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite; this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient; this.tracer = tracer; } @@ -65,13 +79,21 @@ PooledSessionFuture getSession() { @VisibleForTesting DatabaseClient getMultiplexedSession() { - if (this.multiplexedSessionDatabaseClient != null - && this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported()) { + if (canUseMultiplexedSessions()) { return this.multiplexedSessionDatabaseClient; } return pool.getMultiplexedSessionWithFallback(); } + private MultiplexedSessionDatabaseClient getMultiplexedSessionDatabaseClient() { + return canUseMultiplexedSessions() ? this.multiplexedSessionDatabaseClient : null; + } + + private boolean canUseMultiplexedSessions() { + return this.multiplexedSessionDatabaseClient != null + && this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported(); + } + @Override public Dialect getDialect() { return pool.getDialect(); @@ -114,6 +136,10 @@ public CommitResponse writeAtLeastOnceWithOptions( throws SpannerException { ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options); try (IScope s = tracer.withSpan(span)) { + if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient() != null) { + return getMultiplexedSessionDatabaseClient() + .writeAtLeastOnceWithOptions(mutations, options); + } return runWithSessionRetry( session -> session.writeAtLeastOnceWithOptions(mutations, options)); } catch (RuntimeException e) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java index 928927d49a0..36750eaccd1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java @@ -22,7 +22,9 @@ import com.google.api.core.ApiFutures; import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction; import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.common.util.concurrent.MoreExecutors; +import java.util.concurrent.ExecutionException; /** * Represents a delayed execution of a transaction on a multiplexed session. The execution is @@ -119,4 +121,37 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { .readOnlyTransaction(bound), MoreExecutors.directExecutor())); } + + /** + * This is a blocking method, as the interface that it implements is also defined as a blocking + * method. + */ + @Override + public CommitResponse writeAtLeastOnceWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { + SessionReference sessionReference = getSessionReference(); + try (MultiplexedSessionTransaction transaction = + new MultiplexedSessionTransaction(client, span, sessionReference, NO_CHANNEL_HINT, true)) { + return transaction.writeAtLeastOnceWithOptions(mutations, options); + } + } + + /** + * Gets the session reference that this delayed transaction is waiting for. This method should + * only be called by methods that are allowed to be blocking. + */ + private SessionReference getSessionReference() { + try { + return this.sessionFuture.get(); + } catch (ExecutionException executionException) { + // Propagate the underlying exception as a RuntimeException (SpannerException is also a + // RuntimeException). + if (executionException.getCause() instanceof RuntimeException) { + throw (RuntimeException) executionException.getCause(); + } + throw SpannerExceptionFactory.asSpannerException(executionException.getCause()); + } catch (InterruptedException interruptedException) { + throw SpannerExceptionFactory.propagateInterrupt(interruptedException); + } + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index e15fdaf6393..81415e80d25 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -21,6 +21,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.SessionClient.SessionConsumer; import com.google.cloud.spanner.SpannerException.ResourceNotFoundException; import com.google.common.annotations.VisibleForTesting; @@ -107,6 +108,14 @@ void onReadDone() { } } + @Override + public CommitResponse writeAtLeastOnceWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { + CommitResponse response = super.writeAtLeastOnceWithOptions(mutations, options); + onTransactionDone(); + return response; + } + @Override void onTransactionDone() { boolean markedDone = false; @@ -358,6 +367,13 @@ private int getSingleUseChannelHint() { } } + @Override + public CommitResponse writeAtLeastOnceWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { + return createMultiplexedSessionTransaction(true) + .writeAtLeastOnceWithOptions(mutations, options); + } + @Override public ReadContext singleUse() { return createMultiplexedSessionTransaction(true).singleUse(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 389a91448bf..ba335cf8f9f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -73,6 +73,12 @@ public class SessionPoolOptions { private final boolean useMultiplexedSession; + /** + * Controls whether multiplexed session is enabled for blind write or not. This is only used for + * systest soak. TODO: Remove when multiplexed session for blind write is released. + */ + private final boolean useMultiplexedSessionBlindWrite; + private final boolean useMultiplexedSessionForRW; // TODO: Change to use java.time.Duration. @@ -110,6 +116,7 @@ private SessionPoolOptions(Builder builder) { (useMultiplexedSessionFromEnvVariable != null) ? useMultiplexedSessionFromEnvVariable : builder.useMultiplexedSession; + this.useMultiplexedSessionBlindWrite = builder.useMultiplexedSessionBlindWrite; // useMultiplexedSessionForRW priority => Environment var > private setter > client default Boolean useMultiplexedSessionForRWFromEnvVariable = getUseMultiplexedSessionForRWFromEnvVariable(); @@ -184,6 +191,7 @@ public int hashCode() { this.inactiveTransactionRemovalOptions, this.poolMaintainerClock, this.useMultiplexedSession, + this.useMultiplexedSessionBlindWrite, this.useMultiplexedSessionForRW, this.multiplexedSessionMaintenanceDuration); } @@ -318,6 +326,12 @@ public boolean getUseMultiplexedSession() { return useMultiplexedSession; } + @VisibleForTesting + @InternalApi + protected boolean getUseMultiplexedSessionBlindWrite() { + return getUseMultiplexedSession() && useMultiplexedSessionBlindWrite; + } + @VisibleForTesting @InternalApi public boolean getUseMultiplexedSessionForRW() { @@ -554,6 +568,9 @@ public static class Builder { // Set useMultiplexedSession to true to make multiplexed session the default. private boolean useMultiplexedSession = false; + // TODO: Remove when multiplexed session for blind write is released. + private boolean useMultiplexedSessionBlindWrite = false; + // This field controls the default behavior of session management for RW operations in Java // client. // Set useMultiplexedSessionForRW to true to make multiplexed session for RW operations the @@ -601,6 +618,7 @@ private Builder(SessionPoolOptions options) { this.randomizePositionQPSThreshold = options.randomizePositionQPSThreshold; this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions; this.useMultiplexedSession = options.useMultiplexedSession; + this.useMultiplexedSessionBlindWrite = options.useMultiplexedSessionBlindWrite; this.useMultiplexedSessionForRW = options.useMultiplexedSessionForRW; this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration; this.poolMaintainerClock = options.poolMaintainerClock; @@ -789,6 +807,17 @@ Builder setUseMultiplexedSession(boolean useMultiplexedSession) { return this; } + /** + * This method enables multiplexed sessions for blind writes. This method will be removed in the + * future when multiplexed sessions has been made the default for all operations. + */ + @InternalApi + @VisibleForTesting + Builder setUseMultiplexedSessionBlindWrite(boolean useMultiplexedSessionBlindWrite) { + this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite; + return this; + } + /** * Sets whether the client should use multiplexed session for R/W operations or not. This method * is intentionally package-private and intended for internal use. diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index bf31df1eb5d..e5982cba0c8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -303,7 +303,11 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { numMultiplexedSessionsReleased); pool.maybeWaitOnMinSessions(); DatabaseClientImpl dbClient = - createDatabaseClient(clientId, pool, multiplexedSessionDatabaseClient); + createDatabaseClient( + clientId, + pool, + getOptions().getSessionPoolOptions().getUseMultiplexedSessionBlindWrite(), + multiplexedSessionDatabaseClient); dbClients.put(db, dbClient); return dbClient; } @@ -314,8 +318,10 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { DatabaseClientImpl createDatabaseClient( String clientId, SessionPool pool, + boolean useMultiplexedSessionBlindWrite, @Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient) { - return new DatabaseClientImpl(clientId, pool, multiplexedSessionClient, tracer); + return new DatabaseClientImpl( + clientId, pool, useMultiplexedSessionBlindWrite, multiplexedSessionClient, tracer); } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java index b71771ae2ca..7627ed54883 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java @@ -47,7 +47,10 @@ private static class SpannerWithClosedSessionsImpl extends SpannerImpl { @Override DatabaseClientImpl createDatabaseClient( - String clientId, SessionPool pool, MultiplexedSessionDatabaseClient ignore) { + String clientId, + SessionPool pool, + boolean useMultiplexedSessionBlindWriteIgnore, + MultiplexedSessionDatabaseClient ignore) { return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index bf4a02a10c5..b6dff424079 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -16,6 +16,7 @@ package com.google.cloud.spanner; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -24,15 +25,21 @@ import static org.junit.Assert.assertTrue; import com.google.cloud.NoCredentials; +import com.google.cloud.Timestamp; import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.cloud.spanner.Options.RpcPriority; import com.google.cloud.spanner.connection.RandomResultSetGenerator; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.RequestOptions.Priority; import com.google.spanner.v1.Session; import io.grpc.Status; import java.time.Duration; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.UUID; @@ -63,6 +70,7 @@ public void createSpannerInstance() { .setSessionPoolOption( SessionPoolOptions.newBuilder() .setUseMultiplexedSession(true) + .setUseMultiplexedSessionBlindWrite(true) // Set the maintainer to loop once every 1ms .setMultiplexedSessionMaintenanceLoopFrequency(Duration.ofMillis(1L)) // Set multiplexed sessions to be replaced once every 1ms @@ -309,6 +317,156 @@ public void testMaintainerInvalidatesMultiplexedSessionClientIfUnimplemented() { assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } + @Test + public void testWriteAtLeastOnceAborted() { + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + // Force the Commit RPC to return Aborted the first time it is called. The exception is cleared + // after the first call, so the retry should succeed. + mockSpanner.setCommitExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + Timestamp timestamp = + client.writeAtLeastOnce( + Collections.singletonList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + assertNotNull(timestamp); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(2, commitRequests.size()); + for (CommitRequest request : commitRequests) { + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + } + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + } + + @Test + public void testWriteAtLeastOnce() { + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + Timestamp timestamp = + client.writeAtLeastOnce( + Collections.singletonList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + assertNotNull(timestamp); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(commitRequests).hasSize(1); + CommitRequest commit = commitRequests.get(0); + assertNotNull(commit.getSingleUseTransaction()); + assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); + assertNotNull(commit.getRequestOptions()); + assertEquals(Priority.PRIORITY_UNSPECIFIED, commit.getRequestOptions().getPriority()); + assertTrue(mockSpanner.getSession(commit.getSession()).getMultiplexed()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + } + + @Test + public void testWriteAtLeastOnceWithCommitStats() { + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + CommitResponse response = + client.writeAtLeastOnceWithOptions( + Collections.singletonList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.commitStats()); + assertNotNull(response); + assertNotNull(response.getCommitTimestamp()); + assertNotNull(response.getCommitStats()); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(commitRequests).hasSize(1); + CommitRequest commit = commitRequests.get(0); + assertNotNull(commit.getSingleUseTransaction()); + assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); + assertNotNull(commit.getRequestOptions()); + assertEquals(Priority.PRIORITY_UNSPECIFIED, commit.getRequestOptions().getPriority()); + assertTrue(mockSpanner.getSession(commit.getSession()).getMultiplexed()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + } + + @Test + public void testWriteAtLeastOnceWithOptions() { + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + client.writeAtLeastOnceWithOptions( + Collections.singletonList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.priority(RpcPriority.LOW)); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(commitRequests).hasSize(1); + CommitRequest commit = commitRequests.get(0); + assertNotNull(commit.getSingleUseTransaction()); + assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); + assertNotNull(commit.getRequestOptions()); + assertEquals(Priority.PRIORITY_LOW, commit.getRequestOptions().getPriority()); + assertTrue(mockSpanner.getSession(commit.getSession()).getMultiplexed()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + } + + @Test + public void testWriteAtLeastOnceWithTagOptions() { + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + client.writeAtLeastOnceWithOptions( + Collections.singletonList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.tag("app=spanner,env=test")); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(commitRequests).hasSize(1); + CommitRequest commit = commitRequests.get(0); + assertNotNull(commit.getSingleUseTransaction()); + assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); + assertNotNull(commit.getRequestOptions()); + assertThat(commit.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test"); + assertThat(commit.getRequestOptions().getRequestTag()).isEmpty(); + assertTrue(mockSpanner.getSession(commit.getSession()).getMultiplexed()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + } + + @Test + public void testWriteAtLeastOnceWithExcludeTxnFromChangeStreams() { + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + client.writeAtLeastOnceWithOptions( + Collections.singletonList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.excludeTxnFromChangeStreams()); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(commitRequests).hasSize(1); + CommitRequest commit = commitRequests.get(0); + assertNotNull(commit.getSingleUseTransaction()); + assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertTrue(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); + assertTrue(mockSpanner.getSession(commit.getSession()).getMultiplexed()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + } + private void waitForSessionToBeReplaced(DatabaseClientImpl client) { assertNotNull(client.multiplexedSessionDatabaseClient); SessionReference sessionReference =