28
28
import io .inverno .mod .http .base .Parameter ;
29
29
import io .inverno .mod .http .base .Status ;
30
30
import io .inverno .mod .http .server .Exchange ;
31
- import io .inverno .mod .http .server .ExchangeHandler ;
31
+ import io .inverno .mod .http .server .ExchangeContext ;
32
+ import io .inverno .mod .http .server .RootExchangeHandler ;
32
33
import io .inverno .mod .sql .SqlClient ;
34
+ import io .inverno .mod .sql .UnsafeSqlOperations ;
33
35
import io .netty .buffer .ByteBuf ;
34
36
import io .netty .buffer .Unpooled ;
35
37
import io .netty .channel .EventLoopGroup ;
40
42
import reactor .core .publisher .Mono ;
41
43
42
44
@ Bean ( visibility = Visibility .PRIVATE )
43
- public class Handler implements ExchangeHandler < Exchange > {
45
+ public class Handler implements RootExchangeHandler < ExchangeContext , Exchange < ExchangeContext > > {
44
46
45
47
private static final String PATH_PLAINTEXT = "/plaintext" ;
46
48
private static final String PATH_JSON = "/json" ;
@@ -49,26 +51,27 @@ public class Handler implements ExchangeHandler<Exchange> {
49
51
private static final String PATH_UPDATES = "/updates" ;
50
52
private static final String PATH_FORTUNES = "/fortunes" ;
51
53
54
+ public static final String DB_SELECT_WORLD = "SELECT id, randomnumber from WORLD where id = $1" ;
55
+ public static final String DB_UPDATE_WORLD = "UPDATE world SET randomnumber=$1 WHERE id=$2" ;
56
+ public static final String DB_SELECT_FORTUNE = "SELECT id, message from FORTUNE" ;
57
+
52
58
private static final CharSequence STATIC_SERVER = AsciiString .cached ("inverno" );
53
59
54
60
private final Reactor reactor ;
55
61
private final ObjectMapper mapper ;
56
- private final ReactorScope <SqlClient > pooledClientSqlClient ;
57
- private final ReactorScope <SqlClient > poolSqlClient ;
62
+ private final ReactorScope <Mono <SqlClient >> sqlClient ;
58
63
59
64
private EventLoopGroup dateEventLoopGroup ;
60
65
61
66
private CharSequence date ;
62
67
63
68
public Handler (Reactor reactor ,
64
69
ObjectMapper mapper ,
65
- ReactorScope <SqlClient > pooledClientSqlClient ,
66
- ReactorScope <SqlClient > poolSqlClient
70
+ ReactorScope <Mono <SqlClient >> sqlClient
67
71
) {
68
72
this .reactor = reactor ;
69
73
this .mapper = mapper ;
70
- this .pooledClientSqlClient = pooledClientSqlClient ;
71
- this .poolSqlClient = poolSqlClient ;
74
+ this .sqlClient = sqlClient ;
72
75
}
73
76
74
77
@ Init
@@ -85,7 +88,7 @@ public void destroy() {
85
88
}
86
89
87
90
@ Override
88
- public void handle (Exchange exchange ) throws HttpException {
91
+ public void handle (Exchange < ExchangeContext > exchange ) throws HttpException {
89
92
switch (exchange .request ().getPath ()) {
90
93
case PATH_PLAINTEXT : {
91
94
this .handle_plaintext (exchange );
@@ -145,7 +148,7 @@ public ByteBuf get() {
145
148
146
149
private static final Mono <ByteBuf > PLAIN_TEXT_MONO = Mono .fromSupplier (new PlaintextSupplier ());
147
150
148
- public void handle_plaintext (Exchange exchange ) throws HttpException {
151
+ public void handle_plaintext (Exchange < ExchangeContext > exchange ) throws HttpException {
149
152
exchange .response ()
150
153
.headers (h -> h
151
154
.add (HttpHeaderNames .SERVER , STATIC_SERVER )
@@ -158,7 +161,7 @@ public void handle_plaintext(Exchange exchange) throws HttpException {
158
161
.stream (PLAIN_TEXT_MONO );
159
162
}
160
163
161
- public void handle_json (Exchange exchange ) throws HttpException {
164
+ public void handle_json (Exchange < ExchangeContext > exchange ) throws HttpException {
162
165
try {
163
166
exchange .response ()
164
167
.headers (h -> h
@@ -175,21 +178,20 @@ public void handle_json(Exchange exchange) throws HttpException {
175
178
}
176
179
}
177
180
178
- private static final String DB_SELECT_WORLD = "SELECT id, randomnumber from WORLD where id = $1" ;
179
-
180
181
private static int randomWorldId () {
181
182
return 1 + ThreadLocalRandom .current ().nextInt (10000 );
182
183
}
183
184
184
- public void handle_db (Exchange exchange ) throws HttpException {
185
+ public void handle_db (Exchange < ExchangeContext > exchange ) throws HttpException {
185
186
exchange .response ()
186
187
.headers (h -> h
187
188
.add (HttpHeaderNames .SERVER , STATIC_SERVER )
188
189
.add (HttpHeaderNames .DATE , this .date )
189
190
.add (HttpHeaderNames .CONTENT_TYPE , HttpHeaderValues .APPLICATION_JSON )
190
191
)
191
192
.body ()
192
- .raw ().stream (this .pooledClientSqlClient .get ().queryForObject (
193
+ .raw ().stream (this .sqlClient .get ().flatMap (client ->
194
+ client .queryForObject (
193
195
DB_SELECT_WORLD ,
194
196
row -> {
195
197
try {
@@ -201,21 +203,21 @@ public void handle_db(Exchange exchange) throws HttpException {
201
203
},
202
204
randomWorldId ()
203
205
)
204
- );
206
+ )) ;
205
207
}
206
208
207
209
private static final String PARAMETER_QUERIES = "queries" ;
208
210
209
- private int extractQueriesParameter (Exchange exchange ) {
211
+ private int extractQueriesParameter (Exchange < ExchangeContext > exchange ) {
210
212
try {
211
213
return Math .min (500 , Math .max (1 , exchange .request ().queryParameters ().get (PARAMETER_QUERIES ).map (Parameter ::asInteger ).orElse (1 )));
212
214
}
213
- catch (ConverterException e ) { // TODO
215
+ catch (ConverterException e ) {
214
216
return 1 ;
215
217
}
216
218
}
217
219
218
- public void handle_queries (Exchange exchange ) throws HttpException {
220
+ public void handle_queries (Exchange < ExchangeContext > exchange ) throws HttpException {
219
221
int queries = this .extractQueriesParameter (exchange );
220
222
exchange .response ()
221
223
.headers (h -> h
@@ -224,12 +226,17 @@ public void handle_queries(Exchange exchange) throws HttpException {
224
226
.add (HttpHeaderNames .CONTENT_TYPE , HttpHeaderValues .APPLICATION_JSON )
225
227
)
226
228
.body ()
227
- .raw ().stream (Flux .range (0 , queries )
228
- .flatMap (ign -> this .pooledClientSqlClient .get ().queryForObject (
229
- DB_SELECT_WORLD ,
230
- row -> new World (row .getInteger (0 ), row .getInteger (1 )),
231
- randomWorldId ()
232
- ))
229
+ .raw ().stream (this .sqlClient .get ()
230
+ .flatMapMany (client -> ((UnsafeSqlOperations )client )
231
+ .batchQueries (ops ->
232
+ Flux .range (0 , queries )
233
+ .map (ign -> ops .queryForObject (
234
+ DB_SELECT_WORLD ,
235
+ row -> new World (row .getInteger (0 ), row .getInteger (1 )),
236
+ randomWorldId ()
237
+ ))
238
+ )
239
+ )
233
240
.collectList ()
234
241
.map (worlds -> {
235
242
try {
@@ -242,27 +249,28 @@ public void handle_queries(Exchange exchange) throws HttpException {
242
249
);
243
250
}
244
251
245
- private static final String DB_UPDATE_WORLD = "UPDATE world SET randomnumber=$1 WHERE id=$2" ;
246
-
247
- public void handle_updates (Exchange exchange ) throws HttpException {
252
+ public void handle_updates (Exchange <ExchangeContext > exchange ) throws HttpException {
248
253
int queries = this .extractQueriesParameter (exchange );
249
254
250
255
exchange .response ()
251
- .headers (h -> h
252
- .add (HttpHeaderNames .SERVER , STATIC_SERVER )
253
- .add (HttpHeaderNames .DATE , this .date )
254
- .add (HttpHeaderNames .CONTENT_TYPE , HttpHeaderValues .APPLICATION_JSON )
255
- )
256
- .body ()
257
- .raw ().stream (this .poolSqlClient .get ().connection (ops -> Flux .range (0 , queries )
258
- .flatMap (ign -> ops .queryForObject (
259
- DB_SELECT_WORLD ,
260
- row -> new World (row .getInteger (0 ), randomWorldId ()),
261
- randomWorldId ()
262
- )
263
- )
256
+ .headers (h -> h
257
+ .add (HttpHeaderNames .SERVER , STATIC_SERVER )
258
+ .add (HttpHeaderNames .DATE , this .date )
259
+ .add (HttpHeaderNames .CONTENT_TYPE , HttpHeaderValues .APPLICATION_JSON )
260
+ )
261
+ .body ()
262
+ .raw ().stream (this .sqlClient .get ()
263
+ .flatMapMany (client -> Flux .from (((UnsafeSqlOperations )client )
264
+ .batchQueries (ops ->
265
+ Flux .range (0 , queries )
266
+ .map (ign -> ops .queryForObject (
267
+ DB_SELECT_WORLD ,
268
+ row -> new World (row .getInteger (0 ), randomWorldId ()),
269
+ randomWorldId ()
270
+ ))
271
+ ))
264
272
.collectSortedList ()
265
- .delayUntil (worlds -> ops .batchUpdate (
273
+ .delayUntil (worlds -> client .batchUpdate (
266
274
DB_UPDATE_WORLD ,
267
275
worlds .stream ().map (world -> new Object [] { world .getRandomNumber (), world .getId () })
268
276
)
@@ -274,27 +282,29 @@ public void handle_updates(Exchange exchange) throws HttpException {
274
282
catch (JsonProcessingException e ) {
275
283
throw new InternalServerErrorException (e );
276
284
}
277
- }))
278
- );
285
+ })
286
+ )
287
+ );
279
288
}
280
289
281
- private static final String DB_SELECT_FORTUNE = "SELECT id, message from FORTUNE" ;
282
290
private static final CharSequence MEDIA_TEXT_HTML_UTF8 = AsciiString .cached ("text/html; charset=utf-8" );
283
291
284
292
private static final FortunesTemplate .Renderer <CompletableFuture <ByteBuf >> FORTUNES_RENDERER = FortunesTemplate .bytebuf (() -> Unpooled .unreleasableBuffer (Unpooled .buffer ()));
285
293
286
- public void handle_fortunes (Exchange exchange ) throws HttpException {
294
+ public void handle_fortunes (Exchange < ExchangeContext > exchange ) throws HttpException {
287
295
exchange .response ()
288
296
.headers (h -> h
289
297
.add (HttpHeaderNames .SERVER , STATIC_SERVER )
290
298
.add (HttpHeaderNames .DATE , this .date )
291
299
.add (HttpHeaderNames .CONTENT_TYPE , MEDIA_TEXT_HTML_UTF8 )
292
300
)
293
301
.body ()
294
- .raw ().stream (Flux .from (this .pooledClientSqlClient .get ().query (
295
- DB_SELECT_FORTUNE ,
296
- row -> new Fortune (row .getInteger (0 ), row .getString (1 ))
297
- ))
302
+ .raw ().stream (this .sqlClient .get ().flatMapMany (client ->
303
+ client .query (
304
+ DB_SELECT_FORTUNE ,
305
+ row -> new Fortune (row .getInteger (0 ), row .getString (1 ))
306
+ )
307
+ )
298
308
.collectList ()
299
309
.flatMap (fortunes -> {
300
310
fortunes .add (new Fortune (0 , "Additional fortune added at request time." ));
0 commit comments