Skip to content

Commit c99189c

Browse files
authored
chore: add support for virtual threads to Connection API (#2789)
Adds support for using virtual threads in the Connection API. Virtual threads can be enabled for two things: 1. As the StatementExecutor thread for each connection. 2. As the gRPC transport thread pool. Both options can (for now) only be set in the Connection API. Setting any of these options only has any effect if the application is running on Java 21 or higher.
1 parent 340ba13 commit c99189c

File tree

15 files changed

+444
-43
lines changed

15 files changed

+444
-43
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
140140
private final boolean leaderAwareRoutingEnabled;
141141
private final boolean attemptDirectPath;
142142
private final DirectedReadOptions directedReadOptions;
143+
private final boolean useVirtualThreads;
143144

144145
/** Interface that can be used to provide {@link CallCredentials} to {@link SpannerOptions}. */
145146
public interface CallCredentialsProvider {
@@ -580,9 +581,9 @@ public static CloseableExecutorProvider createAsyncExecutorProvider(
580581
return FixedCloseableExecutorProvider.create(executor);
581582
}
582583

583-
private SpannerOptions(Builder builder) {
584+
protected SpannerOptions(Builder builder) {
584585
super(SpannerFactory.class, SpannerRpcFactory.class, builder, new SpannerDefaults());
585-
numChannels = builder.numChannels;
586+
numChannels = builder.numChannels == null ? DEFAULT_CHANNELS : builder.numChannels;
586587
Preconditions.checkArgument(
587588
numChannels >= 1 && numChannels <= MAX_CHANNELS,
588589
"Number of channels must fall in the range [1, %s], found: %s",
@@ -631,6 +632,7 @@ private SpannerOptions(Builder builder) {
631632
leaderAwareRoutingEnabled = builder.leaderAwareRoutingEnabled;
632633
attemptDirectPath = builder.attemptDirectPath;
633634
directedReadOptions = builder.directedReadOptions;
635+
useVirtualThreads = builder.useVirtualThreads;
634636
}
635637

636638
/**
@@ -734,12 +736,13 @@ public static class Builder
734736
private boolean leaderAwareRoutingEnabled = true;
735737
private boolean attemptDirectPath = true;
736738
private DirectedReadOptions directedReadOptions;
739+
private boolean useVirtualThreads = false;
737740

738741
private static String createCustomClientLibToken(String token) {
739742
return token + " " + ServiceOptions.getGoogApiClientLibName();
740743
}
741744

742-
private Builder() {
745+
protected Builder() {
743746
// Manually set retry and polling settings that work.
744747
OperationTimedPollAlgorithm longRunningPollingAlgorithm =
745748
OperationTimedPollAlgorithm.create(
@@ -795,6 +798,7 @@ private Builder() {
795798
this.interceptorProvider = options.interceptorProvider;
796799
this.attemptDirectPath = options.attemptDirectPath;
797800
this.directedReadOptions = options.directedReadOptions;
801+
this.useVirtualThreads = options.useVirtualThreads;
798802
}
799803

800804
@Override
@@ -1263,6 +1267,16 @@ public Builder disableDirectPath() {
12631267
return this;
12641268
}
12651269

1270+
/**
1271+
* Enables/disables the use of virtual threads for the gRPC executor. Setting this option only
1272+
* has any effect on Java 21 and higher. In all other cases, the option will be ignored.
1273+
*/
1274+
@BetaApi
1275+
protected Builder setUseVirtualThreads(boolean useVirtualThreads) {
1276+
this.useVirtualThreads = useVirtualThreads;
1277+
return this;
1278+
}
1279+
12661280
@SuppressWarnings("rawtypes")
12671281
@Override
12681282
public SpannerOptions build() {
@@ -1412,6 +1426,11 @@ public boolean isAttemptDirectPath() {
14121426
return attemptDirectPath;
14131427
}
14141428

1429+
@BetaApi
1430+
public boolean isUseVirtualThreads() {
1431+
return useVirtualThreads;
1432+
}
1433+
14151434
/** Returns the default query options to use for the specific database. */
14161435
public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
14171436
// Use the specific query options for the database if any have been specified. These have
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import com.google.api.core.InternalApi;
20+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
21+
import java.lang.reflect.InvocationTargetException;
22+
import java.lang.reflect.Method;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.ThreadFactory;
26+
import javax.annotation.Nullable;
27+
28+
/** Utility class for creating a thread factory for daemon or virtual threads. */
29+
@InternalApi
30+
public class ThreadFactoryUtil {
31+
32+
/**
33+
* Tries to create a thread factory for virtual threads, and otherwise falls back to creating a
34+
* platform thread factory that creates daemon threads. Virtual threads are supported from JDK21.
35+
*
36+
* @param baseNameFormat the base name format for the threads, '-%d' will be appended to the
37+
* actual thread name format
38+
* @param tryVirtualThreads whether to try to use virtual threads if available or not
39+
* @return a {@link ThreadFactory} that produces virtual threads (Java 21 or higher) or platform
40+
* daemon threads
41+
*/
42+
@InternalApi
43+
public static ThreadFactory createVirtualOrPlatformDaemonThreadFactory(
44+
String baseNameFormat, boolean tryVirtualThreads) {
45+
ThreadFactory virtualThreadFactory =
46+
tryVirtualThreads ? tryCreateVirtualThreadFactory(baseNameFormat) : null;
47+
if (virtualThreadFactory != null) {
48+
return virtualThreadFactory;
49+
}
50+
51+
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(baseNameFormat + "-%d").build();
52+
}
53+
54+
/**
55+
* Tries to create a {@link ThreadFactory} that creates virtual threads. Returns null if virtual
56+
* threads are not supported on this JVM.
57+
*/
58+
@InternalApi
59+
@Nullable
60+
public static ThreadFactory tryCreateVirtualThreadFactory(String baseNameFormat) {
61+
try {
62+
Class<?> threadBuilderClass = Class.forName("java.lang.Thread$Builder");
63+
Method ofVirtualMethod = Thread.class.getDeclaredMethod("ofVirtual");
64+
Object virtualBuilder = ofVirtualMethod.invoke(null);
65+
Method nameMethod = threadBuilderClass.getDeclaredMethod("name", String.class, long.class);
66+
virtualBuilder = nameMethod.invoke(virtualBuilder, baseNameFormat + "-", 0);
67+
Method factoryMethod = threadBuilderClass.getDeclaredMethod("factory");
68+
return (ThreadFactory) factoryMethod.invoke(virtualBuilder);
69+
} catch (ClassNotFoundException | NoSuchMethodException ignore) {
70+
return null;
71+
} catch (InvocationTargetException | IllegalAccessException e) {
72+
throw new RuntimeException(e);
73+
}
74+
}
75+
76+
/**
77+
* Tries to create an {@link ExecutorService} that creates a new virtual thread for each task that
78+
* it runs. Creating a new virtual thread is the recommended way to create executors using virtual
79+
* threads, instead of creating a pool of virtual threads. Returns null if virtual threads are not
80+
* supported on this JVM.
81+
*/
82+
@InternalApi
83+
@Nullable
84+
public static ExecutorService tryCreateVirtualThreadPerTaskExecutor(String baseNameFormat) {
85+
ThreadFactory factory = tryCreateVirtualThreadFactory(baseNameFormat);
86+
if (factory != null) {
87+
try {
88+
Method newThreadPerTaskExecutorMethod =
89+
Executors.class.getDeclaredMethod("newThreadPerTaskExecutor", ThreadFactory.class);
90+
return (ExecutorService) newThreadPerTaskExecutorMethod.invoke(null, factory);
91+
} catch (NoSuchMethodException ignore) {
92+
return null;
93+
} catch (InvocationTargetException | IllegalAccessException e) {
94+
throw new RuntimeException(e);
95+
}
96+
}
97+
return null;
98+
}
99+
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ChecksumResultSet.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.math.BigDecimal;
4040
import java.util.Objects;
4141
import java.util.concurrent.Callable;
42+
import java.util.concurrent.atomic.AtomicLong;
4243

4344
/**
4445
* {@link ResultSet} implementation that keeps a running checksum that can be used to determine
@@ -66,7 +67,7 @@
6667
@VisibleForTesting
6768
class ChecksumResultSet extends ReplaceableForwardingResultSet implements RetriableStatement {
6869
private final ReadWriteTransaction transaction;
69-
private volatile long numberOfNextCalls;
70+
private final AtomicLong numberOfNextCalls = new AtomicLong();
7071
private final ParsedStatement statement;
7172
private final AnalyzeMode analyzeMode;
7273
private final QueryOption[] options;
@@ -103,7 +104,7 @@ public Boolean call() {
103104
if (res) {
104105
checksumCalculator.calculateNextChecksum(getCurrentRowAsStruct());
105106
}
106-
numberOfNextCalls++;
107+
numberOfNextCalls.incrementAndGet();
107108
return res;
108109
}
109110
}
@@ -142,7 +143,7 @@ public void retry(AbortedException aborted) throws AbortedException {
142143
DirectExecuteResultSet.ofResultSet(
143144
transaction.internalExecuteQuery(statement, analyzeMode, options));
144145
boolean next = true;
145-
while (counter < numberOfNextCalls && next) {
146+
while (counter < numberOfNextCalls.get() && next) {
146147
transaction
147148
.getStatementExecutor()
148149
.invokeInterceptors(
@@ -169,7 +170,7 @@ public void retry(AbortedException aborted) throws AbortedException {
169170
// Check that we have the same number of rows and the same checksum.
170171
HashCode newChecksum = newChecksumCalculator.getChecksum();
171172
HashCode currentChecksum = checksumCalculator.getChecksum();
172-
if (counter == numberOfNextCalls && Objects.equals(newChecksum, currentChecksum)) {
173+
if (counter == numberOfNextCalls.get() && Objects.equals(newChecksum, currentChecksum)) {
173174
// Checksum is ok, we only need to replace the delegate result set if it's still open.
174175
if (isClosed()) {
175176
resultSet.close();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,9 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
248248
Preconditions.checkNotNull(options);
249249
this.leakedException =
250250
options.isTrackConnectionLeaks() ? new LeakedConnectionException() : null;
251-
this.statementExecutor = new StatementExecutor(options.getStatementExecutionInterceptors());
251+
this.statementExecutor =
252+
new StatementExecutor(
253+
options.isUseVirtualThreads(), options.getStatementExecutionInterceptors());
252254
this.spannerPool = SpannerPool.INSTANCE;
253255
this.options = options;
254256
this.spanner = spannerPool.getSpanner(options, this);
@@ -283,7 +285,8 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
283285
BatchClient batchClient) {
284286
this.leakedException =
285287
options.isTrackConnectionLeaks() ? new LeakedConnectionException() : null;
286-
this.statementExecutor = new StatementExecutor(Collections.emptyList());
288+
this.statementExecutor =
289+
new StatementExecutor(options.isUseVirtualThreads(), Collections.emptyList());
287290
this.spannerPool = Preconditions.checkNotNull(spannerPool);
288291
this.options = Preconditions.checkNotNull(options);
289292
this.spanner = spannerPool.getSpanner(options, this);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ public String[] getValidValues() {
167167
static final boolean DEFAULT_AUTOCOMMIT = true;
168168
static final boolean DEFAULT_READONLY = false;
169169
static final boolean DEFAULT_RETRY_ABORTS_INTERNALLY = true;
170+
static final boolean DEFAULT_USE_VIRTUAL_THREADS = false;
171+
static final boolean DEFAULT_USE_VIRTUAL_GRPC_TRANSPORT_THREADS = false;
170172
private static final String DEFAULT_CREDENTIALS = null;
171173
private static final String DEFAULT_OAUTH_TOKEN = null;
172174
private static final String DEFAULT_MIN_SESSIONS = null;
@@ -204,6 +206,11 @@ public String[] getValidValues() {
204206
public static final String ROUTE_TO_LEADER_PROPERTY_NAME = "routeToLeader";
205207
/** Name of the 'retry aborts internally' connection property. */
206208
public static final String RETRY_ABORTS_INTERNALLY_PROPERTY_NAME = "retryAbortsInternally";
209+
/** Name of the property to enable/disable virtual threads for the statement executor. */
210+
public static final String USE_VIRTUAL_THREADS_PROPERTY_NAME = "useVirtualThreads";
211+
/** Name of the property to enable/disable virtual threads for gRPC transport. */
212+
public static final String USE_VIRTUAL_GRPC_TRANSPORT_THREADS_PROPERTY_NAME =
213+
"useVirtualGrpcTransportThreads";
207214
/** Name of the 'credentials' connection property. */
208215
public static final String CREDENTIALS_PROPERTY_NAME = "credentials";
209216
/** Name of the 'encodedCredentials' connection property. */
@@ -293,6 +300,16 @@ private static String generateGuardedConnectionPropertyError(
293300
RETRY_ABORTS_INTERNALLY_PROPERTY_NAME,
294301
"Should the connection automatically retry Aborted errors (true/false)",
295302
DEFAULT_RETRY_ABORTS_INTERNALLY),
303+
ConnectionProperty.createBooleanProperty(
304+
USE_VIRTUAL_THREADS_PROPERTY_NAME,
305+
"Use a virtual thread instead of a platform thread for each connection (true/false). "
306+
+ "This option only has any effect if the application is running on Java 21 or higher. In all other cases, the option is ignored.",
307+
DEFAULT_USE_VIRTUAL_THREADS),
308+
ConnectionProperty.createBooleanProperty(
309+
USE_VIRTUAL_GRPC_TRANSPORT_THREADS_PROPERTY_NAME,
310+
"Use a virtual thread instead of a platform thread for the gRPC executor (true/false). "
311+
+ "This option only has any effect if the application is running on Java 21 or higher. In all other cases, the option is ignored.",
312+
DEFAULT_USE_VIRTUAL_GRPC_TRANSPORT_THREADS),
296313
ConnectionProperty.createStringProperty(
297314
CREDENTIALS_PROPERTY_NAME,
298315
"The location of the credentials file to use for this connection. If neither this property or encoded credentials are set, the connection will use the default Google Cloud credentials for the runtime environment."),
@@ -672,6 +689,8 @@ public static Builder newBuilder() {
672689
private final boolean readOnly;
673690
private final boolean routeToLeader;
674691
private final boolean retryAbortsInternally;
692+
private final boolean useVirtualThreads;
693+
private final boolean useVirtualGrpcTransportThreads;
675694
private final List<StatementExecutionInterceptor> statementExecutionInterceptors;
676695
private final SpannerOptionsConfigurator configurator;
677696

@@ -771,6 +790,8 @@ private ConnectionOptions(Builder builder) {
771790
this.readOnly = parseReadOnly(this.uri);
772791
this.routeToLeader = parseRouteToLeader(this.uri);
773792
this.retryAbortsInternally = parseRetryAbortsInternally(this.uri);
793+
this.useVirtualThreads = parseUseVirtualThreads(this.uri);
794+
this.useVirtualGrpcTransportThreads = parseUseVirtualGrpcTransportThreads(this.uri);
774795
this.statementExecutionInterceptors =
775796
Collections.unmodifiableList(builder.statementExecutionInterceptors);
776797
this.configurator = builder.configurator;
@@ -873,6 +894,18 @@ static boolean parseRetryAbortsInternally(String uri) {
873894
return value != null ? Boolean.parseBoolean(value) : DEFAULT_RETRY_ABORTS_INTERNALLY;
874895
}
875896

897+
@VisibleForTesting
898+
static boolean parseUseVirtualThreads(String uri) {
899+
String value = parseUriProperty(uri, USE_VIRTUAL_THREADS_PROPERTY_NAME);
900+
return value != null ? Boolean.parseBoolean(value) : DEFAULT_USE_VIRTUAL_THREADS;
901+
}
902+
903+
@VisibleForTesting
904+
static boolean parseUseVirtualGrpcTransportThreads(String uri) {
905+
String value = parseUriProperty(uri, USE_VIRTUAL_GRPC_TRANSPORT_THREADS_PROPERTY_NAME);
906+
return value != null ? Boolean.parseBoolean(value) : DEFAULT_USE_VIRTUAL_GRPC_TRANSPORT_THREADS;
907+
}
908+
876909
@VisibleForTesting
877910
static @Nullable String parseCredentials(String uri) {
878911
String value = parseUriProperty(uri, CREDENTIALS_PROPERTY_NAME);
@@ -1293,6 +1326,16 @@ public boolean isRetryAbortsInternally() {
12931326
return retryAbortsInternally;
12941327
}
12951328

1329+
/** Whether connections should use virtual threads for connection executors. */
1330+
public boolean isUseVirtualThreads() {
1331+
return useVirtualThreads;
1332+
}
1333+
1334+
/** Whether virtual threads should be used for gRPC transport. */
1335+
public boolean isUseVirtualGrpcTransportThreads() {
1336+
return useVirtualGrpcTransportThreads;
1337+
}
1338+
12961339
/** Any warnings that were generated while creating the {@link ConnectionOptions} instance. */
12971340
@Nullable
12981341
public String getWarnings() {
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner.connection;
18+
19+
import com.google.cloud.spanner.SpannerOptions;
20+
21+
/**
22+
* This class is used for building {@link SpannerOptions} for {@link Connection} instances. It gives
23+
* access to (experimental) properties that are not public in the standard {@link SpannerOptions}
24+
* implementation.
25+
*/
26+
class ConnectionSpannerOptions extends SpannerOptions {
27+
public static Builder newBuilder() {
28+
return new Builder();
29+
}
30+
31+
static class Builder extends SpannerOptions.Builder {
32+
Builder() {}
33+
34+
@Override
35+
protected SpannerOptions.Builder setUseVirtualThreads(boolean useVirtualThreads) {
36+
return super.setUseVirtualThreads(useVirtualThreads);
37+
}
38+
39+
@Override
40+
public ConnectionSpannerOptions build() {
41+
return new ConnectionSpannerOptions(this);
42+
}
43+
}
44+
45+
ConnectionSpannerOptions(Builder builder) {
46+
super(builder);
47+
}
48+
}

0 commit comments

Comments
 (0)