Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,23 +1,15 @@

package io.helidon.benchmark.nima.models;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import jakarta.json.Json;
import jakarta.json.JsonBuilderFactory;

public interface DbRepository {

JsonBuilderFactory JSON = Json.createBuilderFactory(Collections.emptyMap());

World getWorld(int id);

List<World> getWorlds(int count);

World updateWorld(World world);

List<World> updateWorlds(int count);

List<Fortune> getFortunes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,6 @@ public List<World> getWorlds(int count) {
}
}

@Override
public World updateWorld(World world) {
try (Connection c = getConnection()) {
return updateWorld(world, c);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public List<World> updateWorlds(int count) {
try (Connection c = getConnection()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;

import io.helidon.config.Config;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.Future;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.PoolOptions;
Expand All @@ -26,45 +22,40 @@

public class PgClientRepository implements DbRepository {
private static final Logger LOGGER = Logger.getLogger(PgClientRepository.class.getName());
private static final int UPDATE_QUERIES = 500;

private final SqlClient queryPool;
private final SqlClient updatePool;

private final int batchSize;
private final long updateTimeout;
private final int maxRetries;

private final PreparedQuery<RowSet<Row>> getFortuneQuery;
private final PreparedQuery<RowSet<Row>> getWorldQuery;
private final PreparedQuery<RowSet<Row>> updateWorldQuery;
private final PreparedQuery<RowSet<Row>>[] updateWorldSingleQuery;

@SuppressWarnings("unchecked")
public PgClientRepository(Config config) {
Vertx vertx = Vertx.vertx(new VertxOptions()
.setPreferNativeTransport(true));
Vertx vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(config.get("port").asInt().orElse(5432))
.setCachePreparedStatements(config.get("cache-prepared-statements").asBoolean().orElse(true))
.setHost(config.get("host").asString().orElse("tfb-database"))
.setDatabase(config.get("db").asString().orElse("hello_world"))
.setUser(config.get("username").asString().orElse("benchmarkdbuser"))
.setPassword(config.get("password").asString().orElse("benchmarkdbpass"));
.setPassword(config.get("password").asString().orElse("benchmarkdbpass"))
.setPipeliningLimit(100000);

int sqlPoolSize = config.get("sql-pool-size").asInt().orElse(64);
PoolOptions clientOptions = new PoolOptions().setMaxSize(sqlPoolSize);
LOGGER.info("sql-pool-size is " + sqlPoolSize);
batchSize = config.get("update-batch-size").asInt().orElse(20);
LOGGER.info("update-batch-size is " + batchSize);
updateTimeout = config.get("update-timeout-millis").asInt().orElse(5000);
LOGGER.info("update-timeout-millis is " + updateTimeout);
maxRetries = config.get("update-max-retries").asInt().orElse(3);
LOGGER.info("update-max-retries is " + maxRetries);

queryPool = PgPool.client(vertx, connectOptions, clientOptions);

SqlClient queryPool = PgPool.client(vertx, connectOptions, clientOptions);
updatePool = PgPool.client(vertx, connectOptions, clientOptions);

getWorldQuery = queryPool.preparedQuery("SELECT id, randomnumber FROM world WHERE id = $1");
updateWorldQuery = queryPool.preparedQuery("UPDATE world SET randomnumber = $1 WHERE id = $2");
getFortuneQuery = queryPool.preparedQuery("SELECT id, message FROM fortune");

updateWorldSingleQuery = new PreparedQuery[UPDATE_QUERIES];
for (int i = 0; i < UPDATE_QUERIES; i++) {
updateWorldSingleQuery[i] = queryPool.preparedQuery(singleUpdate(i + 1));
}
}

@Override
Expand Down Expand Up @@ -97,60 +88,11 @@ public List<World> getWorlds(int count) {
}
}

@Override
public World updateWorld(World world) {
try {
return updateWorldQuery.execute(Tuple.of(world.id, world.id))
.toCompletionStage()
.thenApply(rows -> world)
.toCompletableFuture().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public List<World> updateWorlds(int count) {
List<World> worlds = getWorlds(count);
if (batchSize > 1) { // batching updates
for (World w : worlds) {
w.randomNumber = randomWorldNumber();
}
if (count <= batchSize) {
LOGGER.finest(() -> "Updating single batch of size " + count);
updateWorldsRetry(worlds, 0, 0);
} else {
int batches = count / batchSize + (count % batchSize == 0 ? 0 : 1);
for (int i = 0; i < batches; i++) {
final int from = i * batchSize;
LOGGER.finest(() -> "Updating batch from " + from + " to " + (from + batchSize));
updateWorldsRetry(worlds, from, 0);
}
}
} else { // no batching for size 1
for (World w : worlds) {
w.randomNumber = randomWorldNumber();
updateWorld(w);
}
}
return worlds;
}

private List<World> updateWorldsRetry(List<World> worlds, int from, int retries) {
if (retries > maxRetries) {
throw new RuntimeException("Too many transaction retries");
}
CompletableFuture<List<World>> cf = null;
try {
cf = updateWorlds(worlds, from, updatePool);
cf.get(updateTimeout, TimeUnit.MILLISECONDS);
return worlds;
} catch (ExecutionException | TimeoutException e) {
cf.cancel(true);
retries++;
final int finalRetries = retries;
LOGGER.fine(() -> "Retrying batch update after cancellation (retries=" + finalRetries + ")");
return updateWorldsRetry(worlds, from, retries); // retry
return updateWorlds(worlds, count, updatePool);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -172,16 +114,36 @@ public List<Fortune> getFortunes() {
}
}

private CompletableFuture<List<World>> updateWorlds(List<World> worlds, int from, SqlClient pool) {
List<Tuple> tuples = new ArrayList<>();
int to = Math.min(from + batchSize, worlds.size());
for (int i = from; i < to; i++) {
World w = worlds.get(i);
tuples.add(Tuple.of(w.randomNumber, w.id));
private List<World> updateWorlds(List<World> worlds, int count, SqlClient pool)
throws ExecutionException, InterruptedException {
int size = worlds.size();
List<Integer> updateParams = new ArrayList<>(size * 2);
for (World world : worlds) {
updateParams.add(world.id);
world.randomNumber = randomWorldNumber();
updateParams.add(world.randomNumber);
}
return updateWorldQuery.executeBatch(tuples)
return updateWorldSingleQuery[count - 1].execute(Tuple.wrap(updateParams))
.toCompletionStage()
.thenApply(rows -> worlds)
.toCompletableFuture();
.toCompletableFuture()
.get();
}

private static String singleUpdate(int count) {
StringBuilder sql = new StringBuilder();
sql.append("UPDATE WORLD SET RANDOMNUMBER = CASE ID");
for (int i = 0; i < count; i++) {
int k = i * 2 + 1;
sql.append(" WHEN $").append(k).append(" THEN $").append(k + 1);
}
sql.append(" ELSE RANDOMNUMBER");
sql.append(" END WHERE ID IN ($1");
for (int i = 1; i < count; i++) {
int k = i * 2 + 1;
sql.append(",$").append(k);
}
sql.append(")");
return sql.toString();
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,13 @@

package io.helidon.benchmark.nima.models;

import java.util.Collections;

import jakarta.json.Json;
import jakarta.json.JsonBuilderFactory;
import jakarta.json.JsonObject;

public final class World {

static final String ID_KEY = "id";
static final String ID_RANDOM_NUMBER = "randomNumber";
static final JsonBuilderFactory JSON = Json.createBuilderFactory(Collections.emptyMap());

public int id;
public int randomNumber;

public World(int id, int randomNumber) {
this.id = id;
this.randomNumber = randomNumber;
}

public JsonObject toJson() {
return JSON.createObjectBuilder().add(ID_KEY, id).add(ID_RANDOM_NUMBER, randomNumber).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,3 @@ password: benchmarkdbpass
sql-pool-size: 300
db-repository: "pgclient" # "pgclient" (default) or "hikari"

# The following for pgclient only
update-batch-size: 4
update-timeout-millis: 10000
update-max-retries: 3
Loading