Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit 8bb9e0b

Browse files
authored
Merge pull request #47 from pmackowski/sender-channel-pool
Sender channel pool
2 parents 8033d81 + 3a083cd commit 8bb9e0b

File tree

12 files changed

+620
-26
lines changed

12 files changed

+620
-26
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ buildscript {
2525
}
2626
plugins {
2727
id 'org.asciidoctor.convert' version '1.5.6'
28+
id 'me.champeau.gradle.jmh' version '0.4.7'
2829
}
2930

3031
ext {
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package reactor.rabbitmq;
2+
3+
import com.rabbitmq.client.Connection;
4+
import org.openjdk.jmh.annotations.*;
5+
import org.openjdk.jmh.infra.Blackhole;
6+
import reactor.core.publisher.Flux;
7+
8+
import java.util.concurrent.TimeUnit;
9+
10+
@BenchmarkMode(Mode.Throughput)
11+
@OutputTimeUnit(TimeUnit.SECONDS)
12+
@State(Scope.Benchmark)
13+
@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
14+
@Measurement(iterations = 1, time = 5, timeUnit = TimeUnit.SECONDS)
15+
@Fork(1)
16+
@Threads(2)
17+
public class SenderBenchmark {
18+
19+
Connection connection;
20+
Sender sender;
21+
String queue;
22+
Flux<OutboundMessage> msgFlux;
23+
24+
@Param({"1", "10", "100", "1000"})
25+
public int nbMessages;
26+
27+
@Setup
28+
public void setupConnection() throws Exception {
29+
connection = SenderBenchmarkUtils.newConnection();
30+
}
31+
32+
@TearDown
33+
public void closeConnection() throws Exception {
34+
connection.close();
35+
}
36+
37+
@Setup(Level.Iteration)
38+
public void setupSender() throws Exception {
39+
queue = SenderBenchmarkUtils.declareQueue(connection);
40+
sender = RabbitFlux.createSender();
41+
msgFlux = SenderBenchmarkUtils.outboundMessageFlux(queue, nbMessages);
42+
}
43+
44+
@TearDown(Level.Iteration)
45+
public void tearDownSender() throws Exception {
46+
SenderBenchmarkUtils.deleteQueue(connection, queue);
47+
if (sender != null) {
48+
sender.close();
49+
}
50+
}
51+
52+
@Benchmark
53+
public void send(Blackhole blackhole) {
54+
blackhole.consume(sender.send(msgFlux).block());
55+
}
56+
57+
@Benchmark
58+
public void sendWithPublishConfirms(Blackhole blackhole) {
59+
blackhole.consume(sender.sendWithPublishConfirms(msgFlux).blockLast());
60+
}
61+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package reactor.rabbitmq;
2+
3+
import com.rabbitmq.client.Channel;
4+
import com.rabbitmq.client.Connection;
5+
import com.rabbitmq.client.ConnectionFactory;
6+
import reactor.core.publisher.Flux;
7+
8+
import java.util.UUID;
9+
10+
public class SenderBenchmarkUtils {
11+
12+
public static Flux<OutboundMessage> outboundMessageFlux(String queue, int nbMessages) {
13+
return Flux.range(0, nbMessages).map(i -> new OutboundMessage("", queue, "".getBytes()));
14+
}
15+
16+
public static Connection newConnection() throws Exception {
17+
ConnectionFactory connectionFactory = new ConnectionFactory();
18+
connectionFactory.useNio();
19+
return connectionFactory.newConnection();
20+
}
21+
22+
public static String declareQueue(Connection connection) throws Exception {
23+
String queueName = UUID.randomUUID().toString();
24+
Channel channel = connection.createChannel();
25+
String queue = channel.queueDeclare(queueName, false, false, false, null).getQueue();
26+
channel.close();
27+
return queue;
28+
}
29+
30+
public static void deleteQueue(Connection connection, String queue) throws Exception {
31+
Channel channel = connection.createChannel();
32+
channel.queueDelete(queue);
33+
channel.close();
34+
}
35+
36+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package reactor.rabbitmq;
2+
3+
import com.rabbitmq.client.Connection;
4+
import org.openjdk.jmh.annotations.*;
5+
import org.openjdk.jmh.infra.Blackhole;
6+
import reactor.core.publisher.Flux;
7+
import reactor.core.publisher.Mono;
8+
9+
import java.util.concurrent.TimeUnit;
10+
11+
@BenchmarkMode(Mode.Throughput)
12+
@OutputTimeUnit(TimeUnit.SECONDS)
13+
@State(Scope.Benchmark)
14+
@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
15+
@Measurement(iterations = 1, time = 5, timeUnit = TimeUnit.SECONDS)
16+
@Fork(1)
17+
@Threads(2)
18+
public class SenderWithLazyChannelPoolBenchmark {
19+
20+
Connection connection;
21+
ChannelPool channelPool;
22+
Sender sender;
23+
String queue;
24+
Flux<OutboundMessage> msgFlux;
25+
26+
@Param({"1", "10", "25"})
27+
public int channelPoolSize;
28+
29+
@Param({"1", "10", "100", "1000"})
30+
public int nbMessages;
31+
32+
@Setup
33+
public void setupConnection() throws Exception {
34+
connection = SenderBenchmarkUtils.newConnection();
35+
channelPool = ChannelPoolFactory.createChannelPool(Mono.just(connection), new ChannelPoolOptions().maxCacheSize(channelPoolSize));
36+
}
37+
38+
@TearDown
39+
public void closeConnection() throws Exception {
40+
connection.close();
41+
channelPool.close();
42+
}
43+
44+
@Setup(Level.Iteration)
45+
public void setupSender() throws Exception {
46+
queue = SenderBenchmarkUtils.declareQueue(connection);
47+
sender = RabbitFlux.createSender();
48+
msgFlux = SenderBenchmarkUtils.outboundMessageFlux(queue, nbMessages);
49+
}
50+
51+
@TearDown(Level.Iteration)
52+
public void tearDownSender() throws Exception {
53+
SenderBenchmarkUtils.deleteQueue(connection, queue);
54+
if (sender != null) {
55+
sender.close();
56+
}
57+
}
58+
59+
@Benchmark
60+
public void send(Blackhole blackhole) {
61+
blackhole.consume(sender.send(msgFlux, new SendOptions().channelPool(channelPool)).block());
62+
}
63+
64+
@Benchmark
65+
public void sendWithPublishConfirms(Blackhole blackhole) {
66+
blackhole.consume(sender.sendWithPublishConfirms(msgFlux, new SendOptions().channelPool(channelPool)).blockLast());
67+
}
68+
69+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright (c) 2018-2019 Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.rabbitmq;
18+
19+
import com.rabbitmq.client.Channel;
20+
import reactor.core.publisher.Mono;
21+
import reactor.core.publisher.SignalType;
22+
23+
import java.util.function.BiConsumer;
24+
25+
public interface ChannelPool {
26+
27+
Mono<? extends Channel> getChannelMono();
28+
29+
BiConsumer<SignalType, Channel> getChannelCloseHandler();
30+
31+
void close();
32+
33+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright (c) 2018-2019 Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.rabbitmq;
18+
19+
import com.rabbitmq.client.Connection;
20+
import reactor.core.publisher.Mono;
21+
22+
public class ChannelPoolFactory {
23+
24+
public static ChannelPool createChannelPool(Mono<? extends Connection> connectionMono) {
25+
return createChannelPool(connectionMono, new ChannelPoolOptions());
26+
}
27+
28+
public static ChannelPool createChannelPool(Mono<? extends Connection> connectionMono, ChannelPoolOptions channelPoolOptions) {
29+
return new LazyChannelPool(connectionMono, channelPoolOptions);
30+
}
31+
32+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) 2018-2019 Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.rabbitmq;
18+
19+
import reactor.core.scheduler.Scheduler;
20+
21+
public class ChannelPoolOptions {
22+
23+
private Integer maxCacheSize;
24+
25+
private Scheduler subscriptionScheduler;
26+
27+
public ChannelPoolOptions maxCacheSize(int maxCacheSize) {
28+
this.maxCacheSize = maxCacheSize;
29+
return this;
30+
}
31+
32+
public Integer getMaxCacheSize() {
33+
return maxCacheSize;
34+
}
35+
36+
public ChannelPoolOptions subscriptionScheduler(Scheduler subscriptionScheduler) {
37+
this.subscriptionScheduler = subscriptionScheduler;
38+
return this;
39+
}
40+
41+
public Scheduler getSubscriptionScheduler() {
42+
return subscriptionScheduler;
43+
}
44+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright (c) 2018-2019 Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.rabbitmq;
18+
19+
import com.rabbitmq.client.Channel;
20+
import com.rabbitmq.client.Connection;
21+
import reactor.core.publisher.Mono;
22+
import reactor.core.publisher.SignalType;
23+
import reactor.core.scheduler.Scheduler;
24+
import reactor.core.scheduler.Schedulers;
25+
26+
import java.io.IOException;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.concurrent.BlockingQueue;
30+
import java.util.concurrent.LinkedBlockingQueue;
31+
import java.util.function.BiConsumer;
32+
33+
import static reactor.rabbitmq.ChannelCloseHandlers.SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE;
34+
35+
/**
36+
* This channel pool is lazy initialized. It might even not reach its maximum size {@link ChannelPoolOptions#getMaxCacheSize()} in low-concurrency environments.
37+
* It always tries to obtain channel from the pool. However, in case of high-concurrency environments, number of channels might exceeds channel pool maximum size.
38+
*
39+
* Channels are added to the pool after their use {@link ChannelPool#getChannelCloseHandler()} and obtained from the pool when channel is requested {@link ChannelPool#getChannelMono()}.
40+
*
41+
* If pool is empty, new channel is created.
42+
* If channel is no longer needed and the channel pool is full, then channel is being closed.
43+
* If channel is no longer needed and the channel pool has not reached its capacity, then channel is added to the pool.
44+
*
45+
* It uses {@link BlockingQueue} internally in a non-blocking way.
46+
*
47+
*/
48+
class LazyChannelPool implements ChannelPool {
49+
50+
private static final int DEFAULT_CHANNEL_POOL_SIZE = 5;
51+
52+
private final Mono<? extends Connection> connectionMono;
53+
private final BlockingQueue<Channel> channelsQueue;
54+
private final Scheduler subscriptionScheduler;
55+
56+
LazyChannelPool(Mono<? extends Connection> connectionMono, ChannelPoolOptions channelPoolOptions) {
57+
int channelsQueueCapacity = channelPoolOptions.getMaxCacheSize() == null ?
58+
DEFAULT_CHANNEL_POOL_SIZE : channelPoolOptions.getMaxCacheSize();
59+
this.channelsQueue = new LinkedBlockingQueue<>(channelsQueueCapacity);
60+
this.connectionMono = connectionMono;
61+
this.subscriptionScheduler = channelPoolOptions.getSubscriptionScheduler() == null ?
62+
Schedulers.newElastic("sender-channel-pool") : channelPoolOptions.getSubscriptionScheduler();
63+
}
64+
65+
public Mono<? extends Channel> getChannelMono() {
66+
return connectionMono.map(connection -> {
67+
Channel channel = channelsQueue.poll();
68+
if (channel == null) {
69+
channel = createChannel(connection);
70+
}
71+
return channel;
72+
})
73+
.subscribeOn(subscriptionScheduler);
74+
}
75+
76+
@Override
77+
public BiConsumer<SignalType, Channel> getChannelCloseHandler() {
78+
return (signalType, channel) -> {
79+
if (!channel.isOpen()) {
80+
return;
81+
}
82+
boolean offer = signalType == SignalType.ON_COMPLETE && channelsQueue.offer(channel);
83+
if (!offer) {
84+
SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE.accept(signalType, channel);
85+
}
86+
};
87+
}
88+
89+
@Override
90+
public void close() {
91+
List<Channel> channels = new ArrayList<>();
92+
channelsQueue.drainTo(channels);
93+
channels.forEach(channel -> {
94+
SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE.accept(SignalType.ON_COMPLETE, channel);
95+
});
96+
}
97+
98+
private Channel createChannel(Connection connection) {
99+
try {
100+
return connection.createChannel();
101+
} catch (IOException e) {
102+
throw new RabbitFluxException("Error while creating channel", e);
103+
}
104+
}
105+
106+
}

0 commit comments

Comments
 (0)