Skip to content

Commit 1db063e

Browse files
authored
Add in_use metrics for sql connection pool (#1570)
See #278 Signed-off-by: Thomas Segismont <[email protected]>
1 parent 2388fee commit 1db063e

File tree

2 files changed

+58
-10
lines changed

2 files changed

+58
-10
lines changed

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,24 @@
1313

1414
import io.netty.channel.EventLoop;
1515
import io.vertx.core.*;
16+
import io.vertx.core.internal.ContextInternal;
1617
import io.vertx.core.internal.PromiseInternal;
18+
import io.vertx.core.internal.VertxInternal;
1719
import io.vertx.core.internal.net.NetSocketInternal;
20+
import io.vertx.core.internal.pool.*;
1821
import io.vertx.core.net.SocketAddress;
1922
import io.vertx.core.spi.metrics.ClientMetrics;
2023
import io.vertx.core.spi.metrics.PoolMetrics;
2124
import io.vertx.core.spi.tracing.VertxTracer;
2225
import io.vertx.core.tracing.TracingPolicy;
23-
import io.vertx.core.internal.pool.*;
24-
import io.vertx.core.internal.ContextInternal;
25-
import io.vertx.core.internal.VertxInternal;
2626
import io.vertx.sqlclient.SqlConnectOptions;
27+
import io.vertx.sqlclient.impl.tracing.QueryReporter;
28+
import io.vertx.sqlclient.spi.DatabaseMetadata;
2729
import io.vertx.sqlclient.spi.connection.Connection;
2830
import io.vertx.sqlclient.spi.connection.ConnectionContext;
31+
import io.vertx.sqlclient.spi.connection.ConnectionFactory;
2932
import io.vertx.sqlclient.spi.protocol.CommandBase;
3033
import io.vertx.sqlclient.spi.protocol.QueryCommandBase;
31-
import io.vertx.sqlclient.impl.tracing.QueryReporter;
32-
import io.vertx.sqlclient.spi.connection.ConnectionFactory;
33-
import io.vertx.sqlclient.spi.DatabaseMetadata;
3434

3535
import java.util.List;
3636
import java.util.function.Function;
@@ -184,6 +184,27 @@ private void dequeueMetric(Object metric) {
184184
}
185185
}
186186

