Skip to content

Commit cee8982

Browse files
SqlConnectionPool does not invoke PoolMetrics begin/end methods (#1568)
* Add in_use metrics for sql connection pool Should fix vert-x3/vertx-micrometer-metrics#278 * Extend queue testing with checks for usage metrics * Fix MetricsTestBase Begin/End metrics size depends on the timeout parameter value. When a timeout is expected, a single begin/end metric should be listed, corresponding to the getConnection/close calls wrapping the tasks. When a timeout isn't expected, the number of begin/end metric is the number of tasks plus one, corresponding to the getConnection/close calls. Signed-off-by: Thomas Segismont <[email protected]> * Minor refactoring Signed-off-by: Thomas Segismont <[email protected]> * More robust MetricsTestBase implementation Since metricsEnd can be invoked asynchronously, relax the test of usage size. Signed-off-by: Thomas Segismont <[email protected]> --------- Signed-off-by: Thomas Segismont <[email protected]> Co-authored-by: Thomas Segismont <[email protected]>
1 parent 125df2f commit cee8982

File tree

2 files changed

+54
-8
lines changed

2 files changed

+54
-8
lines changed

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,22 @@
1313

1414
import io.netty.channel.EventLoop;
1515
import io.vertx.core.*;
16+
import io.vertx.core.internal.ContextInternal;
1617
import io.vertx.core.internal.PromiseInternal;
18+
import io.vertx.core.internal.VertxInternal;
1719
import io.vertx.core.internal.net.NetSocketInternal;
20+
import io.vertx.core.internal.pool.*;
1821
import io.vertx.core.net.SocketAddress;
1922
import io.vertx.core.spi.metrics.ClientMetrics;
2023
import io.vertx.core.spi.metrics.PoolMetrics;
2124
import io.vertx.core.spi.tracing.VertxTracer;
2225
import io.vertx.core.tracing.TracingPolicy;
23-
import io.vertx.core.internal.pool.*;
24-
import io.vertx.core.internal.ContextInternal;
25-
import io.vertx.core.internal.VertxInternal;
2626
import io.vertx.sqlclient.SqlConnection;
27+
import io.vertx.sqlclient.impl.tracing.QueryReporter;
2728
import io.vertx.sqlclient.internal.Connection;
2829
import io.vertx.sqlclient.internal.SqlConnectionBase;
2930
import io.vertx.sqlclient.internal.command.CommandBase;
3031
import io.vertx.sqlclient.internal.command.QueryCommandBase;
31-
import io.vertx.sqlclient.impl.tracing.QueryReporter;
3232
import io.vertx.sqlclient.spi.ConnectionFactory;
3333
import io.vertx.sqlclient.spi.DatabaseMetadata;
3434

@@ -181,6 +181,27 @@ private void dequeueMetric(Object metric) {
181181
}
182182
}
183183

