diff --git a/README.md b/README.md index 80a9dbb252b..6d1ca42d1fc 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ If you are using Maven without the BOM, add this to your dependencies: com.google.cloud google-cloud-spanner - 6.78.0 + 6.79.0 ``` diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index 0eed13b018c..e8f29c929c8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -20,8 +20,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.api.pathtemplate.PathTemplate; -import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -30,6 +30,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; /** Client for creating single sessions and batches of sessions. */ @@ -167,26 +169,37 @@ interface SessionConsumer { } private final SpannerImpl spanner; - private final ExecutorFactory executorFactory; private final ScheduledExecutorService executor; private final DatabaseId db; @GuardedBy("this") private volatile long sessionChannelCounter; - SessionClient( - SpannerImpl spanner, - DatabaseId db, - ExecutorFactory executorFactory) { + SessionClient(SpannerImpl spanner, DatabaseId db) { this.spanner = spanner; this.db = db; - this.executorFactory = executorFactory; - this.executor = executorFactory.get(); + this.executor = createExecutor(spanner); + } + + private static ScheduledThreadPoolExecutor createExecutor(Spanner spanner) { + ScheduledThreadPoolExecutor executor = + new ScheduledThreadPoolExecutor( + spanner.getOptions().getNumChannels(), + ThreadFactoryUtil.createVirtualOrPlatformDaemonThreadFactory("session-client", true)); + executor.setKeepAliveTime(5L, TimeUnit.SECONDS); + executor.allowCoreThreadTimeOut(true); + + return executor; } @Override public void close() { - executorFactory.release(executor); + this.executor.shutdown(); + } + + @VisibleForTesting + ScheduledExecutorService getExecutor() { + return this.executor; } SpannerImpl getSpanner() { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index cf50fa44c77..ba28ab256e1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -53,8 +53,6 @@ import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; import com.google.cloud.Tuple; -import com.google.cloud.grpc.GrpcTransportOptions; -import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.Options.QueryOption; import com.google.cloud.spanner.Options.ReadOption; import com.google.cloud.spanner.Options.TransactionOption; @@ -102,6 +100,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -2263,7 +2262,6 @@ enum Position { private final SessionClient sessionClient; private final int numChannels; private final ScheduledExecutorService executor; - private final ExecutorFactory executorFactory; final PoolMaintainer poolMaintainer; private final Clock clock; @@ -2369,7 +2367,6 @@ static SessionPool createPool( return createPool( sessionPoolOptions, spannerOptions.getDatabaseRole(), - ((GrpcTransportOptions) spannerOptions.getTransportOptions()).getExecutorFactory(), sessionClient, poolMaintainerClock == null ? new Clock() : poolMaintainerClock, Position.RANDOM, @@ -2384,23 +2381,15 @@ static SessionPool createPool( static SessionPool createPool( SessionPoolOptions poolOptions, - ExecutorFactory executorFactory, SessionClient sessionClient, TraceWrapper tracer, OpenTelemetry openTelemetry) { return createPool( - poolOptions, - executorFactory, - sessionClient, - new Clock(), - Position.RANDOM, - tracer, - openTelemetry); + poolOptions, sessionClient, new Clock(), Position.RANDOM, tracer, openTelemetry); } static SessionPool createPool( SessionPoolOptions poolOptions, - ExecutorFactory executorFactory, SessionClient sessionClient, Clock clock, Position initialReleasePosition, @@ -2409,7 +2398,6 @@ static SessionPool createPool( return createPool( poolOptions, null, - executorFactory, sessionClient, clock, initialReleasePosition, @@ -2425,7 +2413,6 @@ static SessionPool createPool( static SessionPool createPool( SessionPoolOptions poolOptions, String databaseRole, - ExecutorFactory executorFactory, SessionClient sessionClient, Clock clock, Position initialReleasePosition, @@ -2440,8 +2427,6 @@ static SessionPool createPool( new SessionPool( poolOptions, databaseRole, - executorFactory, - executorFactory.get(), sessionClient, clock, initialReleasePosition, @@ -2459,8 +2444,6 @@ static SessionPool createPool( private SessionPool( SessionPoolOptions options, String databaseRole, - ExecutorFactory executorFactory, - ScheduledExecutorService executor, SessionClient sessionClient, Clock clock, Position initialReleasePosition, @@ -2473,8 +2456,11 @@ private SessionPool( AtomicLong numMultiplexedSessionsReleased) { this.options = options; this.databaseRole = databaseRole; - this.executorFactory = executorFactory; - this.executor = executor; + this.executor = + Executors.newScheduledThreadPool( + 1, + ThreadFactoryUtil.createVirtualOrPlatformDaemonThreadFactory( + "session-pool-maintainer", true)); this.sessionClient = sessionClient; this.numChannels = sessionClient.getSpanner().getOptions().getNumChannels(); this.clock = clock; @@ -3064,6 +3050,7 @@ ListenableFuture closeAsync(ClosedException closedException) { pendingClosure += 1; // For pool maintenance thread poolMaintainer.close(); } + executor.shutdown(); sessions.clear(); for (PooledSessionFuture session : checkedOutSessions) { @@ -3097,7 +3084,6 @@ ListenableFuture closeAsync(ClosedException closedException) { } } - retFuture.addListener(() -> executorFactory.release(executor), MoreExecutors.directExecutor()); return retFuture; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index dac1fc2c82b..4f4c5be6fe0 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -22,7 +22,6 @@ import com.google.cloud.BaseService; import com.google.cloud.PageImpl; import com.google.cloud.PageImpl.NextPageFetcher; -import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.spanner.SessionClient.SessionId; import com.google.cloud.spanner.SpannerOptions.CloseableExecutorProvider; import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings; @@ -199,11 +198,7 @@ SessionClient getSessionClient(DatabaseId db) { if (sessionClients.containsKey(db)) { return sessionClients.get(db); } else { - SessionClient client = - new SessionClient( - this, - db, - ((GrpcTransportOptions) getOptions().getTransportOptions()).getExecutorFactory()); + SessionClient client = new SessionClient(this, db); sessionClients.put(db, client); return client; } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ITSessionPoolIntegrationTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ITSessionPoolIntegrationTest.java index df29aac9170..5d407bdafca 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ITSessionPoolIntegrationTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ITSessionPoolIntegrationTest.java @@ -18,15 +18,12 @@ import static com.google.common.truth.Truth.assertThat; -import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import io.opencensus.trace.Tracing; import io.opentelemetry.api.OpenTelemetry; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.BeforeClass; @@ -83,18 +80,6 @@ public void setUp() { pool = SessionPool.createPool( options, - new ExecutorFactory() { - - @Override - public void release(ScheduledExecutorService executor) { - executor.shutdown(); - } - - @Override - public ScheduledExecutorService get() { - return new ScheduledThreadPoolExecutor(2); - } - }, ((SpannerImpl) env.getTestHelper().getClient()).getSessionClient(db.getId()), new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer(""), false), OpenTelemetry.noop()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java index bcba430c521..2d01fb35e72 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java @@ -44,7 +44,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -63,23 +62,6 @@ @RunWith(Parameterized.class) public class SessionClientTests { - private final class TestExecutorFactory implements ExecutorFactory { - @Override - public ScheduledExecutorService get() { - return Executors.newScheduledThreadPool(spanner.getOptions().getNumChannels()); - } - - @Override - public void release(ScheduledExecutorService executor) { - executor.shutdown(); - try { - executor.awaitTermination(10_000L, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - @Parameters(name = "NumChannels = {0}") public static Collection data() { return Arrays.asList(new Object[][] {{1}, {2}, {4}, {8}}); @@ -148,7 +130,7 @@ public void createAndCloseSession() { Mockito.eq(dbName), Mockito.eq(databaseRole), Mockito.eq(labels), options.capture())) .thenReturn(sessionProto); - try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) { + try (SessionClient client = new SessionClient(spanner, db)) { Session session = client.createSession(); assertThat(session.getName()).isEqualTo(sessionName); @@ -194,7 +176,7 @@ public void onSessionReady(SessionImpl session) { @Override public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {} }; - try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) { + try (SessionClient client = new SessionClient(spanner, db)) { client.createMultiplexedSession(consumer); } // for multiplexed session there is no channel hint pass in the RPC options @@ -226,7 +208,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount assertTrue(t instanceof RuntimeException); } }; - try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) { + try (SessionClient client = new SessionClient(spanner, db)) { client.createMultiplexedSession(consumer); } // for multiplexed session there is no channel hint pass in the RPC options @@ -235,13 +217,13 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount @SuppressWarnings("unchecked") @Test - public void batchCreateAndCloseSessions() { + public void batchCreateAndCloseSessions() throws Exception { DatabaseId db = DatabaseId.of(dbName); final String sessionName = dbName + "/sessions/s%d"; final Map labels = new HashMap<>(); labels.put("env", "dev"); when(spannerOptions.getSessionLabels()).thenReturn(labels); - String databaseRole = new String("role"); + String databaseRole = "role"; when(spannerOptions.getDatabaseRole()).thenReturn(databaseRole); final List usedChannels = Collections.synchronizedList(new ArrayList<>()); when(rpc.batchCreateSessions( @@ -281,9 +263,12 @@ public void onSessionReady(SessionImpl session) { public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {} }; final int numSessions = 10; - try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) { + ScheduledExecutorService executor; + try (SessionClient client = new SessionClient(spanner, db)) { client.asyncBatchCreateSessions(numSessions, true, consumer); + executor = client.getExecutor(); } + assertTrue(executor.awaitTermination(5L, TimeUnit.SECONDS)); assertThat(returnedSessionCount.get()).isEqualTo(numSessions); assertThat(usedChannels.size()).isEqualTo(spannerOptions.getNumChannels()); List expectedChannels = new ArrayList<>(); @@ -302,7 +287,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount */ @SuppressWarnings("unchecked") @Test - public void batchCreateSessionsDistributesMultipleRequestsOverChannels() { + public void batchCreateSessionsDistributesMultipleRequestsOverChannels() throws Exception { DatabaseId db = DatabaseId.of(dbName); final String sessionName = dbName + "/sessions/s%d"; final Map labels = Collections.emptyMap(); @@ -348,15 +333,18 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount }; final int numSessions = 10; final int numBatches = spannerOptions.getNumChannels() * 2; - try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) { + ScheduledExecutorService executor; + try (SessionClient client = new SessionClient(spanner, db)) { for (int batch = 0; batch < numBatches; batch++) { client.asyncBatchCreateSessions(numSessions, false, consumer); } + executor = client.getExecutor(); } + assertTrue(executor.awaitTermination(5L, TimeUnit.SECONDS)); assertThat(returnedSessionCount.get()).isEqualTo(numSessions * numBatches); assertThat(usedChannelHints.size()).isEqualTo(spannerOptions.getNumChannels() * 2); List expectedChannels = new ArrayList<>(); - for (long l = 0; l < spannerOptions.getNumChannels() * 2; l++) { + for (long l = 0; l < spannerOptions.getNumChannels() * 2L; l++) { expectedChannels.add(l); } assertThat(usedChannelHints).containsExactlyElementsIn(expectedChannels); @@ -370,7 +358,7 @@ private enum AddRemoveSetException { @SuppressWarnings("unchecked") @Test - public void batchCreateSessionsWithExceptions() { + public void batchCreateSessionsWithExceptions() throws Exception { for (AddRemoveSetException behavior : AddRemoveSetException.values()) { final List errorOnChannels = new ArrayList<>(); if (behavior == AddRemoveSetException.REMOVE) { @@ -443,9 +431,12 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount } }; final int numSessions = 10; - try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) { + ScheduledExecutorService executor; + try (SessionClient client = new SessionClient(spanner, db)) { client.asyncBatchCreateSessions(numSessions, true, consumer); + executor = client.getExecutor(); } + assertTrue(executor.awaitTermination(5L, TimeUnit.SECONDS)); assertThat(errorCount.get()).isEqualTo(errorOnChannels.size()); assertThat(returnedSessionCount.get()) .isAtLeast( @@ -457,9 +448,8 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount } } - @SuppressWarnings("unchecked") @Test - public void batchCreateSessionsServerReturnsLessSessionsPerBatch() { + public void batchCreateSessionsServerReturnsLessSessionsPerBatch() throws Exception { final int MAX_SESSIONS_PER_BATCH = 5; DatabaseId db = DatabaseId.of(dbName); final String sessionName = dbName + "/sessions/s%d"; @@ -498,9 +488,12 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount // We want 100 sessions, but each rpc will only return 5. The consumer should still get 100 // sessions. final int numSessions = 100; - try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) { + ScheduledExecutorService executor; + try (SessionClient client = new SessionClient(spanner, db)) { client.asyncBatchCreateSessions(numSessions, true, consumer); + executor = client.getExecutor(); } + assertTrue(executor.awaitTermination(5L, TimeUnit.SECONDS)); assertThat(returnedSessionCount.get()).isEqualTo(numSessions); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java index db4e79113fc..829c292879d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java @@ -50,7 +50,7 @@ @RunWith(JUnit4.class) public class SessionPoolMaintainerTest extends BaseSessionPoolTest { - private ExecutorService executor = Executors.newSingleThreadExecutor(); + private final ExecutorService executor = Executors.newSingleThreadExecutor(); private @Mock SpannerImpl client; private @Mock SessionClient sessionClient; private @Mock SpannerOptions spannerOptions; @@ -130,7 +130,6 @@ private SessionPool createPool(SessionPoolOptions options) throws Exception { SessionPool pool = SessionPool.createPool( options, - new TestExecutorFactory(), client.getSessionClient(db), clock, Position.FIRST, diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java index 33771962828..6086957136e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java @@ -224,7 +224,6 @@ public void stressTest() throws Exception { pool = SessionPool.createPool( sessionPoolOptions, - new TestExecutorFactory(), mockSpanner.getSessionClient(db), clock, Position.RANDOM, diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 998678e4296..d793604dfb6 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -163,22 +163,12 @@ public static Collection data() { private SessionPool createPool() { return SessionPool.createPool( - options, - new TestExecutorFactory(), - client.getSessionClient(db), - tracer, - OpenTelemetry.noop()); + options, client.getSessionClient(db), tracer, OpenTelemetry.noop()); } private SessionPool createPool(Clock clock) { return SessionPool.createPool( - options, - new TestExecutorFactory(), - client.getSessionClient(db), - clock, - Position.RANDOM, - tracer, - OpenTelemetry.noop()); + options, client.getSessionClient(db), clock, Position.RANDOM, tracer, OpenTelemetry.noop()); } private SessionPool createPool( @@ -186,7 +176,6 @@ private SessionPool createPool( return SessionPool.createPool( options, TEST_DATABASE_ROLE, - new TestExecutorFactory(), client.getSessionClient(db), clock, Position.RANDOM, @@ -208,7 +197,6 @@ private SessionPool createPool( return SessionPool.createPool( options, TEST_DATABASE_ROLE, - new TestExecutorFactory(), client.getSessionClient(db), clock, Position.RANDOM, @@ -1571,11 +1559,7 @@ public void testSessionNotFoundReadWriteTransaction() { when(spanner.getOptions()).thenReturn(spannerOptions); SessionPool pool = SessionPool.createPool( - options, - new TestExecutorFactory(), - spanner.getSessionClient(db), - tracer, - OpenTelemetry.noop()); + options, spanner.getSessionClient(db), tracer, OpenTelemetry.noop()); try (PooledSessionFuture readWriteSession = pool.getSession()) { TransactionRunner runner = readWriteSession.readWriteTransaction(); try {