Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit 0bb04a7

Browse files
committed
JMH Microbenchamrks (conclusions from tests - channel caching improved Sender#send and destroyed Sender#sendWithPublishConfirms)
1 parent 4774baa commit 0bb04a7

File tree

4 files changed

+167
-0
lines changed

4 files changed

+167
-0
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ buildscript {
2525
}
2626
plugins {
2727
id 'org.asciidoctor.convert' version '1.5.6'
28+
id 'me.champeau.gradle.jmh' version '0.4.7'
2829
}
2930

3031
ext {
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package reactor.rabbitmq;
2+
3+
import com.rabbitmq.client.Connection;
4+
import org.openjdk.jmh.annotations.*;
5+
import org.openjdk.jmh.infra.Blackhole;
6+
import reactor.core.publisher.Flux;
7+
8+
import java.util.concurrent.TimeUnit;
9+
10+
@BenchmarkMode(Mode.Throughput)
11+
@OutputTimeUnit(TimeUnit.SECONDS)
12+
@State(Scope.Benchmark)
13+
@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
14+
@Measurement(iterations = 1, time = 5, timeUnit = TimeUnit.SECONDS)
15+
@Fork(1)
16+
@Threads(2)
17+
public class SenderBenchmark {
18+
19+
Connection connection;
20+
Sender sender;
21+
String queue;
22+
Flux<OutboundMessage> msgFlux;
23+
24+
@Param({"1", "10", "100", "1000"})
25+
public int nbMessages;
26+
27+
@Setup
28+
public void setupConnection() throws Exception {
29+
connection = SenderBenchmarkUtils.newConnection();
30+
}
31+
32+
@TearDown
33+
public void closeConnection() throws Exception {
34+
connection.close();
35+
}
36+
37+
@Setup(Level.Iteration)
38+
public void setupSender() throws Exception {
39+
queue = SenderBenchmarkUtils.declareQueue(connection);
40+
sender = RabbitFlux.createSender();
41+
msgFlux = SenderBenchmarkUtils.outboundMessageFlux(queue, nbMessages);
42+
}
43+
44+
@TearDown(Level.Iteration)
45+
public void tearDownSender() throws Exception {
46+
SenderBenchmarkUtils.deleteQueue(connection, queue);
47+
if (sender != null) {
48+
sender.close();
49+
}
50+
}
51+
52+
@Benchmark
53+
public void send(Blackhole blackhole) {
54+
blackhole.consume(sender.send(msgFlux).block());
55+
}
56+
57+
@Benchmark
58+
public void sendWithPublishConfirms(Blackhole blackhole) {
59+
blackhole.consume(sender.sendWithPublishConfirms(msgFlux).blockLast());
60+
}
61+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package reactor.rabbitmq;
2+
3+
import com.rabbitmq.client.Channel;
4+
import com.rabbitmq.client.Connection;
5+
import com.rabbitmq.client.ConnectionFactory;
6+
import reactor.core.publisher.Flux;
7+
8+
import java.util.UUID;
9+
10+
public class SenderBenchmarkUtils {
11+
12+
public static Flux<OutboundMessage> outboundMessageFlux(String queue, int nbMessages) {
13+
return Flux.range(0, nbMessages).map(i -> new OutboundMessage("", queue, "".getBytes()));
14+
}
15+
16+
public static Connection newConnection() throws Exception {
17+
ConnectionFactory connectionFactory = new ConnectionFactory();
18+
connectionFactory.useNio();
19+
return connectionFactory.newConnection();
20+
}
21+
22+
public static String declareQueue(Connection connection) throws Exception {
23+
String queueName = UUID.randomUUID().toString();
24+
Channel channel = connection.createChannel();
25+
String queue = channel.queueDeclare(queueName, false, false, false, null).getQueue();
26+
channel.close();
27+
return queue;
28+
}
29+
30+
public static void deleteQueue(Connection connection, String queue) throws Exception {
31+
Channel channel = connection.createChannel();
32+
channel.queueDelete(queue);
33+
channel.close();
34+
}
35+
36+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package reactor.rabbitmq;
2+
3+
import com.rabbitmq.client.Connection;
4+
import org.openjdk.jmh.annotations.*;
5+
import org.openjdk.jmh.infra.Blackhole;
6+
import reactor.core.publisher.Flux;
7+
import reactor.core.publisher.Mono;
8+
9+
import java.util.concurrent.TimeUnit;
10+
11+
@BenchmarkMode(Mode.Throughput)
12+
@OutputTimeUnit(TimeUnit.SECONDS)
13+
@State(Scope.Benchmark)
14+
@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
15+
@Measurement(iterations = 1, time = 5, timeUnit = TimeUnit.SECONDS)
16+
@Fork(1)
17+
@Threads(2)
18+
public class SenderWithLazyChannelPoolBenchmark {
19+
20+
Connection connection;
21+
ChannelPool channelPool;
22+
Sender sender;
23+
String queue;
24+
Flux<OutboundMessage> msgFlux;
25+
26+
@Param({"1", "10", "25"})
27+
public int channelPoolSize;
28+
29+
@Param({"1", "10", "100", "1000"})
30+
public int nbMessages;
31+
32+
@Setup
33+
public void setupConnection() throws Exception {
34+
connection = SenderBenchmarkUtils.newConnection();
35+
channelPool = ChannelPoolFactory.createChannelPool(Mono.just(connection), new ChannelPoolOptions().maxCacheSize(channelPoolSize));
36+
}
37+
38+
@TearDown
39+
public void closeConnection() throws Exception {
40+
connection.close();
41+
channelPool.close();
42+
}
43+
44+
@Setup(Level.Iteration)
45+
public void setupSender() throws Exception {
46+
queue = SenderBenchmarkUtils.declareQueue(connection);
47+
sender = RabbitFlux.createSender();
48+
msgFlux = SenderBenchmarkUtils.outboundMessageFlux(queue, nbMessages);
49+
}
50+
51+
@TearDown(Level.Iteration)
52+
public void tearDownSender() throws Exception {
53+
SenderBenchmarkUtils.deleteQueue(connection, queue);
54+
if (sender != null) {
55+
sender.close();
56+
}
57+
}
58+
59+
// @Benchmark
60+
public void send(Blackhole blackhole) {
61+
blackhole.consume(sender.send(msgFlux, new SendOptions().channelPool(channelPool)).block());
62+
}
63+
64+
// @Benchmark
65+
public void sendWithPublishConfirms(Blackhole blackhole) {
66+
blackhole.consume(sender.sendWithPublishConfirms(msgFlux, new SendOptions().channelPool(channelPool)).blockLast());
67+
}
68+
69+
}

0 commit comments

Comments
 (0)