Skip to content

Commit dab117a

Browse files
committed
New connection pool.
Signed-off-by: Santiago Pericas-Geertsen <[email protected]>
1 parent c4a735d commit dab117a

File tree

4 files changed

+154
-67
lines changed

4 files changed

+154
-67
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
2+
package io.helidon.benchmark.nima.models;
3+
4+
import io.vertx.core.Vertx;
5+
import io.vertx.pgclient.PgConnectOptions;
6+
import io.vertx.pgclient.PgConnection;
7+
import io.vertx.sqlclient.PreparedQuery;
8+
import io.vertx.sqlclient.Row;
9+
import io.vertx.sqlclient.RowSet;
10+
11+
class PgClientConnectionPool implements AutoCloseable {
12+
13+
private final int size;
14+
private final Vertx vertx;
15+
private final PgConnectOptions options;
16+
private final PgClientConnection[] connections;
17+
18+
public PgClientConnectionPool(Vertx vertx, int size, PgConnectOptions options) {
19+
this.size = size;
20+
this.vertx = vertx;
21+
this.options = options;
22+
this.connections = new PgClientConnection[size];
23+
}
24+
25+
public PgClientConnection clientConnection() {
26+
int bucket = Thread.currentThread().hashCode() % size;
27+
return connections[bucket];
28+
}
29+
30+
public void connect() {
31+
try {
32+
for (int i = 0; i < size; i++) {
33+
PgConnection conn = PgConnection.connect(vertx, options)
34+
.toCompletionStage().toCompletableFuture().get();
35+
PgClientConnection clientConn = new PgClientConnection(conn);
36+
clientConn.prepare();
37+
connections[i] = clientConn;
38+
}
39+
} catch (Exception e) {
40+
throw new RuntimeException(e);
41+
}
42+
}
43+
44+
@Override
45+
public void close() {
46+
try {
47+
for (PgClientConnection connection : connections) {
48+
connection.close();
49+
}
50+
} catch (Exception e) {
51+
throw new RuntimeException(e);
52+
}
53+
}
54+
55+
public static class PgClientConnection implements AutoCloseable {
56+
private static final int UPDATE_QUERIES = 500;
57+
private static String SELECT_WORLD = "SELECT id, randomnumber from WORLD where id=$1";
58+
private static String SELECT_FORTUNE = "SELECT id, message from FORTUNE";
59+
60+
private PreparedQuery<RowSet<Row>> worldQuery;
61+
private PreparedQuery<RowSet<Row>> fortuneQuery;
62+
private PreparedQuery<RowSet<Row>>[] updateQuery;
63+
64+
private final PgConnection conn;
65+
66+
PgClientConnection(PgConnection conn) {
67+
this.conn = conn;
68+
}
69+
70+
public PgConnection pgConnection() {
71+
return conn;
72+
}
73+
74+
@Override
75+
public void close() {
76+
conn.close();
77+
}
78+
79+
public PreparedQuery<RowSet<Row>> worldQuery() {
80+
return worldQuery;
81+
}
82+
83+
public PreparedQuery<RowSet<Row>> fortuneQuery() {
84+
return fortuneQuery;
85+
}
86+
87+
public PreparedQuery<RowSet<Row>> updateQuery(int queryCount) {
88+
return updateQuery[queryCount - 1];
89+
}
90+
91+
@SuppressWarnings("unchecked")
92+
void prepare() {
93+
try {
94+
worldQuery = conn.prepare(SELECT_WORLD)
95+
.toCompletionStage().toCompletableFuture().get().query();
96+
fortuneQuery = conn.prepare(SELECT_FORTUNE)
97+
.toCompletionStage().toCompletableFuture().get().query();
98+
updateQuery = (PreparedQuery<RowSet<Row>>[]) new PreparedQuery<?>[UPDATE_QUERIES];
99+
for (int i = 0; i < UPDATE_QUERIES; i++) {
100+
updateQuery[i] = conn.prepare(singleUpdate(i + 1))
101+
.toCompletionStage().toCompletableFuture().get().query();
102+
}
103+
} catch (Exception e) {
104+
throw new RuntimeException(e);
105+
}
106+
}
107+
108+
private static String singleUpdate(int count) {
109+
StringBuilder sql = new StringBuilder();
110+
sql.append("UPDATE WORLD SET RANDOMNUMBER = CASE ID");
111+
for (int i = 0; i < count; i++) {
112+
int k = i * 2 + 1;
113+
sql.append(" WHEN $").append(k).append(" THEN $").append(k + 1);
114+
}
115+
sql.append(" ELSE RANDOMNUMBER");
116+
sql.append(" END WHERE ID IN ($1");
117+
for (int i = 1; i < count; i++) {
118+
int k = i * 2 + 1;
119+
sql.append(",$").append(k);
120+
}
121+
sql.append(")");
122+
return sql.toString();
123+
}
124+
}
125+
}

