Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit b79bf22

Browse files
committed
Add test for channel mono choice when sending messages
References #41
1 parent 54ce9a1 commit b79bf22

File tree

4 files changed

+67
-12
lines changed

4 files changed

+67
-12
lines changed

src/main/java/reactor/rabbitmq/SendOptions.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2018 Pivotal Software Inc, All Rights Reserved.
2+
* Copyright (c) 2018-2019 Pivotal Software Inc, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,11 +29,21 @@
2929
public class SendOptions {
3030

3131
private BiConsumer<Sender.SendContext, Exception> exceptionHandler = new ExceptionHandlers.RetrySendingExceptionHandler(
32-
Duration.ofSeconds(10), Duration.ofMillis(200), ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
32+
Duration.ofSeconds(10), Duration.ofMillis(200), ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
3333
);
3434

35+
/**
36+
* Channel mono to use to send messages.
37+
*
38+
* @since 1.1.0
39+
*/
3540
private Mono<? extends Channel> channelMono;
3641

42+
/**
43+
* Channel closing logic.
44+
*
45+
* @since 1.1.0
46+
*/
3747
private BiConsumer<SignalType, Channel> channelCloseHandler;
3848

3949
public BiConsumer<Sender.SendContext, Exception> getExceptionHandler() {
@@ -45,19 +55,45 @@ public SendOptions exceptionHandler(BiConsumer<Sender.SendContext, Exception> ex
4555
return this;
4656
}
4757

58+
/**
59+
* Return the channel mono to use to send messages.
60+
*
61+
* @return
62+
* @since 1.1.0
63+
*/
4864
public Mono<? extends Channel> getChannelMono() {
4965
return channelMono;
5066
}
5167

68+
/**
69+
* Set the channel mono to use to send messages.
70+
*
71+
* @param channelMono
72+
* @return this {@link SendOptions} instance
73+
* @since 1.1.0
74+
*/
5275
public SendOptions channelMono(Mono<? extends Channel> channelMono) {
5376
this.channelMono = channelMono;
5477
return this;
5578
}
5679

80+
/**
81+
* Return the channel closing logic.
82+
*
83+
* @return
84+
* @since 1.1.0
85+
*/
5786
public BiConsumer<SignalType, Channel> getChannelCloseHandler() {
5887
return channelCloseHandler;
5988
}
6089

90+
/**
91+
* Set the channel closing logic.
92+
*
93+
* @param channelCloseHandler
94+
* @return this {@link SendOptions} instance
95+
* @since 1.1.0
96+
*/
6197
public SendOptions channelCloseHandler(BiConsumer<SignalType, Channel> channelCloseHandler) {
6298
this.channelCloseHandler = channelCloseHandler;
6399
return this;

src/main/java/reactor/rabbitmq/Sender.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017-2018 Pivotal Software Inc, All Rights Reserved.
2+
* Copyright (c) 2017-2019 Pivotal Software Inc, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -110,6 +110,7 @@ public Mono<Void> send(Publisher<OutboundMessage> messages) {
110110
}
111111

112112
public Mono<Void> send(Publisher<OutboundMessage> messages, SendOptions options) {
113+
options = options == null ? new SendOptions() : options;
113114
// TODO using a pool of channels?
114115
// would be much more efficient if send is called very often
115116
// less useful if seldom called, only for long or infinite message flux
@@ -141,12 +142,14 @@ public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMes
141142
}
142143

143144
public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMessage> messages, SendOptions options) {
145+
options = options == null ? new SendOptions() : options;
144146
// TODO using a pool of channels?
145147
// would be much more efficient if send is called very often
146148
// less useful if seldom called, only for long or infinite message flux
147149
final Mono<? extends Channel> currentChannelMono = getChannelMono(options);
148150
final BiConsumer<SignalType, Channel> channelCloseHandler = getChannelCloseHandler(options);
149151

152+
SendOptions sendOptions = options;
150153
return currentChannelMono.map(channel -> {
151154
try {
152155
channel.confirmSelect();
@@ -155,10 +158,11 @@ public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMes
155158
}
156159
return channel;
157160
})
158-
.flatMapMany(channel -> new PublishConfirmOperator(messages, channel, channelCloseHandler, options));
161+
.flatMapMany(channel -> new PublishConfirmOperator(messages, channel, channelCloseHandler, sendOptions));
159162
}
160163

161-
private Mono<? extends Channel> getChannelMono(SendOptions options) {
164+
// package-protected for testing
165+
Mono<? extends Channel> getChannelMono(SendOptions options) {
162166
return Stream.of(options.getChannelMono(), channelMono)
163167
.filter(Objects::nonNull)
164168
.findFirst().orElse(connectionMono.map(CHANNEL_CREATION_FUNCTION));
@@ -511,12 +515,12 @@ public void onSubscribe(Subscription subscription) {
511515
channel.addConfirmListener(new ConfirmListener() {
512516

513517
@Override
514-
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
518+
public void handleAck(long deliveryTag, boolean multiple) {
515519
handleAckNack(deliveryTag, multiple, true);
516520
}
517521

518522
@Override
519-
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
523+
public void handleNack(long deliveryTag, boolean multiple) {
520524
handleAckNack(deliveryTag, multiple, false);
521525
}
522526

src/test/java/reactor/rabbitmq/RabbitFluxTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017-2018 Pivotal Software Inc, All Rights Reserved.
2+
* Copyright (c) 2017-2019 Pivotal Software Inc, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -550,7 +550,7 @@ public void senderWithCustomChannelCloseHandler() throws Exception {
550550
}
551551

552552
@Test
553-
public void senderWithCustomChannelCloseHandlerPriority() throws Exception {
553+
public void senderWithCustomChannelCloseHandlerPriority() {
554554
int nbMessages = 10;
555555
Flux<OutboundMessage> msgFlux = Flux.range(0, nbMessages).map(i -> new OutboundMessage("", queue, "".getBytes()));
556556

src/test/java/reactor/rabbitmq/SenderTests.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2018 Pivotal Software Inc, All Rights Reserved.
2+
* Copyright (c) 2018-2019 Pivotal Software Inc, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,10 +23,12 @@
2323
import org.junit.jupiter.api.AfterEach;
2424
import org.junit.jupiter.api.BeforeEach;
2525
import org.junit.jupiter.api.Test;
26+
import org.mockito.Mockito;
27+
import reactor.core.publisher.Mono;
2628

2729
import java.util.UUID;
2830

29-
import static org.junit.jupiter.api.Assertions.fail;
31+
import static org.junit.jupiter.api.Assertions.*;
3032
import static reactor.rabbitmq.RabbitFlux.createSender;
3133

3234
/**
@@ -64,7 +66,7 @@ public void tearDown() throws Exception {
6466
}
6567

6668
@Test
67-
void cannotReuseChannelOnError() {
69+
void canReuseChannelOnError() {
6870
sender = createSender();
6971
try {
7072
sender.declare(QueueSpecification.queue(queue).autoDelete(true)).block();
@@ -74,4 +76,17 @@ void cannotReuseChannelOnError() {
7476
}
7577
sender.declare(QueueSpecification.queue()).block();
7678
}
79+
80+
@Test
81+
void channelMonoPriority() {
82+
Mono<Channel> senderChannelMono = Mono.just(Mockito.mock(Channel.class));
83+
Mono<Channel> sendChannelMono = Mono.just(Mockito.mock(Channel.class));
84+
sender = createSender();
85+
assertNotNull(sender.getChannelMono(new SendOptions()));
86+
assertSame(sendChannelMono, sender.getChannelMono(new SendOptions().channelMono(sendChannelMono)));
87+
88+
sender = createSender(new SenderOptions().channelMono(senderChannelMono));
89+
assertSame(senderChannelMono, sender.getChannelMono(new SendOptions()));
90+
assertSame(sendChannelMono, sender.getChannelMono(new SendOptions().channelMono(sendChannelMono)));
91+
}
7792
}

0 commit comments

Comments
 (0)