Skip to content

Commit 58ec57a

Browse files
committed
Metrics vertx_pool_queue_pending doesn't decrease after connection loss
See #1494 SqlConnectionPool must invoke dequeueMetric when a connection fails to be acquired Signed-off-by: Thomas Segismont <[email protected]>
1 parent a196cba commit 58ec57a

File tree

2 files changed

+92
-14
lines changed

2 files changed

+92
-14
lines changed

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ public void onEnqueue(PoolWaiter<PooledConnection> waiter) {
255255
pool.cancel(waiter, (res, err) -> {
256256
if (err == null) {
257257
if (res) {
258+
dequeueMetric(metric);
258259
handler.fail("Timeout");
259260
}
260261
} else {

vertx-sql-client/src/test/java/io/vertx/tests/sqlclient/tck/MetricsTestBase.java

Lines changed: 91 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.vertx.ext.unit.TestContext;
2424
import io.vertx.ext.unit.junit.VertxUnitRunner;
2525
import io.vertx.sqlclient.*;
26+
import io.vertx.tests.sqlclient.ProxyServer;
2627
import org.junit.After;
2728
import org.junit.Before;
2829
import org.junit.Test;
@@ -36,6 +37,9 @@
3637
import java.util.concurrent.atomic.AtomicReference;
3738
import java.util.function.Function;
3839

40+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
41+
import static java.util.concurrent.TimeUnit.SECONDS;
42+
3943
@RunWith(VertxUnitRunner.class)
4044
public abstract class MetricsTestBase {
4145

@@ -110,15 +114,20 @@ public void close() {
110114
pool.query("SELECT * FROM immutable WHERE id=1").execute().toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
111115
ctx.assertEquals(0, closeCount.get());
112116
pool.close();
113-
long now = System.currentTimeMillis();
114-
while (closeCount.get() != 1) {
115-
ctx.assertTrue(System.currentTimeMillis() - now < 20_000);
116-
Thread.sleep(100);
117-
}
117+
awaitQueueSize(ctx, closeCount, 1);
118118
}
119119

120120
@Test
121121
public void testQueuing(TestContext ctx) throws Exception {
122+
testQueuing(ctx, false);
123+
}
124+
125+
@Test
126+
public void testQueuingTimeout(TestContext ctx) throws Exception {
127+
testQueuing(ctx, true);
128+
}
129+
130+
private void testQueuing(TestContext ctx, boolean timeout) throws Exception {
122131
AtomicInteger queueSize = new AtomicInteger();
123132
List<Object> enqueueMetrics = Collections.synchronizedList(new ArrayList<>());
124133
List<Object> dequeueMetrics = Collections.synchronizedList(new ArrayList<>());
@@ -136,26 +145,94 @@ public void dequeue(Object taskMetric) {
136145
queueSize.decrementAndGet();
137146
}
138147
};
139-
Pool pool = createPool(vertx, new PoolOptions().setMaxSize(1).setName("the-pool"));
140-
SqlConnection conn = pool.getConnection().toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
148+
PoolOptions poolOptions = new PoolOptions().setMaxSize(1).setName("the-pool");
149+
if (timeout) {
150+
poolOptions.setConnectionTimeout(2).setConnectionTimeoutUnit(SECONDS);
151+
}
152+
Pool pool = createPool(vertx, poolOptions);
153+
SqlConnection conn = pool.getConnection().await(20, SECONDS);
141154
int num = 16;
142155
List<Future<?>> futures = new ArrayList<>();
143156
for (int i = 0;i < num;i++) {
144-
futures.add(pool.query("SELECT * FROM immutable WHERE id=1").execute());
145-
}
146-
long now = System.currentTimeMillis();
147-
while (queueSize.get() != num) {
148-
ctx.assertTrue(System.currentTimeMillis() - now < 20_000);
149-
Thread.sleep(100);
157+
futures.add(pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute()));
150158
}
159+
awaitQueueSize(ctx, queueSize, timeout ? 0 : num);
151160
conn.close();
152-
Future.join(futures).toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
161+
Future.join(futures).otherwiseEmpty().await(20, SECONDS);
153162
ctx.assertEquals(0, queueSize.get());
154163
ctx.assertEquals(enqueueMetrics, dequeueMetrics);
155164
ctx.assertEquals("sql", poolType);
156165
ctx.assertEquals("the-pool", poolName);
157166
}
158167

168+
private void awaitQueueSize(TestContext ctx, AtomicInteger queueSize, int num) throws InterruptedException {
169+
long now = System.currentTimeMillis();
170+
for (; ; ) {
171+
if (queueSize.get() != num) {
172+
if (System.currentTimeMillis() - now >= 20_000) {
173+
ctx.fail("Timeout waiting for queue size " + queueSize.get() + " to be equal to " + num);
174+
} else {
175+
MILLISECONDS.sleep(500);
176+
}
177+
} else {
178+
break;
179+
}
180+
}
181+
}
182+
183+
@Test
184+
public void testConnectionLost(TestContext ctx) throws Exception {
185+
SqlConnectOptions connectOptions = connectOptions();
186+
ProxyServer proxy = ProxyServer.create(vertx, connectOptions.getPort(), connectOptions.getHost());
187+
AtomicReference<ProxyServer.Connection> firstConnection = new AtomicReference<>();
188+
proxy.proxyHandler(proxiedConn -> {
189+
if (firstConnection.compareAndSet(null, proxiedConn)) {
190+
proxiedConn.connect();
191+
}
192+
});
193+
// Start proxy
194+
Async listenLatch = ctx.async();
195+
proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(res -> listenLatch.complete()));
196+
listenLatch.awaitSuccess(20_000);
197+
198+
199+
AtomicInteger queueSize = new AtomicInteger();
200+
poolMetrics = new PoolMetrics() {
201+
@Override
202+
public Object enqueue() {
203+
queueSize.incrementAndGet();
204+
return null;
205+
}
206+
207+
@Override
208+
public void dequeue(Object taskMetric) {
209+
queueSize.decrementAndGet();
210+
}
211+
};
212+
PoolOptions poolOptions = new PoolOptions()
213+
.setConnectionTimeout(500)
214+
.setConnectionTimeoutUnit(MILLISECONDS)
215+
.setMaxSize(1)
216+
.setName("the-pool");
217+
Pool pool = poolBuilder()
218+
.with(poolOptions)
219+
.using(vertx)
220+
.connectingTo(connectOptions.setHost("localhost").setPort(8080))
221+
.build();
222+
SqlConnection conn = pool.getConnection().await(20, SECONDS);
223+
int num = 16;
224+
Async async = ctx.async(num + 1);
225+
for (int i = 0; i < num; i++) {
226+
pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute())
227+
.onComplete(ctx.asyncAssertFailure(t -> async.countDown()));
228+
}
229+
conn.closeHandler(v -> async.countDown());
230+
awaitQueueSize(ctx, queueSize, 16);
231+
firstConnection.get().clientSocket().close();
232+
async.await(20_000);
233+
ctx.assertEquals(0, queueSize.get());
234+
}
235+
159236
@Test
160237
public void testSimpleQuery(TestContext ctx) {
161238
Function<SqlConnection, Future<?>> fn = conn -> conn.query("SELECT * FROM immutable WHERE id=1").execute();

0 commit comments

Comments
 (0)