Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit 54ce9a1

Browse files
authored
Merge pull request #45 from pmackowski/channel-pool-send-options
Added channelMono and channelCloseHandler properties to SendOptions
2 parents 36d8cb3 + f44018b commit 54ce9a1

File tree

3 files changed

+57
-4
lines changed

3 files changed

+57
-4
lines changed

src/main/java/reactor/rabbitmq/SendOptions.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616

1717
package reactor.rabbitmq;
1818

19+
import com.rabbitmq.client.Channel;
20+
import reactor.core.publisher.Mono;
21+
import reactor.core.publisher.SignalType;
22+
1923
import java.time.Duration;
2024
import java.util.function.BiConsumer;
2125

@@ -28,6 +32,10 @@ public class SendOptions {
2832
Duration.ofSeconds(10), Duration.ofMillis(200), ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
2933
);
3034

35+
private Mono<? extends Channel> channelMono;
36+
37+
private BiConsumer<SignalType, Channel> channelCloseHandler;
38+
3139
public BiConsumer<Sender.SendContext, Exception> getExceptionHandler() {
3240
return exceptionHandler;
3341
}
@@ -37,4 +45,21 @@ public SendOptions exceptionHandler(BiConsumer<Sender.SendContext, Exception> ex
3745
return this;
3846
}
3947

48+
public Mono<? extends Channel> getChannelMono() {
49+
return channelMono;
50+
}
51+
52+
public SendOptions channelMono(Mono<? extends Channel> channelMono) {
53+
this.channelMono = channelMono;
54+
return this;
55+
}
56+
57+
public BiConsumer<SignalType, Channel> getChannelCloseHandler() {
58+
return channelCloseHandler;
59+
}
60+
61+
public SendOptions channelCloseHandler(BiConsumer<SignalType, Channel> channelCloseHandler) {
62+
this.channelCloseHandler = channelCloseHandler;
63+
return this;
64+
}
4065
}

src/main/java/reactor/rabbitmq/Sender.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,9 @@ public Mono<Void> send(Publisher<OutboundMessage> messages, SendOptions options)
113113
// TODO using a pool of channels?
114114
// would be much more efficient if send is called very often
115115
// less useful if seldom called, only for long or infinite message flux
116-
final Mono<? extends Channel> currentChannelMono = getChannelMono();
116+
final Mono<? extends Channel> currentChannelMono = getChannelMono(options);
117117
final BiConsumer<SendContext, Exception> exceptionHandler = options.getExceptionHandler();
118+
final BiConsumer<SignalType, Channel> channelCloseHandler = getChannelCloseHandler(options);
118119

119120
return currentChannelMono.flatMapMany(channel ->
120121
Flux.from(messages)
@@ -143,7 +144,8 @@ public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMes
143144
// TODO using a pool of channels?
144145
// would be much more efficient if send is called very often
145146
// less useful if seldom called, only for long or infinite message flux
146-
final Mono<? extends Channel> currentChannelMono = getChannelMono();
147+
final Mono<? extends Channel> currentChannelMono = getChannelMono(options);
148+
final BiConsumer<SignalType, Channel> channelCloseHandler = getChannelCloseHandler(options);
147149

148150
return currentChannelMono.map(channel -> {
149151
try {
@@ -156,8 +158,15 @@ public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMes
156158
.flatMapMany(channel -> new PublishConfirmOperator(messages, channel, channelCloseHandler, options));
157159
}
158160

159-
private Mono<? extends Channel> getChannelMono() {
160-
return channelMono != null ? channelMono : connectionMono.map(CHANNEL_CREATION_FUNCTION);
161+
private Mono<? extends Channel> getChannelMono(SendOptions options) {
162+
return Stream.of(options.getChannelMono(), channelMono)
163+
.filter(Objects::nonNull)
164+
.findFirst().orElse(connectionMono.map(CHANNEL_CREATION_FUNCTION));
165+
}
166+
167+
private BiConsumer<SignalType, Channel> getChannelCloseHandler(SendOptions options) {
168+
return options.getChannelCloseHandler() != null ?
169+
options.getChannelCloseHandler() : this.channelCloseHandler;
161170
}
162171

163172
public RpcClient rpcClient(String exchange, String routingKey) {

src/test/java/reactor/rabbitmq/RabbitFluxTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,25 @@ public void senderWithCustomChannelCloseHandler() throws Exception {
549549
.verifyComplete();
550550
}
551551

552+
@Test
553+
public void senderWithCustomChannelCloseHandlerPriority() throws Exception {
554+
int nbMessages = 10;
555+
Flux<OutboundMessage> msgFlux = Flux.range(0, nbMessages).map(i -> new OutboundMessage("", queue, "".getBytes()));
556+
557+
SenderChannelCloseHandler channelCloseHandlerInSenderOptions = mock(SenderChannelCloseHandler.class);
558+
SenderChannelCloseHandler channelCloseHandlerInSendOptions = mock(SenderChannelCloseHandler.class);
559+
560+
SenderOptions senderOptions = new SenderOptions().channelCloseHandler(channelCloseHandlerInSenderOptions);
561+
sender = createSender(senderOptions);
562+
SendOptions sendOptions = new SendOptions().channelCloseHandler(channelCloseHandlerInSendOptions);
563+
564+
StepVerifier.create(sender.send(msgFlux, sendOptions))
565+
.verifyComplete();
566+
567+
verify(channelCloseHandlerInSenderOptions, never()).accept(any(SignalType.class), any(Channel.class));
568+
verify(channelCloseHandlerInSendOptions, times(1)).accept(any(SignalType.class), any(Channel.class));
569+
}
570+
552571
@Test
553572
public void publishConfirms() throws Exception {
554573
int nbMessages = 10;

0 commit comments

Comments
 (0)