Skip to content

Commit 885f6df

Browse files
author
tramchamploo
authored
Merge pull request #13 from tramchamploo/fix
Fix the wrong queued messages num
2 parents d0d8c39 + 664f18b commit 885f6df

File tree

13 files changed

+183
-86
lines changed

13 files changed

+183
-86
lines changed

benchmark/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Benchmarks
2+
3+
Run `mvn install` to build the benchmarks from parent directory.
4+
5+
```bash
6+
$ java -jar benchmark/target/benchmarks.jar -DjdbcUrl=jdbc:mysql://192.168.99.100:32772/test?useSSL=false -Dusername=root -Dpassword=root
7+
```

benchmark/pom.xml

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,8 @@
5050
</dependency>
5151

5252
<dependency>
53-
<groupId>com.h2database</groupId>
54-
<artifactId>h2</artifactId>
55-
</dependency>
56-
57-
<dependency>
58-
<groupId>c3p0</groupId>
59-
<artifactId>c3p0</artifactId>
60-
<version>0.9.1.2</version>
53+
<groupId>mysql</groupId>
54+
<artifactId>mysql-connector-java</artifactId>
6155
</dependency>
6256

6357
<dependency>

benchmark/src/main/java/io/bufferslayer/AsyncReporterBenchmark.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.bufferslayer;
22

3+
import java.io.IOException;
34
import java.util.concurrent.TimeUnit;
45
import org.openjdk.jmh.annotations.AuxCounters;
56
import org.openjdk.jmh.annotations.Benchmark;
@@ -14,6 +15,7 @@
1415
import org.openjdk.jmh.annotations.Scope;
1516
import org.openjdk.jmh.annotations.Setup;
1617
import org.openjdk.jmh.annotations.State;
18+
import org.openjdk.jmh.annotations.TearDown;
1719
import org.openjdk.jmh.annotations.Warmup;
1820
import org.openjdk.jmh.runner.Runner;
1921
import org.openjdk.jmh.runner.RunnerException;

benchmark/src/main/java/io/bufferslayer/BatchedJdbcTemplateBenchmark.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.bufferslayer;
22

3-
import com.mchange.v2.c3p0.ComboPooledDataSource;
43
import java.beans.PropertyVetoException;
4+
import java.io.IOException;
55
import java.util.Date;
66
import java.util.concurrent.ThreadLocalRandom;
77
import java.util.concurrent.TimeUnit;
@@ -26,6 +26,7 @@
2626
import org.openjdk.jmh.runner.options.Options;
2727
import org.openjdk.jmh.runner.options.OptionsBuilder;
2828
import org.springframework.jdbc.core.JdbcTemplate;
29+
import org.springframework.jdbc.datasource.DriverManagerDataSource;
2930

