Skip to content

Commit c2288bd

Browse files
author
tramchamploo
committed
Add vert.x async jdbc implementation
1 parent 55190dd commit c2288bd

File tree

38 files changed

+1293
-117
lines changed

38 files changed

+1293
-117
lines changed

README.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
[ ![Download](https://api.bintray.com/packages/tramchamploo/tramchamploo/buffer-slayer/images/download.svg) ](https://bintray.com/tramchamploo/tramchamploo/buffer-slayer/_latestVersion)
44

55
# buffer-slayer
6-
buffer-slayer is tool that buffers requests and send them in batch, of which client supports batch operation. Such as `Spring-JdbcTemplate`(batchUpdate), `Redis`(pipeline).
6+
buffer-slayer is tool that buffers requests and send them in batch, of which client supports batch operation. Such as `spring-jdbc`(batchUpdate), `jedis`(pipeline), `vertx-mysql-postgresql-client`(batch, batchWithParams).
77

88
It has a queue that allows multiple producers to send to, and limited so to keep application away from Overflowing.
99

@@ -88,16 +88,16 @@ This is where you configure all properties.
8888
## Benchmark
8989
Here is a simple jdbc benchmark result on my MacBook Pro (Retina, 13-inch, Late 2013).
9090

91-
Using mysql 5.7.18, keeps executing a simple `INSERT INTO test.benchmark(data, time) VALUES(?, ?);`
91+
Using mysql 5.7.18, keeps executing a simple `INSERT INTO test.benchmark(data, time) VALUES(?, ?);`. With `rewriteBatchedStatements=true`
9292

9393
```
94-
Benchmark Mode Cnt Score Units
95-
BatchJdbcTemplateBenchmark.high_contention_batched thrpt 15 27917.086 ops/s
96-
BatchJdbcTemplateBenchmark.high_contention_unbatched thrpt 15 316.562 ops/s
97-
BatchJdbcTemplateBenchmark.mild_contention_batched thrpt 15 18672.057 ops/s
98-
BatchJdbcTemplateBenchmark.mild_contention_unbatched thrpt 15 280.970 ops/s
99-
BatchJdbcTemplateBenchmark.no_contention_batched thrpt 15 9053.288 ops/s
100-
BatchJdbcTemplateBenchmark.no_contention_unbatched thrpt 15 198.581 ops/s
94+
Benchmark Mode Cnt Score Units
95+
AsyncBatchJdbcTemplateBenchmark.high_contention_batched thrpt 15 223123.122 ops/s
96+
AsyncBatchJdbcTemplateBenchmark.high_contention_unbatched thrpt 15 308.868 ops/s
97+
AsyncBatchJdbcTemplateBenchmark.mild_contention_batched thrpt 15 60781.806 ops/s
98+
AsyncBatchJdbcTemplateBenchmark.mild_contention_unbatched thrpt 15 283.915 ops/s
99+
AsyncBatchJdbcTemplateBenchmark.no_contention_batched thrpt 15 29199.430 ops/s
100+
AsyncBatchJdbcTemplateBenchmark.no_contention_unbatched thrpt 15 177.196 ops/s
101101
```
102102

103103
## Components

benchmark/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,17 @@
5454
<type>test-jar</type>
5555
</dependency>
5656

57+
<dependency>
58+
<groupId>${project.groupId}</groupId>
59+
<artifactId>bufferslayer-vertx-jdbc</artifactId>
60+
</dependency>
61+
62+
<dependency>
63+
<groupId>${project.groupId}</groupId>
64+
<artifactId>bufferslayer-vertx-jdbc</artifactId>
65+
<type>test-jar</type>
66+
</dependency>
67+
5768
<dependency>
5869
<groupId>mysql</groupId>
5970
<artifactId>mysql-connector-java</artifactId>

benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AbstractBatchJdbcTemplateBenchmark.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.github.tramchamploo.bufferslayer;
22

3+
import static io.github.tramchamploo.bufferslayer.TestUtil.propertyOr;
4+
35
import java.beans.PropertyVetoException;
46
import java.util.Date;
57
import java.util.concurrent.ThreadLocalRandom;
@@ -25,7 +27,7 @@ public abstract class AbstractBatchJdbcTemplateBenchmark {
2527
private DriverManagerDataSource dataSource;
2628
private BatchJdbcTemplate batch;
2729
private JdbcTemplate unbatch;
28-
private Reporter<Sql, Integer> reporter;
30+
private Reporter<SQL, Integer> reporter;
2931
private static SenderProxy proxy;
3032
private static AtomicLong counter = new AtomicLong();
3133

@@ -35,16 +37,12 @@ public abstract class AbstractBatchJdbcTemplateBenchmark {
3537
private static final String TRUNCATE_TABLE = "TRUNCATE TABLE test.benchmark";
3638
private static final String INSERTION = "INSERT INTO test.benchmark(data, time) VALUES(?, ?)";
3739

38-
static String propertyOr(String key, String fallback) {
39-
return System.getProperty(key, fallback);
40-
}
41-
42-
protected abstract Reporter<Sql, Integer> reporter(Sender<Sql, Integer> sender);
40+
protected abstract Reporter<SQL, Integer> reporter(Sender<SQL, Integer> sender);
4341

4442
@Setup
4543
public void setup() throws PropertyVetoException {
4644
dataSource = new DriverManagerDataSource();
47-
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
45+
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
4846
dataSource.setUrl(propertyOr("jdbcUrl", "jdbc:mysql://127.0.0.1:3306?useSSL=false"));
4947
dataSource.setUsername(propertyOr("username", "root"));
5048
dataSource.setPassword(propertyOr("password", "root"));
@@ -90,7 +88,7 @@ public static class Lagging {
9088

9189
@Setup(Level.Iteration)
9290
public void lag() throws InterruptedException {
93-
TimeUnit.SECONDS.sleep(3);
91+
TimeUnit.SECONDS.sleep(1);
9492
}
9593
}
9694

benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AbstractTimeUsedComparison.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,28 @@
11
package io.github.tramchamploo.bufferslayer;
22

3+
import static io.github.tramchamploo.bufferslayer.TestUtil.propertyOr;
4+
import static io.github.tramchamploo.bufferslayer.TestUtil.randomString;
5+
36
import java.util.Date;
47
import java.util.Random;
58
import java.util.concurrent.CountDownLatch;
6-
import java.util.concurrent.ThreadLocalRandom;
79
import java.util.concurrent.atomic.AtomicLong;
810
import org.springframework.jdbc.core.JdbcTemplate;
911
import org.springframework.jdbc.datasource.DriverManagerDataSource;
1012

13+
/**
14+
* Compare time used for batched and non batched sql updates.
15+
*/
1116
public abstract class AbstractTimeUsedComparison {
1217

13-
static String randomString() {
14-
return String.valueOf(ThreadLocalRandom.current().nextLong());
15-
}
16-
17-
static String propertyOr(String key, String fallback) {
18-
return System.getProperty(key, fallback);
19-
}
20-
2118
protected void run() throws Exception {
2219
BatchJdbcTemplate batch;
2320
JdbcTemplate unbatch;
2421
SenderProxy proxy;
2522

2623
DriverManagerDataSource dataSource = new DriverManagerDataSource();
27-
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
28-
dataSource.setUrl(propertyOr("jdbcUrl", "jdbc:mysql://127.0.0.1:3306?useSSL=false"));
24+
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
25+
dataSource.setUrl(propertyOr("jdbcUrl", "jdbc:mysql://127.0.0.1:3306?useSSL=false&rewriteBatchedStatements=true"));
2926
dataSource.setUsername(propertyOr("username", "root"));
3027
dataSource.setPassword(propertyOr("password", "root"));
3128

@@ -49,7 +46,7 @@ protected void run() throws Exception {
4946
}
5047
});
5148

52-
Reporter<Sql, Integer> reporter = reporter(proxy);
49+
Reporter<SQL, Integer> reporter = reporter(proxy);
5350
batch = new BatchJdbcTemplate(delegate, reporter);
5451
batch.setDataSource(dataSource);
5552

@@ -88,5 +85,5 @@ protected void run() throws Exception {
8885
unbatch.update(DROP_TABLE);
8986
}
9087

91-
protected abstract Reporter<Sql, Integer> reporter(Sender<Sql, Integer> actual);
88+
protected abstract <S extends Message, R> Reporter<S, R> reporter(Sender<S, R> actual);
9289
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package io.github.tramchamploo.bufferslayer;
2+
3+
import static io.github.tramchamploo.bufferslayer.TestUtil.propertyOr;
4+
import static io.github.tramchamploo.bufferslayer.TestUtil.randomString;
5+
6+
import com.github.mauricio.async.db.Configuration;
7+
import com.github.mauricio.async.db.mysql.util.URLParser;
8+
import io.vertx.core.Vertx;
9+
import io.vertx.core.json.JsonArray;
10+
import io.vertx.core.json.JsonObject;
11+
import io.vertx.ext.asyncsql.AsyncSQLClient;
12+
import io.vertx.ext.asyncsql.MySQLClient;
13+
import io.vertx.ext.sql.SQLConnection;
14+
import io.vertx.ext.sql.UpdateResult;
15+
import java.nio.charset.Charset;
16+
import java.util.ArrayList;
17+
import java.util.List;
18+
import java.util.Random;
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.atomic.AtomicLong;
21+
import java.util.function.Function;
22+
23+
public abstract class AbstractVertxTimeUsedComparison extends AbstractTimeUsedComparison {
24+
25+
private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS test";
26+
private static final String CREATE_TABLE = "CREATE TABLE test.benchmark(id INT PRIMARY KEY AUTO_INCREMENT, data VARCHAR(32), time TIMESTAMP)";
27+
private static final String DROP_TABLE = "DROP TABLE IF EXISTS test.benchmark";
28+
private static final String TRUNCATE_TABLE = "TRUNCATE TABLE test.benchmark";
29+
private static final String INSERTION = "INSERT INTO test.benchmark(data, time) VALUES(?, now())";
30+
private static final String MODIFICATION = "UPDATE test.benchmark SET data = ? WHERE id = ?";
31+
32+
private static final int ITERATIONS = 10;
33+
private static final int WARMUPS = 3;
34+
35+
private static final Configuration jdbcUrl = URLParser.parse(propertyOr("jdbcUrl", "jdbc:mysql://127.0.0.1:3306?useSSL=false"), Charset.forName("UTF-8"));
36+
private static final JsonObject mySQLClientConfig = new JsonObject()
37+
.put("host", jdbcUrl.host())
38+
.put("port", jdbcUrl.port())
39+
.put("username", propertyOr("username", "root"))
40+
.put("password", propertyOr("password", "root"))
41+
.put("database", "");
42+
43+
protected void run() throws Exception {
44+
Vertx vertx = Vertx.vertx();
45+
AsyncSQLClient unbatch = MySQLClient.createShared(vertx, mySQLClientConfig);
46+
47+
final CountDownLatch countDown = new CountDownLatch(1);
48+
49+
unbatch.getConnection(connection -> {
50+
SQLConnection conn = connection.result();
51+
conn.update(CREATE_DATABASE, cd -> {
52+
System.out.println("CREATE DATABASE: " + cd.succeeded());
53+
54+
conn.update(DROP_TABLE, dt -> {
55+
System.out.println("DROP TABLE: " + dt.succeeded());
56+
57+
conn.update(CREATE_TABLE, ct -> {
58+
System.out.println("CREATE TABLE: " + ct.succeeded());
59+
conn.close();
60+
countDown.countDown();
61+
});
62+
});
63+
});
64+
});
65+
countDown.await();
66+
67+
avgTime(WARMUPS, unbatch, this::measureTimeInNanos);
68+
69+
long avgTime = (long) avgTime(ITERATIONS, unbatch, this::measureTimeInNanos);
70+
System.out.println("unbatched time used: " + avgTime);
71+
72+
SQLConnectionSender sender = new SQLConnectionSender(unbatch);
73+
74+
Reporter<Statement, UpdateResult> reporter = reporter(sender);
75+
AsyncSQLClient batch = BatchMySQLClient.wrap(vertx, unbatch, reporter);
76+
avgTime = (long) avgTime(ITERATIONS, batch, this::measureTimeInNanos);
77+
System.out.println("batched time used: " + avgTime);
78+
79+
vertx.close();
80+
}
81+
82+
private void truncateTable(AsyncSQLClient client) {
83+
final CountDownLatch countDown = new CountDownLatch(1);
84+
85+
client.getConnection(connection -> {
86+
SQLConnection conn = connection.result();
87+
conn.update(TRUNCATE_TABLE, tt -> {
88+
System.out.println("TRUNCATE TABLE: " + tt.succeeded());
89+
conn.close();
90+
countDown.countDown();
91+
});
92+
});
93+
try {
94+
countDown.await();
95+
} catch (InterruptedException e) {
96+
throw new RuntimeException(e);
97+
}
98+
}
99+
100+
private double avgTime(int iterations, AsyncSQLClient client, Function<AsyncSQLClient, Long> measurement) {
101+
List<Long> times = new ArrayList<>();
102+
for (int i = 0; i < iterations; i++) {
103+
truncateTable(client);
104+
times.add(measurement.apply(client));
105+
}
106+
return times.stream().mapToLong(value -> value).average().getAsDouble();
107+
}
108+
109+
private long measureTimeInNanos(AsyncSQLClient client) {
110+
CountDownLatch countDown = new CountDownLatch(1);
111+
AtomicLong counter = new AtomicLong();
112+
Random random = new Random(System.currentTimeMillis());
113+
114+
long start = System.nanoTime();
115+
116+
for (int i = 0; i < 5000; i++) {
117+
int finalI = i;
118+
client.getConnection(connection -> {
119+
SQLConnection conn = connection.result();
120+
121+
conn.updateWithParams(INSERTION,
122+
new JsonArray().add(randomString()), update -> {
123+
124+
if (update.succeeded()) {
125+
long count = counter.addAndGet(update.result().getUpdated());
126+
if (count == 5050) countDown.countDown();
127+
128+
if (finalI % 100 == 0) {
129+
conn.updateWithParams(MODIFICATION,
130+
new JsonArray().add(randomString()).add(random.nextInt(finalI + 1) + 1), mod -> {
131+
conn.close();
132+
133+
if (update.succeeded()) {
134+
long ccount = counter.addAndGet(mod.result().getUpdated());
135+
if (ccount == 5050) countDown.countDown();
136+
}
137+
});
138+
} else {
139+
conn.close();
140+
}
141+
}
142+
});
143+
});
144+
}
145+
try {
146+
countDown.await();
147+
} catch (InterruptedException e) {
148+
throw new RuntimeException(e);
149+
}
150+
151+
return System.nanoTime() - start;
152+
}
153+
}

benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AsyncBatchJdbcTemplateBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
@State(Scope.Group)
2323
public class AsyncBatchJdbcTemplateBenchmark extends AbstractBatchJdbcTemplateBenchmark {
2424

25-
protected Reporter<Sql, Integer> reporter(Sender<Sql, Integer> sender) {
25+
protected Reporter<SQL, Integer> reporter(Sender<SQL, Integer> sender) {
2626
return AsyncReporter.builder(sender)
2727
.pendingKeepalive(1, TimeUnit.SECONDS)
2828
.sharedSenderThreads(10)

benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AsyncTimeUsedComparison.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public static void main(String[] args) throws Exception {
99
}
1010

1111
@Override
12-
protected Reporter<Sql, Integer> reporter(Sender<Sql, Integer> actual) {
12+
protected <S extends Message, R> Reporter<S, R> reporter(Sender<S, R> actual) {
1313
return AsyncReporter.builder(actual)
1414
.pendingMaxMessages(6000)
1515
.bufferedMaxMessages(100)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.github.tramchamploo.bufferslayer;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
public class AsyncVertxTimeUsedComparison extends AbstractVertxTimeUsedComparison {
6+
7+
public static void main(String[] args) throws Exception {
8+
new AsyncVertxTimeUsedComparison().run();
9+
}
10+
11+
@Override
12+
protected <S extends Message, R> Reporter<S, R> reporter(Sender<S, R> actual) {
13+
return AsyncReporter.builder(actual)
14+
.pendingMaxMessages(6000)
15+
.bufferedMaxMessages(100)
16+
.messageTimeout(50, TimeUnit.MILLISECONDS)
17+
.pendingKeepalive(10, TimeUnit.MILLISECONDS)
18+
.sharedSenderThreads(10)
19+
.singleKey(true)
20+
.build();
21+
}
22+
}

benchmark/src/main/java/io/github/tramchamploo/bufferslayer/NoopSender.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
import java.util.List;
55
import java.util.concurrent.atomic.AtomicBoolean;
66

7-
public class NoopSender implements Sender {
7+
public class NoopSender implements SyncSender {
88

9-
AtomicBoolean closed = new AtomicBoolean(false);
9+
private AtomicBoolean closed = new AtomicBoolean(false);
1010

1111
@Override
1212
public List send(List messages) {
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package io.github.tramchamploo.bufferslayer;
2+
3+
import java.util.concurrent.TimeUnit;
4+
import org.openjdk.jmh.annotations.BenchmarkMode;
5+
import org.openjdk.jmh.annotations.Fork;
6+
import org.openjdk.jmh.annotations.Measurement;
7+
import org.openjdk.jmh.annotations.Mode;
8+
import org.openjdk.jmh.annotations.OutputTimeUnit;
9+
import org.openjdk.jmh.annotations.Scope;
10+
import org.openjdk.jmh.annotations.State;
11+
import org.openjdk.jmh.annotations.Warmup;
12+
import org.openjdk.jmh.runner.Runner;
13+
import org.openjdk.jmh.runner.RunnerException;
14+
import org.openjdk.jmh.runner.options.Options;
15+
import org.openjdk.jmh.runner.options.OptionsBuilder;
16+
17+
@Measurement(iterations = 5, time = 1)
18+
@Warmup(iterations = 3, time = 1)
19+
@Fork(3)
20+
@BenchmarkMode(Mode.Throughput)
21+
@OutputTimeUnit(TimeUnit.SECONDS)
22+
@State(Scope.Group)
23+
public class RxBatchJdbcTemplateBenchmark extends AbstractBatchJdbcTemplateBenchmark {
24+
25+
protected Reporter<SQL, Integer> reporter(Sender<SQL, Integer> sender) {
26+
return RxReporter.builder(sender).build();
27+
}
28+
29+
public static void main(String[] args) throws RunnerException {
30+
Options opt = new OptionsBuilder()
31+
.include(".*" + RxBatchJdbcTemplateBenchmark.class.getSimpleName() + ".*")
32+
.build();
33+
34+
new Runner(opt).run();
35+
}
36+
}

0 commit comments

Comments
 (0)