Skip to content

Commit a8ee36f

Browse files
committed
chore: use an own executor for session maintenance
The session pool uses the gRPC transport executor for session maintenance. This executor should not be used by any other tasks than internal gRPC tasks. This change therefore creates simple executors for session maintenance and session creation. Updates GoogleCloudPlatform/pgadapter#2422
1 parent 1f53d38 commit a8ee36f

File tree

8 files changed

+60
-106
lines changed

8 files changed

+60
-106
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import static com.google.common.base.Preconditions.checkNotNull;
2121

2222
import com.google.api.pathtemplate.PathTemplate;
23-
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
2423
import com.google.cloud.spanner.spi.v1.SpannerRpc;
24+
import com.google.common.annotations.VisibleForTesting;
2525
import com.google.common.base.Preconditions;
2626
import com.google.common.collect.ImmutableMap;
2727
import com.google.common.collect.Maps;
@@ -30,6 +30,8 @@
3030
import java.util.List;
3131
import java.util.Map;
3232
import java.util.concurrent.ScheduledExecutorService;
33+
import java.util.concurrent.ScheduledThreadPoolExecutor;
34+
import java.util.concurrent.TimeUnit;
3335
import javax.annotation.concurrent.GuardedBy;
3436

3537
/** Client for creating single sessions and batches of sessions. */
@@ -167,26 +169,37 @@ interface SessionConsumer {
167169
}
168170

169171
private final SpannerImpl spanner;
170-
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
171172
private final ScheduledExecutorService executor;
172173
private final DatabaseId db;
173174

174175
@GuardedBy("this")
175176
private volatile long sessionChannelCounter;
176177

