Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[ ![Download](https://api.bintray.com/packages/tramchamploo/tramchamploo/buffer-slayer/images/download.svg) ](https://bintray.com/tramchamploo/tramchamploo/buffer-slayer/_latestVersion)

# buffer-slayer
buffer-slayer is tool that buffers requests and send them in batch, of which client supports batch operation. Such as `Spring-JdbcTemplate`(batchUpdate), `Redis`(pipeline).
buffer-slayer is tool that buffers requests and send them in batch, of which client supports batch operation. Such as `spring-jdbc`(batchUpdate), `jedis`(pipeline), `vertx-mysql-postgresql-client`(batch, batchWithParams).

It has a queue that allows multiple producers to send to, and limited so to keep application away from Overflowing.

Expand Down Expand Up @@ -88,16 +88,16 @@ This is where you configure all properties.
## Benchmark
Here is a simple jdbc benchmark result on my MacBook Pro (Retina, 13-inch, Late 2013).

Using mysql 5.7.18, keeps executing a simple `INSERT INTO test.benchmark(data, time) VALUES(?, ?);`
Using mysql 5.7.18, keeps executing a simple `INSERT INTO test.benchmark(data, time) VALUES(?, ?);`. With `rewriteBatchedStatements=true`

```
Benchmark Mode Cnt Score Units
BatchJdbcTemplateBenchmark.high_contention_batched thrpt 15 27917.086 ops/s
BatchJdbcTemplateBenchmark.high_contention_unbatched thrpt 15 316.562 ops/s
BatchJdbcTemplateBenchmark.mild_contention_batched thrpt 15 18672.057 ops/s
BatchJdbcTemplateBenchmark.mild_contention_unbatched thrpt 15 280.970 ops/s
BatchJdbcTemplateBenchmark.no_contention_batched thrpt 15 9053.288 ops/s
BatchJdbcTemplateBenchmark.no_contention_unbatched thrpt 15 198.581 ops/s
Benchmark Mode Cnt Score Units
AsyncBatchJdbcTemplateBenchmark.high_contention_batched thrpt 15 223123.122 ops/s
AsyncBatchJdbcTemplateBenchmark.high_contention_unbatched thrpt 15 308.868 ops/s
AsyncBatchJdbcTemplateBenchmark.mild_contention_batched thrpt 15 60781.806 ops/s
AsyncBatchJdbcTemplateBenchmark.mild_contention_unbatched thrpt 15 283.915 ops/s
AsyncBatchJdbcTemplateBenchmark.no_contention_batched thrpt 15 29199.430 ops/s
AsyncBatchJdbcTemplateBenchmark.no_contention_unbatched thrpt 15 177.196 ops/s
```

## Components
Expand Down
11 changes: 11 additions & 0 deletions benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@
<type>test-jar</type>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>bufferslayer-vertx-jdbc</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>bufferslayer-vertx-jdbc</artifactId>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.github.tramchamploo.bufferslayer;

import static io.github.tramchamploo.bufferslayer.TestUtil.propertyOr;

import java.beans.PropertyVetoException;
import java.util.Date;
import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -25,7 +27,7 @@ public abstract class AbstractBatchJdbcTemplateBenchmark {
private DriverManagerDataSource dataSource;
private BatchJdbcTemplate batch;
private JdbcTemplate unbatch;
private Reporter<Sql, Integer> reporter;
private Reporter<SQL, Integer> reporter;
private static SenderProxy proxy;
private static AtomicLong counter = new AtomicLong();

Expand All @@ -35,11 +37,7 @@ public abstract class AbstractBatchJdbcTemplateBenchmark {
private static final String TRUNCATE_TABLE = "TRUNCATE TABLE test.benchmark";
private static final String INSERTION = "INSERT INTO test.benchmark(data, time) VALUES(?, ?)";

static String propertyOr(String key, String fallback) {
return System.getProperty(key, fallback);
}

protected abstract Reporter<Sql, Integer> reporter(Sender<Sql, Integer> sender);
protected abstract Reporter<SQL, Integer> reporter(Sender<SQL, Integer> sender);

@Setup
public void setup() throws PropertyVetoException {
Expand Down Expand Up @@ -90,7 +88,7 @@ public static class Lagging {

@Setup(Level.Iteration)
public void lag() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);
TimeUnit.SECONDS.sleep(1);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,28 @@
package io.github.tramchamploo.bufferslayer;

import static io.github.tramchamploo.bufferslayer.TestUtil.propertyOr;
import static io.github.tramchamploo.bufferslayer.TestUtil.randomString;

import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;

/**
* Compare time used for batched and non batched sql updates.
*/
public abstract class AbstractTimeUsedComparison {

static String randomString() {
return String.valueOf(ThreadLocalRandom.current().nextLong());
}

static String propertyOr(String key, String fallback) {
return System.getProperty(key, fallback);
}

protected void run() throws Exception {
BatchJdbcTemplate batch;
JdbcTemplate unbatch;
SenderProxy proxy;

DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setUrl(propertyOr("jdbcUrl", "jdbc:mysql://127.0.0.1:3306?useSSL=false"));
dataSource.setUrl(propertyOr("jdbcUrl", "jdbc:mysql://127.0.0.1:3306?useSSL=false&rewriteBatchedStatements=true"));
dataSource.setUsername(propertyOr("username", "root"));
dataSource.setPassword(propertyOr("password", "root"));

Expand All @@ -49,7 +46,7 @@ protected void run() throws Exception {
}
});

Reporter<Sql, Integer> reporter = reporter(proxy);
Reporter<SQL, Integer> reporter = reporter(proxy);
batch = new BatchJdbcTemplate(delegate, reporter);
batch.setDataSource(dataSource);

Expand Down Expand Up @@ -88,5 +85,5 @@ protected void run() throws Exception {
unbatch.update(DROP_TABLE);
}

protected abstract Reporter<Sql, Integer> reporter(Sender<Sql, Integer> actual);
protected abstract <S extends Message, R> Reporter<S, R> reporter(Sender<S, R> actual);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package io.github.tramchamploo.bufferslayer;

import static io.github.tramchamploo.bufferslayer.TestUtil.propertyOr;
import static io.github.tramchamploo.bufferslayer.TestUtil.randomString;

import com.github.mauricio.async.db.Configuration;
import com.github.mauricio.async.db.mysql.util.URLParser;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.asyncsql.AsyncSQLClient;
import io.vertx.ext.asyncsql.MySQLClient;
import io.vertx.ext.sql.SQLConnection;
import io.vertx.ext.sql.UpdateResult;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

public abstract class AbstractVertxTimeUsedComparison extends AbstractTimeUsedComparison {

private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS test";
private static final String CREATE_TABLE = "CREATE TABLE test.benchmark(id INT PRIMARY KEY AUTO_INCREMENT, data VARCHAR(32), time TIMESTAMP)";
private static final String DROP_TABLE = "DROP TABLE IF EXISTS test.benchmark";
private static final String TRUNCATE_TABLE = "TRUNCATE TABLE test.benchmark";
private static final String INSERTION = "INSERT INTO test.benchmark(data, time) VALUES(?, now())";
private static final String MODIFICATION = "UPDATE test.benchmark SET data = ? WHERE id = ?";

private static final int ITERATIONS = 10;
private static final int WARMUPS = 3;

private static final Configuration jdbcUrl = URLParser.parse(propertyOr("jdbcUrl", "jdbc:mysql://127.0.0.1:3306?useSSL=false"), Charset.forName("UTF-8"));
private static final JsonObject mySQLClientConfig = new JsonObject()
.put("host", jdbcUrl.host())
.put("port", jdbcUrl.port())
.put("username", propertyOr("username", "root"))
.put("password", propertyOr("password", "root"))
.put("database", "");

protected void run() throws Exception {
Vertx vertx = Vertx.vertx();
AsyncSQLClient unbatch = MySQLClient.createShared(vertx, mySQLClientConfig);

final CountDownLatch countDown = new CountDownLatch(1);

unbatch.getConnection(connection -> {
SQLConnection conn = connection.result();
conn.update(CREATE_DATABASE, cd -> {
System.out.println("CREATE DATABASE: " + cd.succeeded());

conn.update(DROP_TABLE, dt -> {
System.out.println("DROP TABLE: " + dt.succeeded());

conn.update(CREATE_TABLE, ct -> {
System.out.println("CREATE TABLE: " + ct.succeeded());
conn.close();
countDown.countDown();
});
});
});
});
countDown.await();

avgTime(WARMUPS, unbatch, this::measureTimeInNanos);

long avgTime = (long) avgTime(ITERATIONS, unbatch, this::measureTimeInNanos);
System.out.println("unbatched time used: " + avgTime);

SQLConnectionSender sender = new SQLConnectionSender(unbatch);

Reporter<Statement, UpdateResult> reporter = reporter(sender);
AsyncSQLClient batch = BatchMySQLClient.wrap(vertx, unbatch, reporter);
avgTime = (long) avgTime(ITERATIONS, batch, this::measureTimeInNanos);
System.out.println("batched time used: " + avgTime);

vertx.close();
}

private void truncateTable(AsyncSQLClient client) {
final CountDownLatch countDown = new CountDownLatch(1);

client.getConnection(connection -> {
SQLConnection conn = connection.result();
conn.update(TRUNCATE_TABLE, tt -> {
System.out.println("TRUNCATE TABLE: " + tt.succeeded());
conn.close();
countDown.countDown();
});
});
try {
countDown.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private double avgTime(int iterations, AsyncSQLClient client, Function<AsyncSQLClient, Long> measurement) {
List<Long> times = new ArrayList<>();
for (int i = 0; i < iterations; i++) {
truncateTable(client);
times.add(measurement.apply(client));
}
return times.stream().mapToLong(value -> value).average().getAsDouble();
}

private long measureTimeInNanos(AsyncSQLClient client) {
CountDownLatch countDown = new CountDownLatch(1);
AtomicLong counter = new AtomicLong();
Random random = new Random(System.currentTimeMillis());

long start = System.nanoTime();

for (int i = 0; i < 5000; i++) {
int finalI = i;
client.getConnection(connection -> {
SQLConnection conn = connection.result();

conn.updateWithParams(INSERTION,
new JsonArray().add(randomString()), update -> {

if (update.succeeded()) {
long count = counter.addAndGet(update.result().getUpdated());
if (count == 5050) countDown.countDown();

if (finalI % 100 == 0) {
conn.updateWithParams(MODIFICATION,
new JsonArray().add(randomString()).add(random.nextInt(finalI + 1) + 1), mod -> {
conn.close();

if (update.succeeded()) {
long ccount = counter.addAndGet(mod.result().getUpdated());
if (ccount == 5050) countDown.countDown();
}
});
} else {
conn.close();
}
}
});
});
}
try {
countDown.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

return System.nanoTime() - start;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
@State(Scope.Group)
public class AsyncBatchJdbcTemplateBenchmark extends AbstractBatchJdbcTemplateBenchmark {

protected Reporter<Sql, Integer> reporter(Sender<Sql, Integer> sender) {
protected Reporter<SQL, Integer> reporter(Sender<SQL, Integer> sender) {
return AsyncReporter.builder(sender)
.pendingKeepalive(1, TimeUnit.SECONDS)
.sharedSenderThreads(10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public static void main(String[] args) throws Exception {
}

@Override
protected Reporter<Sql, Integer> reporter(Sender<Sql, Integer> actual) {
protected <S extends Message, R> Reporter<S, R> reporter(Sender<S, R> actual) {
return AsyncReporter.builder(actual)
.pendingMaxMessages(6000)
.bufferedMaxMessages(100)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.github.tramchamploo.bufferslayer;

import java.util.concurrent.TimeUnit;

public class AsyncVertxTimeUsedComparison extends AbstractVertxTimeUsedComparison {

public static void main(String[] args) throws Exception {
new AsyncVertxTimeUsedComparison().run();
}

@Override
protected <S extends Message, R> Reporter<S, R> reporter(Sender<S, R> actual) {
return AsyncReporter.builder(actual)
.pendingMaxMessages(6000)
.bufferedMaxMessages(100)
.messageTimeout(50, TimeUnit.MILLISECONDS)
.pendingKeepalive(10, TimeUnit.MILLISECONDS)
.sharedSenderThreads(10)
.singleKey(true)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package io.github.tramchamploo.bufferslayer;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

public class NoopSender<M extends Message> implements Sender<M, M> {
public class NoopSender<M extends Message> implements SyncSender<M, M> {

private AtomicBoolean closed = new AtomicBoolean(false);

Expand All @@ -22,7 +21,7 @@ public CheckResult check() {
}

@Override
public void close() throws IOException {
public void close() {
closed.set(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.github.tramchamploo.bufferslayer;

import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 3, time = 1)
@Fork(3)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Group)
public class RxBatchJdbcTemplateBenchmark extends AbstractBatchJdbcTemplateBenchmark {

protected Reporter<SQL, Integer> reporter(Sender<SQL, Integer> sender) {
return RxReporter.builder(sender).build();
}

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(".*" + RxBatchJdbcTemplateBenchmark.class.getSimpleName() + ".*")
.build();

new Runner(opt).run();
}
}
Loading