Skip to content

Commit 8b60ae3

Browse files
committed
Metrics vertx_pool_queue_pending doesn't decrease after connection loss
Backported from eclipse-vertx#1529 See eclipse-vertx#1494 SqlConnectionPool must invoke dequeueMetric when a connection fails to be acquired Signed-off-by: Thomas Segismont <[email protected]>
1 parent 740a4b7 commit 8b60ae3

File tree

5 files changed

+136
-13
lines changed

5 files changed

+136
-13
lines changed

vertx-mssql-client/src/test/java/io/vertx/mssqlclient/tck/MSSQLMetricsTest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
package io.vertx.mssqlclient.tck;
1313

1414
import io.vertx.mssqlclient.MSSQLBuilder;
15+
import io.vertx.mssqlclient.MSSQLConnectOptions;
1516
import io.vertx.mssqlclient.junit.MSSQLRule;
1617
import io.vertx.sqlclient.ClientBuilder;
1718
import io.vertx.sqlclient.Pool;
19+
import io.vertx.sqlclient.SqlConnectOptions;
1820
import io.vertx.sqlclient.tck.MetricsTestBase;
1921
import org.junit.ClassRule;
2022

@@ -23,9 +25,14 @@ public class MSSQLMetricsTest extends MetricsTestBase {
2325
@ClassRule
2426
public static MSSQLRule rule = MSSQLRule.SHARED_INSTANCE;
2527

28+
@Override
29+
protected SqlConnectOptions connectOptions() {
30+
return new MSSQLConnectOptions(rule.options());
31+
}
32+
2633
@Override
2734
protected ClientBuilder<Pool> poolBuilder() {
28-
return MSSQLBuilder.pool().connectingTo(rule.options());
35+
return MSSQLBuilder.pool().connectingTo(connectOptions());
2936
}
3037

3138
@Override

vertx-oracle-client/src/test/java/io/vertx/oracleclient/test/tck/OracleMetricsTest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313

1414
import io.vertx.ext.unit.TestContext;
1515
import io.vertx.oracleclient.OracleBuilder;
16+
import io.vertx.oracleclient.OracleConnectOptions;
1617
import io.vertx.oracleclient.test.junit.OracleRule;
1718
import io.vertx.sqlclient.ClientBuilder;
1819
import io.vertx.sqlclient.Pool;
20+
import io.vertx.sqlclient.SqlConnectOptions;
1921
import io.vertx.sqlclient.tck.MetricsTestBase;
2022
import org.junit.ClassRule;
2123
import org.junit.Ignore;
@@ -26,9 +28,14 @@ public class OracleMetricsTest extends MetricsTestBase {
2628
@ClassRule
2729
public static OracleRule rule = OracleRule.SHARED_INSTANCE;
2830

31+
@Override
32+
protected SqlConnectOptions connectOptions() {
33+
return new OracleConnectOptions(rule.options());
34+
}
35+
2936
@Override
3037
protected ClientBuilder<Pool> poolBuilder() {
31-
return OracleBuilder.pool().connectingTo(rule.options());
38+
return OracleBuilder.pool().connectingTo(connectOptions());
3239
}
3340

3441
@Override

vertx-pg-client/src/test/java/io/vertx/pgclient/PgMetricsTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.vertx.pgclient.junit.ContainerPgRule;
1515
import io.vertx.sqlclient.ClientBuilder;
1616
import io.vertx.sqlclient.Pool;
17+
import io.vertx.sqlclient.SqlConnectOptions;
1718
import io.vertx.sqlclient.tck.MetricsTestBase;
1819
import org.junit.ClassRule;
1920

@@ -22,9 +23,14 @@ public class PgMetricsTest extends MetricsTestBase {
2223
@ClassRule
2324
public static ContainerPgRule rule = new ContainerPgRule();
2425

26+
@Override
27+
protected SqlConnectOptions connectOptions() {
28+
return new PgConnectOptions(rule.options());
29+
}
30+
2531
@Override
2632
protected ClientBuilder<Pool> poolBuilder() {
27-
return PgBuilder.pool().connectingTo(rule.options());
33+
return PgBuilder.pool().connectingTo(connectOptions());
2834
}
2935

3036
@Override

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,16 @@ private Object dequeueAndBeginUse(Object metric) {
186186
return NO_METRICS;
187187
}
188188

189+
private void dequeueAndReject(Object metric) {
190+
if (metrics != null && metric != NO_METRICS) {
191+
try {
192+
metrics.rejected(metric);
193+
} catch (Exception e) {
194+
// Log
195+
}
196+
}
197+
}
198+
189199
private Object endUse(Object metric) {
190200
if (metrics != null && metric != NO_METRICS) {
191201
try {
@@ -271,6 +281,7 @@ public void onEnqueue(PoolWaiter<PooledConnection> waiter) {
271281
pool.cancel(waiter, ar -> {
272282
if (ar.succeeded()) {
273283
if (ar.result()) {
284+
dequeueAndReject(queueMetric);
274285
handler.handle(Future.failedFuture("Timeout"));
275286
}
276287
} else {

vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/MetricsTestBase.java

Lines changed: 102 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636
import java.util.concurrent.atomic.AtomicReference;
3737
import java.util.function.Function;
3838

39+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
40+
import static java.util.concurrent.TimeUnit.SECONDS;
41+
3942
@RunWith(VertxUnitRunner.class)
4043
public abstract class MetricsTestBase {
4144

@@ -81,6 +84,8 @@ protected Pool getPool() {
8184
return pool;
8285
}
8386

87+
protected abstract SqlConnectOptions connectOptions();
88+
8489
protected abstract ClientBuilder<Pool> poolBuilder();
8590

8691
protected Pool createPool(Vertx vertx) {
@@ -122,6 +127,15 @@ public void close() {
122127

123128
@Test
124129
public void testQueuing(TestContext ctx) throws Exception {
130+
testQueuing(ctx, false);
131+
}
132+
133+
@Test
134+
public void testQueuingTimeout(TestContext ctx) throws Exception {
135+
testQueuing(ctx, true);
136+
}
137+
138+
private void testQueuing(TestContext ctx, boolean timeout) throws Exception {
125139
AtomicInteger queueSize = new AtomicInteger();
126140
AtomicInteger inUse = new AtomicInteger();
127141
List<Object> enqueueMetrics = Collections.synchronizedList(new ArrayList<>());
@@ -156,28 +170,106 @@ public void end(Object inUseMetric, boolean succeeded) {
156170
endMetrics.add(inUseMetric);
157171
}
158172
};
159-
Pool pool = createPool(vertx, new PoolOptions().setMaxSize(1).setName("the-pool"));
160-
SqlConnection conn = pool.getConnection().toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
173+
PoolOptions poolOptions = new PoolOptions().setMaxSize(1).setName("the-pool");
174+
if (timeout) {
175+
poolOptions.setConnectionTimeout(2).setConnectionTimeoutUnit(SECONDS);
176+
}
177+
Pool pool = createPool(vertx, poolOptions);
178+
SqlConnection conn = Future.await(pool.getConnection(), 20, SECONDS);
161179
int num = 16;
162180
List<Future<?>> futures = new ArrayList<>();
163181
for (int i = 0;i < num;i++) {
164-
futures.add(pool.query("SELECT * FROM immutable WHERE id=1").execute());
165-
}
166-
long now = System.currentTimeMillis();
167-
while (queueSize.get() != num) {
168-
ctx.assertTrue(System.currentTimeMillis() - now < 20_000);
169-
Thread.sleep(100);
182+
futures.add(pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute()));
170183
}
184+
awaitQueueSize(ctx, queueSize, timeout ? 0 : num);
171185
conn.close();
172-
Future.join(futures).toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
186+
Future.await(Future.join(futures).otherwiseEmpty(), 20, SECONDS);
173187
ctx.assertEquals(0, queueSize.get());
174188
ctx.assertEquals(0, inUse.get());
175-
ctx.assertEquals(enqueueMetrics, dequeueMetrics);
189+
if (timeout) {
190+
ctx.assertTrue(enqueueMetrics.containsAll(dequeueMetrics) && dequeueMetrics.containsAll(enqueueMetrics));
191+
} else {
192+
ctx.assertEquals(enqueueMetrics, dequeueMetrics);
193+
}
176194
ctx.assertEquals(beginMetrics, endMetrics);
177195
ctx.assertEquals("sql", poolType);
178196
ctx.assertEquals("the-pool", poolName);
179197
}
180198

199+
private void awaitQueueSize(TestContext ctx, AtomicInteger queueSize, int num) throws InterruptedException {
200+
long now = System.currentTimeMillis();
201+
for (; ; ) {
202+
if (queueSize.get() != num) {
203+
if (System.currentTimeMillis() - now >= 20_000) {
204+
ctx.fail("Timeout waiting for queue size " + queueSize.get() + " to be equal to " + num);
205+
} else {
206+
MILLISECONDS.sleep(500);
207+
}
208+
} else {
209+
break;
210+
}
211+
}
212+
}
213+
214+
@Test
215+
public void testConnectionLost(TestContext ctx) throws Exception {
216+
SqlConnectOptions connectOptions = connectOptions();
217+
ProxyServer proxy = ProxyServer.create(vertx, connectOptions.getPort(), connectOptions.getHost());
218+
AtomicReference<ProxyServer.Connection> firstConnection = new AtomicReference<>();
219+
proxy.proxyHandler(proxiedConn -> {
220+
if (firstConnection.compareAndSet(null, proxiedConn)) {
221+
proxiedConn.connect();
222+
}
223+
});
224+
// Start proxy
225+
Async listenLatch = ctx.async();
226+
proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(res -> listenLatch.complete()));
227+
listenLatch.awaitSuccess(20_000);
228+
229+
230+
AtomicInteger queueSize = new AtomicInteger();
231+
poolMetrics = new PoolMetrics() {
232+
@Override
233+
public Object submitted() {
234+
queueSize.incrementAndGet();
235+
return null;
236+
}
237+
238+
@Override
239+
public Object begin(Object o) {
240+
queueSize.decrementAndGet();
241+
return null;
242+
}
243+
244+
@Override
245+
public void rejected(Object o) {
246+
queueSize.decrementAndGet();
247+
}
248+
};
249+
PoolOptions poolOptions = new PoolOptions()
250+
.setConnectionTimeout(500)
251+
.setConnectionTimeoutUnit(MILLISECONDS)
252+
.setMaxSize(1)
253+
.setName("the-pool");
254+
Pool pool = poolBuilder()
255+
.with(poolOptions)
256+
.using(vertx)
257+
.connectingTo(connectOptions.setHost("localhost").setPort(8080))
258+
.build();
259+
SqlConnection conn = Future.await(pool.getConnection(), 20, SECONDS);
260+
int num = 16;
261+
Async async = ctx.async(num + 1);
262+
for (int i = 0; i < num; i++) {
263+
pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute())
264+
.onComplete(ctx.asyncAssertFailure(t -> async.countDown()));
265+
}
266+
conn.closeHandler(v -> async.countDown());
267+
awaitQueueSize(ctx, queueSize, 16);
268+
firstConnection.get().clientSocket().close();
269+
async.await(20_000);
270+
ctx.assertEquals(0, queueSize.get());
271+
}
272+
181273
@Test
182274
public void testSimpleQuery(TestContext ctx) {
183275
Function<SqlConnection, Future<?>> fn = conn -> conn.query("SELECT * FROM immutable WHERE id=1").execute();

0 commit comments

Comments
 (0)