Skip to content

Commit 527c38d

Browse files
authored
jooby: upgrade 3.2.4 (#9156)
- get back threadlocal buffer for fortunes - implements vertx like sql updates
1 parent e1dcb73 commit 527c38d

File tree

5 files changed

+134
-37
lines changed

5 files changed

+134
-37
lines changed

frameworks/Java/jooby/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
<name>jooby</name>
1212

1313
<properties>
14-
<jooby.version>3.1.2</jooby.version>
15-
<netty.version>4.1.110.Final</netty.version>
14+
<jooby.version>3.2.4</jooby.version>
15+
<netty.version>4.1.111.Final</netty.version>
1616
<dsl-json.version>2.0.2</dsl-json.version>
1717
<postgresql.version>42.7.3</postgresql.version>
1818
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

frameworks/Java/jooby/src/main/java/com/techempower/App.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,8 @@ public class App extends Jooby {
2828

2929
private static final byte[] MESSAGE_BYTES = MESSAGE.getBytes(StandardCharsets.US_ASCII);
3030

31-
private static final ByteBuffer MESSAGE_BUFFER = (ByteBuffer) ByteBuffer
32-
.allocateDirect(MESSAGE_BYTES.length)
33-
.put(MESSAGE_BYTES)
34-
.flip();
35-
3631
{
37-
32+
var bufferFactory = getBufferFactory();
3833
/** Database: */
3934
install(new HikariModule());
4035
DataSource ds = require(DataSource.class);
@@ -43,7 +38,7 @@ public class App extends Jooby {
4338
install(new RockerModule());
4439

4540
get("/plaintext", ctx ->
46-
ctx.send(MESSAGE_BUFFER.duplicate())
41+
ctx.send(bufferFactory.wrap(MESSAGE_BYTES))
4742
);
4843

4944
get("/json", ctx -> ctx

frameworks/Java/jooby/src/main/java/com/techempower/PgClient.java

Lines changed: 61 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.techempower;
22

3+
import java.util.ArrayList;
4+
import java.util.Arrays;
35
import java.util.List;
46
import java.util.concurrent.ExecutionException;
57
import java.util.function.BiConsumer;
@@ -32,10 +34,13 @@ public class PgClient {
3234
private static final String SELECT_FORTUNE = "SELECT id, message from FORTUNE";
3335

3436
private static class DbConnection {
37+
private SqlClientInternal queries;
3538
private PreparedQuery<RowSet<Row>> SELECT_WORLD_QUERY;
3639
private PreparedQuery<RowSet<Row>> SELECT_FORTUNE_QUERY;
3740
private PreparedQuery<RowSet<Row>> UPDATE_WORLD_QUERY;
38-
private SqlClientInternal connection;
41+
private SqlClientInternal updates;
42+
@SuppressWarnings("unchecked")
43+
private PreparedQuery<RowSet<Row>>[] AGGREGATED_UPDATE_WORLD_QUERY = new PreparedQuery[128];
3944
}
4045

4146
private static class DbConnectionFactory extends ThreadLocal<DbConnection> {
@@ -64,20 +69,32 @@ private <T> Handler<AsyncResult<T>> onSuccess(Handler<T> handler) {
6469
.setWorkerPoolSize(1)
6570
.setInternalBlockingPoolSize(1)
6671
);
67-
var future = PgConnection.connect(vertx, options)
72+
var client1 = PgConnection.connect(vertx, options)
6873
.flatMap(conn -> {
69-
result.connection = (SqlClientInternal) conn;
74+
result.queries = (SqlClientInternal) conn;
7075
Future<PreparedStatement> f1 = conn.prepare(SELECT_WORLD)
7176
.andThen(onSuccess(ps -> result.SELECT_WORLD_QUERY = ps.query()));
7277
Future<PreparedStatement> f2 = conn.prepare(SELECT_FORTUNE)
7378
.andThen(onSuccess(ps -> result.SELECT_FORTUNE_QUERY = ps.query()));
74-
Future<PreparedStatement> f3 = conn.prepare(UPDATE_WORLD)
75-
.andThen(onSuccess(ps -> result.UPDATE_WORLD_QUERY = ps.query()));
76-
return Future.join(f1, f2, f3);
77-
})
78-
.toCompletionStage()
79-
.toCompletableFuture()
80-
.get();
79+
return Future.join(f1, f2);
80+
});
81+
82+
var client2 = PgConnection.connect(vertx, options)
83+
.flatMap(conn -> {
84+
result.updates = (SqlClientInternal) conn;
85+
List<Future<?>> list = new ArrayList<>();
86+
Future<PreparedStatement> f1 = conn.prepare(UPDATE_WORLD)
87+
.andThen(onSuccess(ps -> result.UPDATE_WORLD_QUERY = ps.query()));
88+
list.add(f1);
89+
for (int i = 0; i < result.AGGREGATED_UPDATE_WORLD_QUERY.length; i++) {
90+
int idx = i;
91+
list.add(conn
92+
.prepare(buildAggregatedUpdateQuery(1 + idx))
93+
.andThen(onSuccess(ps -> result.AGGREGATED_UPDATE_WORLD_QUERY[idx] = ps.query())));
94+
}
95+
return Future.join(list);
96+
});
97+
var future = Future.join(client1, client2).toCompletionStage().toCompletableFuture().get();
8198

8299
Throwable cause = future.cause();
83100
if (cause != null) {
@@ -91,6 +108,18 @@ private <T> Handler<AsyncResult<T>> onSuccess(Handler<T> handler) {
91108
throw SneakyThrows.propagate(ex.getCause());
92109
}
93110
}
111+
112+
private static String buildAggregatedUpdateQuery(int len) {
113+
StringBuilder sb = new StringBuilder();
114+
sb.append("UPDATE world SET randomNumber = update_data.randomNumber FROM (VALUES");
115+
char sep = ' ';
116+
for (int i = 1;i <= len;i++) {
117+
sb.append(sep).append("($").append(2 * i - 1).append("::int,$").append(2 * i).append("::int)");
118+
sep = ',';
119+
}
120+
sb.append(") AS update_data (id, randomNumber) WHERE world.id = update_data.id");
121+
return sb.toString();
122+
}
94123
}
95124

96125
private final ThreadLocal<DbConnection> sqlClient;
@@ -104,7 +133,7 @@ public void selectWorld(Tuple row, Handler<AsyncResult<RowSet<Row>>> handler) {
104133
}
105134

106135
public void selectWorlds(int queries, Handler<AsyncResult<RowSet<Row>>> handler) {
107-
this.sqlClient.get().connection.group(c -> {
136+
this.sqlClient.get().queries.group(c -> {
108137
for (int i = 0; i < queries; i++) {
109138
c.preparedQuery(SELECT_WORLD).execute(Tuple.of(Util.randomWorld()), handler);
110139
}
@@ -117,16 +146,34 @@ public void fortunes(Handler<AsyncResult<RowSet<Row>>> handler) {
117146

118147
public void selectWorldForUpdate(int queries,
119148
BiConsumer<Integer, PreparedQuery<RowSet<Row>>> consumer) {
120-
this.sqlClient.get().connection.group(c -> {
149+
this.sqlClient.get().queries.group(c -> {
121150
PreparedQuery<RowSet<Row>> statement = c.preparedQuery(SELECT_WORLD);
122151
for (int i = 0; i < queries; i++) {
123152
consumer.accept(i, statement);
124153
}
125154
});
126155
}
127156

128-
public void updateWorld(List<Tuple> batch, Handler<AsyncResult<RowSet<Row>>> handler) {
129-
this.sqlClient.get().UPDATE_WORLD_QUERY.executeBatch(batch, handler);
157+
public void updateWorld(World[] worlds, Handler<AsyncResult<RowSet<Row>>> handler) {
158+
Arrays.sort(worlds);
159+
int len = worlds.length;
160+
var connection = this.sqlClient.get();
161+
if (0 < len && len <= connection.AGGREGATED_UPDATE_WORLD_QUERY.length) {
162+
List<Object> arguments = new ArrayList<>();
163+
for (World world : worlds) {
164+
arguments.add(world.getId());
165+
arguments.add(world.getRandomNumber());
166+
}
167+
Tuple tuple = Tuple.tuple(arguments);
168+
PreparedQuery<RowSet<Row>> query = connection.AGGREGATED_UPDATE_WORLD_QUERY[len - 1];
169+
query.execute(tuple, handler);
170+
} else {
171+
List<Tuple> batch = new ArrayList<>();
172+
for (World world : worlds) {
173+
batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
174+
}
175+
connection.UPDATE_WORLD_QUERY.executeBatch(batch, handler);
176+
}
130177
}
131178

132179
private PgConnectOptions pgPoolOptions(Config config) {

frameworks/Java/jooby/src/main/java/com/techempower/ReactivePg.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,15 @@
77
import java.util.*;
88

99
import com.fizzed.rocker.RockerOutputFactory;
10-
import io.jooby.Context;
11-
import io.jooby.Jooby;
12-
import io.jooby.MediaType;
13-
import io.jooby.ServerOptions;
10+
import com.techempower.rocker.BufferRockerOutput;
11+
import io.jooby.*;
1412
import io.jooby.rocker.DataBufferOutput;
1513
import io.jooby.rocker.RockerModule;
1614
import io.vertx.sqlclient.Row;
1715
import io.vertx.sqlclient.RowIterator;
1816
import io.vertx.sqlclient.Tuple;
1917

2018
public class ReactivePg extends Jooby {
21-
2219
{
2320
/** Reduce the number of resources due we do reactive processing. */
2421
setServerOptions(
@@ -84,14 +81,7 @@ public class ReactivePg extends Jooby {
8481
selectCallback.result().iterator().next().getInteger(0),
8582
randomWorld());
8683
if (index == queries - 1) {
87-
// Sort results... avoid dead locks
88-
Arrays.sort(result);
89-
List<Tuple> batch = new ArrayList<>(queries);
90-
for (World world : result) {
91-
batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
92-
}
93-
94-
client.updateWorld(batch, updateCallback -> {
84+
client.updateWorld(result, updateCallback -> {
9585
if (updateCallback.failed()) {
9686
sendError(ctx, updateCallback.cause());
9787
} else {
@@ -106,7 +96,7 @@ public class ReactivePg extends Jooby {
10696
}).setNonBlocking(true);
10797

10898
/** Fortunes: */
109-
RockerOutputFactory<DataBufferOutput> factory = require(RockerOutputFactory.class);
99+
var factory = BufferRockerOutput.factory();
110100
get("/fortunes", ctx -> {
111101
client.fortunes(rsp -> {
112102
if (rsp.succeeded()) {
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package com.techempower.rocker;
2+
3+
import com.fizzed.rocker.ContentType;
4+
import com.fizzed.rocker.RockerOutput;
5+
import com.fizzed.rocker.RockerOutputFactory;
6+
7+
import java.io.IOException;
8+
import java.nio.Buffer;
9+
import java.nio.ByteBuffer;
10+
import java.nio.charset.Charset;
11+
import java.nio.charset.StandardCharsets;
12+
13+
public class BufferRockerOutput implements RockerOutput<BufferRockerOutput> {
14+
private final ByteBuffer buffer;
15+
16+
public BufferRockerOutput(ByteBuffer buffer) {
17+
this.buffer = buffer;
18+
}
19+
20+
@Override
21+
public ContentType getContentType() {
22+
return ContentType.RAW;
23+
}
24+
25+
@Override
26+
public Charset getCharset() {
27+
return StandardCharsets.UTF_8;
28+
}
29+
30+
@Override
31+
public BufferRockerOutput w(String string) throws IOException {
32+
buffer.put(string.getBytes(getCharset()));
33+
return this;
34+
}
35+
36+
@Override
37+
public BufferRockerOutput w(byte[] bytes) throws IOException {
38+
buffer.put(bytes);
39+
return this;
40+
}
41+
42+
@Override
43+
public int getByteLength() {
44+
return buffer.remaining();
45+
}
46+
47+
public ByteBuffer toBuffer() {
48+
return buffer.flip();
49+
}
50+
51+
public static RockerOutputFactory<BufferRockerOutput> factory() {
52+
var cache = new ThreadLocal<BufferRockerOutput>() {
53+
@Override
54+
protected BufferRockerOutput initialValue() {
55+
return new BufferRockerOutput(ByteBuffer.allocateDirect(2048));
56+
}
57+
};
58+
return (contentType, charsetName) -> cache.get().reset();
59+
}
60+
61+
private BufferRockerOutput reset() {
62+
buffer.clear();
63+
return this;
64+
}
65+
}

0 commit comments

Comments
 (0)