Skip to content

Commit 4bf141b

Browse files
committed
perf: use direct executor
Use a direct executor by default for JDBC connections, as these do not use the async API, and cancelling statements can be handled by directly in the JDBC driver. This reduces the overall number of threads that is created by the JDBC driver.
1 parent 0e5680b commit 4bf141b

File tree

5 files changed

+139
-3
lines changed

5 files changed

+139
-3
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner.connection;
18+
19+
import com.google.api.core.InternalApi;
20+
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
21+
22+
@InternalApi
23+
public class ConnectionOptionsHelper {
24+
25+
@InternalApi
26+
public static ConnectionOptions.Builder useDirectExecutorIfNotUseVirtualThreads(
27+
String uri, ConnectionOptions.Builder builder) {
28+
ConnectionState connectionState = new ConnectionState(ConnectionProperties.parseValues(uri));
29+
if (!connectionState.getValue(ConnectionProperties.USE_VIRTUAL_THREADS).getValue()) {
30+
return builder.setStatementExecutorType(StatementExecutorType.DIRECT_EXECUTOR);
31+
}
32+
return builder;
33+
}
34+
35+
@InternalApi
36+
public static boolean usesDirectExecutor(ConnectionOptions options) {
37+
return options.getStatementExecutorType() == StatementExecutorType.DIRECT_EXECUTOR;
38+
}
39+
40+
private ConnectionOptionsHelper() {}
41+
}

src/main/java/com/google/cloud/spanner/jdbc/AbstractJdbcConnection.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.cloud.spanner.SpannerException;
2222
import com.google.cloud.spanner.connection.AbstractStatementParser;
2323
import com.google.cloud.spanner.connection.ConnectionOptions;
24+
import com.google.cloud.spanner.connection.ConnectionOptionsHelper;
2425
import com.google.common.annotations.VisibleForTesting;
2526
import com.google.rpc.Code;
2627
import java.sql.CallableStatement;
@@ -53,6 +54,7 @@ abstract class AbstractJdbcConnection extends AbstractJdbcWrapper
5354
private final ConnectionOptions options;
5455
private final com.google.cloud.spanner.connection.Connection spanner;
5556
private final Properties clientInfo;
57+
private final boolean usesDirectExecutor;
5658
private AbstractStatementParser parser;
5759

5860
private SQLWarning firstWarning = null;
@@ -63,6 +65,7 @@ abstract class AbstractJdbcConnection extends AbstractJdbcWrapper
6365
this.options = options;
6466
this.spanner = options.getConnection();
6567
this.clientInfo = new Properties(JdbcDatabaseMetaData.getDefaultClientInfoProperties());
68+
this.usesDirectExecutor = ConnectionOptionsHelper.usesDirectExecutor(options);
6669
}
6770

6871
/** Return the corresponding {@link com.google.cloud.spanner.connection.Connection} */
@@ -83,6 +86,10 @@ Spanner getSpanner() {
8386
return this.spanner.getSpanner();
8487
}
8588

