Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
package io.vertx.mssqlclient.tck;

import io.vertx.mssqlclient.MSSQLBuilder;
import io.vertx.mssqlclient.MSSQLConnectOptions;
import io.vertx.mssqlclient.junit.MSSQLRule;
import io.vertx.sqlclient.ClientBuilder;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.tck.MetricsTestBase;
import org.junit.ClassRule;

Expand All @@ -23,9 +25,14 @@ public class MSSQLMetricsTest extends MetricsTestBase {
@ClassRule
public static MSSQLRule rule = MSSQLRule.SHARED_INSTANCE;

@Override
protected SqlConnectOptions connectOptions() {
return new MSSQLConnectOptions(rule.options());
}

@Override
protected ClientBuilder<Pool> poolBuilder() {
return MSSQLBuilder.pool().connectingTo(rule.options());
return MSSQLBuilder.pool().connectingTo(connectOptions());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

import io.vertx.ext.unit.TestContext;
import io.vertx.oracleclient.OracleBuilder;
import io.vertx.oracleclient.OracleConnectOptions;
import io.vertx.oracleclient.test.junit.OracleRule;
import io.vertx.sqlclient.ClientBuilder;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.tck.MetricsTestBase;
import org.junit.ClassRule;
import org.junit.Ignore;
Expand All @@ -26,9 +28,14 @@ public class OracleMetricsTest extends MetricsTestBase {
@ClassRule
public static OracleRule rule = OracleRule.SHARED_INSTANCE;

@Override
protected SqlConnectOptions connectOptions() {
return new OracleConnectOptions(rule.options());
}

@Override
protected ClientBuilder<Pool> poolBuilder() {
return OracleBuilder.pool().connectingTo(rule.options());
return OracleBuilder.pool().connectingTo(connectOptions());
}

@Override
Expand All @@ -49,4 +56,25 @@ public void testPreparedBatchQuery(TestContext ctx) {
public void testPrepareAndBatchQuery(TestContext ctx) {
super.testPrepareAndBatchQuery(ctx);
}

@Test
@Ignore("Implementation of the test does not work with Oracle")
@Override
public void testQueuing(TestContext ctx) throws Exception {
super.testQueuing(ctx);
}

@Test
@Ignore("Implementation of the test does not work with Oracle")
@Override
public void testQueuingTimeout(TestContext ctx) throws Exception {
super.testQueuingTimeout(ctx);
}

@Test
@Ignore("Implementation of the test does not work with Oracle")
@Override
public void testConnectionLost(TestContext ctx) throws Exception {
super.testConnectionLost(ctx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.vertx.pgclient.junit.ContainerPgRule;
import io.vertx.sqlclient.ClientBuilder;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.tck.MetricsTestBase;
import org.junit.ClassRule;

Expand All @@ -22,9 +23,14 @@ public class PgMetricsTest extends MetricsTestBase {
@ClassRule
public static ContainerPgRule rule = new ContainerPgRule();

@Override
protected SqlConnectOptions connectOptions() {
return new PgConnectOptions(rule.options());
}

@Override
protected ClientBuilder<Pool> poolBuilder() {
return PgBuilder.pool().connectingTo(rule.options());
return PgBuilder.pool().connectingTo(connectOptions());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ private Object dequeueAndBeginUse(Object metric) {
return NO_METRICS;
}

private void dequeueAndReject(Object metric) {
if (metrics != null && metric != NO_METRICS) {
try {
metrics.rejected(metric);
} catch (Exception e) {
// Log
}
}
}

private Object endUse(Object metric) {
if (metrics != null && metric != NO_METRICS) {
try {
Expand Down Expand Up @@ -271,6 +281,7 @@ public void onEnqueue(PoolWaiter<PooledConnection> waiter) {
pool.cancel(waiter, ar -> {
if (ar.succeeded()) {
if (ar.result()) {
dequeueAndReject(queueMetric);
handler.handle(Future.failedFuture("Timeout"));
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

@RunWith(VertxUnitRunner.class)
public abstract class MetricsTestBase {

Expand Down Expand Up @@ -81,6 +84,8 @@ protected Pool getPool() {
return pool;
}

protected abstract SqlConnectOptions connectOptions();

protected abstract ClientBuilder<Pool> poolBuilder();

protected Pool createPool(Vertx vertx) {
Expand Down Expand Up @@ -122,6 +127,15 @@ public void close() {

@Test
public void testQueuing(TestContext ctx) throws Exception {
testQueuing(ctx, false);
}

@Test
public void testQueuingTimeout(TestContext ctx) throws Exception {
testQueuing(ctx, true);
}

private void testQueuing(TestContext ctx, boolean timeout) throws Exception {
AtomicInteger queueSize = new AtomicInteger();
AtomicInteger inUse = new AtomicInteger();
List<Object> enqueueMetrics = Collections.synchronizedList(new ArrayList<>());
Expand Down Expand Up @@ -156,28 +170,106 @@ public void end(Object inUseMetric, boolean succeeded) {
endMetrics.add(inUseMetric);
}
};
Pool pool = createPool(vertx, new PoolOptions().setMaxSize(1).setName("the-pool"));
SqlConnection conn = pool.getConnection().toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
PoolOptions poolOptions = new PoolOptions().setMaxSize(1).setName("the-pool");
if (timeout) {
poolOptions.setConnectionTimeout(2).setConnectionTimeoutUnit(SECONDS);
}
Pool pool = createPool(vertx, poolOptions);
SqlConnection conn = Future.await(pool.getConnection(), 20, SECONDS);
int num = 16;
List<Future<?>> futures = new ArrayList<>();
for (int i = 0;i < num;i++) {
futures.add(pool.query("SELECT * FROM immutable WHERE id=1").execute());
}
long now = System.currentTimeMillis();
while (queueSize.get() != num) {
ctx.assertTrue(System.currentTimeMillis() - now < 20_000);
Thread.sleep(100);
futures.add(pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute()));
}
conn.close();
Future.join(futures).toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
awaitQueueSize(ctx, queueSize, timeout ? 0 : num);
Future.await(conn.close(), 20, SECONDS);
Future.await(Future.join(futures).otherwiseEmpty(), 20, SECONDS);
ctx.assertEquals(0, queueSize.get());
ctx.assertEquals(0, inUse.get());
ctx.assertEquals(enqueueMetrics, dequeueMetrics);
if (timeout) {
ctx.assertTrue(enqueueMetrics.containsAll(dequeueMetrics) && dequeueMetrics.containsAll(enqueueMetrics));
} else {
ctx.assertEquals(enqueueMetrics, dequeueMetrics);
}
ctx.assertEquals(beginMetrics, endMetrics);
ctx.assertEquals("sql", poolType);
ctx.assertEquals("the-pool", poolName);
}

private void awaitQueueSize(TestContext ctx, AtomicInteger queueSize, int num) throws InterruptedException {
long now = System.currentTimeMillis();
for (; ; ) {
if (queueSize.get() != num) {
if (System.currentTimeMillis() - now >= 20_000) {
ctx.fail("Timeout waiting for queue size " + queueSize.get() + " to be equal to " + num);
} else {
MILLISECONDS.sleep(500);
}
} else {
break;
}
}
}

@Test
public void testConnectionLost(TestContext ctx) throws Exception {
SqlConnectOptions connectOptions = connectOptions();
ProxyServer proxy = ProxyServer.create(vertx, connectOptions.getPort(), connectOptions.getHost());
AtomicReference<ProxyServer.Connection> firstConnection = new AtomicReference<>();
proxy.proxyHandler(proxiedConn -> {
if (firstConnection.compareAndSet(null, proxiedConn)) {
proxiedConn.connect();
}
});
// Start proxy
Async listenLatch = ctx.async();
proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(res -> listenLatch.complete()));
listenLatch.awaitSuccess(20_000);


AtomicInteger queueSize = new AtomicInteger();
poolMetrics = new PoolMetrics() {
@Override
public Object submitted() {
queueSize.incrementAndGet();
return null;
}

@Override
public Object begin(Object o) {
queueSize.decrementAndGet();
return null;
}

@Override
public void rejected(Object o) {
queueSize.decrementAndGet();
}
};
PoolOptions poolOptions = new PoolOptions()
.setConnectionTimeout(500)
.setConnectionTimeoutUnit(MILLISECONDS)
.setMaxSize(1)
.setName("the-pool");
Pool pool = poolBuilder()
.with(poolOptions)
.using(vertx)
.connectingTo(connectOptions.setHost("localhost").setPort(8080))
.build();
SqlConnection conn = Future.await(pool.getConnection(), 20, SECONDS);
int num = 16;
Async async = ctx.async(num + 1);
for (int i = 0; i < num; i++) {
pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute())
.onComplete(ctx.asyncAssertFailure(t -> async.countDown()));
}
conn.closeHandler(v -> async.countDown());
awaitQueueSize(ctx, queueSize, 16);
firstConnection.get().clientSocket().close();
async.await(20_000);
ctx.assertEquals(0, queueSize.get());
}

@Test
public void testSimpleQuery(TestContext ctx) {
Function<SqlConnection, Future<?>> fn = conn -> conn.query("SELECT * FROM immutable WHERE id=1").execute();
Expand Down