Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit 4774baa

Browse files
committed
LazyChannelPool (added subscribeOn, signalType) and added channelPool property to SenderOptions
1 parent d761e0f commit 4774baa

File tree

3 files changed

+20
-41
lines changed

3 files changed

+20
-41
lines changed

src/main/java/reactor/rabbitmq/LazyChannelPool.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@
3333
import static reactor.rabbitmq.ChannelCloseHandlers.SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE;
3434

3535
/**
36-
* This channel pool is lazy initialized. It might even not reach its maximum size {@link ChannelPoolOptions#getMaxCacheSize()} in low-traffic environments.
37-
* It always tries to obtain channel from the pool. However, in case of high-concurrency environment, number of channels might exceeds channel pool maximum size.
36+
* This channel pool is lazy initialized. It might even not reach its maximum size {@link ChannelPoolOptions#getMaxCacheSize()} in low-concurrency environments.
37+
* It always tries to obtain channel from the pool. However, in case of high-concurrency environments, number of channels might exceeds channel pool maximum size.
3838
*
3939
* Channels are added to the pool after their use {@link ChannelPool#getChannelCloseHandler()} and obtained from the pool when channel is requested {@link ChannelPool#getChannelMono()}.
4040
*
4141
* If pool is empty, new channel is created.
42-
* If channel is no longer needed and the channel pool is full (high-traffic), then channel is being closed.
42+
* If channel is no longer needed and the channel pool is full, then channel is being closed.
4343
* If channel is no longer needed and the channel pool has not reached its capacity, then channel is added to the pool.
4444
*
4545
* It uses {@link BlockingQueue} internally in a non-blocking way.
@@ -69,7 +69,8 @@ public Mono<? extends Channel> getChannelMono() {
6969
channel = createChannel(connection);
7070
}
7171
return channel;
72-
}).subscribeOn(subscriptionScheduler);
72+
})
73+
.subscribeOn(subscriptionScheduler);
7374
}
7475

7576
@Override
@@ -78,8 +79,7 @@ public BiConsumer<SignalType, Channel> getChannelCloseHandler() {
7879
if (!channel.isOpen()) {
7980
return;
8081
}
81-
// maybe also close channel if signalType == SignalType.ON_ERROR ?
82-
boolean offer = channelsQueue.offer(channel);
82+
boolean offer = signalType == SignalType.ON_COMPLETE && channelsQueue.offer(channel);
8383
if (!offer) {
8484
SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE.accept(signalType, channel);
8585
}

src/main/java/reactor/rabbitmq/SenderOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,20 @@ public SenderOptions channelCloseHandler(BiConsumer<SignalType, Channel> channel
161161
return this;
162162
}
163163

164+
/**
165+
* Set the channel pool to use to send messages.
166+
*
167+
* @param channelPool
168+
* @return this {@link SenderOptions} instance
169+
* @since 1.1.0
170+
*/
171+
public SenderOptions channelPool(ChannelPool channelPool) {
172+
this.channelMono = channelPool.getChannelMono();
173+
this.channelCloseHandler = channelPool.getChannelCloseHandler();
174+
return this;
175+
}
176+
177+
164178
public SenderOptions resourceManagementChannelMono(Mono<? extends Channel> resourceManagementChannelMono) {
165179
this.resourceManagementChannelMono = resourceManagementChannelMono;
166180
return this;

src/test/java/reactor/rabbitmq/RabbitFluxTests.java

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.rabbitmq.client.*;
2020
import org.junit.jupiter.api.AfterEach;
2121
import org.junit.jupiter.api.BeforeEach;
22-
import org.junit.jupiter.api.Disabled;
2322
import org.junit.jupiter.api.Test;
2423
import org.junit.jupiter.params.ParameterizedTest;
2524
import org.junit.jupiter.params.provider.Arguments;
@@ -590,40 +589,6 @@ public void senderWithCustomChannelCloseHandlerPriority(int nbMessages,
590589
verify(channelCloseHandlerInSendOptions, times(1)).accept(any(SignalType.class), any(Channel.class));
591590
}
592591

593-
@Test // temporary test, jvm warm-up ignored
594-
@Disabled
595-
public void senderWithChannelPoolPerformance() throws Exception {
596-
int nbMessages = 1;
597-
int count = 500;
598-
int expectedSpeedupRatio = 3;
599-
600-
Flux<OutboundMessage> msgFlux = Flux.range(0, nbMessages).map(i -> new OutboundMessage("", queue, "".getBytes()));
601-
602-
ChannelPool channelPool = ChannelPoolFactory.createChannelPool(Mono.just(connection));
603-
SendOptions sendOptions = new SendOptions().channelPool(channelPool);
604-
605-
sender = createSender();
606-
607-
Mono<Void> sendChannelPoolMono = Flux.range(0, count)
608-
.flatMap(i -> sender.send(msgFlux, sendOptions))
609-
.then();
610-
611-
Mono<Void> sendMono = Flux.range(0, count)
612-
.flatMap(i -> sender.send(msgFlux))
613-
.then();
614-
615-
Duration durationSendChannelPool = StepVerifier.create(sendChannelPoolMono).verifyComplete();
616-
Duration durationSendChannelAlwaysCreated = StepVerifier.create(sendMono).verifyComplete();
617-
618-
int totalMessages = nbMessages * count * 2;
619-
StepVerifier.create(consume(queue, totalMessages))
620-
.expectNextCount(totalMessages)
621-
.verifyComplete();
622-
623-
assertTrue(durationSendChannelPool.toMillis() * expectedSpeedupRatio < durationSendChannelAlwaysCreated.toMillis(),
624-
String.format("Sender with channel pool is not %s times faster. Duration with channel pool is %s and without is %s", expectedSpeedupRatio, durationSendChannelPool, durationSendChannelAlwaysCreated));
625-
}
626-
627592
@Test
628593
public void publishConfirms() throws Exception {
629594
int nbMessages = 10;

0 commit comments

Comments
 (0)