Skip to content

Commit 6d6d500

Browse files
authored
perf: use direct executor (googleapis#1864)
Use a direct executor by default for JDBC connections, as these do not use the async API, and cancelling statements can be handled by directly in the JDBC driver. This reduces the overall number of threads that is created by the JDBC driver.
1 parent b35259f commit 6d6d500

File tree

5 files changed

+133
-3
lines changed

5 files changed

+133
-3
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner.connection;
18+
19+
import com.google.api.core.InternalApi;
20+
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
21+
22+
@InternalApi
23+
public class ConnectionOptionsHelper {
24+
25+
@InternalApi
26+
public static ConnectionOptions.Builder useDirectExecutorIfNotUseVirtualThreads(
27+
String uri, ConnectionOptions.Builder builder) {
28+
ConnectionState connectionState = new ConnectionState(ConnectionProperties.parseValues(uri));
29+
if (!connectionState.getValue(ConnectionProperties.USE_VIRTUAL_THREADS).getValue()) {
30+
return builder.setStatementExecutorType(StatementExecutorType.DIRECT_EXECUTOR);
31+
}
32+
return builder;
33+
}
34+
35+
@InternalApi
36+
public static boolean usesDirectExecutor(ConnectionOptions options) {
37+
return options.getStatementExecutorType() == StatementExecutorType.DIRECT_EXECUTOR;
38+
}
39+
40+
private ConnectionOptionsHelper() {}
41+
}

src/main/java/com/google/cloud/spanner/jdbc/AbstractJdbcConnection.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.cloud.spanner.SpannerException;
2222
import com.google.cloud.spanner.connection.AbstractStatementParser;
2323
import com.google.cloud.spanner.connection.ConnectionOptions;
24+
import com.google.cloud.spanner.connection.ConnectionOptionsHelper;
2425
import com.google.common.annotations.VisibleForTesting;
2526
import com.google.rpc.Code;
2627
import java.sql.CallableStatement;
@@ -53,6 +54,7 @@ abstract class AbstractJdbcConnection extends AbstractJdbcWrapper
5354
private final ConnectionOptions options;
5455
private final com.google.cloud.spanner.connection.Connection spanner;
5556
private final Properties clientInfo;
57+
private final boolean usesDirectExecutor;
5658
private AbstractStatementParser parser;
5759

5860
private SQLWarning firstWarning = null;
@@ -63,6 +65,7 @@ abstract class AbstractJdbcConnection extends AbstractJdbcWrapper
6365
this.options = options;
6466
this.spanner = options.getConnection();
6567
this.clientInfo = new Properties(JdbcDatabaseMetaData.getDefaultClientInfoProperties());
68+
this.usesDirectExecutor = ConnectionOptionsHelper.usesDirectExecutor(options);
6669
}
6770

6871
/** Return the corresponding {@link com.google.cloud.spanner.connection.Connection} */
@@ -83,6 +86,10 @@ Spanner getSpanner() {
8386
return this.spanner.getSpanner();
8487
}
8588

