From e7392cb4cc93cd2f00faa41f9cab2babdd4f9448 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Thu, 13 Nov 2025 12:21:36 +0100 Subject: [PATCH] Add in_use metrics for sql connection pool See #278 Signed-off-by: Thomas Segismont --- .../impl/pool/SqlConnectionPool.java | 39 ++++++++++++++++--- .../tests/sqlclient/tck/MetricsTestBase.java | 29 ++++++++++++-- 2 files changed, 58 insertions(+), 10 deletions(-) diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java index 1f46c3c33..9a0577786 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java @@ -13,24 +13,24 @@ import io.netty.channel.EventLoop; import io.vertx.core.*; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.PromiseInternal; +import io.vertx.core.internal.VertxInternal; import io.vertx.core.internal.net.NetSocketInternal; +import io.vertx.core.internal.pool.*; import io.vertx.core.net.SocketAddress; import io.vertx.core.spi.metrics.ClientMetrics; import io.vertx.core.spi.metrics.PoolMetrics; import io.vertx.core.spi.tracing.VertxTracer; import io.vertx.core.tracing.TracingPolicy; -import io.vertx.core.internal.pool.*; -import io.vertx.core.internal.ContextInternal; -import io.vertx.core.internal.VertxInternal; import io.vertx.sqlclient.SqlConnectOptions; +import io.vertx.sqlclient.impl.tracing.QueryReporter; +import io.vertx.sqlclient.spi.DatabaseMetadata; import io.vertx.sqlclient.spi.connection.Connection; import io.vertx.sqlclient.spi.connection.ConnectionContext; +import io.vertx.sqlclient.spi.connection.ConnectionFactory; import io.vertx.sqlclient.spi.protocol.CommandBase; import io.vertx.sqlclient.spi.protocol.QueryCommandBase; -import io.vertx.sqlclient.impl.tracing.QueryReporter; -import io.vertx.sqlclient.spi.connection.ConnectionFactory; -import io.vertx.sqlclient.spi.DatabaseMetadata; import java.util.List; import java.util.function.Function; @@ -184,6 +184,27 @@ private void dequeueMetric(Object metric) { } } + private Object beginMetric() { + if (metrics != null) { + try { + return metrics.begin(); + } catch (Exception e) { + // + } + } + return NO_METRICS; + } + + private void endMetric(Object metric) { + if (metrics != null && metric != NO_METRICS) { + try { + metrics.end(metric); + } catch (Exception e) { + // + } + } + } + // TODO : try optimize without promise public void execute(CommandBase cmd, Completable handler) { ContextInternal context = vertx.getOrCreateContext(); @@ -193,7 +214,9 @@ public void execute(CommandBase cmd, Completable handler) { p.future().compose(lease -> { dequeueMetric(metric); PooledConnection pooled = lease.get(); + pooled.timerMetric = beginMetric(); Connection conn = pooled.conn; + Future future; if (afterAcquire != null) { future = afterAcquire.apply(conn) @@ -205,6 +228,7 @@ public void execute(CommandBase cmd, Completable handler) { future = pp; } return future.andThen(ar -> { + endMetric(pooled.timerMetric); pooled.refresh(); lease.recycle(); }); @@ -247,6 +271,7 @@ public void complete(Lease lease, Throwable failure) { private void handle(Lease lease) { dequeueMetric(metric); PooledConnection pooled = lease.get(); + pooled.timerMetric = beginMetric(); pooled.lease = lease; handler.succeed(pooled); } @@ -306,6 +331,7 @@ public class PooledConnection implements Connection, ConnectionContext { private ConnectionContext holder; private Promise> poolCallback; private Lease lease; + private Object timerMetric; public long idleEvictionTimestamp; public long lifetimeEvictionTimestamp; @@ -446,6 +472,7 @@ private void doClose(ConnectionContext holder, Completable promise) { } private void cleanup(Completable promise) { + endMetric(timerMetric); Lease l = this.lease; this.lease = null; refresh(); diff --git a/vertx-sql-client/src/test/java/io/vertx/tests/sqlclient/tck/MetricsTestBase.java b/vertx-sql-client/src/test/java/io/vertx/tests/sqlclient/tck/MetricsTestBase.java index 4e2f5944f..7ebf2fda9 100644 --- a/vertx-sql-client/src/test/java/io/vertx/tests/sqlclient/tck/MetricsTestBase.java +++ b/vertx-sql-client/src/test/java/io/vertx/tests/sqlclient/tck/MetricsTestBase.java @@ -114,7 +114,7 @@ public void close() { pool.query("SELECT * FROM immutable WHERE id=1").execute().toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS); ctx.assertEquals(0, closeCount.get()); pool.close(); - awaitQueueSize(ctx, closeCount, 1); + awaitUntilSizeEquals(ctx, closeCount, 1); } @Test @@ -131,6 +131,9 @@ private void testQueuing(TestContext ctx, boolean timeout) throws Exception { AtomicInteger queueSize = new AtomicInteger(); List enqueueMetrics = Collections.synchronizedList(new ArrayList<>()); List dequeueMetrics = Collections.synchronizedList(new ArrayList<>()); + AtomicInteger usageSize = new AtomicInteger(); + List beginMetrics = Collections.synchronizedList(new ArrayList<>()); + List endMetrics = Collections.synchronizedList(new ArrayList<>()); poolMetrics = new PoolMetrics() { @Override public Object enqueue() { @@ -144,6 +147,20 @@ public void dequeue(Object taskMetric) { dequeueMetrics.add(taskMetric); queueSize.decrementAndGet(); } + + @Override + public Object begin() { + Object metric = new Object(); + beginMetrics.add(metric); + usageSize.incrementAndGet(); + return metric; + } + + @Override + public void end(Object usageMetric) { + endMetrics.add(usageMetric); + usageSize.decrementAndGet(); + } }; PoolOptions poolOptions = new PoolOptions().setMaxSize(1).setName("the-pool"); if (timeout) { @@ -156,16 +173,20 @@ public void dequeue(Object taskMetric) { for (int i = 0;i < num;i++) { futures.add(pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute())); } - awaitQueueSize(ctx, queueSize, timeout ? 0 : num); + awaitUntilSizeEquals(ctx, queueSize, timeout ? 0 : num); conn.close(); Future.join(futures).otherwiseEmpty().await(20, SECONDS); ctx.assertEquals(0, queueSize.get()); ctx.assertEquals(enqueueMetrics, dequeueMetrics); + awaitUntilSizeEquals(ctx, usageSize, 0); + ctx.assertEquals(timeout ? 1 : num + 1, beginMetrics.size()); + ctx.assertEquals(timeout ? 1 : num + 1, endMetrics.size()); + ctx.assertEquals(beginMetrics, endMetrics); ctx.assertEquals("sql", poolType); ctx.assertEquals("the-pool", poolName); } - private void awaitQueueSize(TestContext ctx, AtomicInteger queueSize, int num) throws InterruptedException { + private void awaitUntilSizeEquals(TestContext ctx, AtomicInteger queueSize, int num) throws InterruptedException { long now = System.currentTimeMillis(); for (; ; ) { if (queueSize.get() != num) { @@ -227,7 +248,7 @@ public void dequeue(Object taskMetric) { .onComplete(ctx.asyncAssertFailure(t -> async.countDown())); } conn.closeHandler(v -> async.countDown()); - awaitQueueSize(ctx, queueSize, 16); + awaitUntilSizeEquals(ctx, queueSize, 16); firstConnection.get().clientSocket().close(); async.await(20_000); ctx.assertEquals(0, queueSize.get());