Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit 36d8cb3

Browse files
committed
Document custom channel mono for sending
References #41
1 parent 1f62660 commit 36d8cb3

File tree

5 files changed

+87
-20
lines changed

5 files changed

+87
-20
lines changed

src/docs/asciidoc/api-guide.adoc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ Each instance of `Receiver` is associated with a single instance
260260
of `Connection` created by the options-provided `ConnectionFactory`.
261261

262262
A receiver is created with an instance of receiver configuration options
263-
`reactor.rabbitmq.ReceiverOptions`. The properties of `SenderOptions`
263+
`reactor.rabbitmq.ReceiverOptions`. The properties of `ReceiverOptions`
264264
contains the `ConnectionFactory` that creates connections to the broker
265265
and a Reactor `Scheduler` used for the connection creation.
266266

@@ -442,6 +442,12 @@ include::{test-examples}/AdvancedFeatures.java[tag=shared-connection]
442442
Be aware that closing the first `Sender` or `Receiver` will close the underlying
443443
AMQP connection for all the others.
444444

445+
==== Creating channels with a custom `Mono` in `Sender`
446+
447+
`SenderOptions` provides a `channelMono` property that is called when creating the `Channel` used
448+
in sending methods. This is a convenient way to provide any custom logic when
449+
creating the `Channel`, e.g. retry logic.
450+
445451
==== Threading considerations for resource management
446452

447453
A `Sender` instance maintains a `Mono<Channel>` to manage resources and by default

src/docs/asciidoc/new.adoc

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,18 @@
11
== New & Noteworthy
22

33
[[new]]
4+
5+
=== What's new in Reactor RabbitMQ 1.1
6+
7+
* Let user provide `Mono<Channel>` for sending messages
8+
49
=== What's new in Reactor RabbitMQ 1.0
510

6-
* 1.0.0.M1
7-
** Introduction of the `Sender` and `Receiver` API
8-
* 1.0.0.M2
9-
** Support for request/reply
10-
** Exception handling
11-
* 1.0.0.M3
12-
** Bump Reactor to 3.2.0.RELEASE
13-
** Let user provide `Mono<Channel>` for resource management
14-
* 1.0.0.RC1
15-
** Bump Reactor to 3.2.1.RELEASE
16-
** Bump RabbitMQ Java client to 5.5.0
17-
* 1.0.0.RC2
18-
** Bump Reactor to 3.2.3.RELEASE
19-
** Bump RabbitMQ Java client to 5.5.1 for better topology recovery support
20-
** Complete receiving flux on channel termination
21-
** Handle error signal of `connectionMono` subscription to enable proper error handling
22-
** Rename `ReactorRabbitMq` to `RabbitFlux` and `ReactorRabbitMqException` to `RabbitFluxException`
11+
* Introduction of the `Sender` and `Receiver` API
12+
* Support for request/reply
13+
* Exception handling
14+
* Let user provide `Mono<Channel>` for resource management
15+
* Complete receiving flux on channel termination
16+
* Handle error signal of `connectionMono` subscription to enable proper error handling
17+
* Use Reactor 3.2.3.RELEASE
18+
* Use Java client 5.5.1

src/main/java/reactor/rabbitmq/ChannelCloseHandlers.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright (c) 2018-2019 Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package reactor.rabbitmq;
218

319
import com.rabbitmq.client.Channel;
@@ -7,8 +23,20 @@
723

824
import java.util.function.BiConsumer;
925

26+
/**
27+
* Helper class to close channels.
28+
*
29+
* @since 1.1.0
30+
*/
1031
public class ChannelCloseHandlers {
1132

33+
public static final BiConsumer<SignalType, Channel> SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE = new SenderChannelCloseHandler();
34+
35+
/**
36+
* Default closing strategy in {@link Sender}.
37+
* <p>
38+
* Closes the channel and emits a warn-level log message in case of error.
39+
*/
1240
public static class SenderChannelCloseHandler implements BiConsumer<SignalType, Channel> {
1341

1442
private static final Logger LOGGER = LoggerFactory.getLogger(SenderChannelCloseHandler.class);

src/main/java/reactor/rabbitmq/Sender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public Sender(SenderOptions options) {
9292
.cache();
9393
this.channelMono = options.getChannelMono();
9494
this.channelCloseHandler = options.getChannelCloseHandler() == null ?
95-
new ChannelCloseHandlers.SenderChannelCloseHandler() :
95+
ChannelCloseHandlers.SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE :
9696
options.getChannelCloseHandler();
9797
this.privateResourceManagementScheduler = options.getResourceManagementScheduler() == null;
9898
this.resourceManagementScheduler = options.getResourceManagementScheduler() == null ?

src/main/java/reactor/rabbitmq/SenderOptions.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017-2018 Pivotal Software Inc, All Rights Reserved.
2+
* Copyright (c) 2017-2019 Pivotal Software Inc, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,8 +39,19 @@ public class SenderOptions {
3939

4040
private Mono<? extends Connection> connectionMono;
4141

42+
/**
43+
* Channel mono used in sending methods.
44+
*
45+
* @since 1.1.0
46+
*/
4247
private Mono<? extends Channel> channelMono;
4348

49+
/**
50+
* Logic to close channels.
51+
*
52+
* @see ChannelCloseHandlers.SenderChannelCloseHandler
53+
* @since 1.1.0
54+
*/
4455
private BiConsumer<SignalType, Channel> channelCloseHandler;
4556

4657
private Scheduler resourceManagementScheduler;
@@ -106,19 +117,45 @@ public Mono<? extends Connection> getConnectionMono() {
106117
return connectionMono;
107118
}
108119

120+
/**
121+
* Sets the channel mono to use in send methods.
122+
*
123+
* @param channelMono the channel mono to use
124+
* @return this {@link SenderOptions} instance
125+
* @since 1.1.0
126+
*/
109127
public SenderOptions channelMono(Mono<? extends Channel> channelMono) {
110128
this.channelMono = channelMono;
111129
return this;
112130
}
113131

132+
/**
133+
* Returns the channel mono to use in send methods.
134+
*
135+
* @return
136+
* @since 1.1.0
137+
*/
114138
public Mono<? extends Channel> getChannelMono() {
115139
return channelMono;
116140
}
117141

142+
/**
143+
* Returns the channel closing logic.
144+
*
145+
* @return
146+
* @since 1.1.0
147+
*/
118148
public BiConsumer<SignalType, Channel> getChannelCloseHandler() {
119149
return channelCloseHandler;
120150
}
121151

152+
/**
153+
* Set the channel closing logic.
154+
*
155+
* @param channelCloseHandler the closing logic
156+
* @return this {@link SenderOptions} instance
157+
* @since 1.1.0
158+
*/
122159
public SenderOptions channelCloseHandler(BiConsumer<SignalType, Channel> channelCloseHandler) {
123160
this.channelCloseHandler = channelCloseHandler;
124161
return this;

0 commit comments

Comments
 (0)