Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@

import io.vertx.ext.unit.TestContext;
import io.vertx.oracleclient.OracleBuilder;
import tests.oracleclient.junit.OracleRule;
import io.vertx.sqlclient.ClientBuilder;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.tests.sqlclient.tck.MetricsTestBase;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import tests.oracleclient.junit.OracleRule;

public class OracleMetricsTest extends MetricsTestBase {

Expand Down Expand Up @@ -55,4 +55,11 @@ public void testPreparedBatchQuery(TestContext ctx) {
public void testPrepareAndBatchQuery(TestContext ctx) {
super.testPrepareAndBatchQuery(ctx);
}

@Test
@Ignore("Implementation of the test does not work with Oracle")
@Override
public void testConnectionLost(TestContext ctx) throws Exception {
super.testConnectionLost(ctx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ public void onEnqueue(PoolWaiter<PooledConnection> waiter) {
pool.cancel(waiter, (res, err) -> {
if (err == null) {
if (res) {
dequeueMetric(metric);
handler.fail("Timeout");
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.sqlclient.*;
import io.vertx.tests.sqlclient.ProxyServer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -36,6 +37,9 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

@RunWith(VertxUnitRunner.class)
public abstract class MetricsTestBase {

Expand Down Expand Up @@ -110,15 +114,20 @@ public void close() {
pool.query("SELECT * FROM immutable WHERE id=1").execute().toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
ctx.assertEquals(0, closeCount.get());
pool.close();
long now = System.currentTimeMillis();
while (closeCount.get() != 1) {
ctx.assertTrue(System.currentTimeMillis() - now < 20_000);
Thread.sleep(100);
}
awaitQueueSize(ctx, closeCount, 1);
}

@Test
public void testQueuing(TestContext ctx) throws Exception {
testQueuing(ctx, false);
}

@Test
public void testQueuingTimeout(TestContext ctx) throws Exception {
testQueuing(ctx, true);
}

private void testQueuing(TestContext ctx, boolean timeout) throws Exception {
AtomicInteger queueSize = new AtomicInteger();
List<Object> enqueueMetrics = Collections.synchronizedList(new ArrayList<>());
List<Object> dequeueMetrics = Collections.synchronizedList(new ArrayList<>());
Expand All @@ -136,26 +145,94 @@ public void dequeue(Object taskMetric) {
queueSize.decrementAndGet();
}
};
Pool pool = createPool(vertx, new PoolOptions().setMaxSize(1).setName("the-pool"));
SqlConnection conn = pool.getConnection().toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
PoolOptions poolOptions = new PoolOptions().setMaxSize(1).setName("the-pool");
if (timeout) {
poolOptions.setConnectionTimeout(2).setConnectionTimeoutUnit(SECONDS);
}
Pool pool = createPool(vertx, poolOptions);
SqlConnection conn = pool.getConnection().await(20, SECONDS);
int num = 16;
List<Future<?>> futures = new ArrayList<>();
for (int i = 0;i < num;i++) {
futures.add(pool.query("SELECT * FROM immutable WHERE id=1").execute());
}
long now = System.currentTimeMillis();
while (queueSize.get() != num) {
ctx.assertTrue(System.currentTimeMillis() - now < 20_000);
Thread.sleep(100);
futures.add(pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute()));
}
awaitQueueSize(ctx, queueSize, timeout ? 0 : num);
conn.close();
Future.join(futures).toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
Future.join(futures).otherwiseEmpty().await(20, SECONDS);
ctx.assertEquals(0, queueSize.get());
ctx.assertEquals(enqueueMetrics, dequeueMetrics);
ctx.assertEquals("sql", poolType);
ctx.assertEquals("the-pool", poolName);
}

private void awaitQueueSize(TestContext ctx, AtomicInteger queueSize, int num) throws InterruptedException {
long now = System.currentTimeMillis();
for (; ; ) {
if (queueSize.get() != num) {
if (System.currentTimeMillis() - now >= 20_000) {
ctx.fail("Timeout waiting for queue size " + queueSize.get() + " to be equal to " + num);
} else {
MILLISECONDS.sleep(500);
}
} else {
break;
}
}
}

@Test
public void testConnectionLost(TestContext ctx) throws Exception {
SqlConnectOptions connectOptions = connectOptions();
ProxyServer proxy = ProxyServer.create(vertx, connectOptions.getPort(), connectOptions.getHost());
AtomicReference<ProxyServer.Connection> firstConnection = new AtomicReference<>();
proxy.proxyHandler(proxiedConn -> {
if (firstConnection.compareAndSet(null, proxiedConn)) {
proxiedConn.connect();
}
});
// Start proxy
Async listenLatch = ctx.async();
proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(res -> listenLatch.complete()));
listenLatch.awaitSuccess(20_000);


AtomicInteger queueSize = new AtomicInteger();
poolMetrics = new PoolMetrics() {
@Override
public Object enqueue() {
queueSize.incrementAndGet();
return null;
}

@Override
public void dequeue(Object taskMetric) {
queueSize.decrementAndGet();
}
};
PoolOptions poolOptions = new PoolOptions()
.setConnectionTimeout(500)
.setConnectionTimeoutUnit(MILLISECONDS)
.setMaxSize(1)
.setName("the-pool");
Pool pool = poolBuilder()
.with(poolOptions)
.using(vertx)
.connectingTo(connectOptions.setHost("localhost").setPort(8080))
.build();
SqlConnection conn = pool.getConnection().await(20, SECONDS);
int num = 16;
Async async = ctx.async(num + 1);
for (int i = 0; i < num; i++) {
pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute())
.onComplete(ctx.asyncAssertFailure(t -> async.countDown()));
}
conn.closeHandler(v -> async.countDown());
awaitQueueSize(ctx, queueSize, 16);
firstConnection.get().clientSocket().close();
async.await(20_000);
ctx.assertEquals(0, queueSize.get());
}

@Test
public void testSimpleQuery(TestContext ctx) {
Function<SqlConnection, Future<?>> fn = conn -> conn.query("SELECT * FROM immutable WHERE id=1").execute();
Expand Down