22
33import java .util .ArrayList ;
44import java .util .List ;
5- import java .util .concurrent .CompletableFuture ;
65import java .util .concurrent .ExecutionException ;
7- import java .util .concurrent .TimeUnit ;
8- import java .util .concurrent .TimeoutException ;
96import java .util .logging .Logger ;
107
118import io .helidon .config .Config ;
12-
9+ import io . vertx . core . Future ;
1310import io .vertx .core .Vertx ;
1411import io .vertx .core .VertxOptions ;
15- import io .vertx .core .Future ;
1612import io .vertx .pgclient .PgConnectOptions ;
1713import io .vertx .pgclient .PgPool ;
1814import io .vertx .sqlclient .PoolOptions ;
2622
2723public class PgClientRepository implements DbRepository {
2824 private static final Logger LOGGER = Logger .getLogger (PgClientRepository .class .getName ());
25+ private static final int UPDATE_QUERIES = 500 ;
2926
30- private final SqlClient queryPool ;
3127 private final SqlClient updatePool ;
3228
33- private final int batchSize ;
34- private final long updateTimeout ;
35- private final int maxRetries ;
36-
3729 private final PreparedQuery <RowSet <Row >> getFortuneQuery ;
3830 private final PreparedQuery <RowSet <Row >> getWorldQuery ;
39- private final PreparedQuery <RowSet <Row >> updateWorldQuery ;
31+ private final PreparedQuery <RowSet <Row >>[] updateWorldSingleQuery ;
4032
33+ @ SuppressWarnings ("unchecked" )
4134 public PgClientRepository (Config config ) {
42- Vertx vertx = Vertx .vertx (new VertxOptions ()
43- .setPreferNativeTransport (true ));
35+ Vertx vertx = Vertx .vertx (new VertxOptions ().setPreferNativeTransport (true ));
4436 PgConnectOptions connectOptions = new PgConnectOptions ()
4537 .setPort (config .get ("port" ).asInt ().orElse (5432 ))
4638 .setCachePreparedStatements (config .get ("cache-prepared-statements" ).asBoolean ().orElse (true ))
4739 .setHost (config .get ("host" ).asString ().orElse ("tfb-database" ))
4840 .setDatabase (config .get ("db" ).asString ().orElse ("hello_world" ))
4941 .setUser (config .get ("username" ).asString ().orElse ("benchmarkdbuser" ))
50- .setPassword (config .get ("password" ).asString ().orElse ("benchmarkdbpass" ));
42+ .setPassword (config .get ("password" ).asString ().orElse ("benchmarkdbpass" ))
43+ .setPipeliningLimit (100000 );
5144
5245 int sqlPoolSize = config .get ("sql-pool-size" ).asInt ().orElse (64 );
5346 PoolOptions clientOptions = new PoolOptions ().setMaxSize (sqlPoolSize );
5447 LOGGER .info ("sql-pool-size is " + sqlPoolSize );
55- batchSize = config .get ("update-batch-size" ).asInt ().orElse (20 );
56- LOGGER .info ("update-batch-size is " + batchSize );
57- updateTimeout = config .get ("update-timeout-millis" ).asInt ().orElse (5000 );
58- LOGGER .info ("update-timeout-millis is " + updateTimeout );
59- maxRetries = config .get ("update-max-retries" ).asInt ().orElse (3 );
60- LOGGER .info ("update-max-retries is " + maxRetries );
61-
62- queryPool = PgPool .client (vertx , connectOptions , clientOptions );
48+
49+ SqlClient queryPool = PgPool .client (vertx , connectOptions , clientOptions );
6350 updatePool = PgPool .client (vertx , connectOptions , clientOptions );
6451
6552 getWorldQuery = queryPool .preparedQuery ("SELECT id, randomnumber FROM world WHERE id = $1" );
66- updateWorldQuery = queryPool .preparedQuery ("UPDATE world SET randomnumber = $1 WHERE id = $2" );
6753 getFortuneQuery = queryPool .preparedQuery ("SELECT id, message FROM fortune" );
54+
55+ updateWorldSingleQuery = new PreparedQuery [UPDATE_QUERIES ];
56+ for (int i = 0 ; i < UPDATE_QUERIES ; i ++) {
57+ updateWorldSingleQuery [i ] = queryPool .preparedQuery (singleUpdate (i + 1 ));
58+ }
6859 }
6960
7061 @ Override
@@ -97,60 +88,11 @@ public List<World> getWorlds(int count) {
9788 }
9889 }
9990
100- @ Override
101- public World updateWorld (World world ) {
102- try {
103- return updateWorldQuery .execute (Tuple .of (world .id , world .id ))
104- .toCompletionStage ()
105- .thenApply (rows -> world )
106- .toCompletableFuture ().get ();
107- } catch (Exception e ) {
108- throw new RuntimeException (e );
109- }
110- }
111-
11291 @ Override
11392 public List <World > updateWorlds (int count ) {
11493 List <World > worlds = getWorlds (count );
115- if (batchSize > 1 ) { // batching updates
116- for (World w : worlds ) {
117- w .randomNumber = randomWorldNumber ();
118- }
119- if (count <= batchSize ) {
120- LOGGER .finest (() -> "Updating single batch of size " + count );
121- updateWorldsRetry (worlds , 0 , 0 );
122- } else {
123- int batches = count / batchSize + (count % batchSize == 0 ? 0 : 1 );
124- for (int i = 0 ; i < batches ; i ++) {
125- final int from = i * batchSize ;
126- LOGGER .finest (() -> "Updating batch from " + from + " to " + (from + batchSize ));
127- updateWorldsRetry (worlds , from , 0 );
128- }
129- }
130- } else { // no batching for size 1
131- for (World w : worlds ) {
132- w .randomNumber = randomWorldNumber ();
133- updateWorld (w );
134- }
135- }
136- return worlds ;
137- }
138-
139- private List <World > updateWorldsRetry (List <World > worlds , int from , int retries ) {
140- if (retries > maxRetries ) {
141- throw new RuntimeException ("Too many transaction retries" );
142- }
143- CompletableFuture <List <World >> cf = null ;
14494 try {
145- cf = updateWorlds (worlds , from , updatePool );
146- cf .get (updateTimeout , TimeUnit .MILLISECONDS );
147- return worlds ;
148- } catch (ExecutionException | TimeoutException e ) {
149- cf .cancel (true );
150- retries ++;
151- final int finalRetries = retries ;
152- LOGGER .fine (() -> "Retrying batch update after cancellation (retries=" + finalRetries + ")" );
153- return updateWorldsRetry (worlds , from , retries ); // retry
95+ return updateWorlds (worlds , count , updatePool );
15496 } catch (Exception e ) {
15597 throw new RuntimeException (e );
15698 }
@@ -172,16 +114,36 @@ public List<Fortune> getFortunes() {
172114 }
173115 }
174116
175- private CompletableFuture <List <World >> updateWorlds (List <World > worlds , int from , SqlClient pool ) {
176- List <Tuple > tuples = new ArrayList <>();
177- int to = Math .min (from + batchSize , worlds .size ());
178- for (int i = from ; i < to ; i ++) {
179- World w = worlds .get (i );
180- tuples .add (Tuple .of (w .randomNumber , w .id ));
117+ private List <World > updateWorlds (List <World > worlds , int count , SqlClient pool )
118+ throws ExecutionException , InterruptedException {
119+ int size = worlds .size ();
120+ List <Integer > updateParams = new ArrayList <>(size * 2 );
121+ for (World world : worlds ) {
122+ updateParams .add (world .id );
123+ world .randomNumber = randomWorldNumber ();
124+ updateParams .add (world .randomNumber );
181125 }
182- return updateWorldQuery . executeBatch ( tuples )
126+ return updateWorldSingleQuery [ count - 1 ]. execute ( Tuple . wrap ( updateParams ) )
183127 .toCompletionStage ()
184128 .thenApply (rows -> worlds )
185- .toCompletableFuture ();
129+ .toCompletableFuture ()
130+ .get ();
131+ }
132+
133+ private static String singleUpdate (int count ) {
134+ StringBuilder sql = new StringBuilder ();
135+ sql .append ("UPDATE WORLD SET RANDOMNUMBER = CASE ID" );
136+ for (int i = 0 ; i < count ; i ++) {
137+ int k = i * 2 + 1 ;
138+ sql .append (" WHEN $" ).append (k ).append (" THEN $" ).append (k + 1 );
139+ }
140+ sql .append (" ELSE RANDOMNUMBER" );
141+ sql .append (" END WHERE ID IN ($1" );
142+ for (int i = 1 ; i < count ; i ++) {
143+ int k = i * 2 + 1 ;
144+ sql .append (",$" ).append (k );
145+ }
146+ sql .append (")" );
147+ return sql .toString ();
186148 }
187149}
0 commit comments