Skip to content

Commit 340781c

Browse files
authored
Pool queries should honor connection timeout (#1586)
See #1232 If a connection timeout is defined in connect options, the pool uses it when users acquire a connection with pool.getConnection, but not when they execute a query with pool.query Signed-off-by: Thomas Segismont <[email protected]>
1 parent 9c632cd commit 340781c

File tree

3 files changed

+50
-16
lines changed

3 files changed

+50
-16
lines changed

vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.concurrent.atomic.AtomicReference;
4343
import java.util.stream.Collector;
4444

45+
import static java.util.concurrent.TimeUnit.SECONDS;
4546
import static java.util.stream.Collectors.mapping;
4647
import static java.util.stream.Collectors.toList;
4748

@@ -615,4 +616,17 @@ public void testConnectionClosedInHook(TestContext ctx) {
615616
}));
616617
}));
617618
}
619+
620+
@Test
621+
public void testPooledQueryTimeout(TestContext ctx) {
622+
Async async = ctx.async();
623+
PoolOptions poolOptions = new PoolOptions().setMaxSize(1).setConnectionTimeout(1).setConnectionTimeoutUnit(SECONDS);
624+
Pool pool = createPool(options, poolOptions);
625+
pool.getConnection().onComplete(ctx.asyncAssertSuccess(conn -> {
626+
pool.query("SELECT 1").execute().onComplete(ctx.asyncAssertFailure(t -> {
627+
conn.close();
628+
async.complete();
629+
}));
630+
}));
631+
}
618632
}

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/PoolImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@
2727
import io.vertx.core.spi.metrics.VertxMetrics;
2828
import io.vertx.sqlclient.*;
2929
import io.vertx.sqlclient.impl.TransactionPropagationLocal;
30-
import io.vertx.sqlclient.spi.connection.Connection;
3130
import io.vertx.sqlclient.internal.SqlClientBase;
3231
import io.vertx.sqlclient.internal.SqlConnectionInternal;
32+
import io.vertx.sqlclient.spi.Driver;
33+
import io.vertx.sqlclient.spi.connection.Connection;
3334
import io.vertx.sqlclient.spi.connection.ConnectionContext;
34-
import io.vertx.sqlclient.spi.protocol.CommandBase;
3535
import io.vertx.sqlclient.spi.connection.ConnectionFactory;
36-
import io.vertx.sqlclient.spi.Driver;
36+
import io.vertx.sqlclient.spi.protocol.CommandBase;
3737

3838
import java.util.function.Function;
3939
import java.util.function.Supplier;
@@ -179,7 +179,7 @@ public Future<SqlConnection> getConnection() {
179179

180180
@Override
181181
public <R> void schedule(CommandBase<R> cmd, Completable<R> handler) {
182-
pool.execute(cmd, handler);
182+
pool.execute(cmd, handler, connectionTimeout);
183183
}
184184

185185
private void acquire(ContextInternal context, long timeout, Completable<SqlConnectionPool.PooledConnection> completionHandler) {

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -205,34 +205,54 @@ private void endMetric(Object metric) {
205205
}
206206
}
207207

208+
private static final Exception POOL_QUERY_TIMEOUT_EXCEPTION = new VertxException("Timeout waiting for connection", true);
209+
208210
// TODO : try optimize without promise
209-
public <R> void execute(CommandBase<R> cmd, Completable<R> handler) {
211+
public <R> void execute(CommandBase<R> cmd, Completable<R> handler, long timeout) {
210212
ContextInternal context = vertx.getOrCreateContext();
211213
Promise<Lease<PooledConnection>> p = context.promise();
214+
long timerId;
215+
if (timeout > 0) {
216+
timerId = vertx.setTimer(timeout, t -> handler.fail(POOL_QUERY_TIMEOUT_EXCEPTION));
217+
} else {
218+
timerId = -1;
219+
}
212220
Object metric = enqueueMetric();
213221
pool.acquire(context, 0, p);
214222
p.future().compose(lease -> {
215223
dequeueMetric(metric);
216224
PooledConnection pooled = lease.get();
217-
pooled.timerMetric = beginMetric();
218-
Connection conn = pooled.conn;
219-
220225
Future<R> future;
221-
if (afterAcquire != null) {
222-
future = afterAcquire.apply(conn)
223-
.compose(v -> Future.<R>future(d -> pooled.schedule(cmd, d)))
224-
.eventually(() -> beforeRecycle.apply(conn));
226+
if (timerId != -1 && !vertx.cancelTimer(timerId)) {
227+
// We want to make sure the connection is released properly below
228+
// But we don't want to record begin/end pool metrics
229+
pooled.timerMetric = NO_METRICS;
230+
future = Future.failedFuture(POOL_QUERY_TIMEOUT_EXCEPTION);
225231
} else {
226-
PromiseInternal<R> pp = context.promise();
227-
pooled.schedule(cmd, pp);
228-
future = pp;
232+
pooled.timerMetric = beginMetric();
233+
if (afterAcquire != null) {
234+
Connection conn = pooled.conn;
235+
future = afterAcquire.apply(conn)
236+
.compose(v -> Future.<R>future(d -> pooled.schedule(cmd, d)))
237+
.eventually(() -> beforeRecycle.apply(conn));
238+
} else {
239+
PromiseInternal<R> pp = context.promise();
240+
pooled.schedule(cmd, pp);
241+
future = pp;
242+
}
229243
}
230244
return future.andThen(ar -> {
231245
endMetric(pooled.timerMetric);
232246
pooled.refresh();
233247
lease.recycle();
234248
});
235-
}).onComplete(handler);
249+
}).onComplete(ar -> {
250+
if (ar.succeeded()) {
251+
handler.succeed(ar.result());
252+
} else if (!POOL_QUERY_TIMEOUT_EXCEPTION.equals(ar.cause())) {
253+
handler.fail(ar.cause());
254+
}
255+
});
236256
}
237257

238258
public void acquire(ContextInternal context, long timeout, Completable<PooledConnection> handler) {

0 commit comments

Comments
 (0)