Skip to content

Commit 2bbec92

Browse files
author
tramchamploo
committed
Adding support for jedis
1 parent ed39e88 commit 2bbec92

File tree

25 files changed

+4218
-17
lines changed

25 files changed

+4218
-17
lines changed

.circleci/config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ jobs:
66

77
docker:
88
- image: circleci/openjdk:8-jdk-browsers
9+
- image: circleci/redis:latest
910

1011
steps:
1112
- checkout

benchmark/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@
6565
<optional>true</optional>
6666
</dependency>
6767

68+
<dependency>
69+
<groupId>${project.groupId}</groupId>
70+
<artifactId>bufferslayer-jedis</artifactId>
71+
</dependency>
72+
6873
<dependency>
6974
<groupId>ch.qos.logback</groupId>
7075
<artifactId>logback-classic</artifactId>

benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AbstractBatchJdbcTemplateBenchmark.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public abstract class AbstractBatchJdbcTemplateBenchmark {
2525
private BatchJdbcTemplate batch;
2626
private JdbcTemplate unbatch;
2727
private Reporter<Sql, Integer> reporter;
28-
private static SenderProxy proxy;
28+
private static SenderProxy<Sql, Integer> proxy;
2929
private static AtomicLong counter = new AtomicLong();
3030

3131
private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS test";
@@ -51,7 +51,7 @@ public void setup() {
5151
JdbcTemplate delegate = new JdbcTemplate(dataSource);
5252
delegate.setDataSource(dataSource);
5353

54-
proxy = new SenderProxy(new JdbcTemplateSender(delegate));
54+
proxy = new SenderProxy<>(new JdbcTemplateSender(delegate));
5555
proxy.onMessages(updated -> counter.addAndGet(updated.size()));
5656

5757
reporter = reporter(proxy);
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package io.github.tramchamploo.bufferslayer;
2+
3+
import java.util.concurrent.ThreadLocalRandom;
4+
import java.util.concurrent.TimeUnit;
5+
import java.util.concurrent.atomic.AtomicLong;
6+
import org.openjdk.jmh.annotations.AuxCounters;
7+
import org.openjdk.jmh.annotations.Benchmark;
8+
import org.openjdk.jmh.annotations.Group;
9+
import org.openjdk.jmh.annotations.GroupThreads;
10+
import org.openjdk.jmh.annotations.Level;
11+
import org.openjdk.jmh.annotations.Scope;
12+
import org.openjdk.jmh.annotations.Setup;
13+
import org.openjdk.jmh.annotations.State;
14+
import org.openjdk.jmh.annotations.TearDown;
15+
import redis.clients.jedis.Jedis;
16+
import redis.clients.jedis.JedisPool;
17+
18+
/**
19+
* Jedis benchmark that executing jedis commands.
20+
*/
21+
public abstract class AbstractBatchJedisBenchmark {
22+
23+
private JedisPool unbatch;
24+
private BatchJedis batch;
25+
private Reporter<RedisCommand, Object> reporter;
26+
private static SenderProxy<RedisCommand, Object> proxy;
27+
private static AtomicLong counter = new AtomicLong();
28+
29+
static String propertyOr(String key, String fallback) {
30+
return System.getProperty(key, fallback);
31+
}
32+
33+
protected abstract Reporter<RedisCommand, Object> reporter(Sender<RedisCommand, Object> sender);
34+
35+
@Setup
36+
public void setup() {
37+
unbatch = new JedisPool(propertyOr("redisHost", "127.0.0.1"),
38+
Integer.parseInt(propertyOr("redisPort", "6379")));
39+
40+
proxy = new SenderProxy<>(new JedisSender(unbatch));
41+
proxy.onMessages(updated -> counter.addAndGet(updated.size()));
42+
43+
reporter = reporter(proxy);
44+
batch = new BatchJedis(unbatch, reporter);
45+
}
46+
47+
@TearDown(Level.Iteration)
48+
public void flushDB() {
49+
try (Jedis jedis = unbatch.getResource()) {
50+
jedis.flushDB();
51+
}
52+
}
53+
54+
@AuxCounters
55+
@State(Scope.Thread)
56+
public static class AtomicLongCounter {
57+
58+
public long updated() {
59+
return counter.get();
60+
}
61+
62+
@Setup(Level.Iteration)
63+
public void clean() {
64+
counter.set(0);
65+
}
66+
}
67+
68+
@State(Scope.Benchmark)
69+
public static class Lagging {
70+
71+
@Setup(Level.Iteration)
72+
public void lag() throws InterruptedException {
73+
TimeUnit.SECONDS.sleep(1);
74+
}
75+
}
76+
77+
static String randomString() {
78+
return String.valueOf(ThreadLocalRandom.current().nextLong());
79+
}
80+
81+
@Benchmark @Group("no_contention_batched") @GroupThreads(1)
82+
public void no_contention_batched_set(Lagging l, AtomicLongCounter counters) {
83+
batch.set(randomString(), randomString());
84+
}
85+
86+
@Benchmark @Group("no_contention_unbatched") @GroupThreads(1)
87+
public void no_contention_unbatched_set(Lagging l) {
88+
try (Jedis jedis = unbatch.getResource()) {
89+
jedis.set(randomString(), randomString());
90+
}
91+
}
92+
93+
@Benchmark @Group("mild_contention_batched") @GroupThreads(2)
94+
public void mild_contention_batched_set(Lagging l, AtomicLongCounter counters) {
95+
batch.set(randomString(), randomString());
96+
}
97+
98+
@Benchmark @Group("mild_contention_unbatched") @GroupThreads(2)
99+
public void mild_contention_unbatched_set(Lagging l) {
100+
try (Jedis jedis = unbatch.getResource()) {
101+
jedis.set(randomString(), randomString());
102+
}
103+
}
104+
105+
@Benchmark @Group("high_contention_batched") @GroupThreads(8)
106+
public void high_contention_batched_set(Lagging l, AtomicLongCounter counters) {
107+
batch.set(randomString(), randomString());
108+
}
109+
110+
@Benchmark @Group("high_contention_unbatched") @GroupThreads(8)
111+
public void high_contention_unbatched_set(Lagging l) {
112+
try (Jedis jedis = unbatch.getResource()) {
113+
jedis.set(randomString(), randomString());
114+
}
115+
}
116+
}

benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AbstractTimeUsedComparison.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ static String propertyOr(String key, String fallback) {
2121
protected void run() throws Exception {
2222
BatchJdbcTemplate batch;
2323
JdbcTemplate unbatch;
24-
SenderProxy proxy;
24+
SenderProxy<Sql, Integer> proxy;
2525

2626
DriverManagerDataSource dataSource = new DriverManagerDataSource();
2727
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
@@ -42,7 +42,7 @@ protected void run() throws Exception {
4242

4343
CountDownLatch countDown = new CountDownLatch(1);
4444

45-
proxy = new SenderProxy(new JdbcTemplateSender(delegate));
45+
proxy = new SenderProxy<>(new JdbcTemplateSender(delegate));
4646
proxy.onMessages(updated -> {
4747
if (counter.addAndGet(updated.size()) == 5050) {
4848
countDown.countDown();
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package io.github.tramchamploo.bufferslayer;
2+
3+
import java.util.concurrent.TimeUnit;
4+
import org.openjdk.jmh.annotations.BenchmarkMode;
5+
import org.openjdk.jmh.annotations.Fork;
6+
import org.openjdk.jmh.annotations.Measurement;
7+
import org.openjdk.jmh.annotations.Mode;
8+
import org.openjdk.jmh.annotations.OutputTimeUnit;
9+
import org.openjdk.jmh.annotations.Scope;
10+
import org.openjdk.jmh.annotations.State;
11+
import org.openjdk.jmh.annotations.Warmup;
12+
import org.openjdk.jmh.runner.Runner;
13+
import org.openjdk.jmh.runner.RunnerException;
14+
import org.openjdk.jmh.runner.options.Options;
15+
import org.openjdk.jmh.runner.options.OptionsBuilder;
16+
17+
@Measurement(iterations = 5, time = 1)
18+
@Warmup(iterations = 3, time = 1)
19+
@Fork(3)
20+
@BenchmarkMode(Mode.Throughput)
21+
@OutputTimeUnit(TimeUnit.SECONDS)
22+
@State(Scope.Group)
23+
public class AsyncBatchJedisBenchmark extends AbstractBatchJedisBenchmark {
24+
25+
protected Reporter<RedisCommand, Object> reporter(Sender<RedisCommand, Object> sender) {
26+
return AsyncReporter.builder(sender)
27+
.pendingKeepalive(1, TimeUnit.SECONDS)
28+
.build();
29+
}
30+
31+
public static void main(String[] args) throws RunnerException {
32+
Options opt = new OptionsBuilder()
33+
.include(".*" + AsyncBatchJedisBenchmark.class.getSimpleName() + ".*")
34+
.build();
35+
36+
new Runner(opt).run();
37+
}
38+
}

boundedqueue/src/test/java/io/github/tramchamploo/bufferslayer/AsyncReporterTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public void flushIfExceedMaxSize() throws InterruptedException {
6767
assertTrue(countDown.await(300, TimeUnit.MILLISECONDS));
6868
assertEquals(50, sender.sent.size());
6969

70+
// wait for the queue to be released
71+
Thread.sleep(100);
7072
// make sure the queue is released
7173
assertEquals(0, reporter.synchronizer.deque.size());
7274
assertEquals(0, reporter.synchronizer.keyToReady.size());

circle.yml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
machine:
2+
java:
3+
version: oraclejdk8
4+
services:
5+
- redis
6+
7+
dependencies:
8+
pre:
9+
- openssl aes-256-cbc -d -in .buildscript/secret-env-cipher -k $KEY >> ~/.circlerc
10+
override:
11+
- mvn --fail-never dependency:go-offline || true
12+
cache_directories:
13+
- "~/.m2"
14+
15+
test:
16+
post:
17+
- mkdir -p $CIRCLE_TEST_REPORTS/junit/
18+
- find . -type f -regex ".*/target/surefire-reports/.*xml" -exec cp {} $CIRCLE_TEST_REPORTS/junit/ \;
19+
- .buildscript/release.sh

core/src/main/java/io/github/tramchamploo/bufferslayer/internal/CompositeFuture.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public abstract class CompositeFuture extends AbstractFuture<CompositeFuture> {
1414
* When the list is empty, the returned future will be already completed.
1515
*/
1616
public static CompositeFuture all(List<? extends Future<?>> futures) {
17-
return DefaultCompositeFuture.all(futures.toArray(new Future[futures.size()]));
17+
return DefaultCompositeFuture.all(futures.toArray(new Future[0]));
1818
}
1919

2020
/**

core/src/main/java/io/github/tramchamploo/bufferslayer/internal/Promises.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.github.tramchamploo.bufferslayer.Message;
77
import io.github.tramchamploo.bufferslayer.MessageDroppedException;
88
import io.github.tramchamploo.bufferslayer.OverflowStrategy.Strategy;
9+
import java.util.Collections;
910
import java.util.LinkedList;
1011
import java.util.List;
1112

@@ -19,7 +20,12 @@ public static <R> void allSuccess(List<R> result, List<MessagePromise<R>> promis
1920
for (int i = 0; i < result.size(); i++) {
2021
MessagePromise<R> promise = promises.get(i);
2122
R ret = result.get(i);
22-
promise.setSuccess(ret);
23+
if (ret instanceof Throwable) {
24+
promise.setFailure(MessageDroppedException.dropped((Throwable) ret,
25+
Collections.singletonList(promise.message())));
26+
} else {
27+
promise.setSuccess(ret);
28+
}
2329
}
2430
}
2531

0 commit comments

Comments
 (0)