Skip to content

Commit 046299b

Browse files
committed
chore(Spanner): fix tests for mux rw
1 parent 1af8e46 commit 046299b

12 files changed

+355
-29
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -253,10 +253,15 @@ public void onSessionReady(SessionImpl session) {
253253
// initiate a begin transaction request to verify if read-write transactions are
254254
// supported using multiplexed sessions.
255255
if (sessionClient
256-
.getSpanner()
257-
.getOptions()
258-
.getSessionPoolOptions()
259-
.getUseMultiplexedSessionForRW()) {
256+
.getSpanner()
257+
.getOptions()
258+
.getSessionPoolOptions()
259+
.getUseMultiplexedSessionForRW()
260+
&& !sessionClient
261+
.getSpanner()
262+
.getOptions()
263+
.getSessionPoolOptions()
264+
.getSkipVerifyBeginTransactionForMuxRW()) {
260265
verifyBeginTransactionWithRWOnMultiplexedSessionAsync(session.getName());
261266
}
262267
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public class SessionPoolOptions {
8383

8484
// TODO: Change to use java.time.Duration.
8585
private final Duration multiplexedSessionMaintenanceDuration;
86+
private final boolean skipVerifyingBeginTransactionForMuxRW;
8687

8788
private SessionPoolOptions(Builder builder) {
8889
// minSessions > maxSessions is only possible if the user has only set a value for maxSessions.
@@ -132,6 +133,7 @@ private SessionPoolOptions(Builder builder) {
132133
? useMultiplexedSessionFromEnvVariablePartitionedOps
133134
: builder.useMultiplexedSessionPartitionedOps;
134135
this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration;
136+
this.skipVerifyingBeginTransactionForMuxRW = builder.skipVerifyingBeginTransactionForMuxRW;
135137
}
136138

137139
@Override
@@ -169,8 +171,10 @@ public boolean equals(Object o) {
169171
&& Objects.equals(this.useMultiplexedSession, other.useMultiplexedSession)
170172
&& Objects.equals(this.useMultiplexedSessionForRW, other.useMultiplexedSessionForRW)
171173
&& Objects.equals(
172-
this.multiplexedSessionMaintenanceDuration,
173-
other.multiplexedSessionMaintenanceDuration);
174+
this.multiplexedSessionMaintenanceDuration, other.multiplexedSessionMaintenanceDuration)
175+
&& Objects.equals(
176+
this.skipVerifyingBeginTransactionForMuxRW,
177+
other.skipVerifyingBeginTransactionForMuxRW);
174178
}
175179

176180
@Override
@@ -199,7 +203,8 @@ public int hashCode() {
199203
this.poolMaintainerClock,
200204
this.useMultiplexedSession,
201205
this.useMultiplexedSessionForRW,
202-
this.multiplexedSessionMaintenanceDuration);
206+
this.multiplexedSessionMaintenanceDuration,
207+
this.skipVerifyingBeginTransactionForMuxRW);
203208
}
204209