3031
/**
3132
* Created by guohang.bao on 2017/3/16.
@@ -38,7 +39,7 @@
3839
@State(Scope.Group)
3940
public class BatchedJdbcTemplateBenchmark {
4041

41-
private ComboPooledDataSource dataSource;
42+
private DriverManagerDataSource dataSource;
4243
private BatchJdbcTemplate batch;
4344
private JdbcTemplate unbatch;
4445
private AsyncReporter reporter;
@@ -50,13 +51,17 @@ public class BatchedJdbcTemplateBenchmark {
5051
private static final String TRUNCATE_TABLE = "TRUNCATE TABLE benchmark;";
5152
private static final String INSERTION = "INSERT INTO benchmark(data, time) VALUES(?, ?);";
5253

54+
static String envOr(String key, String fallback) {
55+
return System.getenv(key) != null ? System.getenv(key) : fallback;
56+
}
57+
5358
@Setup
5459
public void setup() throws PropertyVetoException {
55-
dataSource = new ComboPooledDataSource();
56-
dataSource.setDriverClass("org.h2.Driver");
57-
dataSource.setMinPoolSize(10);
58-
dataSource.setMaxPoolSize(50);
59-
dataSource.setJdbcUrl("jdbc:h2:~/benchmark");
60+
dataSource = new DriverManagerDataSource();
61+
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
62+
dataSource.setUrl(envOr("jdbcUrl", "jdbc:mysql://192.168.99.100:32772/test?useSSL=false"));
63+
dataSource.setUsername(envOr("username", "root"));
64+
dataSource.setPassword(envOr("password", "root"));
6065

6166
JdbcTemplate delegate = new JdbcTemplate(dataSource);
6267
delegate.setDataSource(dataSource);
@@ -79,7 +84,9 @@ public void setup() throws PropertyVetoException {
7984

8085
@TearDown(Level.Iteration)
8186
public void dropTable() {
82-
reporter.flush();
87+
for (SizeBoundedQueue pending : reporter.pendings.values()) {
88+
pending.doClear();
89+
}
8390
unbatch.update(TRUNCATE_TABLE);
8491
}
8592

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,38 @@
11
package io.bufferslayer;
22

3-
import com.mchange.v2.c3p0.ComboPooledDataSource;
4-
import java.beans.PropertyVetoException;
53
import java.util.Date;
64
import java.util.Random;
75
import java.util.concurrent.CountDownLatch;
86
import java.util.concurrent.Executors;
97
import java.util.concurrent.ThreadLocalRandom;
8+
import java.util.concurrent.TimeUnit;
109
import java.util.concurrent.atomic.AtomicLong;
1110
import org.springframework.jdbc.core.JdbcTemplate;
11+
import org.springframework.jdbc.datasource.DriverManagerDataSource;
1212

1313
/**
1414
* Created by guohang.bao on 2017/3/30.
1515
*/
1616
public class TimeUsedComparison {
1717

18-
public static void main(String[] args) throws PropertyVetoException, InterruptedException {
19-
ComboPooledDataSource dataSource;
18+
static String randomString() {
19+
return String.valueOf(ThreadLocalRandom.current().nextLong());
20+
}
21+
22+
static String envOr(String key, String fallback) {
23+
return System.getenv(key) != null ? System.getenv(key) : fallback;
24+
}
25+
26+
public static void main(String[] args) throws Exception {
2027
BatchJdbcTemplate batch;
2128
JdbcTemplate unbatch;
2229
SenderProxy proxy;
2330

24-
dataSource = new ComboPooledDataSource();
25-
dataSource.setDriverClass("org.h2.Driver");
26-
dataSource.setMinPoolSize(10);
27-
dataSource.setMaxPoolSize(50);
28-
dataSource.setJdbcUrl("jdbc:h2:~/test");
31+
DriverManagerDataSource dataSource = new DriverManagerDataSource();
32+
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
33+
dataSource.setUrl(envOr("jdbcUrl", "jdbc:mysql://192.168.99.100:32772/test?useSSL=false"));
34+
dataSource.setUsername(envOr("username", "root"));
35+
dataSource.setPassword(envOr("password", "root"));
2936

3037
JdbcTemplate delegate = new JdbcTemplate(dataSource);
3138
delegate.setDataSource(dataSource);
@@ -41,14 +48,16 @@ public static void main(String[] args) throws PropertyVetoException, Interrupted
4148

4249
proxy = new SenderProxy(new JdbcTemplateSender(delegate));
4350
proxy.onMessages(updated -> {
44-
if (counter.addAndGet(updated.size()) == 500500) {
51+
if (counter.addAndGet(updated.size()) == 5050) {
4552
countDown.countDown();
4653
}
4754
});
4855

4956
AsyncReporter reporter = AsyncReporter.builder(proxy)
50-
.pendingMaxMessages(6000000)
51-
.bufferedMaxMessages(1000)
57+
.pendingMaxMessages(6000)
58+
.bufferedMaxMessages(100)
59+
.messageTimeout(50, TimeUnit.MILLISECONDS)
60+
.flushThreadKeepalive(10, TimeUnit.MILLISECONDS)
5261
.senderExecutor(Executors.newCachedThreadPool())
5362
.parallelismPerBatch(10)
5463
.strictOrder(true)
@@ -64,32 +73,29 @@ public static void main(String[] args) throws PropertyVetoException, Interrupted
6473
Random random = new Random(System.currentTimeMillis());
6574

6675
long start = System.nanoTime();
67-
for (int i = 0; i < 500000; i++) {
76+
for (int i = 0; i < 5000; i++) {
6877
batch.update(INSERTION, new Object[] {randomString(), new Date()});
69-
if (i % 1000 == 0) {
78+
if (i % 100 == 0) {
7079
batch.update(MODIFICATION, new Object[] {randomString(), random.nextInt(i + 1) + 1});
7180
}
7281
}
7382
countDown.await();
7483
long used = System.nanoTime() - start;
7584
System.out.println("batch time used: " + used);
85+
reporter.sender.close();
86+
reporter.close();
7687

7788
unbatch.update(DROP_TABLE);
7889
unbatch.update(CREATE_TABLE);
7990
start = System.nanoTime();
80-
for (int i = 0; i < 500000; i++) {
91+
for (int i = 0; i < 5000; i++) {
8192
unbatch.update(INSERTION, new Object[] {randomString(), new Date()});
82-
if (i % 1000 == 0) {
93+
if (i % 100 == 0) {
8394
unbatch.update(MODIFICATION, new Object[] {randomString(), random.nextInt(i + 1) + 1});
8495
}
8596
}
8697
used = System.nanoTime() - start;
8798
System.out.println("unbatch time used: " + used);
88-
reporter.close();
8999
unbatch.update(DROP_TABLE);
90100
}
91-
92-
static String randomString() {
93-
return String.valueOf(ThreadLocalRandom.current().nextLong());
94-
}
95101
}

boundedqueue/src/main/java/io/bufferslayer/AsyncReporter.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ public class AsyncReporter implements Reporter, Flushable, Component {
4646
static AtomicLong idGenerator = new AtomicLong();
4747
private final Long id = idGenerator.getAndIncrement();
4848

49-
private final AsyncSender sender;
50-
private final ReporterMetrics metrics;
49+
final AsyncSender sender;
50+
private final ReporterMetrics<SizeBoundedQueue> metrics;
5151

5252
private final long messageTimeoutNanos;
5353
private final long flushThreadKeepaliveNanos;
@@ -66,6 +66,7 @@ public class AsyncReporter implements Reporter, Flushable, Component {
6666
private final ThreadFactory flushThreadFactory;
6767
private ScheduledExecutorService flushThreadsMonitor;
6868

69+
@SuppressWarnings("unchecked")
6970
private AsyncReporter(Builder builder) {
7071
this.sender = new DefaultSenderToAsyncSenderAdaptor(builder.sender,
7172
builder.senderExecutor, builder.parallelismPerBatch);
@@ -244,8 +245,8 @@ public void run() {
244245
long lastDrained = System.nanoTime();
245246
try {
246247
while (!closed.get()) {
247-
// check if exceeds keepAlive
248-
if (!strictOrder && flushThreadKeepaliveNanos > 0 &&
248+
if (!strictOrder && // check if exceeds keepAlive
249+
flushThreadKeepaliveNanos > 0 &&
249250
System.nanoTime() - lastDrained >= flushThreadKeepaliveNanos) {
250251
return;
251252
}
@@ -255,20 +256,18 @@ public void run() {
255256
}
256257
}
257258
} finally {
258-
// flush messages left in queue
259-
for (Message message : consumer.drain()) {
259+
for (Message message : consumer.drain()) { // flush messages left in queue
260260
pending.offer(message, DeferredHolder.newDeferred(message.id));
261261
}
262262
flushTillEmpty(pending);
263-
// remove queue from pendings
264-
pendings.remove(key);
265-
// wake up notice thread
266-
if (closed.get()) {
267-
close.countDown();
268-
}
263+
264+
pendings.remove(key); // remove queue from pendings
265+
metrics.removeQueuedMessages(pending); // remove metrics
266+
if (closed.get()) close.countDown(); // wake up notice thread
269267
}
270268
}
271269
});
270+
272271
flushThread.start();
273272
return flushThread;
274273
}
@@ -282,14 +281,16 @@ void flushTillEmpty(SizeBoundedQueue pending) {
282281
@Override
283282
public void flush() {
284283
for (SizeBoundedQueue pending : pendings.values()) {
285-
flushTillEmpty(pending);
284+
BufferNextMessage rightNow =
285+
new BufferNextMessage(bufferedMaxMessages, 0, strictOrder);
286+
flush(pending, rightNow);
286287
}
287288
}
288289

289290
@SuppressWarnings("unchecked")
290291
int flush(SizeBoundedQueue pending, BufferNextMessage consumer) {
291292
int drainedCount = pending.drainTo(consumer, consumer.remainingNanos());
292-
metrics.updateQueuedMessages(pending.count);
293+
metrics.updateQueuedMessages(pending, pending.count);
293294

294295
if (!consumer.isReady()) {
295296
return drainedCount;
@@ -338,7 +339,6 @@ public CheckResult check() {
338339
public void close() {
339340
close = new CountDownLatch(messageTimeoutNanos > 0 ? flushThreads.size() : 0);
340341
closed.set(true);
341-
if (flushThreadsMonitor != null) flushThreadsMonitor.shutdown();
342342
try {
343343
if (!close.await(messageTimeoutNanos, TimeUnit.NANOSECONDS)) {
344344
logger.warn("Timed out waiting for close");
@@ -348,6 +348,8 @@ public void close() {
348348
Thread.currentThread().interrupt();
349349
}
350350

351+
if (flushThreadsMonitor != null) flushThreadsMonitor.shutdown();
352+
351353
int count = 0;
352354
for (SizeBoundedQueue pending : pendings.values()) {
353355
count += pending.clear();

boundedqueue/src/main/java/io/bufferslayer/SenderToAsyncSenderAdaptor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.ArrayList;
77
import java.util.List;
88
import java.util.concurrent.Executor;
9+
import java.util.concurrent.ExecutorService;
910
import org.jdeferred.Deferred;
1011
import org.jdeferred.DeferredManager;
1112
import org.jdeferred.Promise;
@@ -68,6 +69,9 @@ public CheckResult check() {
6869

6970
@Override
7071
public void close() throws IOException {
72+
if (executor instanceof ExecutorService) {
73+
((ExecutorService) executor).shutdown();
74+
}
7175
delegate.close();
7276
}
7377
}

boundedqueue/src/test/java/io/bufferslayer/AsyncReporterTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,9 @@ public void flushIncrementMetrics() throws InterruptedException {
189189
reporter.report(newMessage(0));
190190
reporter.report(newMessage(0));
191191

192+
reporter.flush();
193+
assertEquals(1, metrics.queuedMessages());
194+
192195
reporter.flush();
193196
assertEquals(0, metrics.queuedMessages());
194197
}
@@ -314,4 +317,26 @@ public void flushThreadName() {
314317
assertTrue(iter.hasNext());
315318
assertEquals("AsyncReporter-1-flush-thread-0", iter.next().getName());
316319
}
320+
321+
@Test
322+
public void closeShouldRemoveMetrics() throws InterruptedException {
323+
FakeSender sender = new FakeSender();
324+
325+
reporter = AsyncReporter.builder(sender)
326+
.metrics(metrics)
327+
.flushThreadKeepalive(10, TimeUnit.MILLISECONDS)
328+
.build();
329+
330+
CountDownLatch countDown = new CountDownLatch(1);
331+
Promise<Object, MessageDroppedException, Integer> promise = reporter.report(newMessage(0));
332+
promise.done(d -> {
333+
assertEquals(1, metrics.queuedMessages.size());
334+
countDown.countDown();
335+
});
336+
countDown.await();
337+
reporter.flush();
338+
339+
reporter.close();
340+
assertEquals(0, metrics.queuedMessages.size());
341+
}
317342
}

0 commit comments

Comments
 (0)