Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ If you are using Maven without the BOM, add this to your dependencies:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-spanner</artifactId>
<version>6.78.0</version>
<version>6.79.0</version>
</dependency>

```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.pathtemplate.PathTemplate;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
Expand All @@ -30,6 +30,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;

/** Client for creating single sessions and batches of sessions. */
Expand Down Expand Up @@ -167,26 +169,37 @@ interface SessionConsumer {
}

private final SpannerImpl spanner;
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
private final ScheduledExecutorService executor;
private final DatabaseId db;

@GuardedBy("this")
private volatile long sessionChannelCounter;

SessionClient(
SpannerImpl spanner,
DatabaseId db,
ExecutorFactory<ScheduledExecutorService> executorFactory) {
SessionClient(SpannerImpl spanner, DatabaseId db) {
this.spanner = spanner;
this.db = db;
this.executorFactory = executorFactory;
this.executor = executorFactory.get();
this.executor = createExecutor(spanner);
}

private static ScheduledThreadPoolExecutor createExecutor(Spanner spanner) {
ScheduledThreadPoolExecutor executor =
new ScheduledThreadPoolExecutor(
spanner.getOptions().getNumChannels(),
ThreadFactoryUtil.createVirtualOrPlatformDaemonThreadFactory("session-client", true));
executor.setKeepAliveTime(5L, TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(true);

return executor;
}

@Override
public void close() {
executorFactory.release(executor);
this.executor.shutdown();
}

@VisibleForTesting
ScheduledExecutorService getExecutor() {
return this.executor;
}

SpannerImpl getSpanner() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.Tuple;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.Options.TransactionOption;
Expand Down Expand Up @@ -102,6 +100,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -2263,7 +2262,6 @@ enum Position {
private final SessionClient sessionClient;
private final int numChannels;
private final ScheduledExecutorService executor;
private final ExecutorFactory<ScheduledExecutorService> executorFactory;

final PoolMaintainer poolMaintainer;
private final Clock clock;
Expand Down Expand Up @@ -2369,7 +2367,6 @@ static SessionPool createPool(
return createPool(
sessionPoolOptions,
spannerOptions.getDatabaseRole(),
((GrpcTransportOptions) spannerOptions.getTransportOptions()).getExecutorFactory(),
sessionClient,
poolMaintainerClock == null ? new Clock() : poolMaintainerClock,
Position.RANDOM,
Expand All @@ -2384,23 +2381,15 @@ static SessionPool createPool(

static SessionPool createPool(
SessionPoolOptions poolOptions,
ExecutorFactory<ScheduledExecutorService> executorFactory,
SessionClient sessionClient,
TraceWrapper tracer,
OpenTelemetry openTelemetry) {
return createPool(
poolOptions,
executorFactory,
sessionClient,
new Clock(),
Position.RANDOM,
tracer,
openTelemetry);
poolOptions, sessionClient, new Clock(), Position.RANDOM, tracer, openTelemetry);
}

static SessionPool createPool(
SessionPoolOptions poolOptions,
ExecutorFactory<ScheduledExecutorService> executorFactory,
SessionClient sessionClient,
Clock clock,
Position initialReleasePosition,
Expand All @@ -2409,7 +2398,6 @@ static SessionPool createPool(
return createPool(
poolOptions,
null,
executorFactory,
sessionClient,
clock,
initialReleasePosition,
Expand All @@ -2425,7 +2413,6 @@ static SessionPool createPool(
static SessionPool createPool(
SessionPoolOptions poolOptions,
String databaseRole,
ExecutorFactory<ScheduledExecutorService> executorFactory,
SessionClient sessionClient,
Clock clock,
Position initialReleasePosition,
Expand All @@ -2440,8 +2427,6 @@ static SessionPool createPool(
new SessionPool(
poolOptions,
databaseRole,
executorFactory,
executorFactory.get(),
sessionClient,
clock,
initialReleasePosition,
Expand All @@ -2459,8 +2444,6 @@ static SessionPool createPool(
private SessionPool(
SessionPoolOptions options,
String databaseRole,
ExecutorFactory<ScheduledExecutorService> executorFactory,
ScheduledExecutorService executor,
SessionClient sessionClient,
Clock clock,
Position initialReleasePosition,
Expand All @@ -2473,8 +2456,11 @@ private SessionPool(
AtomicLong numMultiplexedSessionsReleased) {
this.options = options;
this.databaseRole = databaseRole;
this.executorFactory = executorFactory;
this.executor = executor;
this.executor =
Executors.newScheduledThreadPool(
1,
ThreadFactoryUtil.createVirtualOrPlatformDaemonThreadFactory(
"session-pool-maintainer", true));
this.sessionClient = sessionClient;
this.numChannels = sessionClient.getSpanner().getOptions().getNumChannels();
this.clock = clock;
Expand Down Expand Up @@ -3064,6 +3050,7 @@ ListenableFuture<Void> closeAsync(ClosedException closedException) {
pendingClosure += 1; // For pool maintenance thread
poolMaintainer.close();
}
executor.shutdown();

sessions.clear();
for (PooledSessionFuture session : checkedOutSessions) {
Expand Down Expand Up @@ -3097,7 +3084,6 @@ ListenableFuture<Void> closeAsync(ClosedException closedException) {
}
}

retFuture.addListener(() -> executorFactory.release(executor), MoreExecutors.directExecutor());
return retFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.cloud.BaseService;
import com.google.cloud.PageImpl;
import com.google.cloud.PageImpl.NextPageFetcher;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.SessionClient.SessionId;
import com.google.cloud.spanner.SpannerOptions.CloseableExecutorProvider;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
Expand Down Expand Up @@ -199,11 +198,7 @@ SessionClient getSessionClient(DatabaseId db) {
if (sessionClients.containsKey(db)) {
return sessionClients.get(db);
} else {
SessionClient client =
new SessionClient(
this,
db,
((GrpcTransportOptions) getOptions().getTransportOptions()).getExecutorFactory());
SessionClient client = new SessionClient(this, db);
sessionClients.put(db, client);
return client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@

import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import io.opencensus.trace.Tracing;
import io.opentelemetry.api.OpenTelemetry;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -83,18 +80,6 @@ public void setUp() {
pool =
SessionPool.createPool(
options,
new ExecutorFactory<ScheduledExecutorService>() {

@Override
public void release(ScheduledExecutorService executor) {
executor.shutdown();
}

@Override
public ScheduledExecutorService get() {
return new ScheduledThreadPoolExecutor(2);
}
},
((SpannerImpl) env.getTestHelper().getClient()).getSessionClient(db.getId()),
new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer(""), false),
OpenTelemetry.noop());
Expand Down
Loading
Loading