Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@

package io.helidon.benchmark.nima.models;

import io.vertx.pgclient.PgConnection;
import io.vertx.sqlclient.PreparedQuery;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;

public class PgClientConnection implements AutoCloseable {
static final int UPDATE_QUERIES = 500;
private static String SELECT_WORLD = "SELECT id, randomnumber from WORLD where id=$1";
private static String SELECT_FORTUNE = "SELECT * from FORTUNE";

private PreparedQuery<RowSet<Row>> worldQuery;
private PreparedQuery<RowSet<Row>> fortuneQuery;
private PreparedQuery<RowSet<Row>>[] updateQuery;

private final PgConnection conn;

PgClientConnection(PgConnection conn) {
this.conn = conn;
}

public PgConnection pgConnection() {
return conn;
}

@Override
public void close() {
conn.close();
}

public PreparedQuery<RowSet<Row>> worldQuery() {
return worldQuery;
}

public PreparedQuery<RowSet<Row>> fortuneQuery() {
return fortuneQuery;
}

public PreparedQuery<RowSet<Row>> updateQuery(int queryCount) {
return updateQuery[queryCount - 1];
}

@SuppressWarnings("unchecked")
void prepare() {
try {
worldQuery = conn.prepare(SELECT_WORLD)
.toCompletionStage().toCompletableFuture().get().query();
fortuneQuery = conn.prepare(SELECT_FORTUNE)
.toCompletionStage().toCompletableFuture().get().query();
updateQuery = (PreparedQuery<RowSet<Row>>[]) new PreparedQuery<?>[UPDATE_QUERIES];
for (int i = 0; i < UPDATE_QUERIES; i++) {
updateQuery[i] = conn.prepare(singleUpdate(i + 1))
.toCompletionStage().toCompletableFuture().get().query();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static String singleUpdate(int count) {
StringBuilder sql = new StringBuilder();
sql.append("UPDATE WORLD SET RANDOMNUMBER = CASE ID");
for (int i = 0; i < count; i++) {
int k = i * 2 + 1;
sql.append(" WHEN $").append(k).append(" THEN $").append(k + 1);
}
sql.append(" ELSE RANDOMNUMBER");
sql.append(" END WHERE ID IN ($1");
for (int i = 1; i < count; i++) {
int k = i * 2 + 1;
sql.append(",$").append(k);
}
sql.append(")");
return sql.toString();
}
}
Original file line number Diff line number Diff line change
@@ -1,59 +1,30 @@

package io.helidon.benchmark.nima.models;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

import io.helidon.config.Config;
import io.vertx.core.Vertx;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgConnection;
import io.vertx.sqlclient.PreparedQuery;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;

class PgClientConnectionPool implements AutoCloseable {
abstract class PgClientConnectionPool implements AutoCloseable {

private final Config config;
private final Vertx vertx;
private final PgConnectOptions options;
private final ReentrantLock lock = new ReentrantLock();
private final Map<String, PgClientConnection> connectionMap = new HashMap<>();

public PgClientConnectionPool(Vertx vertx, PgConnectOptions options) {
static PgClientConnectionPool create(Vertx vertx, PgConnectOptions options, Config config) {
return new PgClientConnectionPoolArray(vertx, options, config);
}

PgClientConnectionPool(Vertx vertx, PgConnectOptions options, Config config) {
this.vertx = vertx;
this.options = options;
this.config = config;
}

public PgClientConnection clientConnection() {
String carrierThread = carrierThread();
PgClientConnection connection = connectionMap.get(carrierThread);
if (connection == null) {
try {
lock.lock();
connection = connectionMap.get(carrierThread);
if (connection == null) {
connection = newConnection();
connectionMap.put(carrierThread, connection);
}
} finally {
lock.unlock();
}
}
return connection;
}
abstract PgClientConnection clientConnection();

@Override
public void close() {
try {
for (PgClientConnection connection : connectionMap.values()) {
connection.close();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private PgClientConnection newConnection() {
protected PgClientConnection newConnection() {
try {
PgConnection conn = PgConnection.connect(vertx, options)
.toCompletionStage().toCompletableFuture().get();
Expand All @@ -64,80 +35,4 @@ private PgClientConnection newConnection() {
throw new RuntimeException(e);
}
}

static String carrierThread() {
String threadName = Thread.currentThread().toString();
return threadName.substring(threadName.indexOf('@') + 1);
}

public static class PgClientConnection implements AutoCloseable {
static final int UPDATE_QUERIES = 500;
private static String SELECT_WORLD = "SELECT id, randomnumber from WORLD where id=$1";
private static String SELECT_FORTUNE = "SELECT * from FORTUNE";

private PreparedQuery<RowSet<Row>> worldQuery;
private PreparedQuery<RowSet<Row>> fortuneQuery;
private PreparedQuery<RowSet<Row>>[] updateQuery;

private final PgConnection conn;

PgClientConnection(PgConnection conn) {
this.conn = conn;
}

public PgConnection pgConnection() {
return conn;
}

@Override
public void close() {
conn.close();
}

public PreparedQuery<RowSet<Row>> worldQuery() {
return worldQuery;
}

public PreparedQuery<RowSet<Row>> fortuneQuery() {
return fortuneQuery;
}

public PreparedQuery<RowSet<Row>> updateQuery(int queryCount) {
return updateQuery[queryCount - 1];
}

@SuppressWarnings("unchecked")
void prepare() {
try {
worldQuery = conn.prepare(SELECT_WORLD)
.toCompletionStage().toCompletableFuture().get().query();
fortuneQuery = conn.prepare(SELECT_FORTUNE)
.toCompletionStage().toCompletableFuture().get().query();
updateQuery = (PreparedQuery<RowSet<Row>>[]) new PreparedQuery<?>[UPDATE_QUERIES];
for (int i = 0; i < UPDATE_QUERIES; i++) {
updateQuery[i] = conn.prepare(singleUpdate(i + 1))
.toCompletionStage().toCompletableFuture().get().query();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static String singleUpdate(int count) {
StringBuilder sql = new StringBuilder();
sql.append("UPDATE WORLD SET RANDOMNUMBER = CASE ID");
for (int i = 0; i < count; i++) {
int k = i * 2 + 1;
sql.append(" WHEN $").append(k).append(" THEN $").append(k + 1);
}
sql.append(" ELSE RANDOMNUMBER");
sql.append(" END WHERE ID IN ($1");
for (int i = 1; i < count; i++) {
int k = i * 2 + 1;
sql.append(",$").append(k);
}
sql.append(")");
return sql.toString();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@

package io.helidon.benchmark.nima.models;

import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;

import io.helidon.config.Config;
import io.vertx.core.Vertx;
import io.vertx.pgclient.PgConnectOptions;

class PgClientConnectionPoolArray extends PgClientConnectionPool {
private static final Logger LOGGER = Logger.getLogger(PgClientConnectionPoolArray.class.getName());

private final int connections;
private final PgClientConnection[] connectionArray;
private final ReentrantLock lock = new ReentrantLock();

PgClientConnectionPoolArray(Vertx vertx, PgConnectOptions options, Config config) {
super(vertx, options, config);
double sizeFactor = config.get("pgclient-connection-pool.size-factor")
.asDouble()
.orElse(1.0);
connections = (int) (Runtime.getRuntime().availableProcessors() * sizeFactor);
connectionArray = new PgClientConnection[connections];
LOGGER.info("Connection pool is " + getClass().getSimpleName());
LOGGER.info("Size of connection pool is " + connections);
}

@Override
public PgClientConnection clientConnection() {
int index = Thread.currentThread().hashCode() % connections;
PgClientConnection connection = connectionArray[index];
if (connection == null) {
try {
lock.lock();
connection = connectionArray[index];
if (connection == null) {
connection = newConnection();
connectionArray[index] = connection;
}
} finally {
lock.unlock();
}
}
return connection;
}

@Override
public void close() {
try {
for (PgClientConnection connection : connectionArray) {
if (connection != null) {
connection.close();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;

import io.helidon.config.Config;
import io.vertx.core.Future;
Expand All @@ -15,14 +14,12 @@
import io.vertx.sqlclient.Tuple;

import static io.helidon.benchmark.nima.models.DbRepository.randomWorldNumber;
import static io.helidon.benchmark.nima.models.PgClientConnectionPool.PgClientConnection.UPDATE_QUERIES;
import static io.helidon.benchmark.nima.models.PgClientConnection.UPDATE_QUERIES;

public class PgClientRepository implements DbRepository {
private static final Logger LOGGER = Logger.getLogger(PgClientRepository.class.getName());

private final PgClientConnectionPool connectionPool;

@SuppressWarnings("unchecked")
public PgClientRepository(Config config) {
VertxOptions vertxOptions = new VertxOptions()
.setPreferNativeTransport(true)
Expand All @@ -41,7 +38,7 @@ public PgClientRepository(Config config) {
.setTcpQuickAck(true)
.setTcpKeepAlive(true)
.setPipeliningLimit(100000);
connectionPool = new PgClientConnectionPool(vertx, connectOptions);
connectionPool = PgClientConnectionPool.create(vertx, connectOptions, config);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ db: "hello_world"
username: benchmarkdbuser
password: benchmarkdbpass
db-repository: "pgclient" # "pgclient" (default) or "hikari"
pgclient-connection-pool:
size-factor: 1.0 # size = available-processors * size-factor

Loading