187+
private Object beginMetric() {
188+
if (metrics != null) {
189+
try {
190+
return metrics.begin();
191+
} catch (Exception e) {
192+
//
193+
}
194+
}
195+
return NO_METRICS;
196+
}
197+
198+
private void endMetric(Object metric) {
199+
if (metrics != null && metric != NO_METRICS) {
200+
try {
201+
metrics.end(metric);
202+
} catch (Exception e) {
203+
//
204+
}
205+
}
206+
}
207+
187208
// TODO : try optimize without promise
188209
public <R> void execute(CommandBase<R> cmd, Completable<R> handler) {
189210
ContextInternal context = vertx.getOrCreateContext();
@@ -193,7 +214,9 @@ public <R> void execute(CommandBase<R> cmd, Completable<R> handler) {
193214
p.future().compose(lease -> {
194215
dequeueMetric(metric);
195216
PooledConnection pooled = lease.get();
217+
pooled.timerMetric = beginMetric();
196218
Connection conn = pooled.conn;
219+
197220
Future<R> future;
198221
if (afterAcquire != null) {
199222
future = afterAcquire.apply(conn)
@@ -205,6 +228,7 @@ public <R> void execute(CommandBase<R> cmd, Completable<R> handler) {
205228
future = pp;
206229
}
207230
return future.andThen(ar -> {
231+
endMetric(pooled.timerMetric);
208232
pooled.refresh();
209233
lease.recycle();
210234
});
@@ -247,6 +271,7 @@ public void complete(Lease<PooledConnection> lease, Throwable failure) {
247271
private void handle(Lease<PooledConnection> lease) {
248272
dequeueMetric(metric);
249273
PooledConnection pooled = lease.get();
274+
pooled.timerMetric = beginMetric();
250275
pooled.lease = lease;
251276
handler.succeed(pooled);
252277
}
@@ -306,6 +331,7 @@ public class PooledConnection implements Connection, ConnectionContext {
306331
private ConnectionContext holder;
307332
private Promise<ConnectResult<PooledConnection>> poolCallback;
308333
private Lease<PooledConnection> lease;
334+
private Object timerMetric;
309335
public long idleEvictionTimestamp;
310336
public long lifetimeEvictionTimestamp;
311337

@@ -446,6 +472,7 @@ private void doClose(ConnectionContext holder, Completable<Void> promise) {
446472
}
447473

448474
private void cleanup(Completable<Void> promise) {
475+
endMetric(timerMetric);
449476
Lease<PooledConnection> l = this.lease;
450477
this.lease = null;
451478
refresh();

vertx-sql-client/src/test/java/io/vertx/tests/sqlclient/tck/MetricsTestBase.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void close() {
114114
pool.query("SELECT * FROM immutable WHERE id=1").execute().toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
115115
ctx.assertEquals(0, closeCount.get());
116116
pool.close();
117-
awaitQueueSize(ctx, closeCount, 1);
117+
awaitUntilSizeEquals(ctx, closeCount, 1);
118118
}
119119

120120
@Test
@@ -131,6 +131,9 @@ private void testQueuing(TestContext ctx, boolean timeout) throws Exception {
131131
AtomicInteger queueSize = new AtomicInteger();
132132
List<Object> enqueueMetrics = Collections.synchronizedList(new ArrayList<>());
133133
List<Object> dequeueMetrics = Collections.synchronizedList(new ArrayList<>());
134+
AtomicInteger usageSize = new AtomicInteger();
135+
List<Object> beginMetrics = Collections.synchronizedList(new ArrayList<>());
136+
List<Object> endMetrics = Collections.synchronizedList(new ArrayList<>());
134137
poolMetrics = new PoolMetrics() {
135138
@Override
136139
public Object enqueue() {
@@ -144,6 +147,20 @@ public void dequeue(Object taskMetric) {
144147
dequeueMetrics.add(taskMetric);
145148
queueSize.decrementAndGet();
146149
}
150+
151+
@Override
152+
public Object begin() {
153+
Object metric = new Object();
154+
beginMetrics.add(metric);
155+
usageSize.incrementAndGet();
156+
return metric;
157+
}
158+
159+
@Override
160+
public void end(Object usageMetric) {
161+
endMetrics.add(usageMetric);
162+
usageSize.decrementAndGet();
163+
}
147164
};
148165
PoolOptions poolOptions = new PoolOptions().setMaxSize(1).setName("the-pool");
149166
if (timeout) {
@@ -156,16 +173,20 @@ public void dequeue(Object taskMetric) {
156173
for (int i = 0;i < num;i++) {
157174
futures.add(pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute()));
158175
}
159-
awaitQueueSize(ctx, queueSize, timeout ? 0 : num);
176+
awaitUntilSizeEquals(ctx, queueSize, timeout ? 0 : num);
160177
conn.close();
161178
Future.join(futures).otherwiseEmpty().await(20, SECONDS);
162179
ctx.assertEquals(0, queueSize.get());
163180
ctx.assertEquals(enqueueMetrics, dequeueMetrics);
181+
awaitUntilSizeEquals(ctx, usageSize, 0);
182+
ctx.assertEquals(timeout ? 1 : num + 1, beginMetrics.size());
183+
ctx.assertEquals(timeout ? 1 : num + 1, endMetrics.size());
184+
ctx.assertEquals(beginMetrics, endMetrics);
164185
ctx.assertEquals("sql", poolType);
165186
ctx.assertEquals("the-pool", poolName);
166187
}
167188

168-
private void awaitQueueSize(TestContext ctx, AtomicInteger queueSize, int num) throws InterruptedException {
189+
private void awaitUntilSizeEquals(TestContext ctx, AtomicInteger queueSize, int num) throws InterruptedException {
169190
long now = System.currentTimeMillis();
170191
for (; ; ) {
171192
if (queueSize.get() != num) {
@@ -227,7 +248,7 @@ public void dequeue(Object taskMetric) {
227248
.onComplete(ctx.asyncAssertFailure(t -> async.countDown()));
228249
}
229250
conn.closeHandler(v -> async.countDown());
230-
awaitQueueSize(ctx, queueSize, 16);
251+
awaitUntilSizeEquals(ctx, queueSize, 16);
231252
firstConnection.get().clientSocket().close();
232253
async.await(20_000);
233254
ctx.assertEquals(0, queueSize.get());

0 commit comments

Comments
 (0)