Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions benchmark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Benchmarks

Run `mvn install` to build the benchmarks from parent directory.

```bash
$ java -jar benchmark/target/benchmarks.jar -DjdbcUrl=jdbc:mysql://192.168.99.100:32772/test?useSSL=false -Dusername=root -Dpassword=root
```
10 changes: 2 additions & 8 deletions benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,8 @@
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>

<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.bufferslayer;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
Expand All @@ -14,6 +15,7 @@
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.bufferslayer;

import com.mchange.v2.c3p0.ComboPooledDataSource;
import java.beans.PropertyVetoException;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
Expand All @@ -26,6 +26,7 @@
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;

/**
* Created by guohang.bao on 2017/3/16.
Expand All @@ -38,7 +39,7 @@
@State(Scope.Group)
public class BatchedJdbcTemplateBenchmark {

private ComboPooledDataSource dataSource;
private DriverManagerDataSource dataSource;
private BatchJdbcTemplate batch;
private JdbcTemplate unbatch;
private AsyncReporter reporter;
Expand All @@ -50,13 +51,17 @@ public class BatchedJdbcTemplateBenchmark {
private static final String TRUNCATE_TABLE = "TRUNCATE TABLE benchmark;";
private static final String INSERTION = "INSERT INTO benchmark(data, time) VALUES(?, ?);";

static String envOr(String key, String fallback) {
return System.getenv(key) != null ? System.getenv(key) : fallback;
}

@Setup
public void setup() throws PropertyVetoException {
dataSource = new ComboPooledDataSource();
dataSource.setDriverClass("org.h2.Driver");
dataSource.setMinPoolSize(10);
dataSource.setMaxPoolSize(50);
dataSource.setJdbcUrl("jdbc:h2:~/benchmark");
dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUrl(envOr("jdbcUrl", "jdbc:mysql://192.168.99.100:32772/test?useSSL=false"));
dataSource.setUsername(envOr("username", "root"));
dataSource.setPassword(envOr("password", "root"));

JdbcTemplate delegate = new JdbcTemplate(dataSource);
delegate.setDataSource(dataSource);
Expand All @@ -79,7 +84,9 @@ public void setup() throws PropertyVetoException {

@TearDown(Level.Iteration)
public void dropTable() {
reporter.flush();
for (SizeBoundedQueue pending : reporter.pendings.values()) {
pending.doClear();
}
unbatch.update(TRUNCATE_TABLE);
}

Expand Down
48 changes: 27 additions & 21 deletions benchmark/src/main/java/io/bufferslayer/TimeUsedComparison.java
Original file line number Diff line number Diff line change
@@ -1,31 +1,38 @@
package io.bufferslayer;

import com.mchange.v2.c3p0.ComboPooledDataSource;
import java.beans.PropertyVetoException;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;

/**
* Created by guohang.bao on 2017/3/30.
*/
public class TimeUsedComparison {

public static void main(String[] args) throws PropertyVetoException, InterruptedException {
ComboPooledDataSource dataSource;
static String randomString() {
return String.valueOf(ThreadLocalRandom.current().nextLong());
}

static String envOr(String key, String fallback) {
return System.getenv(key) != null ? System.getenv(key) : fallback;
}

public static void main(String[] args) throws Exception {
BatchJdbcTemplate batch;
JdbcTemplate unbatch;
SenderProxy proxy;

dataSource = new ComboPooledDataSource();
dataSource.setDriverClass("org.h2.Driver");
dataSource.setMinPoolSize(10);
dataSource.setMaxPoolSize(50);
dataSource.setJdbcUrl("jdbc:h2:~/test");
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUrl(envOr("jdbcUrl", "jdbc:mysql://192.168.99.100:32772/test?useSSL=false"));
dataSource.setUsername(envOr("username", "root"));
dataSource.setPassword(envOr("password", "root"));

JdbcTemplate delegate = new JdbcTemplate(dataSource);
delegate.setDataSource(dataSource);
Expand All @@ -41,14 +48,16 @@ public static void main(String[] args) throws PropertyVetoException, Interrupted

proxy = new SenderProxy(new JdbcTemplateSender(delegate));
proxy.onMessages(updated -> {
if (counter.addAndGet(updated.size()) == 500500) {
if (counter.addAndGet(updated.size()) == 5050) {
countDown.countDown();
}
});

AsyncReporter reporter = AsyncReporter.builder(proxy)
.pendingMaxMessages(6000000)
.bufferedMaxMessages(1000)
.pendingMaxMessages(6000)
.bufferedMaxMessages(100)
.messageTimeout(50, TimeUnit.MILLISECONDS)
.flushThreadKeepalive(10, TimeUnit.MILLISECONDS)
.senderExecutor(Executors.newCachedThreadPool())
.parallelismPerBatch(10)
.strictOrder(true)
Expand All @@ -64,32 +73,29 @@ public static void main(String[] args) throws PropertyVetoException, Interrupted
Random random = new Random(System.currentTimeMillis());

long start = System.nanoTime();
for (int i = 0; i < 500000; i++) {
for (int i = 0; i < 5000; i++) {
batch.update(INSERTION, new Object[] {randomString(), new Date()});
if (i % 1000 == 0) {
if (i % 100 == 0) {
batch.update(MODIFICATION, new Object[] {randomString(), random.nextInt(i + 1) + 1});
}
}
countDown.await();
long used = System.nanoTime() - start;
System.out.println("batch time used: " + used);
reporter.sender.close();
reporter.close();

unbatch.update(DROP_TABLE);
unbatch.update(CREATE_TABLE);
start = System.nanoTime();
for (int i = 0; i < 500000; i++) {
for (int i = 0; i < 5000; i++) {
unbatch.update(INSERTION, new Object[] {randomString(), new Date()});
if (i % 1000 == 0) {
if (i % 100 == 0) {
unbatch.update(MODIFICATION, new Object[] {randomString(), random.nextInt(i + 1) + 1});
}
}
used = System.nanoTime() - start;
System.out.println("unbatch time used: " + used);
reporter.close();
unbatch.update(DROP_TABLE);
}

static String randomString() {
return String.valueOf(ThreadLocalRandom.current().nextLong());
}
}
32 changes: 17 additions & 15 deletions boundedqueue/src/main/java/io/bufferslayer/AsyncReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public class AsyncReporter implements Reporter, Flushable, Component {
static AtomicLong idGenerator = new AtomicLong();
private final Long id = idGenerator.getAndIncrement();

private final AsyncSender sender;
private final ReporterMetrics metrics;
final AsyncSender sender;
private final ReporterMetrics<SizeBoundedQueue> metrics;

private final long messageTimeoutNanos;
private final long flushThreadKeepaliveNanos;
Expand All @@ -66,6 +66,7 @@ public class AsyncReporter implements Reporter, Flushable, Component {
private final ThreadFactory flushThreadFactory;
private ScheduledExecutorService flushThreadsMonitor;

@SuppressWarnings("unchecked")
private AsyncReporter(Builder builder) {
this.sender = new DefaultSenderToAsyncSenderAdaptor(builder.sender,
builder.senderExecutor, builder.parallelismPerBatch);
Expand Down Expand Up @@ -244,8 +245,8 @@ public void run() {
long lastDrained = System.nanoTime();
try {
while (!closed.get()) {
// check if exceeds keepAlive
if (!strictOrder && flushThreadKeepaliveNanos > 0 &&
if (!strictOrder && // check if exceeds keepAlive
flushThreadKeepaliveNanos > 0 &&
System.nanoTime() - lastDrained >= flushThreadKeepaliveNanos) {
return;
}
Expand All @@ -255,20 +256,18 @@ public void run() {
}
}
} finally {
// flush messages left in queue
for (Message message : consumer.drain()) {
for (Message message : consumer.drain()) { // flush messages left in queue
pending.offer(message, DeferredHolder.newDeferred(message.id));
}
flushTillEmpty(pending);
// remove queue from pendings
pendings.remove(key);
// wake up notice thread
if (closed.get()) {
close.countDown();
}

pendings.remove(key); // remove queue from pendings
metrics.removeQueuedMessages(pending); // remove metrics
if (closed.get()) close.countDown(); // wake up notice thread
}
}
});

flushThread.start();
return flushThread;
}
Expand All @@ -282,14 +281,16 @@ void flushTillEmpty(SizeBoundedQueue pending) {
@Override
public void flush() {
for (SizeBoundedQueue pending : pendings.values()) {
flushTillEmpty(pending);
BufferNextMessage rightNow =
new BufferNextMessage(bufferedMaxMessages, 0, strictOrder);
flush(pending, rightNow);
}
}

@SuppressWarnings("unchecked")
int flush(SizeBoundedQueue pending, BufferNextMessage consumer) {
int drainedCount = pending.drainTo(consumer, consumer.remainingNanos());
metrics.updateQueuedMessages(pending.count);
metrics.updateQueuedMessages(pending, pending.count);

if (!consumer.isReady()) {
return drainedCount;
Expand Down Expand Up @@ -338,7 +339,6 @@ public CheckResult check() {
public void close() {
close = new CountDownLatch(messageTimeoutNanos > 0 ? flushThreads.size() : 0);
closed.set(true);
if (flushThreadsMonitor != null) flushThreadsMonitor.shutdown();
try {
if (!close.await(messageTimeoutNanos, TimeUnit.NANOSECONDS)) {
logger.warn("Timed out waiting for close");
Expand All @@ -348,6 +348,8 @@ public void close() {
Thread.currentThread().interrupt();
}

if (flushThreadsMonitor != null) flushThreadsMonitor.shutdown();

int count = 0;
for (SizeBoundedQueue pending : pendings.values()) {
count += pending.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.jdeferred.Deferred;
import org.jdeferred.DeferredManager;
import org.jdeferred.Promise;
Expand Down Expand Up @@ -68,6 +69,9 @@ public CheckResult check() {

@Override
public void close() throws IOException {
if (executor instanceof ExecutorService) {
((ExecutorService) executor).shutdown();
}
delegate.close();
}
}
25 changes: 25 additions & 0 deletions boundedqueue/src/test/java/io/bufferslayer/AsyncReporterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ public void flushIncrementMetrics() throws InterruptedException {
reporter.report(newMessage(0));
reporter.report(newMessage(0));

reporter.flush();
assertEquals(1, metrics.queuedMessages());

reporter.flush();
assertEquals(0, metrics.queuedMessages());
}
Expand Down Expand Up @@ -314,4 +317,26 @@ public void flushThreadName() {
assertTrue(iter.hasNext());
assertEquals("AsyncReporter-1-flush-thread-0", iter.next().getName());
}

@Test
public void closeShouldRemoveMetrics() throws InterruptedException {
FakeSender sender = new FakeSender();

reporter = AsyncReporter.builder(sender)
.metrics(metrics)
.flushThreadKeepalive(10, TimeUnit.MILLISECONDS)
.build();

CountDownLatch countDown = new CountDownLatch(1);
Promise<Object, MessageDroppedException, Integer> promise = reporter.report(newMessage(0));
promise.done(d -> {
assertEquals(1, metrics.queuedMessages.size());
countDown.countDown();
});
countDown.await();
reporter.flush();

reporter.close();
assertEquals(0, metrics.queuedMessages.size());
}
}
Loading