66
77import benchmark .model .Fortune ;
88import benchmark .model .World ;
9- import io .r2dbc .spi .Connection ;
10- import io .r2dbc .spi .ConnectionFactory ;
119import reactor .core .publisher .Flux ;
1210import reactor .core .publisher .Mono ;
1311
1614public class R2dbcDbRepository implements DbRepository {
1715
1816 private final DatabaseClient databaseClient ;
19- private final ConnectionFactory connectionFactory ;
20- private final ThreadLocal <Mono <? extends Connection >> conn = new ThreadLocal <>();
2117
2218 public R2dbcDbRepository (DatabaseClient databaseClient ) {
2319 this .databaseClient = databaseClient ;
24- this .connectionFactory = databaseClient .getConnectionFactory ();
2520 }
2621
2722 @ Override
@@ -54,16 +49,10 @@ public Mono<World> findAndUpdateWorld(int id, int randomNumber) {
5449
5550 @ Override
5651 public Flux <Fortune > fortunes () {
57- return getConnection ()
58- .flatMapMany (conn -> conn .createStatement ("SELECT id, message FROM " + "fortune" ).execute ())
59- .flatMap (result -> result .map (r -> new Fortune (r .get (0 , Integer .class ), r .get (1 , String .class ))));
60- }
61-
62- private Mono <? extends Connection > getConnection () {
63- if (this .conn .get () == null ) {
64- this .conn .set (Mono .from (connectionFactory .create ()).cache ());
65- }
66- return this .conn .get ();
52+ return databaseClient
53+ .sql ("SELECT id, message FROM fortune" )
54+ .mapProperties (Fortune .class )
55+ .all ();
6756 }
6857
6958}
0 commit comments