205210
public Builder toBuilder() {
@@ -392,6 +397,12 @@ Duration getMultiplexedSessionMaintenanceDuration() {
392397
return multiplexedSessionMaintenanceDuration;
393398
}
394399

400+
@VisibleForTesting
401+
@InternalApi
402+
boolean getSkipVerifyBeginTransactionForMuxRW() {
403+
return skipVerifyingBeginTransactionForMuxRW;
404+
}
405+
395406
public static Builder newBuilder() {
396407
return new Builder();
397408
}
@@ -607,6 +618,7 @@ public static class Builder {
607618

608619
private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7);
609620
private Clock poolMaintainerClock = Clock.INSTANCE;
621+
private boolean skipVerifyingBeginTransactionForMuxRW = false;
610622

611623
private static Position getReleaseToPositionFromSystemProperty() {
612624
// NOTE: This System property is a beta feature. Support for it can be removed in the future.
@@ -650,6 +662,7 @@ private Builder(SessionPoolOptions options) {
650662
this.useMultiplexedSessionPartitionedOps = options.useMultiplexedSessionForPartitionedOps;
651663
this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration;
652664
this.poolMaintainerClock = options.poolMaintainerClock;
665+
this.skipVerifyingBeginTransactionForMuxRW = options.skipVerifyingBeginTransactionForMuxRW;
653666
}
654667

655668
/**
@@ -872,6 +885,18 @@ Builder setMultiplexedSessionMaintenanceDuration(
872885
return this;
873886
}
874887

888+
// The additional BeginTransaction RPC for multiplexed session read-write is causing
889+
// unexpected behavior in mock Spanner tests that rely on mocking the BeginTransaction RPC.
890+
// Invoking this method with `true` skips sending the BeginTransaction RPC when the multiplexed
891+
// session is created for the first time during client initialization.
892+
// This is only used for tests.
893+
@VisibleForTesting
894+
Builder setSkipVerifyingBeginTransactionForMuxRW(
895+
boolean skipVerifyingBeginTransactionForMuxRW) {
896+
this.skipVerifyingBeginTransactionForMuxRW = skipVerifyingBeginTransactionForMuxRW;
897+
return this;
898+
}
899+
875900
/**
876901
* Sets whether the client should automatically execute a background query to detect the dialect
877902
* that is used by the database or not. Set this option to true if you do not know what the

google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,9 @@ public void tearDown() {
270270
@Test
271271
public void
272272
testPoolMaintainer_whenInactiveTransactionAndSessionIsNotFoundOnBackend_removeSessionsFromPool() {
273+
assumeFalse(
274+
"Session pool maintainer test skipped for multiplexed sessions",
275+
isMultiplexedSessionsEnabledForRW());
273276
FakeClock poolMaintainerClock = new FakeClock();
274277
InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions =
275278
InactiveTransactionRemovalOptions.newBuilder()
@@ -347,6 +350,9 @@ public void tearDown() {
347350
@Test
348351
public void
349352
testPoolMaintainer_whenInactiveTransactionAndSessionExistsOnBackend_removeSessionsFromPool() {
353+
assumeFalse(
354+
"Session leaks tests are skipped for multiplexed sessions",
355+
isMultiplexedSessionsEnabledForRW());
350356
FakeClock poolMaintainerClock = new FakeClock();
351357
InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions =
352358
InactiveTransactionRemovalOptions.newBuilder()
@@ -482,6 +488,9 @@ public void testPoolMaintainer_whenLongRunningPartitionedUpdateRequest_takeNoAct
482488
*/
483489
@Test
484490
public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessionsFromPool() {
491+
assumeFalse(
492+
"Session leaks tests are skipped for multiplexed sessions",
493+
isMultiplexedSessionsEnabledForRW());
485494
FakeClock poolMaintainerClock = new FakeClock();
486495
InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions =
487496
InactiveTransactionRemovalOptions.newBuilder()
@@ -3084,14 +3093,14 @@ public void testDatabaseOrInstanceDoesNotExistOnCreate() {
30843093
.readWriteTransaction()
30853094
.run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT)));
30863095
// No additional requests should have been sent by the client.
3087-
// Note that in case of the use of multiplexed sessions, then we have 2 requests:
3096+
// Note that in case of the use of regular sessions, then we have 1 request:
30883097
// 1. BatchCreateSessions for the session pool.
3089-
// 2. CreateSession for the multiplexed session.
3090-
assertThat(mockSpanner.getRequests())
3091-
.hasSize(
3092-
spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession()
3093-
? 2
3094-
: 1);
3098+
// Note that in case of the use of multiplexed sessions for read-only and read-write,
3099+
// then we have 1 request:
3100+
// 1. CreateSession for the multiplexed session.
3101+
// There will be no BatchCreateSessions request in case of multiplexed sessions, because
3102+
// the session pool options has min size of 0.
3103+
assertThat(mockSpanner.getRequests()).hasSize(1);
30953104
}
30963105
}
30973106
mockSpanner.reset();
@@ -3211,9 +3220,16 @@ public void testDatabaseOrInstanceIsDeletedAndThenRecreated() throws Exception {
32113220
ResourceNotFoundException.class, () -> dbClient.singleUse().executeQuery(SELECT1));
32123221
}
32133222

