diff --git a/frameworks/Java/jooby/conf/application.vertx.conf b/frameworks/Java/jooby/conf/application.vertx.conf new file mode 100644 index 00000000000..d3a286c4b71 --- /dev/null +++ b/frameworks/Java/jooby/conf/application.vertx.conf @@ -0,0 +1,6 @@ +# Add/Override some properties to matches Vertx pg Driver properties +db.database = ${db.databaseName} +db.host = ${db.serverName} +db.cachePreparedStatements = true +db.preparedStatementCacheMaxSize = 1024 +db.pipeliningLimit = 256 diff --git a/frameworks/Java/jooby/jooby-pgclient.dockerfile b/frameworks/Java/jooby/jooby-pgclient.dockerfile index 77eb7d3bea5..e5917ae2af5 100644 --- a/frameworks/Java/jooby/jooby-pgclient.dockerfile +++ b/frameworks/Java/jooby/jooby-pgclient.dockerfile @@ -4,8 +4,8 @@ COPY pom.xml pom.xml COPY src src COPY public public COPY conf conf -RUN mvn package -q -P netty +RUN mvn package -q -P vertx EXPOSE 8080 -CMD ["java", "-server", "-Xms2g", "-Xmx2g", "-XX:+UseNUMA", "-XX:+UseParallelGC", "--enable-native-access=ALL-UNNAMED", "--add-opens=java.base/java.lang=ALL-UNNAMED", "--sun-misc-unsafe-memory-access=allow", "-Dio.netty.disableHttpHeadersValidation=true", "-Dio.netty.buffer.checkBounds=false", "-Dio.netty.buffer.checkAccessible=false", "-Dio.netty.noUnsafe=false", "-Dio.netty.eventLoopGroup=single", "-cp", "target/jooby.jar", "com.techempower.ReactivePg"] +CMD ["java", "-server", "-Xms2g", "-Xmx2g", "-XX:+UseNUMA", "-XX:+UseParallelGC", "--enable-native-access=ALL-UNNAMED", "--add-opens=java.base/java.lang=ALL-UNNAMED", "--sun-misc-unsafe-memory-access=allow", "-Dio.netty.disableHttpHeadersValidation=true", "-Dio.netty.buffer.checkBounds=false", "-Dio.netty.buffer.checkAccessible=false", "-Dio.netty.noUnsafe=false", "-Dio.netty.eventLoopGroup=single", "-Djava.lang.Integer.IntegerCache.high=10000", "-Dvertx.disableMetrics=true", "-Dvertx.disableContextTimings=true", "-cp", "target/jooby.jar", "com.techempower.ReactivePg", "vertx"] diff --git a/frameworks/Java/jooby/pom.xml b/frameworks/Java/jooby/pom.xml index 20c192196cf..1c86733ddc9 100644 --- a/frameworks/Java/jooby/pom.xml +++ b/frameworks/Java/jooby/pom.xml @@ -11,7 +11,7 @@ jooby - 4.0.7 + 4.0.9 2.0.2 42.7.7 UTF-8 @@ -44,10 +44,11 @@ ${postgresql.version} + - io.vertx - vertx-pg-client - 5.0.1 + io.jooby + jooby-vertx-pg-client + ${jooby.version} @@ -205,6 +206,17 @@ + + + vertx + + + io.jooby + jooby-vertx + ${jooby.version} + + + diff --git a/frameworks/Java/jooby/src/main/java/com/techempower/App.java b/frameworks/Java/jooby/src/main/java/com/techempower/App.java index bd43f712c5c..fbc4bdb8164 100644 --- a/frameworks/Java/jooby/src/main/java/com/techempower/App.java +++ b/frameworks/Java/jooby/src/main/java/com/techempower/App.java @@ -27,7 +27,7 @@ public class App extends Jooby { private static final String MESSAGE = "Hello, World!"; - private static final byte[] MESSAGE_BYTES = MESSAGE.getBytes(StandardCharsets.US_ASCII); + private static final byte[] MESSAGE_BYTES = MESSAGE.getBytes(StandardCharsets.UTF_8); { /** Database: */ diff --git a/frameworks/Java/jooby/src/main/java/com/techempower/PgClient.java b/frameworks/Java/jooby/src/main/java/com/techempower/PgClient.java deleted file mode 100644 index 3bb393c8a3d..00000000000 --- a/frameworks/Java/jooby/src/main/java/com/techempower/PgClient.java +++ /dev/null @@ -1,201 +0,0 @@ -package com.techempower; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.function.BiConsumer; - -import com.typesafe.config.Config; -import io.jooby.SneakyThrows; -import io.vertx.core.AsyncResult; -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.Vertx; -import io.vertx.core.VertxOptions; -import io.vertx.pgclient.PgConnectOptions; -import io.vertx.pgclient.PgConnection; -import io.vertx.sqlclient.PreparedQuery; -import io.vertx.sqlclient.PreparedStatement; -import io.vertx.sqlclient.Row; -import io.vertx.sqlclient.RowSet; -import io.vertx.sqlclient.Tuple; -import io.vertx.sqlclient.impl.SqlClientInternal; - -public class PgClient { - - static { - // Should be all I/O processing for SQL responses - System.setProperty("vertx.nettyIORatio", "100"); - } - - private static final String UPDATE_WORLD = "UPDATE world SET randomnumber=$1 WHERE id=$2"; - private static final String SELECT_WORLD = "SELECT id, randomnumber from WORLD where id=$1"; - private static final String SELECT_FORTUNE = "SELECT id, message from FORTUNE"; - - private static class DbConnection { - private SqlClientInternal queries; - private PreparedQuery> SELECT_WORLD_QUERY; - private PreparedQuery> SELECT_FORTUNE_QUERY; - private PreparedQuery> UPDATE_WORLD_QUERY; - private SqlClientInternal updates; - @SuppressWarnings("unchecked") - private PreparedQuery>[] AGGREGATED_UPDATE_WORLD_QUERY = new PreparedQuery[500]; - } - - public static class PgUpdate { - private DbConnection connection; - - public PgUpdate(DbConnection connection) { - this.connection = connection; - } - - public void selectWorldForUpdate(int queries, BiConsumer>> consumer) { - connection.queries.group(c -> { - PreparedQuery> statement = c.preparedQuery(SELECT_WORLD); - for (int i = 0; i < queries; i++) { - consumer.accept(i, statement); - } - }); - } - - public void updateWorld(World[] worlds, Handler>> handler) { - Arrays.sort(worlds); - int len = worlds.length; - if (0 < len && len <= connection.AGGREGATED_UPDATE_WORLD_QUERY.length) { - List arguments = new ArrayList<>(); - for (World world : worlds) { - arguments.add(world.getId()); - arguments.add(world.getRandomNumber()); - } - Tuple tuple = Tuple.tuple(arguments); - PreparedQuery> query = connection.AGGREGATED_UPDATE_WORLD_QUERY[len - 1]; - query.execute(tuple).onComplete(handler); - } else { - List batch = new ArrayList<>(); - for (World world : worlds) { - batch.add(Tuple.of(world.getRandomNumber(), world.getId())); - } - connection.UPDATE_WORLD_QUERY.executeBatch(batch).onComplete(handler); - } - } - } - - private static class DbConnectionFactory extends ThreadLocal { - - private final PgConnectOptions options; - - public DbConnectionFactory(PgConnectOptions options) { - this.options = options; - } - - private Handler> onSuccess(Handler handler) { - return ar -> { - if (ar.succeeded()) { - handler.handle(ar.result()); - } - }; - } - - @Override protected DbConnection initialValue() { - try { - DbConnection result = new DbConnection(); - Vertx vertx = Vertx.vertx( - new VertxOptions() - .setPreferNativeTransport(true) - .setEventLoopPoolSize(1) - .setWorkerPoolSize(1) - .setInternalBlockingPoolSize(1) - ); - var client1 = PgConnection.connect(vertx, options) - .flatMap(conn -> { - result.queries = (SqlClientInternal) conn; - Future f1 = conn.prepare(SELECT_WORLD) - .andThen(onSuccess(ps -> result.SELECT_WORLD_QUERY = ps.query())); - Future f2 = conn.prepare(SELECT_FORTUNE) - .andThen(onSuccess(ps -> result.SELECT_FORTUNE_QUERY = ps.query())); - return Future.join(f1, f2); - }); - - var client2 = PgConnection.connect(vertx, options) - .flatMap(conn -> { - result.updates = (SqlClientInternal) conn; - List> list = new ArrayList<>(); - Future f1 = conn.prepare(UPDATE_WORLD) - .andThen(onSuccess(ps -> result.UPDATE_WORLD_QUERY = ps.query())); - list.add(f1); - for (int i = 0; i < result.AGGREGATED_UPDATE_WORLD_QUERY.length; i++) { - int idx = i; - list.add(conn - .prepare(buildAggregatedUpdateQuery(1 + idx)) - .andThen(onSuccess(ps -> result.AGGREGATED_UPDATE_WORLD_QUERY[idx] = ps.query()))); - } - return Future.join(list); - }); - var future = Future.join(client1, client2).toCompletionStage().toCompletableFuture().get(); - - Throwable cause = future.cause(); - if (cause != null) { - throw SneakyThrows.propagate(cause); - } - return result; - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw SneakyThrows.propagate(ex); - } catch (ExecutionException ex) { - throw SneakyThrows.propagate(ex.getCause()); - } - } - - private static String buildAggregatedUpdateQuery(int len) { - StringBuilder sb = new StringBuilder(); - sb.append("UPDATE world SET randomNumber = update_data.randomNumber FROM (VALUES"); - char sep = ' '; - for (int i = 1;i <= len;i++) { - sb.append(sep).append("($").append(2 * i - 1).append("::int,$").append(2 * i).append("::int)"); - sep = ','; - } - sb.append(") AS update_data (id, randomNumber) WHERE world.id = update_data.id"); - return sb.toString(); - } - } - - private final ThreadLocal sqlClient; - - public PgClient(Config config) { - this.sqlClient = new DbConnectionFactory(pgPoolOptions(config)); - } - - public void selectWorld(Tuple row, Handler>> handler) { - this.sqlClient.get().SELECT_WORLD_QUERY.execute(row).onComplete(handler); - } - - public void selectWorlds(int queries, Handler>> handler) { - this.sqlClient.get().queries.group(c -> { - for (int i = 0; i < queries; i++) { - c.preparedQuery(SELECT_WORLD).execute(Tuple.of(Util.boxedRandomWorld())).onComplete(handler); - } - }); - } - - public void fortunes(Handler>> handler) { - this.sqlClient.get().SELECT_FORTUNE_QUERY.execute().onComplete(handler); - } - - public PgUpdate updater() { - return new PgUpdate(sqlClient.get()); - } - - private PgConnectOptions pgPoolOptions(Config config) { - PgConnectOptions options = new PgConnectOptions(); - options.setDatabase(config.getString("databaseName")); - options.setHost(config.getString("serverName")); - options.setPort(config.getInt("portNumber")); - options.setUser(config.getString("user")); - options.setPassword(config.getString("password")); - options.setCachePreparedStatements(true); - // Large pipelining means less flushing and we use a single connection anyway - options.setPipeliningLimit(100_000); - return options; - } -} diff --git a/frameworks/Java/jooby/src/main/java/com/techempower/ReactivePg.java b/frameworks/Java/jooby/src/main/java/com/techempower/ReactivePg.java index c14e45e317f..385c7180539 100644 --- a/frameworks/Java/jooby/src/main/java/com/techempower/ReactivePg.java +++ b/frameworks/Java/jooby/src/main/java/com/techempower/ReactivePg.java @@ -1,125 +1,188 @@ package com.techempower; -import static com.techempower.Util.boxedRandomWorld; -import static com.techempower.Util.randomWorld; import static io.jooby.ExecutionMode.EVENT_LOOP; import static io.jooby.MediaType.JSON; +import static io.jooby.Reified.getParameterized; +import static java.util.stream.IntStream.range; import java.util.*; +import java.util.function.Consumer; import io.jooby.*; import io.jooby.rocker.RockerModule; -import io.vertx.sqlclient.Row; -import io.vertx.sqlclient.RowIterator; -import io.vertx.sqlclient.Tuple; +import io.jooby.vertx.pgclient.VertxPgConnectionModule; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.sqlclient.*; +import io.vertx.sqlclient.impl.SqlClientInternal; public class ReactivePg extends Jooby { - { - /** PG client: */ - PgClient client = new PgClient(getConfig().getConfig("db")); - - /** Template engine: */ - install(new RockerModule()); - Json.configure(getOutputFactory()); - - /** Single query: */ - get("/db", ctx -> { - client.selectWorld(Tuple.of(boxedRandomWorld()), rsp -> { - if (rsp.succeeded()) { - RowIterator rs = rsp.result().iterator(); - Row row = rs.next(); - ctx.setResponseType(JSON) - .send(Json.encode(new World(row.getInteger(0), row.getInteger(1)))); - } else { - ctx.sendError(rsp.cause()); + private final PreparedQuery> selectWorldQuery; + private final PreparedQuery> selectFortuneQuery; + private final List>> updateWorldQuery; + private final SqlClientInternal sqlClient; + + { + /** PG client: */ + install(new VertxPgConnectionModule().prepare(statements())); + + /** Template engine: */ + install(new RockerModule()); + Json.configure(getOutputFactory()); + + this.selectWorldQuery = require(PreparedQueryType, "selectWorld"); + this.selectFortuneQuery = require(PreparedQueryType, "selectFortune"); + this.updateWorldQuery = require(PreparedQueryTypeList, "updateWorld"); + this.sqlClient = require(SqlClientInternal.class); + + /** Single query: */ + get("/db", ctx -> { + selectWorldQuery + .execute(Tuple.of(Util.boxedRandomWorld())) + .onComplete( + rsp -> { + if (rsp.succeeded()) { + var rs = rsp.result().iterator(); + var row = rs.next(); + ctx.setResponseType(JSON) + .send(Json.encode(new World(row.getInteger(0), row.getInteger(1)))); + } else { + ctx.sendError(rsp.cause()); + } + }); + return ctx; + }); + + /** Multiple queries: */ + get("/queries", ctx -> { + int queries = Util.queries(ctx); + selectWorlds(ctx, queries, result -> ctx.setResponseType(JSON).send(Json.encode(result))); + return ctx; + }); + + /** Update queries: */ + get("/updates", ctx -> { + int queries = Util.queries(ctx); + selectWorlds( + ctx, + queries, + result -> { + updateWorld( + result, + ar -> { + if (ar.failed()) { + sendError(ctx, ar.cause()); + } else { + ctx.setResponseType(JSON).send(Json.encode(result)); + } + }); + }); + return ctx; + }); + + /** Fortunes: */ + get("/fortunes", ctx -> { + selectFortuneQuery.execute() + .onComplete(rsp -> { + if (rsp.succeeded()) { + RowIterator rs = rsp.result().iterator(); + List fortunes = new ArrayList<>(); + + while (rs.hasNext()) { + Row row = rs.next(); + fortunes.add(new Fortune(row.getInteger(0), row.getString(1))); + } + + fortunes.add(new Fortune(0, "Additional fortune added at request time.")); + Collections.sort(fortunes); + + /** render view: */ + ctx.setResponseType(MediaType.html) + .render(views.fortunes.template(fortunes)); + } else { + sendError(ctx, rsp.cause()); + } + }); + return ctx; + }); + } + + private void selectWorlds(Context ctx, int queries, Consumer> consumer) { + sqlClient.group( + client -> { + var statement = client.preparedQuery(SELECT_WORLD); + List worlds = new ArrayList<>(queries); + for (int i = 0; i < queries; i++) { + statement + .execute(Tuple.of(Util.boxedRandomWorld())) + .map(rs -> new World(rs.iterator().next().getInteger(0), Util.boxedRandomWorld())) + .onComplete( + ar -> { + if (ar.succeeded()) { + worlds.add(ar.result()); + if (worlds.size() == queries) { + consumer.accept(worlds); + } + } else { + sendError(ctx, ar.cause()); + } + }); + } + }); + } + + private void updateWorld(List worlds, Handler>> handler) { + Collections.sort(worlds); + int len = worlds.size(); + List arguments = new ArrayList<>(); + for (var world : worlds) { + arguments.add(world.getId()); + arguments.add(world.getRandomNumber()); } - }); - return ctx; - }); - - /** Multiple queries: */ - get("/queries", ctx -> { - int queries = Util.queries(ctx); - var result = new ArrayList(queries); - client.selectWorlds(queries, rsp -> { - if (rsp.succeeded()) { - RowIterator rs = rsp.result().iterator(); - Row row = rs.next(); - result.add(new World(row.getInteger(0), row.getInteger(1))); - } else { - sendError(ctx, rsp.cause()); + updateWorldQuery.get(len - 1).execute(Tuple.wrap(arguments)).onComplete(handler); + } + + private void sendError(Context ctx, Throwable cause) { + if (!ctx.isResponseStarted()) { + ctx.sendError(cause); } - // ready? - if (result.size() == queries) { - ctx.setResponseType(JSON) - .send(Json.encode(result)); + } + + private Map> statements() { + return Map.of( + "selectWorld", List.of(SELECT_WORLD), + "selectFortune", List.of(SELECT_FORTUNE), + "updateWorld", + range(0, 500).map(i -> i + 1).mapToObj(this::buildAggregatedUpdateQuery).toList()); + } + + private String buildAggregatedUpdateQuery(int len) { + var sql = new StringBuilder(); + sql.append("UPDATE WORLD SET RANDOMNUMBER = CASE ID"); + for (int i = 0; i < len; i++) { + int offset = (i * 2) + 1; + sql.append(" WHEN $").append(offset).append(" THEN $").append(offset + 1); } - }); - return ctx; - }); - - /** Update queries: */ - get("/updates", ctx -> { - int queries = Util.queries(ctx); - World[] result = new World[queries]; - var updater = client.updater(); - updater.selectWorldForUpdate(queries, (index, statement) -> { - statement.execute(Tuple.of(boxedRandomWorld())).onComplete(rsp -> { - if (rsp.failed()) { - sendError(ctx, rsp.cause()); - return; - } - result[index] = new World( - rsp.result().iterator().next().getInteger(0), - boxedRandomWorld()); - if (index == queries - 1) { - updater.updateWorld(result, updateCallback -> { - if (updateCallback.failed()) { - sendError(ctx, updateCallback.cause()); - } else { - ctx.setResponseType(JSON) - .send(Json.encode(result)); - } - }); - } - }); - }); - return ctx; - }); - - /** Fortunes: */ - get("/fortunes", ctx -> { - client.fortunes(rsp -> { - if (rsp.succeeded()) { - RowIterator rs = rsp.result().iterator(); - List fortunes = new ArrayList<>(); - - while (rs.hasNext()) { - Row row = rs.next(); - fortunes.add(new Fortune(row.getInteger(0), row.getString(1))); - } - - fortunes.add(new Fortune(0, "Additional fortune added at request time.")); - Collections.sort(fortunes); - - /** render view: */ - ctx.setResponseType(MediaType.html) - .render(views.fortunes.template(fortunes)); - } else { - sendError(ctx, rsp.cause()); + sql.append(" ELSE RANDOMNUMBER"); + sql.append(" END WHERE ID IN ($1"); + for (int i = 1; i < len; i++) { + int offset = (i * 2) + 1; + sql.append(",$").append(offset); } - }); - return ctx; - }); - } - - private void sendError(Context ctx, Throwable cause) { - if (!ctx.isResponseStarted()) { - ctx.sendError(cause); + sql.append(")"); + return sql.toString(); } - } - public static void main(String[] args) { - runApp(args, EVENT_LOOP, ReactivePg::new); - } + private static final String SELECT_WORLD = "SELECT id, randomnumber from WORLD where id=$1"; + private static final String SELECT_FORTUNE = "SELECT id, message from FORTUNE"; + + private static final Reified>> PreparedQueryType = + getParameterized(PreparedQuery.class, getParameterized(RowSet.class, Row.class)); + + private static final Reified>>> PreparedQueryTypeList = + Reified.list(PreparedQueryType); + + public static void main(String[] args) { + runApp(args, EVENT_LOOP, ReactivePg::new); + } } diff --git a/frameworks/Kotlin/kooby/pom.xml b/frameworks/Kotlin/kooby/pom.xml index 4e962e35034..a27aaaa2a03 100644 --- a/frameworks/Kotlin/kooby/pom.xml +++ b/frameworks/Kotlin/kooby/pom.xml @@ -12,7 +12,7 @@ kooby: jooby+kotlin - 4.0.7 + 4.0.9 42.7.7 UTF-8 24