177-
SessionClient(
178-
SpannerImpl spanner,
179-
DatabaseId db,
180-
ExecutorFactory<ScheduledExecutorService> executorFactory) {
178+
SessionClient(SpannerImpl spanner, DatabaseId db) {
181179
this.spanner = spanner;
182180
this.db = db;
183-
this.executorFactory = executorFactory;
184-
this.executor = executorFactory.get();
181+
this.executor = createExecutor(spanner);
182+
}
183+
184+
private static ScheduledThreadPoolExecutor createExecutor(Spanner spanner) {
185+
ScheduledThreadPoolExecutor executor =
186+
new ScheduledThreadPoolExecutor(
187+
spanner.getOptions().getNumChannels(),
188+
ThreadFactoryUtil.createVirtualOrPlatformDaemonThreadFactory("session-client", true));
189+
executor.setKeepAliveTime(5L, TimeUnit.SECONDS);
190+
executor.allowCoreThreadTimeOut(true);
191+
192+
return executor;
185193
}
186194

187195
@Override
188196
public void close() {
189-
executorFactory.release(executor);
197+
this.executor.shutdown();
198+
}
199+
200+
@VisibleForTesting
201+
ScheduledExecutorService getExecutor() {
202+
return this.executor;
190203
}
191204

192205
SpannerImpl getSpanner() {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@
5353
import com.google.api.gax.rpc.ServerStream;
5454
import com.google.cloud.Timestamp;
5555
import com.google.cloud.Tuple;
56-
import com.google.cloud.grpc.GrpcTransportOptions;
57-
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
5856
import com.google.cloud.spanner.Options.QueryOption;
5957
import com.google.cloud.spanner.Options.ReadOption;
6058
import com.google.cloud.spanner.Options.TransactionOption;
@@ -102,6 +100,7 @@
102100
import java.util.concurrent.CountDownLatch;
103101
import java.util.concurrent.ExecutionException;
104102
import java.util.concurrent.Executor;
103+
import java.util.concurrent.Executors;
105104
import java.util.concurrent.ScheduledExecutorService;
106105
import java.util.concurrent.ScheduledFuture;
107106
import java.util.concurrent.TimeUnit;
@@ -2263,7 +2262,6 @@ enum Position {
22632262
private final SessionClient sessionClient;
22642263
private final int numChannels;
22652264
private final ScheduledExecutorService executor;
2266-
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
22672265

22682266
final PoolMaintainer poolMaintainer;
22692267
private final Clock clock;
@@ -2369,7 +2367,6 @@ static SessionPool createPool(
23692367
return createPool(
23702368
sessionPoolOptions,
23712369
spannerOptions.getDatabaseRole(),
2372-
((GrpcTransportOptions) spannerOptions.getTransportOptions()).getExecutorFactory(),
23732370
sessionClient,
23742371
poolMaintainerClock == null ? new Clock() : poolMaintainerClock,
23752372
Position.RANDOM,
@@ -2384,23 +2381,15 @@ static SessionPool createPool(
23842381

23852382
static SessionPool createPool(
23862383
SessionPoolOptions poolOptions,
2387-
ExecutorFactory<ScheduledExecutorService> executorFactory,
23882384
SessionClient sessionClient,
23892385
TraceWrapper tracer,
23902386
OpenTelemetry openTelemetry) {
23912387
return createPool(
2392-
poolOptions,
2393-
executorFactory,
2394-
sessionClient,
2395-
new Clock(),
2396-
Position.RANDOM,
2397-
tracer,
2398-
openTelemetry);
2388+
poolOptions, sessionClient, new Clock(), Position.RANDOM, tracer, openTelemetry);
23992389
}
24002390

24012391
static SessionPool createPool(
24022392
SessionPoolOptions poolOptions,
2403-
ExecutorFactory<ScheduledExecutorService> executorFactory,
24042393
SessionClient sessionClient,
24052394
Clock clock,
24062395
Position initialReleasePosition,
@@ -2409,7 +2398,6 @@ static SessionPool createPool(
24092398
return createPool(
24102399
poolOptions,
24112400
null,
2412-
executorFactory,
24132401
sessionClient,
24142402
clock,
24152403
initialReleasePosition,
@@ -2425,7 +2413,6 @@ static SessionPool createPool(
24252413
static SessionPool createPool(
24262414
SessionPoolOptions poolOptions,
24272415
String databaseRole,
2428-
ExecutorFactory<ScheduledExecutorService> executorFactory,
24292416
SessionClient sessionClient,
24302417
Clock clock,
24312418
Position initialReleasePosition,
@@ -2440,8 +2427,6 @@ static SessionPool createPool(
24402427
new SessionPool(
24412428
poolOptions,
24422429
databaseRole,
2443-
executorFactory,
2444-
executorFactory.get(),
24452430
sessionClient,
24462431
clock,
24472432
initialReleasePosition,
@@ -2459,8 +2444,6 @@ static SessionPool createPool(
24592444
private SessionPool(
24602445
SessionPoolOptions options,
24612446
String databaseRole,
2462-
ExecutorFactory<ScheduledExecutorService> executorFactory,
2463-
ScheduledExecutorService executor,
24642447
SessionClient sessionClient,
24652448
Clock clock,
24662449
Position initialReleasePosition,
@@ -2473,8 +2456,11 @@ private SessionPool(
24732456
AtomicLong numMultiplexedSessionsReleased) {
24742457
this.options = options;
24752458
this.databaseRole = databaseRole;
2476-
this.executorFactory = executorFactory;
2477-
this.executor = executor;
2459+
this.executor =
2460+
Executors.newScheduledThreadPool(
2461+
1,
2462+
ThreadFactoryUtil.createVirtualOrPlatformDaemonThreadFactory(
2463+
"session-pool-maintainer", true));
24782464
this.sessionClient = sessionClient;
24792465
this.numChannels = sessionClient.getSpanner().getOptions().getNumChannels();
24802466
this.clock = clock;
@@ -3064,6 +3050,7 @@ ListenableFuture<Void> closeAsync(ClosedException closedException) {
30643050
pendingClosure += 1; // For pool maintenance thread
30653051
poolMaintainer.close();
30663052
}
3053+
executor.shutdown();
30673054

30683055
sessions.clear();
30693056
for (PooledSessionFuture session : checkedOutSessions) {
@@ -3097,7 +3084,6 @@ ListenableFuture<Void> closeAsync(ClosedException closedException) {
30973084
}
30983085
}
30993086

3100-
retFuture.addListener(() -> executorFactory.release(executor), MoreExecutors.directExecutor());
31013087
return retFuture;
31023088
}
31033089

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.google.cloud.BaseService;
2323
import com.google.cloud.PageImpl;
2424
import com.google.cloud.PageImpl.NextPageFetcher;
25-
import com.google.cloud.grpc.GrpcTransportOptions;
2625
import com.google.cloud.spanner.SessionClient.SessionId;
2726
import com.google.cloud.spanner.SpannerOptions.CloseableExecutorProvider;
2827
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
@@ -199,11 +198,7 @@ SessionClient getSessionClient(DatabaseId db) {
199198
if (sessionClients.containsKey(db)) {
200199
return sessionClients.get(db);
201200
} else {
202-
SessionClient client =
203-
new SessionClient(
204-
this,
205-
db,
206-
((GrpcTransportOptions) getOptions().getTransportOptions()).getExecutorFactory());
201+
SessionClient client = new SessionClient(this, db);
207202
sessionClients.put(db, client);
208203
return client;
209204
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/ITSessionPoolIntegrationTest.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,12 @@
1818

1919
import static com.google.common.truth.Truth.assertThat;
2020

21-
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
2221
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
2322
import io.opencensus.trace.Tracing;
2423
import io.opentelemetry.api.OpenTelemetry;
2524
import java.util.ArrayList;
2625
import java.util.List;
2726
import java.util.concurrent.CountDownLatch;
28-
import java.util.concurrent.ScheduledExecutorService;
29-
import java.util.concurrent.ScheduledThreadPoolExecutor;
3027
import java.util.concurrent.TimeUnit;
3128
import org.junit.Before;
3229
import org.junit.BeforeClass;
@@ -83,18 +80,6 @@ public void setUp() {
8380
pool =
8481
SessionPool.createPool(
8582
options,
86-
new ExecutorFactory<ScheduledExecutorService>() {
87-
88-
@Override
89-
public void release(ScheduledExecutorService executor) {
90-
executor.shutdown();
91-
}
92-
93-
@Override
94-
public ScheduledExecutorService get() {
95-
return new ScheduledThreadPoolExecutor(2);
96-
}
97-
},
9883
((SpannerImpl) env.getTestHelper().getClient()).getSessionClient(db.getId()),
9984
new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer(""), false),
10085
OpenTelemetry.noop());

0 commit comments

Comments
 (0)