Skip to content

Commit 341d959

Browse files
feat(spanner): added flag to control use of multiplexed session for blind write. This flag will be used by systest.
1 parent 60e45cc commit 341d959

File tree

7 files changed

+81
-10
lines changed

7 files changed

+81
-10
lines changed

google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -803,10 +803,13 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
803803
.setTotalTimeout(rpcTimeout)
804804
.build();
805805

806-
com.google.cloud.spanner.SessionPoolOptions sessionPoolOptions =
807-
SessionPoolOptionsHelper.setUseMultiplexedSession(
808-
com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession)
809-
.build();
806+
com.google.cloud.spanner.SessionPoolOptions.Builder poolOptionsBuilder =
807+
com.google.cloud.spanner.SessionPoolOptions.newBuilder();
808+
SessionPoolOptionsHelper.setUseMultiplexedSession(
809+
com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession);
810+
SessionPoolOptionsHelper.setUseMultiplexedSessionBlindWrite(
811+
com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession);
812+
com.google.cloud.spanner.SessionPoolOptions sessionPoolOptions = poolOptionsBuilder.build();
810813
// Cloud Spanner Client does not support global retry settings,
811814
// Thus, we need to add retry settings to each individual stub.
812815
SpannerOptions.Builder optionsBuilder =

