diff --git a/.github/workflows/ci-4.x.yml b/.github/workflows/ci-4.x.yml index 3a157f8de..55a5339ae 100644 --- a/.github/workflows/ci-4.x.yml +++ b/.github/workflows/ci-4.x.yml @@ -28,12 +28,13 @@ jobs: - os: ubuntu-latest jdk: 8 profile: 'MariaDB-10.4' - - os: ubuntu-latest - jdk: 8 - profile: 'MSSQL-2017-latest' - - os: ubuntu-latest - jdk: 8 - profile: 'MSSQL-2019-latest' + # Currently does not pass in CI due to a container issue +# - os: ubuntu-latest +# jdk: 8 +# profile: 'MSSQL-2017-latest' +# - os: ubuntu-latest +# jdk: 8 +# profile: 'MSSQL-2019-latest' - os: ubuntu-latest jdk: 8 profile: 'DB2-11.5' @@ -49,9 +50,10 @@ jobs: - os: ubuntu-latest jdk: 17 profile: 'MariaDB-10.4' - - os: ubuntu-latest - jdk: 17 - profile: 'MSSQL-2019-latest' + # Currently does not pass in CI due to a container issue +# - os: ubuntu-latest +# jdk: 17 +# profile: 'MSSQL-2019-latest' - os: ubuntu-latest jdk: 17 profile: 'Oracle-23' diff --git a/.github/workflows/ci-5.x.yml b/.github/workflows/ci-5.x.yml index 040a924b8..dd00c066a 100644 --- a/.github/workflows/ci-5.x.yml +++ b/.github/workflows/ci-5.x.yml @@ -34,12 +34,13 @@ jobs: - os: ubuntu-latest jdk: 11 profile: 'MariaDB-10.4' - - os: ubuntu-latest - jdk: 11 - profile: 'MSSQL-2017-latest' - - os: ubuntu-latest - jdk: 11 - profile: 'MSSQL-2019-latest' + # Currently does not pass in CI due to a container issue +# - os: ubuntu-latest +# jdk: 11 +# profile: 'MSSQL-2017-latest' +# - os: ubuntu-latest +# jdk: 11 +# profile: 'MSSQL-2019-latest' - os: ubuntu-latest jdk: 11 profile: 'DB2-11.5' @@ -55,9 +56,10 @@ jobs: - os: ubuntu-latest jdk: 17 profile: 'MariaDB-10.4' - - os: ubuntu-latest - jdk: 17 - profile: 'MSSQL-2019-latest' + # Currently does not pass in CI due to a container issue +# - os: ubuntu-latest +# jdk: 17 +# profile: 'MSSQL-2019-latest' - os: ubuntu-latest jdk: 17 profile: 'Oracle-23' diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/DB2ConnectOptions.java b/vertx-db2-client/src/main/java/io/vertx/db2client/DB2ConnectOptions.java index 7f6564074..e550d487b 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/DB2ConnectOptions.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/DB2ConnectOptions.java @@ -232,6 +232,7 @@ public DB2ConnectOptions setSslOptions(ClientSSLOptions sslOptions) { protected void init() { this.setHost(DEFAULT_HOST); this.setPort(DEFAULT_PORT); + this.setMetricsName(DEFAULT_METRICS_NAME); this.setProperties(new HashMap<>(DEFAULT_CONNECTION_ATTRIBUTES)); } diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionFactory.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionFactory.java index a6912e432..d0404a5ce 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionFactory.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionFactory.java @@ -20,6 +20,7 @@ import io.vertx.core.Promise; import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.VertxInternal; +import io.vertx.core.net.NetClientOptions; import io.vertx.core.net.SocketAddress; import io.vertx.core.internal.net.NetSocketInternal; import io.vertx.core.spi.metrics.ClientMetrics; @@ -38,6 +39,10 @@ public DB2ConnectionFactory(VertxInternal vertx) { super(vertx); } + public DB2ConnectionFactory(VertxInternal vertx, NetClientOptions tcpOptions) { + super(vertx, tcpOptions); + } + @Override protected Future doConnectInternal(DB2ConnectOptions options, ContextInternal context) { SocketAddress server = options.getSocketAddress(); @@ -51,7 +56,7 @@ protected Future doConnectInternal(DB2ConnectOptions options, Contex int pipeliningLimit = options.getPipeliningLimit(); return client.connect(server).flatMap(so -> { VertxMetrics vertxMetrics = vertx.metricsSPI(); - ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", tcpOptions.getMetricsName()) : null; + ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", options.getMetricsName()) : null; DB2SocketConnection conn = new DB2SocketConnection((NetSocketInternal) so, metrics, options, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context); conn.init(); diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/spi/DB2Driver.java b/vertx-db2-client/src/main/java/io/vertx/db2client/spi/DB2Driver.java index f86ce5615..d5a64f18a 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/spi/DB2Driver.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/spi/DB2Driver.java @@ -83,7 +83,7 @@ public boolean acceptsOptions(SqlConnectOptions options) { @Override public ConnectionFactory createConnectionFactory(Vertx vertx, NetClientOptions transportOptions) { - return new DB2ConnectionFactory((VertxInternal) vertx); + return new DB2ConnectionFactory((VertxInternal) vertx, transportOptions); } @Override diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/MSSQLConnectOptions.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/MSSQLConnectOptions.java index 7aba9cdc3..41bead3a6 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/MSSQLConnectOptions.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/MSSQLConnectOptions.java @@ -229,6 +229,7 @@ protected void init() { this.setUser(DEFAULT_USER); this.setPassword(DEFAULT_PASSWORD); this.setDatabase(DEFAULT_DATABASE); + this.setMetricsName(DEFAULT_METRICS_NAME); this.setProperties(new HashMap<>(DEFAULT_PROPERTIES)); packetSize = DEFAULT_PACKET_SIZE; ssl = DEFAULT_SSL; diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionFactory.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionFactory.java index 2f683d47f..b0802b68b 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionFactory.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionFactory.java @@ -77,7 +77,7 @@ private Future connectOrRedirect(MSSQLConnectOptions options, Contex private MSSQLSocketConnection createSocketConnection(NetSocket so, MSSQLConnectOptions options, ContextInternal context) { VertxMetrics vertxMetrics = vertx.metricsSPI(); - ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", tcpOptions.getMetricsName()) : null; + ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", options.getMetricsName()) : null; MSSQLSocketConnection conn = new MSSQLSocketConnection((NetSocketInternal) so, sslContextManager, metrics, options, false, 0, sql -> true, 1, context); conn.init(); return conn; diff --git a/vertx-mssql-client/src/test/java/io/vertx/mssqlclient/tck/MSSQLMetricsTest.java b/vertx-mssql-client/src/test/java/io/vertx/mssqlclient/tck/MSSQLMetricsTest.java index c8be77ff4..d8d9c3b5e 100644 --- a/vertx-mssql-client/src/test/java/io/vertx/mssqlclient/tck/MSSQLMetricsTest.java +++ b/vertx-mssql-client/src/test/java/io/vertx/mssqlclient/tck/MSSQLMetricsTest.java @@ -11,10 +11,11 @@ package io.vertx.mssqlclient.tck; -import io.vertx.core.Vertx; import io.vertx.mssqlclient.MSSQLBuilder; import io.vertx.mssqlclient.junit.MSSQLRule; +import io.vertx.sqlclient.ClientBuilder; import io.vertx.sqlclient.Pool; +import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.tck.MetricsTestBase; import org.junit.ClassRule; @@ -24,8 +25,13 @@ public class MSSQLMetricsTest extends MetricsTestBase { public static MSSQLRule rule = MSSQLRule.SHARED_INSTANCE; @Override - protected Pool createPool(Vertx vertx) { - return MSSQLBuilder.pool(builder -> builder.connectingTo(rule.options()).using(vertx)); + protected SqlConnectOptions connectOptions() { + return rule.options(); + } + + @Override + protected ClientBuilder poolBuilder() { + return MSSQLBuilder.pool(); } @Override diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/MySQLConnectOptions.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/MySQLConnectOptions.java index 0ab0d00b9..740032246 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/MySQLConnectOptions.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/MySQLConnectOptions.java @@ -408,6 +408,7 @@ protected void init() { this.setUser(DEFAULT_USER); this.setPassword(DEFAULT_PASSWORD); this.setDatabase(DEFAULT_SCHEMA); + this.setMetricsName(DEFAULT_METRICS_NAME); this.setProperties(new HashMap<>(DEFAULT_CONNECTION_ATTRIBUTES)); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionFactory.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionFactory.java index f350fb6ba..309cc1593 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionFactory.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionFactory.java @@ -134,7 +134,7 @@ private Future doConnect(MySQLConnectOptions options, SslMode sslMod Future fut = client.connect(connectOptions); return fut.flatMap(so -> { VertxMetrics vertxMetrics = vertx.metricsSPI(); - ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", tcpOptions.getMetricsName()) : null; + ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", options.getMetricsName()) : null; MySQLSocketConnection conn = new MySQLSocketConnection((NetSocketInternal) so, metrics, options, cachePreparedStatements, preparedStatementCacheMaxSize, preparedStatementCacheSqlFilter, pipeliningLimit, context); conn.init(); return Future.future(promise -> conn.sendStartupMessage(username, password, database, collation, serverRsaPublicKey, properties, sslMode, sslOptions, initialCapabilitiesFlags, charsetEncoding, authenticationPlugin, promise)); diff --git a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/OracleConnectOptions.java b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/OracleConnectOptions.java index 7340ac2bc..5909e6971 100644 --- a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/OracleConnectOptions.java +++ b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/OracleConnectOptions.java @@ -367,6 +367,7 @@ protected void init() { this.setUser(DEFAULT_USER); this.setPassword(DEFAULT_PASSWORD); this.setDatabase(DEFAULT_DATABASE); + this.setMetricsName(DEFAULT_METRICS_NAME); } @Override diff --git a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionFactory.java b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionFactory.java index 6d9856839..7bc761ba7 100644 --- a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionFactory.java +++ b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionFactory.java @@ -61,7 +61,7 @@ private OracleDataSource getDatasource(SqlConnectOptions options) { public Future connect(Context context, OracleConnectOptions options) { OracleDataSource datasource = getDatasource(options); VertxMetrics vertxMetrics = ((VertxInternal)context.owner()).metricsSPI(); - ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", null) : null; + ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", options.getMetricsName()) : null; ContextInternal ctx = (ContextInternal) context; return executeBlocking(context, () -> { OracleConnection orac = datasource.createConnectionBuilder().build(); diff --git a/vertx-oracle-client/src/test/java/io/vertx/oracleclient/test/tck/OracleMetricsTest.java b/vertx-oracle-client/src/test/java/io/vertx/oracleclient/test/tck/OracleMetricsTest.java index 702181755..4c4294e7f 100644 --- a/vertx-oracle-client/src/test/java/io/vertx/oracleclient/test/tck/OracleMetricsTest.java +++ b/vertx-oracle-client/src/test/java/io/vertx/oracleclient/test/tck/OracleMetricsTest.java @@ -11,11 +11,12 @@ package io.vertx.oracleclient.test.tck; -import io.vertx.core.Vertx; import io.vertx.ext.unit.TestContext; import io.vertx.oracleclient.OracleBuilder; import io.vertx.oracleclient.test.junit.OracleRule; +import io.vertx.sqlclient.ClientBuilder; import io.vertx.sqlclient.Pool; +import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.tck.MetricsTestBase; import org.junit.ClassRule; import org.junit.Ignore; @@ -27,10 +28,13 @@ public class OracleMetricsTest extends MetricsTestBase { public static OracleRule rule = OracleRule.SHARED_INSTANCE; @Override - protected Pool createPool(Vertx vertx) { - return OracleBuilder.pool(builder -> builder - .connectingTo(rule.options()) - .using(vertx)); + protected SqlConnectOptions connectOptions() { + return rule.options(); + } + + @Override + protected ClientBuilder poolBuilder() { + return OracleBuilder.pool(); } @Override diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnectOptions.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnectOptions.java index 46c2709a1..d9cb20e41 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnectOptions.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnectOptions.java @@ -287,6 +287,7 @@ protected void init() { this.setUser(DEFAULT_USER); this.setPassword(DEFAULT_PASSWORD); this.setDatabase(DEFAULT_DATABASE); + this.setMetricsName(DEFAULT_METRICS_NAME); this.setProperties(new HashMap<>(DEFAULT_PROPERTIES)); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionFactory.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionFactory.java index 3ea07f7f1..53e5e2784 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionFactory.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionFactory.java @@ -176,7 +176,7 @@ private PgSocketConnection newSocketConnection(ContextInternal context, NetSocke int pipeliningLimit = options.getPipeliningLimit(); boolean useLayer7Proxy = options.getUseLayer7Proxy(); VertxMetrics vertxMetrics = vertx.metricsSPI(); - ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", tcpOptions.getMetricsName()) : null; + ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", options.getMetricsName()) : null; PgSocketConnection conn = new PgSocketConnection(socket, metrics, options, cachePreparedStatements, preparedStatementCacheMaxSize, preparedStatementCacheSqlFilter, pipeliningLimit, useLayer7Proxy, context); return conn; } diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgMetricsTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgMetricsTest.java index 644025b13..8ce5c685b 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgMetricsTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgMetricsTest.java @@ -11,9 +11,10 @@ package io.vertx.pgclient; -import io.vertx.core.Vertx; import io.vertx.pgclient.junit.ContainerPgRule; +import io.vertx.sqlclient.ClientBuilder; import io.vertx.sqlclient.Pool; +import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.tck.MetricsTestBase; import org.junit.ClassRule; @@ -23,8 +24,12 @@ public class PgMetricsTest extends MetricsTestBase { public static ContainerPgRule rule = new ContainerPgRule(); @Override - protected Pool createPool(Vertx vertx) { - return PgBuilder.pool().connectingTo(rule.options()).using(vertx).build(); + protected SqlConnectOptions connectOptions() { + return rule.options(); + } + + protected ClientBuilder poolBuilder() { + return PgBuilder.pool(); } @Override diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/SqlConnectOptions.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/SqlConnectOptions.java index eeeb598a9..0783c7084 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/SqlConnectOptions.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/SqlConnectOptions.java @@ -15,6 +15,7 @@ import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.json.annotations.JsonGen; import io.vertx.core.json.JsonObject; +import io.vertx.core.net.ClientOptionsBase; import io.vertx.core.net.ClientSSLOptions; import io.vertx.core.net.SocketAddress; import io.vertx.core.tracing.TracingPolicy; @@ -70,6 +71,7 @@ public static SqlConnectOptions fromUri(String connectionUri) throws IllegalArgu public static final int DEFAULT_PREPARED_STATEMENT_CACHE_MAX_SIZE = 256; public static final int DEFAULT_PREPARED_STATEMENT_CACHE_SQL_LIMIT = 2048; public static final Predicate DEFAULT_PREPARED_STATEMENT_CACHE_FILTER = sql -> sql.length() < DEFAULT_PREPARED_STATEMENT_CACHE_SQL_LIMIT; + public static final String DEFAULT_METRICS_NAME = ""; private String host; private int port; @@ -83,6 +85,7 @@ public static SqlConnectOptions fromUri(String connectionUri) throws IllegalArgu private TracingPolicy tracingPolicy; private int reconnectAttempts; private long reconnectInterval; + private String metricsName; private ClientSSLOptions sslOptions; public SqlConnectOptions() { @@ -109,6 +112,7 @@ public SqlConnectOptions(SqlConnectOptions other) { this.tracingPolicy = other.tracingPolicy; this.reconnectAttempts = other.reconnectAttempts; this.reconnectInterval = other.reconnectInterval; + this.metricsName = other.metricsName; ClientSSLOptions sslOptions = other.sslOptions; this.sslOptions = sslOptions != null ? sslOptions.copy() : null; } @@ -398,6 +402,25 @@ public SqlConnectOptions setReconnectInterval(long interval) { return this; } + /** + * @return the metrics name identifying the reported metrics. + */ + public String getMetricsName() { + return metricsName; + } + + /** + * Set the metrics name identifying the reported metrics, useful for grouping metrics + * with the same name. + * + * @param metricsName the metrics name + * @return a reference to this, so the API can be used fluently + */ + public SqlConnectOptions setMetricsName(String metricsName) { + this.metricsName = metricsName; + return this; + } + public ClientSSLOptions getSslOptions() { return sslOptions; } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java index e9534bfcb..de4a84fe2 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java @@ -16,6 +16,7 @@ import io.vertx.core.internal.net.NetSocketInternal; import io.vertx.core.net.SocketAddress; import io.vertx.core.spi.metrics.ClientMetrics; +import io.vertx.core.spi.metrics.PoolMetrics; import io.vertx.core.spi.tracing.VertxTracer; import io.vertx.core.tracing.TracingPolicy; import io.vertx.core.internal.pool.*; @@ -43,8 +44,11 @@ */ public class SqlConnectionPool { + private static final Object NO_METRICS = new Object(); + private final Function> connectionProvider; private final VertxInternal vertx; + private final PoolMetrics metrics; private final ConnectionPool pool; private final Handler hook; private final Function> afterAcquire; @@ -55,6 +59,7 @@ public class SqlConnectionPool { private final int maxSize; public SqlConnectionPool(Function> connectionProvider, + PoolMetrics metrics, Handler hook, Function> afterAcquire, Function> beforeRecycle, @@ -72,6 +77,7 @@ public SqlConnectionPool(Function> connectionProv throw new IllegalArgumentException("afterAcquire and beforeRecycle hooks must be both not null"); } this.pool = ConnectionPool.pool(connector, new int[]{maxSize}, maxWaitQueueSize); + this.metrics = metrics; this.vertx = vertx; this.pipelined = pipelined; this.idleTimeout = idleTimeout; @@ -154,10 +160,33 @@ public void evict() { }); } + private Object enqueueMetric() { + if (metrics != null) { + try { + return metrics.enqueue(); + } catch (Exception e) { + // Log + } + } + return NO_METRICS; + } + + private void dequeueMetric(Object metric) { + if (metrics != null && metric != NO_METRICS) { + try { + metrics.dequeue(metric); + } catch (Exception e) { + // + } + } + } + public Future execute(ContextInternal context, CommandBase cmd) { Promise> p = context.promise(); + Object metric = enqueueMetric(); pool.acquire(context, 0, p); return p.future().compose(lease -> { + dequeueMetric(metric); PooledConnection pooled = lease.get(); Connection conn = pooled.conn; Future future; @@ -177,8 +206,14 @@ public Future execute(ContextInternal context, CommandBase cmd) { public void acquire(ContextInternal context, long timeout, Handler> handler) { class PoolRequest implements PoolWaiter.Listener, Handler>> { + + private final Object metric; private long timerID = -1L; + PoolRequest(Object metric) { + this.metric = metric; + } + @Override public void handle(AsyncResult> ar) { if (timerID != -1L) { @@ -204,6 +239,7 @@ public void handle(AsyncResult> ar) { } private void handle(Lease lease) { + dequeueMetric(metric); PooledConnection pooled = lease.get(); pooled.lease = lease; handler.handle(Future.succeededFuture(pooled)); @@ -231,7 +267,8 @@ public void onConnect(PoolWaiter waiter) { onEnqueue(waiter); } } - PoolRequest request = new PoolRequest(); + Object metric = enqueueMetric(); + PoolRequest request = new PoolRequest(metric); pool.acquire(context, request, 0, request); } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java index bb41d33d6..2fad432bf 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java @@ -23,6 +23,8 @@ import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.PromiseInternal; import io.vertx.core.internal.VertxInternal; +import io.vertx.core.spi.metrics.PoolMetrics; +import io.vertx.core.spi.metrics.VertxMetrics; import io.vertx.sqlclient.*; import io.vertx.sqlclient.impl.pool.SqlConnectionPool; import io.vertx.sqlclient.internal.Connection; @@ -67,6 +69,14 @@ public PoolImpl(VertxInternal vertx, Handler hook = connectionInitializer != null ? this::initializeConnection : null; + VertxMetrics metrics = vertx.metricsSPI(); + PoolMetrics poolMetrics; + if (metrics != null) { + poolMetrics = metrics.createPoolMetrics("sql", poolOptions.getName(), poolOptions.getMaxSize()); + } else { + poolMetrics = null; + } + this.idleTimeout = MILLISECONDS.convert(poolOptions.getIdleTimeout(), poolOptions.getIdleTimeoutUnit()); this.connectionTimeout = MILLISECONDS.convert(poolOptions.getConnectionTimeout(), poolOptions.getConnectionTimeoutUnit()); this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit()); @@ -74,7 +84,7 @@ public PoolImpl(VertxInternal vertx, this.timerID = -1L; this.pipelined = pipelined; this.vertx = vertx; - this.pool = new SqlConnectionPool(connectionProvider, hook, afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize()); + this.pool = new SqlConnectionPool(connectionProvider, poolMetrics, hook, afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize()); this.closeFuture = closeFuture; this.connectionInitializer = connectionInitializer; } diff --git a/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/MetricsTestBase.java b/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/MetricsTestBase.java index 76c7b73a6..8819e63cb 100644 --- a/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/MetricsTestBase.java +++ b/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/MetricsTestBase.java @@ -17,19 +17,20 @@ import io.vertx.core.metrics.MetricsOptions; import io.vertx.core.net.SocketAddress; import io.vertx.core.spi.metrics.ClientMetrics; +import io.vertx.core.spi.metrics.PoolMetrics; import io.vertx.core.spi.metrics.VertxMetrics; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; -import io.vertx.sqlclient.Pool; -import io.vertx.sqlclient.SqlConnection; -import io.vertx.sqlclient.Tuple; +import io.vertx.sqlclient.*; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -39,8 +40,13 @@ public abstract class MetricsTestBase { Vertx vertx; - ClientMetrics metrics; + ClientMetrics clientMetrics; + PoolMetrics poolMetrics; Pool pool; + String clientType; + String clientNamespace; + String poolType; + String poolName; @Before public void setup() { @@ -50,8 +56,16 @@ public void setup() { new MetricsOptions().setEnabled(true))) .withMetrics(tracingOptions -> new VertxMetrics() { @Override - public ClientMetrics createClientMetrics(SocketAddress remoteAddress, String type, String namespace) { - return metrics; + public ClientMetrics createClientMetrics(SocketAddress remoteAddress, String type, String namespace) { + clientType = type; + clientNamespace = namespace; + return clientMetrics; + } + @Override + public PoolMetrics createPoolMetrics(String type, String name, int maxSize) { + poolType = type; + poolName = name; + return poolMetrics; } }) .build(); @@ -69,14 +83,24 @@ protected Pool getPool() { return pool; } - protected abstract Pool createPool(Vertx vertx); + protected abstract SqlConnectOptions connectOptions(); + + protected abstract ClientBuilder poolBuilder(); + + protected Pool createPool(Vertx vertx) { + return createPool(vertx, new PoolOptions()); + } + + protected Pool createPool(Vertx vertx, PoolOptions options) { + return poolBuilder().with(options).using(vertx).connectingTo(connectOptions()).build(); + } protected abstract String statement(String... parts); @Test public void testClosePool(TestContext ctx) throws Exception { AtomicInteger closeCount = new AtomicInteger(); - metrics = new ClientMetrics() { + clientMetrics = new ClientMetrics() { @Override public void close() { closeCount.incrementAndGet(); @@ -93,6 +117,45 @@ public void close() { } } + @Test + public void testQueuing(TestContext ctx) throws Exception { + AtomicInteger queueSize = new AtomicInteger(); + List enqueueMetrics = Collections.synchronizedList(new ArrayList<>()); + List dequeueMetrics = Collections.synchronizedList(new ArrayList<>()); + poolMetrics = new PoolMetrics() { + @Override + public Object enqueue() { + Object metric = new Object(); + enqueueMetrics.add(metric); + queueSize.incrementAndGet(); + return metric; + } + @Override + public void dequeue(Object taskMetric) { + dequeueMetrics.add(taskMetric); + queueSize.decrementAndGet(); + } + }; + Pool pool = createPool(vertx, new PoolOptions().setMaxSize(1).setName("the-pool")); + SqlConnection conn = pool.getConnection().toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS); + int num = 16; + List> futures = new ArrayList<>(); + for (int i = 0;i < num;i++) { + futures.add(pool.query("SELECT * FROM immutable WHERE id=1").execute()); + } + long now = System.currentTimeMillis(); + while (queueSize.get() != num) { + ctx.assertTrue(System.currentTimeMillis() - now < 20_000); + Thread.sleep(100); + } + conn.close(); + Future.join(futures).toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS); + ctx.assertEquals(0, queueSize.get()); + ctx.assertEquals(enqueueMetrics, dequeueMetrics); + ctx.assertEquals("sql", poolType); + ctx.assertEquals("the-pool", poolName); + } + @Test public void testSimpleQuery(TestContext ctx) { Function> fn = conn -> conn.query("SELECT * FROM immutable WHERE id=1").execute(); @@ -135,10 +198,9 @@ public void testFailure(TestContext ctx) { private void testMetrics(TestContext ctx, boolean fail, Function> fn) { Object metric = new Object(); - Object queueMetric = new Object(); AtomicReference responseMetric = new AtomicReference<>(); AtomicReference failureMetric = new AtomicReference<>(); - metrics = new ClientMetrics() { + clientMetrics = new ClientMetrics() { @Override public Object requestBegin(String uri, Object request) { return metric; @@ -156,7 +218,7 @@ public void requestReset(Object requestMetric) { failureMetric.set(requestMetric); } }; - Pool pool = getPool(); + pool = poolBuilder().using(vertx).connectingTo(connectOptions().setMetricsName("the-client-metrics")).build(); Async async = ctx.async(); vertx.runOnContext(v1 -> { pool @@ -173,6 +235,8 @@ public void requestReset(Object requestMetric) { ctx.assertEquals(metric, responseMetric.get()); ctx.assertNull(failureMetric.get()); } + ctx.assertEquals("sql", clientType); + ctx.assertEquals("the-client-metrics", clientNamespace); async.complete(); }); }));