diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java index b74c78732..ac5b68f17 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java @@ -13,22 +13,22 @@ import io.netty.channel.EventLoop; import io.vertx.core.*; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.PromiseInternal; +import io.vertx.core.internal.VertxInternal; import io.vertx.core.internal.net.NetSocketInternal; +import io.vertx.core.internal.pool.*; import io.vertx.core.net.SocketAddress; import io.vertx.core.spi.metrics.ClientMetrics; import io.vertx.core.spi.metrics.PoolMetrics; import io.vertx.core.spi.tracing.VertxTracer; import io.vertx.core.tracing.TracingPolicy; -import io.vertx.core.internal.pool.*; -import io.vertx.core.internal.ContextInternal; -import io.vertx.core.internal.VertxInternal; import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.impl.tracing.QueryReporter; import io.vertx.sqlclient.internal.Connection; import io.vertx.sqlclient.internal.SqlConnectionBase; import io.vertx.sqlclient.internal.command.CommandBase; import io.vertx.sqlclient.internal.command.QueryCommandBase; -import io.vertx.sqlclient.impl.tracing.QueryReporter; import io.vertx.sqlclient.spi.ConnectionFactory; import io.vertx.sqlclient.spi.DatabaseMetadata; @@ -181,6 +181,27 @@ private void dequeueMetric(Object metric) { } } + private Object beginMetric() { + if (metrics != null) { + try { + return metrics.begin(); + } catch (Exception e) { + // + } + } + return NO_METRICS; + } + + private void endMetric(Object metric) { + if (metrics != null && metric != NO_METRICS) { + try { + metrics.end(metric); + } catch (Exception e) { + // + } + } + } + // TODO : try optimize without promise public void execute(CommandBase cmd, Completable handler) { ContextInternal context = vertx.getOrCreateContext(); @@ -190,7 +211,9 @@ public void execute(CommandBase cmd, Completable handler) { p.future().compose(lease -> { dequeueMetric(metric); PooledConnection pooled = lease.get(); + pooled.timerMetric = beginMetric(); Connection conn = pooled.conn; + Future future; if (afterAcquire != null) { future = afterAcquire.apply(conn) @@ -202,6 +225,7 @@ public void execute(CommandBase cmd, Completable handler) { future = pp; } return future.andThen(ar -> { + endMetric(pooled.timerMetric); pooled.refresh(); lease.recycle(); }); @@ -244,6 +268,7 @@ public void complete(Lease lease, Throwable failure) { private void handle(Lease lease) { dequeueMetric(metric); PooledConnection pooled = lease.get(); + pooled.timerMetric = beginMetric(); pooled.lease = lease; handler.succeed(pooled); } @@ -303,6 +328,7 @@ public class PooledConnection implements Connection, Connection.Holder { private Holder holder; private Promise> poolCallback; private Lease lease; + private Object timerMetric; public long idleEvictionTimestamp; public long lifetimeEvictionTimestamp; @@ -443,6 +469,7 @@ private void doClose(Holder holder, Completable promise) { } private void cleanup(Completable promise) { + endMetric(timerMetric); Lease l = this.lease; this.lease = null; refresh(); diff --git a/vertx-sql-client/src/test/java/io/vertx/tests/sqlclient/tck/MetricsTestBase.java b/vertx-sql-client/src/test/java/io/vertx/tests/sqlclient/tck/MetricsTestBase.java index 4e2f5944f..31431be0a 100644 --- a/vertx-sql-client/src/test/java/io/vertx/tests/sqlclient/tck/MetricsTestBase.java +++ b/vertx-sql-client/src/test/java/io/vertx/tests/sqlclient/tck/MetricsTestBase.java @@ -114,7 +114,7 @@ public void close() { pool.query("SELECT * FROM immutable WHERE id=1").execute().toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS); ctx.assertEquals(0, closeCount.get()); pool.close(); - awaitQueueSize(ctx, closeCount, 1); + awaitUntilSizeEquals(ctx, closeCount, 1); } @Test @@ -131,6 +131,9 @@ private void testQueuing(TestContext ctx, boolean timeout) throws Exception { AtomicInteger queueSize = new AtomicInteger(); List enqueueMetrics = Collections.synchronizedList(new ArrayList<>()); List dequeueMetrics = Collections.synchronizedList(new ArrayList<>()); + AtomicInteger usageSize = new AtomicInteger(); + List beginMetrics = Collections.synchronizedList(new ArrayList<>()); + List endMetrics = Collections.synchronizedList(new ArrayList<>()); poolMetrics = new PoolMetrics() { @Override public Object enqueue() { @@ -144,6 +147,18 @@ public void dequeue(Object taskMetric) { dequeueMetrics.add(taskMetric); queueSize.decrementAndGet(); } + @Override + public Object begin() { + Object metric = new Object(); + beginMetrics.add(metric); + usageSize.incrementAndGet(); + return metric; + } + @Override + public void end(Object usageMetric) { + endMetrics.add(usageMetric); + usageSize.decrementAndGet(); + } }; PoolOptions poolOptions = new PoolOptions().setMaxSize(1).setName("the-pool"); if (timeout) { @@ -156,16 +171,20 @@ public void dequeue(Object taskMetric) { for (int i = 0;i < num;i++) { futures.add(pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute())); } - awaitQueueSize(ctx, queueSize, timeout ? 0 : num); + awaitUntilSizeEquals(ctx, queueSize, timeout ? 0 : num); conn.close(); Future.join(futures).otherwiseEmpty().await(20, SECONDS); ctx.assertEquals(0, queueSize.get()); ctx.assertEquals(enqueueMetrics, dequeueMetrics); + awaitUntilSizeEquals(ctx, usageSize, 0); + ctx.assertEquals(timeout ? 1 : num + 1, beginMetrics.size()); + ctx.assertEquals(timeout ? 1 : num + 1, endMetrics.size()); + ctx.assertEquals(beginMetrics, endMetrics); ctx.assertEquals("sql", poolType); ctx.assertEquals("the-pool", poolName); } - private void awaitQueueSize(TestContext ctx, AtomicInteger queueSize, int num) throws InterruptedException { + private void awaitUntilSizeEquals(TestContext ctx, AtomicInteger queueSize, int num) throws InterruptedException { long now = System.currentTimeMillis(); for (; ; ) { if (queueSize.get() != num) { @@ -227,7 +246,7 @@ public void dequeue(Object taskMetric) { .onComplete(ctx.asyncAssertFailure(t -> async.countDown())); } conn.closeHandler(v -> async.countDown()); - awaitQueueSize(ctx, queueSize, 16); + awaitUntilSizeEquals(ctx, queueSize, 16); firstConnection.get().clientSocket().close(); async.await(20_000); ctx.assertEquals(0, queueSize.get());