google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,12 @@ public static SessionPoolOptions.Builder setUseMultiplexedSession(
3030
SessionPoolOptions.Builder sessionPoolOptionsBuilder, boolean useMultiplexedSession) {
3131
return sessionPoolOptionsBuilder.setUseMultiplexedSession(useMultiplexedSession);
3232
}
33+
34+
// TODO: Remove when Builder.setUseMultiplexedSession(..) has been made public.
35+
public static SessionPoolOptions.Builder setUseMultiplexedSessionBlindWrite(
36+
SessionPoolOptions.Builder sessionPoolOptionsBuilder,
37+
boolean useMultiplexedSessionBlindWrite) {
38+
return sessionPoolOptionsBuilder.setUseMultiplexedSessionBlindWrite(
39+
useMultiplexedSessionBlindWrite);
40+
}
3341
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,23 +37,37 @@ class DatabaseClientImpl implements DatabaseClient {
3737
@VisibleForTesting final SessionPool pool;
3838
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
3939

40+
final boolean useMultiplexedSessionBlindWrite;
41+
4042
@VisibleForTesting
4143
DatabaseClientImpl(SessionPool pool, TraceWrapper tracer) {
42-
this("", pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
44+
this(
45+
"",
46+
pool,
47+
/* useMultiplexedSessionBlindWrite = */ false,
48+
/* multiplexedSessionDatabaseClient = */ null,
49+
tracer);
4350
}
4451

4552
@VisibleForTesting
4653
DatabaseClientImpl(String clientId, SessionPool pool, TraceWrapper tracer) {
47-
this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
54+
this(
55+
clientId,
56+
pool,
57+
/* useMultiplexedSessionBlindWrite = */ false,
58+
/* multiplexedSessionDatabaseClient = */ null,
59+
tracer);
4860
}
4961

5062
DatabaseClientImpl(
5163
String clientId,
5264
SessionPool pool,
65+
boolean useMultiplexedSessionBlindWrite,
5366
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient,
5467
TraceWrapper tracer) {
5568
this.clientId = clientId;
5669
this.pool = pool;
70+
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
5771
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
5872
this.tracer = tracer;
5973
}
@@ -122,7 +136,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
122136
throws SpannerException {
123137
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
124138
try (IScope s = tracer.withSpan(span)) {
125-
if (getMultiplexedSessionDatabaseClient() != null) {
139+
if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient() != null) {
126140
return getMultiplexedSessionDatabaseClient()
127141
.writeAtLeastOnceWithOptions(mutations, options);
128142
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ public class SessionPoolOptions {
7373

7474
private final boolean useMultiplexedSession;
7575

76+
/**
77+
* Controls whether multiplexed session is enabled for blind write or not. This is only used for
78+
* systest soak. Should be removed once released.
79+
*/
80+
private final boolean useMultiplexedSessionBlindWrite;
81+
7682
private final boolean useMultiplexedSessionForRW;
7783

7884
// TODO: Change to use java.time.Duration.
@@ -110,6 +116,7 @@ private SessionPoolOptions(Builder builder) {
110116
(useMultiplexedSessionFromEnvVariable != null)
111117
? useMultiplexedSessionFromEnvVariable
112118
: builder.useMultiplexedSession;
119+
this.useMultiplexedSessionBlindWrite = builder.useMultiplexedSessionBlindWrite;
113120
// useMultiplexedSessionForRW priority => Environment var > private setter > client default
114121
Boolean useMultiplexedSessionForRWFromEnvVariable =
115122
getUseMultiplexedSessionForRWFromEnvVariable();
@@ -184,6 +191,7 @@ public int hashCode() {
184191
this.inactiveTransactionRemovalOptions,
185192
this.poolMaintainerClock,
186193
this.useMultiplexedSession,
194+
this.useMultiplexedSessionBlindWrite,
187195
this.useMultiplexedSessionForRW,
188196
this.multiplexedSessionMaintenanceDuration);
189197
}
@@ -318,6 +326,12 @@ public boolean getUseMultiplexedSession() {
318326
return useMultiplexedSession;
319327
}
320328

329+
@VisibleForTesting
330+
@InternalApi
331+
public boolean getUseMultiplexedSessionBlindWrite() {
332+
return getUseMultiplexedSession() && useMultiplexedSessionBlindWrite;
333+
}
334+
321335
@VisibleForTesting
322336
@InternalApi
323337
public boolean getUseMultiplexedSessionForRW() {
@@ -554,6 +568,11 @@ public static class Builder {
554568
// Set useMultiplexedSession to true to make multiplexed session the default.
555569
private boolean useMultiplexedSession = false;
556570

571+
// This field controls the default behavior of session management in Java client.
572+
// Set useMultiplexedSessionBlindWrite to true to make multiplexed session the default for blind
573+
// write.
574+
private boolean useMultiplexedSessionBlindWrite = false;
575+
557576
// This field controls the default behavior of session management for RW operations in Java
558577
// client.
559578
// Set useMultiplexedSessionForRW to true to make multiplexed session for RW operations the
@@ -601,6 +620,7 @@ private Builder(SessionPoolOptions options) {
601620
this.randomizePositionQPSThreshold = options.randomizePositionQPSThreshold;
602621
this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions;
603622
this.useMultiplexedSession = options.useMultiplexedSession;
623+
this.useMultiplexedSessionBlindWrite = options.useMultiplexedSessionBlindWrite;
604624
this.useMultiplexedSessionForRW = options.useMultiplexedSessionForRW;
605625
this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration;
606626
this.poolMaintainerClock = options.poolMaintainerClock;
@@ -789,6 +809,22 @@ Builder setUseMultiplexedSession(boolean useMultiplexedSession) {
789809
return this;
790810
}
791811

812+
/**
813+
* Sets whether the client should use multiplexed session for blind write or not. If set to
814+
* true, the client optimises and runs multiple applicable requests concurrently on a single
815+
* session. A single multiplexed session is sufficient to handle all concurrent traffic.
816+
*
817+
* <p>When set to false, the client uses the regular session cached in the session pool for
818+
* running 1 concurrent transaction per session. We require to provision sufficient sessions by
819+
* making use of {@link SessionPoolOptions#minSessions} and {@link
820+
* SessionPoolOptions#maxSessions} based on the traffic load. Failing to do so will result in
821+
* higher latencies.
822+
*/
823+
Builder setUseMultiplexedSessionBlindWrite(boolean useMultiplexedSessionBlindWrite) {
824+
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
825+
return this;
826+
}
827+
792828
/**
793829
* Sets whether the client should use multiplexed session for R/W operations or not. This method
794830
* is intentionally package-private and intended for internal use.

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,11 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
303303
numMultiplexedSessionsReleased);
304304
pool.maybeWaitOnMinSessions();
305305
DatabaseClientImpl dbClient =
306-
createDatabaseClient(clientId, pool, multiplexedSessionDatabaseClient);
306+
createDatabaseClient(
307+
clientId,
308+
pool,
309+
getOptions().getSessionPoolOptions().getUseMultiplexedSessionBlindWrite(),
310+
multiplexedSessionDatabaseClient);
307311
dbClients.put(db, dbClient);
308312
return dbClient;
309313
}
@@ -314,8 +318,10 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
314318
DatabaseClientImpl createDatabaseClient(
315319
String clientId,
316320
SessionPool pool,
321+
boolean useMultiplexedSessionBlindWrite,
317322
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient) {
318-
return new DatabaseClientImpl(clientId, pool, multiplexedSessionClient, tracer);
323+
return new DatabaseClientImpl(
324+
clientId, pool, useMultiplexedSessionBlindWrite, multiplexedSessionClient, tracer);
319325
}
320326

321327
@Override

google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@ private static class SpannerWithClosedSessionsImpl extends SpannerImpl {
4747

4848
@Override
4949
DatabaseClientImpl createDatabaseClient(
50-
String clientId, SessionPool pool, MultiplexedSessionDatabaseClient ignore) {
50+
String clientId,
51+
SessionPool pool,
52+
boolean useMultiplexedSessionBlindWriteIgnore,
53+
MultiplexedSessionDatabaseClient ignore) {
5154
return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer);
5255
}
5356
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public void createSpannerInstance() {
7070
.setSessionPoolOption(
7171
SessionPoolOptions.newBuilder()
7272
.setUseMultiplexedSession(true)
73+
.setUseMultiplexedSessionBlindWrite(true)
7374
// Set the maintainer to loop once every 1ms
7475
.setMultiplexedSessionMaintenanceLoopFrequency(Duration.ofMillis(1L))
7576
// Set multiplexed sessions to be replaced once every 1ms

0 commit comments

Comments
 (0)