11package benchmark .repository ;
22
3- import benchmark .model .Fortune ;
4- import benchmark .model .World ;
53import org .springframework .context .annotation .Profile ;
64import org .springframework .r2dbc .core .DatabaseClient ;
75import org .springframework .stereotype .Component ;
6+
7+ import benchmark .model .Fortune ;
8+ import benchmark .model .World ;
9+ import io .r2dbc .spi .Connection ;
10+ import io .r2dbc .spi .ConnectionFactory ;
811import reactor .core .publisher .Flux ;
912import reactor .core .publisher .Mono ;
1013
1114@ Component
1215@ Profile ("r2dbc" )
1316public class R2dbcDbRepository implements DbRepository {
17+
1418 private final DatabaseClient databaseClient ;
19+ private final ConnectionFactory connectionFactory ;
20+ private final ThreadLocal <Mono <? extends Connection >> conn = new ThreadLocal <>();
1521
1622 public R2dbcDbRepository (DatabaseClient databaseClient ) {
1723 this .databaseClient = databaseClient ;
24+ this .connectionFactory = databaseClient .getConnectionFactory ();
1825 }
1926
2027 @ Override
2128 public Mono <World > getWorld (int id ) {
22- return databaseClient
23- .sql ("SELECT id, randomnumber FROM world WHERE id = $1" )
24- .bind ("$1" , id )
25- .mapProperties (World .class )
26- .first ();
27-
29+ return getConnection ().flatMap (conn -> Mono
30+ .from (conn .createStatement ("SELECT id, randomnumber FROM world WHERE id = $1" )
31+ .bind ("$1" , id )
32+ .execute ())
33+ .flatMap (result -> Mono .from (result .map ((row , rowMetadata ) ->
34+ new World (row .get ("id" , Integer .class ), row .get ("randomnumber" , Integer .class )))))
35+ .single ());
2836 }
2937
30- public Mono <World > updateWorld (World world ) {
38+ private Mono <World > updateWorld (World world ) {
3139 return databaseClient
3240 .sql ("UPDATE world SET randomnumber=$2 WHERE id = $1" )
3341 .bind ("$1" , world .id )
@@ -37,18 +45,31 @@ public Mono<World> updateWorld(World world) {
3745 .map (count -> world );
3846 }
3947
48+
49+ @ Override
4050 public Mono <World > findAndUpdateWorld (int id , int randomNumber ) {
41- return getWorld (id ).flatMap (world -> {
42- world .randomnumber = randomNumber ;
43- return updateWorld (world );
44- });
51+ return databaseClient .sql ("SELECT id, randomnumber FROM world WHERE id = $1" )
52+ .bind ("$1" , id )
53+ .map ((row , rowMetadata ) -> new World (row .get ("id" , Integer .class ),
54+ row .get ("randomnumber" , Integer .class )))
55+ .first ().flatMap (world -> {
56+ world .randomnumber = randomNumber ;
57+ return updateWorld (world );
58+ });
4559 }
4660
4761 @ Override
4862 public Flux <Fortune > fortunes () {
49- return databaseClient
50- .sql ("SELECT id, message FROM fortune" )
51- .mapProperties (Fortune .class )
52- .all ();
63+ return getConnection ()
64+ .flatMapMany (conn -> conn .createStatement ("SELECT id, message FROM " + "fortune" ).execute ())
65+ .flatMap (result -> result .map (r -> new Fortune (r .get (0 , Integer .class ), r .get (1 , String .class ))));
66+ }
67+
68+ private Mono <? extends Connection > getConnection () {
69+ if (this .conn .get () == null ) {
70+ this .conn .set (Mono .from (connectionFactory .create ()).cache ());
71+ }
72+ return this .conn .get ();
5373 }
74+
5475}
0 commit comments