89+
boolean usesDirectExecutor() {
90+
return this.usesDirectExecutor;
91+
}
92+
8693
@Override
8794
public Dialect getDialect() {
8895
return spanner.getDialect();

src/main/java/com/google/cloud/spanner/jdbc/AbstractJdbcStatement.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import java.time.Duration;
3535
import java.util.Arrays;
3636
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.locks.Lock;
38+
import java.util.concurrent.locks.ReentrantLock;
3739
import java.util.function.Function;
3840
import java.util.function.Supplier;
3941
import javax.annotation.Nonnull;
@@ -43,6 +45,8 @@ abstract class AbstractJdbcStatement extends AbstractJdbcWrapper implements Stat
4345
private static final String CURSORS_NOT_SUPPORTED = "Cursors are not supported";
4446
private static final String ONLY_FETCH_FORWARD_SUPPORTED = "Only fetch_forward is supported";
4547
final AbstractStatementParser parser;
48+
private final Lock executingLock;
49+
private volatile Thread executingThread;
4650
private boolean closed;
4751
private boolean closeOnCompletion;
4852
private boolean poolable;
@@ -52,6 +56,11 @@ abstract class AbstractJdbcStatement extends AbstractJdbcWrapper implements Stat
5256
AbstractJdbcStatement(JdbcConnection connection) throws SQLException {
5357
this.connection = connection;
5458
this.parser = connection.getParser();
59+
if (connection.usesDirectExecutor()) {
60+
this.executingLock = new ReentrantLock();
61+
} else {
62+
this.executingLock = null;
63+
}
5564
}
5665

5766
@Override
@@ -239,6 +248,10 @@ private <T> T doWithStatementTimeout(
239248
Supplier<T> runnable, Function<T, Boolean> shouldResetTimeout) throws SQLException {
240249
StatementTimeout originalTimeout = setTemporaryStatementTimeout();
241250
T result = null;
251+
if (this.executingLock != null) {
252+
this.executingLock.lock();
253+
this.executingThread = Thread.currentThread();
254+
}
242255
try {
243256
Stopwatch stopwatch = Stopwatch.createStarted();
244257
result = runnable.get();
@@ -248,6 +261,10 @@ private <T> T doWithStatementTimeout(
248261
} catch (SpannerException spannerException) {
249262
throw JdbcSqlExceptionFactory.of(spannerException);
250263
} finally {
264+
if (this.executingLock != null) {
265+
this.executingThread = null;
266+
this.executingLock.unlock();
267+
}
251268
if (shouldResetTimeout.apply(result)) {
252269
resetStatementTimeout(originalTimeout);
253270
}
@@ -353,7 +370,16 @@ void setQueryTimeout(@Nonnull Duration duration) throws SQLException {
353370
@Override
354371
public void cancel() throws SQLException {
355372
checkClosed();
356-
connection.getSpannerConnection().cancel();
373+
if (this.executingThread != null) {
374+
// This is a best-effort operation. It could be that the executing thread is set to null
375+
// between the if-check and the actual execution. Just ignore if that happens.
376+
try {
377+
this.executingThread.interrupt();
378+
} catch (NullPointerException ignore) {
379+
}
380+
} else {
381+
connection.getSpannerConnection().cancel();
382+
}
357383
}
358384

359385
@Override

src/main/java/com/google/cloud/spanner/jdbc/JdbcDriver.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.cloud.spanner.SessionPoolOptionsHelper;
2323
import com.google.cloud.spanner.SpannerException;
2424
import com.google.cloud.spanner.connection.ConnectionOptions;
25+
import com.google.cloud.spanner.connection.ConnectionOptionsHelper;
2526
import com.google.cloud.spanner.connection.ConnectionPropertiesHelper;
2627
import com.google.cloud.spanner.connection.ConnectionProperty;
2728
import com.google.rpc.Code;
@@ -245,6 +246,9 @@ private ConnectionOptions buildConnectionOptions(String connectionUrl, Propertie
245246
// Enable multiplexed sessions by default for the JDBC driver.
246247
builder.setSessionPoolOptions(
247248
SessionPoolOptionsHelper.useMultiplexedSessions(SessionPoolOptions.newBuilder()).build());
249+
// Enable direct executor for JDBC, as we don't use the async API.
250+
builder =
251+
ConnectionOptionsHelper.useDirectExecutorIfNotUseVirtualThreads(connectionUrl, builder);
248252
return builder.build();
249253
}
250254

src/test/java/com/google/cloud/spanner/jdbc/JdbcStatementTimeoutTest.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,45 @@
1919
import static org.junit.Assert.assertArrayEquals;
2020
import static org.junit.Assert.assertEquals;
2121
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertNull;
2223
import static org.junit.Assert.assertThrows;
2324

2425
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
2526
import com.google.cloud.spanner.connection.AbstractMockServerTest;
27+
import com.google.cloud.spanner.jdbc.JdbcSqlExceptionFactory.JdbcSqlExceptionImpl;
2628
import com.google.cloud.spanner.jdbc.JdbcSqlExceptionFactory.JdbcSqlTimeoutException;
29+
import com.google.rpc.Code;
30+
import com.google.spanner.v1.ExecuteSqlRequest;
2731
import java.sql.ResultSet;
2832
import java.sql.SQLException;
2933
import java.sql.Statement;
3034
import java.time.Duration;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.Executors;
37+
import java.util.concurrent.Future;
3138
import org.junit.After;
3239
import org.junit.Test;
3340
import org.junit.runner.RunWith;
34-
import org.junit.runners.JUnit4;
41+
import org.junit.runners.Parameterized;
42+
import org.junit.runners.Parameterized.Parameter;
43+
import org.junit.runners.Parameterized.Parameters;
3544

3645
/** Tests setting a statement timeout. */
37-
@RunWith(JUnit4.class)
46+
@RunWith(Parameterized.class)
3847
public class JdbcStatementTimeoutTest extends AbstractMockServerTest {
3948

49+
@Parameter public boolean useVirtualThreads;
50+
51+
@Parameters(name = "useVirtualThreads = {0}")
52+
public static Object[] data() {
53+
return new Boolean[] {false, true};
54+
}
55+
56+
@Override
57+
protected String getBaseUrl() {
58+
return super.getBaseUrl() + ";useVirtualThreads=" + this.useVirtualThreads;
59+
}
60+
4061
@After
4162
public void resetExecutionTimes() {
4263
mockSpanner.removeAllExecutionTimes();
@@ -122,4 +143,35 @@ public void testExecuteBatchTimeout() throws SQLException {
122143
}
123144
}
124145
}
146+
147+
@Test
148+
public void testCancel() throws Exception {
149+
ExecutorService service = Executors.newSingleThreadExecutor();
150+
String sql = INSERT_STATEMENT.getSql();
151+
152+
try (java.sql.Connection connection = createJdbcConnection();
153+
Statement statement = connection.createStatement()) {
154+
mockSpanner.freeze();
155+
Future<Void> future =
156+
service.submit(
157+
() -> {
158+
// Wait until the request has landed on the server and then cancel the statement.
159+
mockSpanner.waitForRequestsToContain(
160+
message ->
161+
message instanceof ExecuteSqlRequest
162+
&& ((ExecuteSqlRequest) message).getSql().equals(sql),
163+
5000L);
164+
System.out.println("Cancelling statement");
165+
statement.cancel();
166+
return null;
167+
});
168+
JdbcSqlExceptionImpl exception =
169+
assertThrows(JdbcSqlExceptionImpl.class, () -> statement.execute(sql));
170+
assertEquals(Code.CANCELLED, exception.getCode());
171+
assertNull(future.get());
172+
} finally {
173+
mockSpanner.unfreeze();
174+
service.shutdown();
175+
}
176+
}
125177
}

0 commit comments

Comments
 (0)