Skip to content

Commit 76687f8

Browse files
committed
chore(spanner): add mock spanner tests for R/W txn with multiplexed sessions
1 parent dea3c08 commit 76687f8

File tree

5 files changed

+165
-21
lines changed

5 files changed

+165
-21
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,6 @@ public String getDatabaseRole() {
4040
throw new UnsupportedOperationException();
4141
}
4242

43-
@Override
44-
public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
45-
throw new UnsupportedOperationException();
46-
}
47-
48-
@Override
49-
public CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
50-
throws SpannerException {
51-
throw new UnsupportedOperationException();
52-
}
53-
5443
@Override
5544
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
5645
return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp();
@@ -63,16 +52,6 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
6352
throw new UnsupportedOperationException();
6453
}
6554

66-
@Override
67-
public TransactionRunner readWriteTransaction(TransactionOption... options) {
68-
throw new UnsupportedOperationException();
69-
}
70-
71-
@Override
72-
public TransactionManager transactionManager(TransactionOption... options) {
73-
throw new UnsupportedOperationException();
74-
}
75-
7655
@Override
7756
public AsyncRunner runAsync(TransactionOption... options) {
7857
throw new UnsupportedOperationException();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.google.api.core.ApiFuture;
2222
import com.google.api.core.ApiFutures;
23+
import com.google.cloud.Timestamp;
2324
import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction;
2425
import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction;
2526
import com.google.cloud.spanner.Options.TransactionOption;
@@ -136,6 +137,19 @@ public CommitResponse writeAtLeastOnceWithOptions(
136137
}
137138
}
138139