3214-
assertThrows(
3215-
ResourceNotFoundException.class,
3216-
() -> dbClient.readWriteTransaction().run(transaction -> null));
3223+
if (!spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()) {
3224+
// We only verify this for read-write transactions if we are not using multiplexed
3225+
// sessions. For multiplexed sessions, we don't need any special handling, as deleting the
3226+
// database will also invalidate the multiplexed session, and trying to continue to use it
3227+
// will continue to return an error.
3228+
assertThrows(
3229+
ResourceNotFoundException.class,
3230+
() -> dbClient.readWriteTransaction().run(transaction -> null));
3231+
}
3232+
32173233
assertThat(mockSpanner.getRequests()).isEmpty();
32183234
// Now get a new database client. Normally multiple calls to Spanner#getDatabaseClient will
32193235
// return the same instance, but not when the instance has been invalidated by a
@@ -3300,13 +3316,18 @@ public void testAllowNestedTransactions() throws InterruptedException {
33003316
Thread.sleep(1L);
33013317
}
33023318
assertThat(client.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions);
3319+
int expectedMinSessions =
3320+
spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()
3321+
? minSessions
3322+
: minSessions - 1;
33033323
Long res =
33043324
client
33053325
.readWriteTransaction()
33063326
.allowNestedTransaction()
33073327
.run(
33083328
transaction -> {
3309-
assertThat(client.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions - 1);
3329+
assertThat(client.pool.getNumberOfSessionsInPool())
3330+
.isEqualTo(expectedMinSessions);
33103331
return transaction.executeUpdate(UPDATE_STATEMENT);
33113332
});
33123333
assertThat(res).isEqualTo(UPDATE_COUNT);
@@ -3333,6 +3354,9 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio
33333354
}
33343355
assertThat(client1.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions);
33353356
assertThat(client2.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions);
3357+
// When read-write transaction uses multiplexed sessions, then sessions are not checked out from
3358+
// the session pool.
3359+
int expectedMinSessions = isMultiplexedSessionsEnabledForRW() ? minSessions : minSessions - 1;
33363360
Long res =
33373361
client1
33383362
.readWriteTransaction()
@@ -3341,7 +3365,8 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio
33413365
transaction -> {
33423366
// Client1 should have 1 session checked out.
33433367
// Client2 should have 0 sessions checked out.
3344-
assertThat(client1.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions - 1);
3368+
assertThat(client1.pool.getNumberOfSessionsInPool())
3369+
.isEqualTo(expectedMinSessions);
33453370
assertThat(client2.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions);
33463371
Long add =
33473372
client2
@@ -3350,9 +3375,9 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio
33503375
transaction1 -> {
33513376
// Both clients should now have 1 session checked out.
33523377
assertThat(client1.pool.getNumberOfSessionsInPool())
3353-
.isEqualTo(minSessions - 1);
3378+
.isEqualTo(expectedMinSessions);
33543379
assertThat(client2.pool.getNumberOfSessionsInPool())
3355-
.isEqualTo(minSessions - 1);
3380+
.isEqualTo(expectedMinSessions);
33563381
try (ResultSet rs = transaction1.executeQuery(SELECT1)) {
33573382
if (rs.next()) {
33583383
return rs.getLong(0);
@@ -5090,6 +5115,9 @@ public void testRetryOnResourceExhausted() {
50905115

50915116
@Test
50925117
public void testSessionPoolExhaustedError_containsStackTraces() {
5118+
assumeFalse(
5119+
"Session pool tests are skipped for multiplexed sessions",
5120+
isMultiplexedSessionsEnabledForRW());
50935121
try (Spanner spanner =
50945122
SpannerOptions.newBuilder()
50955123
.setProjectId(TEST_PROJECT)
@@ -5450,4 +5478,11 @@ private boolean isMultiplexedSessionsEnabled() {
54505478
}
54515479
return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession();
54525480
}
5481+
5482+
private boolean isMultiplexedSessionsEnabledForRW() {
5483+
if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) {
5484+
return false;
5485+
}
5486+
return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW();
5487+
}
54535488
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,13 @@ public void setUp() {
191191
.setChannelProvider(channelProvider)
192192
.setCredentials(NoCredentials.getInstance())
193193
.setTrackTransactionStarter()
194+
// The extra BeginTransaction RPC for multiplexed session read-write is causing
195+
// unexpected behavior in tests having a mock on the BeginTransaction RPC. Therefore,
196+
// this is being skipped.
197+
.setSessionPoolOption(
198+
SessionPoolOptions.newBuilder()
199+
.setSkipVerifyingBeginTransactionForMuxRW(true)
200+
.build())
194201
.build()
195202
.getService();
196203
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import com.google.spanner.v1.PartitionReadRequest;
6363
import com.google.spanner.v1.PartitionResponse;
6464
import com.google.spanner.v1.ReadRequest;
65+
import com.google.spanner.v1.RequestOptions;
6566
import com.google.spanner.v1.ResultSet;
6667
import com.google.spanner.v1.ResultSetMetadata;
6768
import com.google.spanner.v1.ResultSetStats;
@@ -1829,7 +1830,7 @@ private ByteString getTransactionId(Session session, TransactionSelector tx) {
18291830
transactionId = null;
18301831
break;
18311832
case BEGIN:
1832-
transactionId = beginTransaction(session, tx.getBegin(), null).getId();
1833+
transactionId = beginTransaction(session, tx.getBegin(), null, null).getId();
18331834
break;
18341835
case ID:
18351836
Transaction transaction = transactions.get(tx.getId());
@@ -1895,7 +1896,8 @@ public void beginTransaction(
18951896
beginTransactionExecutionTime.simulateExecutionTime(
18961897
exceptions, stickyGlobalExceptions, freezeLock);
18971898
Transaction transaction =
1898-
beginTransaction(session, request.getOptions(), request.getMutationKey());
1899+
beginTransaction(
1900+
session, request.getOptions(), request.getMutationKey(), request.getRequestOptions());
18991901
responseObserver.onNext(transaction);
19001902
responseObserver.onCompleted();
19011903
} catch (StatusRuntimeException t) {
@@ -1906,7 +1908,10 @@ public void beginTransaction(
19061908
}
19071909

19081910
private Transaction beginTransaction(
1909-
Session session, TransactionOptions options, com.google.spanner.v1.Mutation mutationKey) {
1911+
Session session,
1912+
TransactionOptions options,
1913+
com.google.spanner.v1.Mutation mutationKey,
1914+
RequestOptions requestOptions) {
19101915
ByteString transactionId = generateTransactionName(session.getName());
19111916
Transaction.Builder builder = Transaction.newBuilder().setId(transactionId);
19121917
if (options != null && options.getModeCase() == ModeCase.READ_ONLY) {
@@ -1920,12 +1925,17 @@ private Transaction beginTransaction(
19201925
}
19211926
Transaction transaction = builder.build();
19221927
transactions.put(transaction.getId(), transaction);
1923-
transactionsStarted.add(transaction.getId());
1928+
// TODO: remove once UNIMPLEMENTED error is not thrown for read-write mux
1929+
// Do not consider the transaction if this request was from background thread
1930+
if (requestOptions == null
1931+
|| !requestOptions.getTransactionTag().equals("multiplexed-rw-background-begin-txn")) {
1932+
transactionsStarted.add(transaction.getId());
1933+
if (abortNextTransaction.getAndSet(false)) {
1934+
markAbortedTransaction(transaction.getId());
1935+
}
1936+
}
19241937
isPartitionedDmlTransaction.put(
19251938
transaction.getId(), options.getModeCase() == ModeCase.PARTITIONED_DML);
1926-
if (abortNextTransaction.getAndSet(false)) {
1927-
markAbortedTransaction(transaction.getId());
1928-
}
19291939
return transaction;
19301940
}
19311941

@@ -2025,7 +2035,8 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
20252035
TransactionOptions.newBuilder()
20262036
.setReadWrite(ReadWrite.getDefaultInstance())
20272037
.build(),
2028-
null);
2038+
null,
2039+
request.getRequestOptions());
20292040
} else if (request.getTransactionId() != null) {
20302041
transaction = transactions.get(request.getTransactionId());
20312042
Optional<Boolean> aborted =

google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryApiTracerTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ public void createSpannerInstance() {
135135
SessionPoolOptions.newBuilder()
136136
.setWaitForMinSessionsDuration(Duration.ofSeconds(5L))
137137
.setFailOnSessionLeak()
138+
.setSkipVerifyingBeginTransactionForMuxRW(true)
138139
.build())
139140
.setEnableApiTracing(true)
140141
.build()
@@ -428,6 +429,7 @@ public boolean isEnableApiTracing() {
428429
SessionPoolOptions.newBuilder()
429430
.setWaitForMinSessionsDuration(Duration.ofSeconds(5L))
430431
.setFailOnSessionLeak()
432+
.setSkipVerifyingBeginTransactionForMuxRW(true)
431433
.build())
432434
.build()
433435
.getService();

google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ public void createSpannerInstance() {
144144
SessionPoolOptions.newBuilder()
145145
.setWaitForMinSessionsDuration(Duration.ofSeconds(5L))
146146
.setFailOnSessionLeak()
147+
.setSkipVerifyingBeginTransactionForMuxRW(true)
147148
.build())
148149
// Setting this to false so that Spanner Options does not register Metrics Tracer
149150
// factory again.

0 commit comments

Comments
 (0)