11package benchmark .repository ;
22
3- import benchmark .model .Fortune ;
4- import benchmark .model .World ;
53import org .springframework .context .annotation .Profile ;
64import org .springframework .r2dbc .core .DatabaseClient ;
75import org .springframework .stereotype .Component ;
6+
7+ import benchmark .model .Fortune ;
8+ import benchmark .model .World ;
9+ import io .r2dbc .spi .Connection ;
10+ import io .r2dbc .spi .ConnectionFactory ;
811import reactor .core .publisher .Flux ;
912import reactor .core .publisher .Mono ;
1013
1114@ Component
1215@ Profile ("r2dbc" )
1316public class R2dbcDbRepository implements DbRepository {
1417 private final DatabaseClient databaseClient ;
18+ private final ConnectionFactory connectionFactory ;
19+ private final ThreadLocal <Mono <? extends Connection >> conn = new ThreadLocal <>();
1520
1621 public R2dbcDbRepository (DatabaseClient databaseClient ) {
1722 this .databaseClient = databaseClient ;
23+ this .connectionFactory = databaseClient .getConnectionFactory ();
1824 }
1925
2026 @ Override
2127 public Mono <World > getWorld (int id ) {
22- return databaseClient
23- .sql ("SELECT id, randomnumber FROM world WHERE id = $1" )
24- .bind ("$1" , id )
25- .mapProperties (World .class )
26- .first ();
27-
28+ return getConnection ().flatMap (conn -> Mono
29+ .from (conn .createStatement ("SELECT id, randomnumber FROM world WHERE id = $1" )
30+ .bind ("$1" , id )
31+ .execute ())
32+ .flatMap (result -> Mono .from (result .map ((row , rowMetadata ) ->
33+ new World (row .get ("id" , Integer .class ), row .get ("randomnumber" , Integer .class )))))
34+ .single ());
2835 }
2936
30- public Mono <World > updateWorld (World world ) {
37+ private Mono <World > updateWorld (World world ) {
3138 return databaseClient
3239 .sql ("UPDATE world SET randomnumber=$2 WHERE id = $1" )
3340 .bind ("$1" , world .id )
@@ -37,18 +44,31 @@ public Mono<World> updateWorld(World world) {
3744 .map (count -> world );
3845 }
3946
47+
48+ @ Override
4049 public Mono <World > findAndUpdateWorld (int id , int randomNumber ) {
41- return getWorld (id ).flatMap (world -> {
42- world .randomnumber = randomNumber ;
43- return updateWorld (world );
44- });
50+ return databaseClient .sql ("SELECT id, randomnumber FROM world WHERE id = $1" )
51+ .bind ("$1" , id )
52+ .map ((row , rowMetadata ) -> new World (row .get ("id" , Integer .class ),
53+ row .get ("randomnumber" , Integer .class )))
54+ .first ().flatMap (world -> {
55+ world .randomnumber = randomNumber ;
56+ return updateWorld (world );
57+ });
4558 }
4659
4760 @ Override
4861 public Flux <Fortune > fortunes () {
49- return databaseClient
50- .sql ("SELECT id, message FROM fortune" )
51- .mapProperties (Fortune .class )
52- .all ();
62+ return getConnection ()
63+ .flatMapMany (conn -> conn .createStatement ("SELECT id, message FROM " + "fortune" ).execute ())
64+ .flatMap (result -> result .map (r -> new Fortune (r .get (0 , Integer .class ), r .get (1 , String .class ))));
5365 }
66+
67+ private Mono <? extends Connection > getConnection () {
68+ if (this .conn .get () == null ) {
69+ this .conn .set (Mono .from (connectionFactory .create ()).cache ());
70+ }
71+ return this .conn .get ();
72+ }
73+
5474}
0 commit comments