Skip to content

Commit 9fbb130

Browse files
committed
Metrics vertx_pool_queue_pending doesn't decrease after connection loss
See #1494 SqlConnectionPool must invoke dequeueMetric when a connection fails to be acquired Signed-off-by: Thomas Segismont <[email protected]>
1 parent a196cba commit 9fbb130

File tree

2 files changed

+96
-14
lines changed

2 files changed

+96
-14
lines changed

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ public void onEnqueue(PoolWaiter<PooledConnection> waiter) {
255255
pool.cancel(waiter, (res, err) -> {
256256
if (err == null) {
257257
if (res) {
258+
dequeueMetric(metric);
258259
handler.fail("Timeout");
259260
}
260261
} else {

vertx-sql-client/src/test/java/io/vertx/tests/sqlclient/tck/MetricsTestBase.java

Lines changed: 95 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.vertx.ext.unit.TestContext;
2424
import io.vertx.ext.unit.junit.VertxUnitRunner;
2525
import io.vertx.sqlclient.*;
26+
import io.vertx.tests.sqlclient.ProxyServer;
2627
import org.junit.After;
2728
import org.junit.Before;
2829
import org.junit.Test;
@@ -32,10 +33,14 @@
3233
import java.util.Collections;
3334
import java.util.List;
3435
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.atomic.AtomicBoolean;
3537
import java.util.concurrent.atomic.AtomicInteger;
3638
import java.util.concurrent.atomic.AtomicReference;
3739
import java.util.function.Function;
3840

41+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
42+
import static java.util.concurrent.TimeUnit.SECONDS;
43+
3944
@RunWith(VertxUnitRunner.class)
4045
public abstract class MetricsTestBase {
4146

@@ -110,15 +115,20 @@ public void close() {
110115
pool.query("SELECT * FROM immutable WHERE id=1").execute().toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
111116
ctx.assertEquals(0, closeCount.get());
112117
pool.close();
113-
long now = System.currentTimeMillis();
114-
while (closeCount.get() != 1) {
115-
ctx.assertTrue(System.currentTimeMillis() - now < 20_000);
116-
Thread.sleep(100);
117-
}
118+
awaitQueueSize(ctx, closeCount, 1);
118119
}
119120

120121
@Test
121122
public void testQueuing(TestContext ctx) throws Exception {
123+
testQueuing(ctx, false);
124+
}
125+
126+
@Test
127+
public void testQueuingTimeout(TestContext ctx) throws Exception {
128+
testQueuing(ctx, true);
129+
}
130+
131+
private void testQueuing(TestContext ctx, boolean timeout) throws Exception {
122132
AtomicInteger queueSize = new AtomicInteger();
123133
List<Object> enqueueMetrics = Collections.synchronizedList(new ArrayList<>());
124134
List<Object> dequeueMetrics = Collections.synchronizedList(new ArrayList<>());
@@ -136,26 +146,97 @@ public void dequeue(Object taskMetric) {
136146
queueSize.decrementAndGet();
137147
}
138148
};
139-
Pool pool = createPool(vertx, new PoolOptions().setMaxSize(1).setName("the-pool"));
140-
SqlConnection conn = pool.getConnection().toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
149+
PoolOptions poolOptions = new PoolOptions().setMaxSize(1).setName("the-pool");
150+
if (timeout) {
151+
poolOptions.setConnectionTimeout(2).setConnectionTimeoutUnit(SECONDS);
152+
}
153+
Pool pool = createPool(vertx, poolOptions);
154+
SqlConnection conn = pool.getConnection().await(20, SECONDS);
141155
int num = 16;
142156
List<Future<?>> futures = new ArrayList<>();
143157
for (int i = 0;i < num;i++) {
144-
futures.add(pool.query("SELECT * FROM immutable WHERE id=1").execute());
145-
}
146-
long now = System.currentTimeMillis();
147-
while (queueSize.get() != num) {
148-
ctx.assertTrue(System.currentTimeMillis() - now < 20_000);
149-
Thread.sleep(100);
158+
futures.add(pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute()));
150159
}
160+
awaitQueueSize(ctx, queueSize, timeout ? 0 : num);
151161
conn.close();
152-
Future.join(futures).toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
162+
Future.join(futures).otherwiseEmpty().await(20, SECONDS);
153163
ctx.assertEquals(0, queueSize.get());
154164
ctx.assertEquals(enqueueMetrics, dequeueMetrics);
155165
ctx.assertEquals("sql", poolType);
156166
ctx.assertEquals("the-pool", poolName);
157167
}
158168

169+
private void awaitQueueSize(TestContext ctx, AtomicInteger queueSize, int num) throws InterruptedException {
170+
long now = System.currentTimeMillis();
171+
for (; ; ) {
172+
if (queueSize.get() != num) {
173+
if (System.currentTimeMillis() - now >= 20_000) {
174+
ctx.fail("Timeout waiting for queue size " + queueSize.get() + " to be equal to " + num);
175+
} else {
176+
MILLISECONDS.sleep(500);
177+
}
178+
} else {
179+
break;
180+
}
181+
}
182+
}
183+
184+
@Test
185+
public void testConnectionLost(TestContext ctx) throws Exception {
186+
SqlConnectOptions connectOptions = connectOptions();
187+
ProxyServer proxy = ProxyServer.create(vertx, connectOptions.getPort(), connectOptions.getHost());
188+
AtomicBoolean firstConnection = new AtomicBoolean(true);
189+
proxy.proxyHandler(proxiedConn -> {
190+
if (firstConnection.compareAndSet(true, false)) {
191+
proxiedConn.connect();
192+
vertx.eventBus().consumer("disconnect", msg -> {
193+
proxiedConn.clientSocket().close();
194+
});
195+
}
196+
});
197+
// Start proxy
198+
Async listenLatch = ctx.async();
199+
proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(res -> listenLatch.complete()));
200+
listenLatch.awaitSuccess(20_000);
201+
202+
203+
AtomicInteger queueSize = new AtomicInteger();
204+
poolMetrics = new PoolMetrics() {
205+
@Override
206+
public Object enqueue() {
207+
queueSize.incrementAndGet();
208+
return null;
209+
}
210+
211+
@Override
212+
public void dequeue(Object taskMetric) {
213+
queueSize.decrementAndGet();
214+
}
215+
};
216+
PoolOptions poolOptions = new PoolOptions()
217+
.setConnectionTimeout(500)
218+
.setConnectionTimeoutUnit(MILLISECONDS)
219+
.setMaxSize(1)
220+
.setName("the-pool");
221+
Pool pool = poolBuilder()
222+
.with(poolOptions)
223+
.using(vertx)
224+
.connectingTo(connectOptions.setHost("localhost").setPort(8080))
225+
.build();
226+
SqlConnection conn = pool.getConnection().await(20, SECONDS);
227+
int num = 16;
228+
Async async = ctx.async(num + 1);
229+
for (int i = 0; i < num; i++) {
230+
pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute())
231+
.onComplete(ctx.asyncAssertFailure(t -> async.countDown()));
232+
}
233+
conn.closeHandler(v -> async.countDown());
234+
awaitQueueSize(ctx, queueSize, 16);
235+
vertx.eventBus().send("disconnect", "boom");
236+
async.await(20_000);
237+
ctx.assertEquals(0, queueSize.get());
238+
}
239+
159240
@Test
160241
public void testSimpleQuery(TestContext ctx) {
161242
Function<SqlConnection, Future<?>> fn = conn -> conn.query("SELECT * FROM immutable WHERE id=1").execute();

0 commit comments

Comments
 (0)