140+
/**
141+
* This is a blocking method, as the interface that it implements is also defined as a blocking
142+
* method.
143+
*/
144+
@Override
145+
public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
146+
SessionReference sessionReference = getSessionReference();
147+
try (MultiplexedSessionTransaction transaction =
148+
new MultiplexedSessionTransaction(client, span, sessionReference, NO_CHANNEL_HINT, false)) {
149+
return transaction.write(mutations);
150+
}
151+
}
152+
139153
/**
140154
* This is a blocking method, as the interface that it implements is also defined as a blocking
141155
* method.

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.api.core.ApiFuture;
2222
import com.google.api.core.ApiFutures;
2323
import com.google.api.core.SettableApiFuture;
24+
import com.google.cloud.Timestamp;
2425
import com.google.cloud.spanner.Options.TransactionOption;
2526
import com.google.cloud.spanner.SessionClient.SessionConsumer;
2627
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
@@ -367,6 +368,11 @@ private int getSingleUseChannelHint() {
367368
}
368369
}
369370

371+
@Override
372+
public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
373+
return createMultiplexedSessionTransaction(false).write(mutations);
374+
}
375+
370376
@Override
371377
public CommitResponse writeWithOptions(
372378
final Iterable<Mutation> mutations, final TransactionOption... options)

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,8 @@ Builder setUseMultiplexedSessionBlindWrite(boolean useMultiplexedSessionBlindWri
822822
* Sets whether the client should use multiplexed session for R/W operations or not. This method
823823
* is intentionally package-private and intended for internal use.
824824
*/
825+
@InternalApi
826+
@VisibleForTesting
825827
Builder setUseMultiplexedSessionForRW(boolean useMultiplexedSessionForRW) {
826828
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
827829
return this;

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public void createSpannerInstance() {
7171
SessionPoolOptions.newBuilder()
7272
.setUseMultiplexedSession(true)
7373
.setUseMultiplexedSessionBlindWrite(true)
74+
.setUseMultiplexedSessionForRW(true)
7475
// Set the maintainer to loop once every 1ms
7576
.setMultiplexedSessionMaintenanceLoopFrequency(Duration.ofMillis(1L))
7677
// Set multiplexed sessions to be replaced once every 1ms
@@ -467,6 +468,148 @@ public void testWriteAtLeastOnceWithExcludeTxnFromChangeStreams() {
467468
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
468469
}
469470

471+
@Test
472+
public void testReadWriteTransactionUsingTransactionRunner() {
473+
// Queries executed within a R/W transaction via TransactionRunner should use a multiplexed
474+
// session.
475+
// During a retry (due to an ABORTED error), the transaction should use the same multiplexed
476+
// session as before, assuming the maintainer hasn't run in the meantime.
477+
DatabaseClientImpl client =
478+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
479+
// Force the Commit RPC to return Aborted the first time it is called. The exception is cleared
480+
// after the first call, so the retry should succeed.
481+
mockSpanner.setCommitExecutionTime(
482+
SimulatedExecutionTime.ofException(
483+
mockSpanner.createAbortedException(ByteString.copyFromUtf8("test"))));
484+
485+
client
486+
.readWriteTransaction()
487+
.run(
488+
transaction -> {
489+
try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) {
490+
//noinspection StatementWithEmptyBody
491+
while (resultSet.next()) {
492+
// ignore
493+
}
494+
}
495+
return null;
496+
});
497+
498+
List<ExecuteSqlRequest> executeSqlRequests =
499+
mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
500+
assertEquals(2, executeSqlRequests.size());
501+
assertEquals(executeSqlRequests.get(0).getSession(), executeSqlRequests.get(1).getSession());
502+
503+
// Verify the requests are executed using multiplexed sessions
504+
for (ExecuteSqlRequest request : executeSqlRequests) {
505+
assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());
506+
}
507+
508+
assertNotNull(client.multiplexedSessionDatabaseClient);
509+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
510+
// TODO: fix this
511+
// assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
512+
}
513+
514+
@Test
515+
public void testReadWriteTransactionUsingTransactionManager() {
516+
// Queries executed within a R/W transaction via TransactionManager should use a multiplexed
517+
// session.
518+
// During a retry (due to an ABORTED error), the transaction should use the same multiplexed
519+
// session as before, assuming the maintainer hasn't run in the meantime.
520+
DatabaseClientImpl client =
521+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
522+
// Force the Commit RPC to return Aborted the first time it is called. The exception is cleared
523+
// after the first call, so the retry should succeed.
524+
mockSpanner.setCommitExecutionTime(
525+
SimulatedExecutionTime.ofException(
526+
mockSpanner.createAbortedException(ByteString.copyFromUtf8("test"))));
527+
528+
try (TransactionManager manager = client.transactionManager()) {
529+
TransactionContext transaction = manager.begin();
530+
while (true) {
531+
try {
532+
try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) {
533+
//noinspection StatementWithEmptyBody
534+
while (resultSet.next()) {
535+
// ignore
536+
}
537+
}
538+
manager.commit();
539+
assertNotNull(manager.getCommitTimestamp());
540+
break;
541+
} catch (AbortedException e) {
542+
transaction = manager.resetForRetry();
543+
}
544+
}
545+
}
546+
547+
List<ExecuteSqlRequest> executeSqlRequests =
548+
mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
549+
assertEquals(2, executeSqlRequests.size());
550+
assertEquals(executeSqlRequests.get(0).getSession(), executeSqlRequests.get(1).getSession());
551+
552+
// Verify the requests are executed using multiplexed sessions
553+
for (ExecuteSqlRequest request : executeSqlRequests) {
554+
assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());
555+
}
556+
557+
assertNotNull(client.multiplexedSessionDatabaseClient);
558+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
559+
// TODO: fix this
560+
// assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
561+
}
562+
563+
@Test
564+
public void testMutationUsingWrite() {
565+
DatabaseClientImpl client =
566+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
567+
// Force the Commit RPC to return Aborted the first time it is called. The exception is cleared
568+
// after the first call, so the retry should succeed.
569+
mockSpanner.setCommitExecutionTime(
570+
SimulatedExecutionTime.ofException(
571+
mockSpanner.createAbortedException(ByteString.copyFromUtf8("test"))));
572+
Timestamp timestamp =
573+
client.write(
574+
Collections.singletonList(
575+
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()));
576+
assertNotNull(timestamp);
577+
578+
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
579+
assertEquals(2, commitRequests.size());
580+
for (CommitRequest request : commitRequests) {
581+
assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());
582+
}
583+
584+
assertNotNull(client.multiplexedSessionDatabaseClient);
585+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
586+
// assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
587+
}
588+
589+
@Test
590+
public void testMutationUsingWriteWithOptions() {
591+
DatabaseClientImpl client =
592+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
593+
CommitResponse response =
594+
client.writeWithOptions(
595+
Collections.singletonList(
596+
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()),
597+
Options.tag("app=spanner,env=test"));
598+
assertNotNull(response);
599+
assertNotNull(response.getCommitTimestamp());
600+
601+
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
602+
assertThat(commitRequests).hasSize(1);
603+
CommitRequest commit = commitRequests.get(0);
604+
assertNotNull(commit.getRequestOptions());
605+
assertThat(commit.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test");
606+
assertTrue(mockSpanner.getSession(commit.getSession()).getMultiplexed());
607+
608+
assertNotNull(client.multiplexedSessionDatabaseClient);
609+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
610+
// assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
611+
}
612+
470613
private void waitForSessionToBeReplaced(DatabaseClientImpl client) {
471614
assertNotNull(client.multiplexedSessionDatabaseClient);
472615
SessionReference sessionReference =

0 commit comments

Comments
 (0)