Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit d761e0f

Browse files
committed
ChannelPool - some simple fixes
1 parent eee7e20 commit d761e0f

File tree

3 files changed

+27
-15
lines changed

3 files changed

+27
-15
lines changed

src/main/java/reactor/rabbitmq/ChannelPoolOptions.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,20 @@
1717
package reactor.rabbitmq;
1818

1919
import reactor.core.scheduler.Scheduler;
20-
import reactor.core.scheduler.Schedulers;
2120

2221
public class ChannelPoolOptions {
2322

24-
private int maxSize = 5;
23+
private Integer maxCacheSize;
2524

26-
private Scheduler subscriptionScheduler = Schedulers.newElastic("sender-channel-pool");
25+
private Scheduler subscriptionScheduler;
2726

28-
public ChannelPoolOptions maxSize(int maxSize) {
29-
this.maxSize = maxSize;
27+
public ChannelPoolOptions maxCacheSize(int maxCacheSize) {
28+
this.maxCacheSize = maxCacheSize;
3029
return this;
3130
}
3231

33-
public int getMaxSize() {
34-
return maxSize;
32+
public Integer getMaxCacheSize() {
33+
return maxCacheSize;
3534
}
3635

3736
public ChannelPoolOptions subscriptionScheduler(Scheduler subscriptionScheduler) {

src/main/java/reactor/rabbitmq/LazyChannelPool.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import reactor.core.publisher.Mono;
2222
import reactor.core.publisher.SignalType;
2323
import reactor.core.scheduler.Scheduler;
24+
import reactor.core.scheduler.Schedulers;
2425

2526
import java.io.IOException;
2627
import java.util.ArrayList;
@@ -32,8 +33,8 @@
3233
import static reactor.rabbitmq.ChannelCloseHandlers.SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE;
3334

3435
/**
35-
* This channel pool is lazy initialized. It might even not reach its maximum size {@link ChannelPoolOptions#getMaxSize()} in low-traffic environments.
36-
* It always tries to obtain channel from the pool. However, in case of high-traffic number of channels might exceeds channel pool maximum size.
36+
* This channel pool is lazy initialized. It might even not reach its maximum size {@link ChannelPoolOptions#getMaxCacheSize()} in low-traffic environments.
37+
* It always tries to obtain channel from the pool. However, in case of high-concurrency environment, number of channels might exceeds channel pool maximum size.
3738
*
3839
* Channels are added to the pool after their use {@link ChannelPool#getChannelCloseHandler()} and obtained from the pool when channel is requested {@link ChannelPool#getChannelMono()}.
3940
*
@@ -46,14 +47,19 @@
4647
*/
4748
class LazyChannelPool implements ChannelPool {
4849

50+
private static final int DEFAULT_CHANNEL_POOL_SIZE = 5;
51+
4952
private final Mono<? extends Connection> connectionMono;
5053
private final BlockingQueue<Channel> channelsQueue;
5154
private final Scheduler subscriptionScheduler;
5255

5356
LazyChannelPool(Mono<? extends Connection> connectionMono, ChannelPoolOptions channelPoolOptions) {
54-
this.channelsQueue = new LinkedBlockingQueue<>(channelPoolOptions.getMaxSize());
57+
int channelsQueueCapacity = channelPoolOptions.getMaxCacheSize() == null ?
58+
DEFAULT_CHANNEL_POOL_SIZE : channelPoolOptions.getMaxCacheSize();
59+
this.channelsQueue = new LinkedBlockingQueue<>(channelsQueueCapacity);
5560
this.connectionMono = connectionMono;
56-
this.subscriptionScheduler = channelPoolOptions.getSubscriptionScheduler();
61+
this.subscriptionScheduler = channelPoolOptions.getSubscriptionScheduler() == null ?
62+
Schedulers.newElastic("sender-channel-pool") : channelPoolOptions.getSubscriptionScheduler();
5763
}
5864

5965
public Mono<? extends Channel> getChannelMono() {

src/test/java/reactor/rabbitmq/LazyChannelPoolTests.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.junit.jupiter.api.Test;
88
import reactor.core.publisher.Mono;
99
import reactor.test.StepVerifier;
10+
import reactor.test.scheduler.VirtualTimeScheduler;
1011

1112
import java.io.IOException;
1213
import java.time.Duration;
@@ -34,7 +35,9 @@ void setUp() throws IOException {
3435
@Test
3536
void testChannelPoolLazyInitialization() throws Exception {
3637
int maxChannelPoolSize = 2;
37-
ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions().maxSize(maxChannelPoolSize);
38+
ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions()
39+
.maxCacheSize(maxChannelPoolSize)
40+
.subscriptionScheduler(VirtualTimeScheduler.create());
3841
lazyChannelPool = new LazyChannelPool(Mono.just(connection), channelPoolOptions);
3942

4043
StepVerifier.withVirtualTime(() ->
@@ -67,7 +70,9 @@ void testChannelPoolLazyInitialization() throws Exception {
6770
@Test
6871
void testChannelPoolExceedsMaxPoolSize() throws Exception {
6972
int maxChannelPoolSize = 2;
70-
ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions().maxSize(maxChannelPoolSize);
73+
ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions()
74+
.maxCacheSize(maxChannelPoolSize)
75+
.subscriptionScheduler(VirtualTimeScheduler.create());
7176
lazyChannelPool = new LazyChannelPool(Mono.just(connection), channelPoolOptions);
7277

7378
StepVerifier.withVirtualTime(() ->
@@ -107,7 +112,9 @@ void testChannelPoolExceedsMaxPoolSize() throws Exception {
107112
@Test
108113
void testChannelPool() throws Exception {
109114
int maxChannelPoolSize = 1;
110-
ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions().maxSize(maxChannelPoolSize);
115+
ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions()
116+
.maxCacheSize(maxChannelPoolSize)
117+
.subscriptionScheduler(VirtualTimeScheduler.create());
111118
lazyChannelPool = new LazyChannelPool(Mono.just(connection), channelPoolOptions);
112119

113120
StepVerifier.withVirtualTime(() ->
@@ -170,7 +177,7 @@ private void verifyBasicPublishOnce(Channel channel) throws Exception {
170177
}
171178

172179
private void verifyBasicPublish(Channel channel, int times) throws Exception {
173-
verify(channel, times(times)).basicPublish(anyString(), anyString(), any(AMQP.BasicProperties.class), any(byte[].class));
180+
verify(channel, times(times)).basicPublish(anyString(), anyString(), nullable(AMQP.BasicProperties.class), any(byte[].class));
174181
}
175182

176183
private Channel channel(int channelNumber) {

0 commit comments

Comments
 (0)