89+
boolean usesDirectExecutor() {
90+
return this.usesDirectExecutor;
91+
}
92+
8693
@Override
8794
public Dialect getDialect() {
8895
return spanner.getDialect();

src/main/java/com/google/cloud/spanner/jdbc/AbstractJdbcStatement.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.time.Duration;
3434
import java.util.Arrays;
3535
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.locks.Lock;
37+
import java.util.concurrent.locks.ReentrantLock;
3638
import java.util.function.Function;
3739
import java.util.function.Supplier;
3840

@@ -41,6 +43,8 @@ abstract class AbstractJdbcStatement extends AbstractJdbcWrapper implements Stat
4143
private static final String CURSORS_NOT_SUPPORTED = "Cursors are not supported";
4244
private static final String ONLY_FETCH_FORWARD_SUPPORTED = "Only fetch_forward is supported";
4345
final AbstractStatementParser parser;
46+
private final Lock executingLock;
47+
private volatile Thread executingThread;
4448
private boolean closed;
4549
private boolean closeOnCompletion;
4650
private boolean poolable;
@@ -50,6 +54,11 @@ abstract class AbstractJdbcStatement extends AbstractJdbcWrapper implements Stat
5054
AbstractJdbcStatement(JdbcConnection connection) throws SQLException {
5155
this.connection = connection;
5256
this.parser = connection.getParser();
57+
if (connection.usesDirectExecutor()) {
58+
this.executingLock = new ReentrantLock();
59+
} else {
60+
this.executingLock = null;
61+
}
5362
}
5463

5564
@Override
@@ -228,6 +237,10 @@ private <T> T doWithStatementTimeout(
228237
Supplier<T> runnable, Function<T, Boolean> shouldResetTimeout) throws SQLException {
229238
StatementTimeout originalTimeout = setTemporaryStatementTimeout();
230239
T result = null;
240+
if (this.executingLock != null) {
241+
this.executingLock.lock();
242+
this.executingThread = Thread.currentThread();
243+
}
231244
try {
232245
Stopwatch stopwatch = Stopwatch.createStarted();
233246
result = runnable.get();
@@ -237,6 +250,10 @@ private <T> T doWithStatementTimeout(
237250
} catch (SpannerException spannerException) {
238251
throw JdbcSqlExceptionFactory.of(spannerException);
239252
} finally {
253+
if (this.executingLock != null) {
254+
this.executingThread = null;
255+
this.executingLock.unlock();
256+
}
240257
if (shouldResetTimeout.apply(result)) {
241258
resetStatementTimeout(originalTimeout);
242259
}
@@ -330,7 +347,16 @@ public void setQueryTimeout(int seconds) throws SQLException {
330347
@Override
331348
public void cancel() throws SQLException {
332349
checkClosed();
333-
connection.getSpannerConnection().cancel();
350+
if (this.executingThread != null) {
351+
// This is a best-effort operation. It could be that the executing thread is set to null
352+
// between the if-check and the actual execution. Just ignore if that happens.
353+
try {
354+
this.executingThread.interrupt();
355+
} catch (NullPointerException ignore) {
356+
}
357+
} else {
358+
connection.getSpannerConnection().cancel();
359+
}
334360
}
335361

336362
@Override

src/main/java/com/google/cloud/spanner/jdbc/JdbcDriver.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.cloud.spanner.SpannerException;
2424
import com.google.cloud.spanner.connection.ConnectionOptions;
2525
import com.google.cloud.spanner.connection.ConnectionOptions.ConnectionProperty;
26+
import com.google.cloud.spanner.connection.ConnectionOptionsHelper;
2627
import com.google.rpc.Code;
2728
import io.opentelemetry.api.OpenTelemetry;
2829
import java.sql.Connection;
@@ -244,6 +245,9 @@ private ConnectionOptions buildConnectionOptions(String connectionUrl, Propertie
244245
// Enable multiplexed sessions by default for the JDBC driver.
245246
builder.setSessionPoolOptions(
246247
SessionPoolOptionsHelper.useMultiplexedSessions(SessionPoolOptions.newBuilder()).build());
248+
// Enable direct executor for JDBC, as we don't use the async API.
249+
builder =
250+
ConnectionOptionsHelper.useDirectExecutorIfNotUseVirtualThreads(connectionUrl, builder);
247251
return builder.build();
248252
}
249253

src/test/java/com/google/cloud/spanner/jdbc/JdbcStatementTimeoutTest.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,52 @@
1919
import static org.junit.Assert.assertArrayEquals;
2020
import static org.junit.Assert.assertEquals;
2121
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertNull;
2223
import static org.junit.Assert.assertThrows;
2324

2425
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
2526
import com.google.cloud.spanner.connection.AbstractMockServerTest;
27+
import com.google.cloud.spanner.jdbc.JdbcSqlExceptionFactory.JdbcSqlExceptionImpl;
2628
import com.google.cloud.spanner.jdbc.JdbcSqlExceptionFactory.JdbcSqlTimeoutException;
29+
import com.google.rpc.Code;
30+
import com.google.spanner.v1.ExecuteSqlRequest;
2731
import java.sql.ResultSet;
2832
import java.sql.SQLException;
2933
import java.sql.Statement;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.Executors;
36+
import java.util.concurrent.Future;
37+
import org.junit.After;
3038
import org.junit.Test;
3139
import org.junit.runner.RunWith;
32-
import org.junit.runners.JUnit4;
40+
import org.junit.runners.Parameterized;
41+
import org.junit.runners.Parameterized.Parameter;
42+
import org.junit.runners.Parameterized.Parameters;
3343

3444
/**
3545
* Tests setting a statement timeout. This test is by default not included in unit test runs, as the
3646
* minimum timeout value in JDBC is 1 second, which again makes this test relatively slow.
3747
*/
38-
@RunWith(JUnit4.class)
48+
@RunWith(Parameterized.class)
3949
public class JdbcStatementTimeoutTest extends AbstractMockServerTest {
4050

51+
@Parameter public boolean useVirtualThreads;
52+
53+
@Parameters(name = "useVirtualThreads = {0}")
54+
public static Object[] data() {
55+
return new Boolean[] {false, true};
56+
}
57+
58+
@Override
59+
protected String getBaseUrl() {
60+
return super.getBaseUrl() + ";useVirtualThreads=" + this.useVirtualThreads;
61+
}
62+
63+
@After
64+
public void resetExecutionTimes() {
65+
mockSpanner.removeAllExecutionTimes();
66+
}
67+
4168
@Test
4269
public void testExecuteTimeout() throws SQLException {
4370
try (java.sql.Connection connection = createJdbcConnection()) {
@@ -118,4 +145,35 @@ public void testExecuteBatchTimeout() throws SQLException {
118145
}
119146
}
120147
}
148+
149+
@Test
150+
public void testCancel() throws Exception {
151+
ExecutorService service = Executors.newSingleThreadExecutor();
152+
String sql = INSERT_STATEMENT.getSql();
153+
154+
try (java.sql.Connection connection = createJdbcConnection();
155+
Statement statement = connection.createStatement()) {
156+
mockSpanner.freeze();
157+
Future<Void> future =
158+
service.submit(
159+
() -> {
160+
// Wait until the request has landed on the server and then cancel the statement.
161+
mockSpanner.waitForRequestsToContain(
162+
message ->
163+
message instanceof ExecuteSqlRequest
164+
&& ((ExecuteSqlRequest) message).getSql().equals(sql),
165+
5000L);
166+
System.out.println("Cancelling statement");
167+
statement.cancel();
168+
return null;
169+
});
170+
JdbcSqlExceptionImpl exception =
171+
assertThrows(JdbcSqlExceptionImpl.class, () -> statement.execute(sql));
172+
assertEquals(Code.CANCELLED, exception.getCode());
173+
assertNull(future.get());
174+
} finally {
175+
mockSpanner.unfreeze();
176+
service.shutdown();
177+
}
178+
}
121179
}

0 commit comments

Comments
 (0)