frameworks/Java/helidon/nima/src/main/java/io/helidon/benchmark/nima/models/PgClientRepository.java

Lines changed: 26 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,24 @@
22

33
import java.util.ArrayList;
44
import java.util.List;
5-
import java.util.concurrent.ExecutionException;
65
import java.util.logging.Logger;
76

87
import io.helidon.config.Config;
98
import io.vertx.core.Future;
109
import io.vertx.core.Vertx;
1110
import io.vertx.core.VertxOptions;
1211
import io.vertx.pgclient.PgConnectOptions;
13-
import io.vertx.pgclient.PgPool;
14-
import io.vertx.sqlclient.PoolOptions;
1512
import io.vertx.sqlclient.PreparedQuery;
1613
import io.vertx.sqlclient.Row;
1714
import io.vertx.sqlclient.RowSet;
18-
import io.vertx.sqlclient.SqlClient;
1915
import io.vertx.sqlclient.Tuple;
2016

2117
import static io.helidon.benchmark.nima.models.DbRepository.randomWorldNumber;
2218

2319
public class PgClientRepository implements DbRepository {
2420
private static final Logger LOGGER = Logger.getLogger(PgClientRepository.class.getName());
25-
private static final int UPDATE_QUERIES = 500;
2621

27-
private final SqlClient updatePool;
28-
29-
private final PreparedQuery<RowSet<Row>> getFortuneQuery;
30-
private final PreparedQuery<RowSet<Row>> getWorldQuery;
31-
private final PreparedQuery<RowSet<Row>>[] updateWorldSingleQuery;
22+
private final PgClientConnectionPool connectionPool;
3223

3324
@SuppressWarnings("unchecked")
3425
public PgClientRepository(Config config) {
@@ -41,27 +32,16 @@ public PgClientRepository(Config config) {
4132
.setUser(config.get("username").asString().orElse("benchmarkdbuser"))
4233
.setPassword(config.get("password").asString().orElse("benchmarkdbpass"))
4334
.setPipeliningLimit(100000);
44-
45-
int sqlPoolSize = config.get("sql-pool-size").asInt().orElse(64);
46-
PoolOptions clientOptions = new PoolOptions().setMaxSize(sqlPoolSize);
47-
LOGGER.info("sql-pool-size is " + sqlPoolSize);
48-
49-
SqlClient queryPool = PgPool.client(vertx, connectOptions, clientOptions);
50-
updatePool = PgPool.client(vertx, connectOptions, clientOptions);
51-
52-
getWorldQuery = queryPool.preparedQuery("SELECT id, randomnumber FROM world WHERE id = $1");
53-
getFortuneQuery = queryPool.preparedQuery("SELECT id, message FROM fortune");
54-
55-
updateWorldSingleQuery = new PreparedQuery[UPDATE_QUERIES];
56-
for (int i = 0; i < UPDATE_QUERIES; i++) {
57-
updateWorldSingleQuery[i] = queryPool.preparedQuery(singleUpdate(i + 1));
58-
}
35+
int sqlPoolSize = config.get("sql-pool-size").asInt().orElse(Runtime.getRuntime().availableProcessors());
36+
connectionPool = new PgClientConnectionPool(vertx, sqlPoolSize, connectOptions);
37+
connectionPool.connect();
5938
}
6039

6140
@Override
6241
public World getWorld(int id) {
6342
try {
64-
return getWorldQuery.execute(Tuple.of(id))
43+
PreparedQuery<RowSet<Row>> worldQuery = connectionPool.clientConnection().worldQuery();
44+
return worldQuery.execute(Tuple.of(id))
6545
.map(rows -> {
6646
Row r = rows.iterator().next();
6747
return new World(r.getInteger(0), r.getInteger(1));
@@ -74,13 +54,14 @@ public World getWorld(int id) {
7454
@Override
7555
public List<World> getWorlds(int count) {
7656
try {
57+
PreparedQuery<RowSet<Row>> worldQuery = connectionPool.clientConnection().worldQuery();
7758
List<Future<?>> futures = new ArrayList<>();
7859
for (int i = 0; i < count; i++) {
79-
futures.add(getWorldQuery.execute(Tuple.of(randomWorldNumber()))
80-
.map(rows -> {
81-
Row r = rows.iterator().next();
82-
return new World(r.getInteger(0), r.getInteger(1));
83-
}));
60+
futures.add(worldQuery.execute(Tuple.of(randomWorldNumber()))
61+
.map(rows -> {
62+
Row r = rows.iterator().next();
63+
return new World(r.getInteger(0), r.getInteger(1));
64+
}));
8465
}
8566
return Future.all(futures).toCompletionStage().toCompletableFuture().get().list();
8667
} catch (Exception e) {
@@ -92,7 +73,18 @@ public List<World> getWorlds(int count) {
9273
public List<World> updateWorlds(int count) {
9374
List<World> worlds = getWorlds(count);
9475
try {
95-
return updateWorlds(worlds, count, updatePool);
76+
PreparedQuery<RowSet<Row>> updateQuery = connectionPool.clientConnection().updateQuery(count);
77+
List<Integer> updateParams = new ArrayList<>(count * 2);
78+
for (World world : worlds) {
79+
updateParams.add(world.id);
80+
world.randomNumber = randomWorldNumber();
81+
updateParams.add(world.randomNumber);
82+
}
83+
return updateQuery.execute(Tuple.wrap(updateParams))
84+
.toCompletionStage()
85+
.thenApply(rows -> worlds)
86+
.toCompletableFuture()
87+
.get();
9688
} catch (Exception e) {
9789
throw new RuntimeException(e);
9890
}
@@ -101,7 +93,8 @@ public List<World> updateWorlds(int count) {
10193
@Override
10294
public List<Fortune> getFortunes() {
10395
try {
104-
return getFortuneQuery.execute()
96+
PreparedQuery<RowSet<Row>> fortuneQuery = connectionPool.clientConnection().fortuneQuery();
97+
return fortuneQuery.execute()
10598
.map(rows -> {
10699
List<Fortune> fortunes = new ArrayList<>(rows.size() + 1);
107100
for (Row r : rows) {
@@ -113,37 +106,4 @@ public List<Fortune> getFortunes() {
113106
throw new RuntimeException(e);
114107
}
115108
}
116-
117-
private List<World> updateWorlds(List<World> worlds, int count, SqlClient pool)
118-
throws ExecutionException, InterruptedException {
119-
int size = worlds.size();
120-
List<Integer> updateParams = new ArrayList<>(size * 2);
121-
for (World world : worlds) {
122-
updateParams.add(world.id);
123-
world.randomNumber = randomWorldNumber();
124-
updateParams.add(world.randomNumber);
125-
}
126-
return updateWorldSingleQuery[count - 1].execute(Tuple.wrap(updateParams))
127-
.toCompletionStage()
128-
.thenApply(rows -> worlds)
129-
.toCompletableFuture()
130-
.get();
131-
}
132-
133-
private static String singleUpdate(int count) {
134-
StringBuilder sql = new StringBuilder();
135-
sql.append("UPDATE WORLD SET RANDOMNUMBER = CASE ID");
136-
for (int i = 0; i < count; i++) {
137-
int k = i * 2 + 1;
138-
sql.append(" WHEN $").append(k).append(" THEN $").append(k + 1);
139-
}
140-
sql.append(" ELSE RANDOMNUMBER");
141-
sql.append(" END WHERE ID IN ($1");
142-
for (int i = 1; i < count; i++) {
143-
int k = i * 2 + 1;
144-
sql.append(",$").append(k);
145-
}
146-
sql.append(")");
147-
return sql.toString();
148-
}
149109
}

frameworks/Java/helidon/nima/src/main/java/io/helidon/benchmark/nima/services/FortuneHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,21 @@ public void handle(ServerRequest req, ServerResponse res) {
3333
res.header(SERVER);
3434
res.header(CONTENT_TYPE_HTML);
3535

36+
// render using template and get list of buffers
3637
List<Fortune> fortuneList = repository.getFortunes();
3738
fortuneList.add(ADDITIONAL_FORTUNE);
3839
Collections.sort(fortuneList);
3940
ArrayOfByteArraysOutput output = fortunes.template(fortuneList).render(ArrayOfByteArraysOutput.FACTORY);
4041
List<byte[]> entity = output.getArrays();
4142

43+
// compute entity length and set header
4244
int length = 0;
4345
for (byte[] bytes : entity) {
4446
length += bytes.length;
4547
}
4648
res.header(CONTENT_LENGTH, String.valueOf(length));
4749

50+
// write entity to output
4851
try (var out = res.outputStream()) {
4952
for (byte[] bytes : entity) {
5053
out.write(bytes);

frameworks/Java/helidon/nima/src/main/resources/application.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,5 @@ host: "tfb-database"
3636
db: "hello_world"
3737
username: benchmarkdbuser
3838
password: benchmarkdbpass
39-
sql-pool-size: 300
4039
db-repository: "pgclient" # "pgclient" (default) or "hikari"
4140

0 commit comments

Comments
 (0)