184+
private Object beginMetric() {
185+
if (metrics != null) {
186+
try {
187+
return metrics.begin();
188+
} catch (Exception e) {
189+
//
190+
}
191+
}
192+
return NO_METRICS;
193+
}
194+
195+
private void endMetric(Object metric) {
196+
if (metrics != null && metric != NO_METRICS) {
197+
try {
198+
metrics.end(metric);
199+
} catch (Exception e) {
200+
//
201+
}
202+
}
203+
}
204+
184205
// TODO : try optimize without promise
185206
public <R> void execute(CommandBase<R> cmd, Completable<R> handler) {
186207
ContextInternal context = vertx.getOrCreateContext();
@@ -190,7 +211,9 @@ public <R> void execute(CommandBase<R> cmd, Completable<R> handler) {
190211
p.future().compose(lease -> {
191212
dequeueMetric(metric);
192213
PooledConnection pooled = lease.get();
214+
pooled.timerMetric = beginMetric();
193215
Connection conn = pooled.conn;
216+
194217
Future<R> future;
195218
if (afterAcquire != null) {
196219
future = afterAcquire.apply(conn)
@@ -202,6 +225,7 @@ public <R> void execute(CommandBase<R> cmd, Completable<R> handler) {
202225
future = pp;
203226
}
204227
return future.andThen(ar -> {
228+
endMetric(pooled.timerMetric);
205229
pooled.refresh();
206230
lease.recycle();
207231
});
@@ -244,6 +268,7 @@ public void complete(Lease<PooledConnection> lease, Throwable failure) {
244268
private void handle(Lease<PooledConnection> lease) {
245269
dequeueMetric(metric);
246270
PooledConnection pooled = lease.get();
271+
pooled.timerMetric = beginMetric();
247272
pooled.lease = lease;
248273
handler.succeed(pooled);
249274
}
@@ -303,6 +328,7 @@ public class PooledConnection implements Connection, Connection.Holder {
303328
private Holder holder;
304329
private Promise<ConnectResult<PooledConnection>> poolCallback;
305330
private Lease<PooledConnection> lease;
331+
private Object timerMetric;
306332
public long idleEvictionTimestamp;
307333
public long lifetimeEvictionTimestamp;
308334

@@ -443,6 +469,7 @@ private void doClose(Holder holder, Completable<Void> promise) {
443469
}
444470

445471
private void cleanup(Completable<Void> promise) {
472+
endMetric(timerMetric);
446473
Lease<PooledConnection> l = this.lease;
447474
this.lease = null;
448475
refresh();

vertx-sql-client/src/test/java/io/vertx/tests/sqlclient/tck/MetricsTestBase.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void close() {
114114
pool.query("SELECT * FROM immutable WHERE id=1").execute().toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
115115
ctx.assertEquals(0, closeCount.get());
116116
pool.close();
117-
awaitQueueSize(ctx, closeCount, 1);
117+
awaitUntilSizeEquals(ctx, closeCount, 1);
118118
}
119119

120120
@Test
@@ -131,6 +131,9 @@ private void testQueuing(TestContext ctx, boolean timeout) throws Exception {
131131
AtomicInteger queueSize = new AtomicInteger();
132132
List<Object> enqueueMetrics = Collections.synchronizedList(new ArrayList<>());
133133
List<Object> dequeueMetrics = Collections.synchronizedList(new ArrayList<>());
134+
AtomicInteger usageSize = new AtomicInteger();
135+
List<Object> beginMetrics = Collections.synchronizedList(new ArrayList<>());
136+
List<Object> endMetrics = Collections.synchronizedList(new ArrayList<>());
134137
poolMetrics = new PoolMetrics() {
135138
@Override
136139
public Object enqueue() {
@@ -144,6 +147,18 @@ public void dequeue(Object taskMetric) {
144147
dequeueMetrics.add(taskMetric);
145148
queueSize.decrementAndGet();
146149
}
150+
@Override
151+
public Object begin() {
152+
Object metric = new Object();
153+
beginMetrics.add(metric);
154+
usageSize.incrementAndGet();
155+
return metric;
156+
}
157+
@Override
158+
public void end(Object usageMetric) {
159+
endMetrics.add(usageMetric);
160+
usageSize.decrementAndGet();
161+
}
147162
};
148163
PoolOptions poolOptions = new PoolOptions().setMaxSize(1).setName("the-pool");
149164
if (timeout) {
@@ -156,16 +171,20 @@ public void dequeue(Object taskMetric) {
156171
for (int i = 0;i < num;i++) {
157172
futures.add(pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute()));
158173
}
159-
awaitQueueSize(ctx, queueSize, timeout ? 0 : num);
174+
awaitUntilSizeEquals(ctx, queueSize, timeout ? 0 : num);
160175
conn.close();
161176
Future.join(futures).otherwiseEmpty().await(20, SECONDS);
162177
ctx.assertEquals(0, queueSize.get());
163178
ctx.assertEquals(enqueueMetrics, dequeueMetrics);
179+
awaitUntilSizeEquals(ctx, usageSize, 0);
180+
ctx.assertEquals(timeout ? 1 : num + 1, beginMetrics.size());
181+
ctx.assertEquals(timeout ? 1 : num + 1, endMetrics.size());
182+
ctx.assertEquals(beginMetrics, endMetrics);
164183
ctx.assertEquals("sql", poolType);
165184
ctx.assertEquals("the-pool", poolName);
166185
}
167186

168-
private void awaitQueueSize(TestContext ctx, AtomicInteger queueSize, int num) throws InterruptedException {
187+
private void awaitUntilSizeEquals(TestContext ctx, AtomicInteger queueSize, int num) throws InterruptedException {
169188
long now = System.currentTimeMillis();
170189
for (; ; ) {
171190
if (queueSize.get() != num) {
@@ -227,7 +246,7 @@ public void dequeue(Object taskMetric) {
227246
.onComplete(ctx.asyncAssertFailure(t -> async.countDown()));
228247
}
229248
conn.closeHandler(v -> async.countDown());
230-
awaitQueueSize(ctx, queueSize, 16);
249+
awaitUntilSizeEquals(ctx, queueSize, 16);
231250
firstConnection.get().clientSocket().close();
232251
async.await(20_000);
233252
ctx.assertEquals(0, queueSize.get());

0 commit comments

Comments
 (0)