Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit e5f9fe9

Browse files
authored
Merge pull request #168 from reactor/reactor-rabbitmq-165-consumer-arguments
Add arguments in ConsumeOptions
2 parents 4c560fb + a515660 commit e5f9fe9

File tree

3 files changed

+52
-6
lines changed

3 files changed

+52
-6
lines changed

src/main/java/reactor/rabbitmq/ConsumeOptions.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import com.rabbitmq.client.Channel;
2020
import com.rabbitmq.client.Delivery;
21+
import java.util.Collections;
22+
import java.util.Map;
2123
import reactor.core.publisher.FluxSink;
2224

2325
import java.time.Duration;
@@ -60,6 +62,11 @@ public class ConsumeOptions {
6062
private Consumer<Channel> channelCallback = ch -> {
6163
};
6264

65+
/**
66+
* Arguments for the call to <code>basic.consume</code>.
67+
*/
68+
private Map<String, Object> arguments = Collections.emptyMap();
69+
6370
public int getQos() {
6471
return qos;
6572
}
@@ -129,4 +136,13 @@ public ConsumeOptions channelCallback(Consumer<Channel> channelCallback) {
129136
public Consumer<Channel> getChannelCallback() {
130137
return channelCallback;
131138
}
139+
140+
public ConsumeOptions arguments(Map<String, Object> arguments) {
141+
this.arguments = arguments;
142+
return this;
143+
}
144+
145+
public Map<String, Object> getArguments() {
146+
return arguments;
147+
}
132148
}

src/main/java/reactor/rabbitmq/Receiver.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,15 @@ public Flux<Delivery> consumeNoAck(final String queue, ConsumeOptions options) {
128128

129129
completeOnChannelShutdown(channel, emitter);
130130

131-
final String consumerTag = channel.basicConsume(queue, true, options.getConsumerTag(), deliverCallback, cancelCallback);
131+
final String consumerTag = channel.basicConsume(
132+
queue,
133+
true, // auto-ack
134+
options.getConsumerTag(),
135+
false, // noLocal (not supported by RabbitMQ)
136+
false, // not exclusive
137+
options.getArguments(),
138+
deliverCallback,
139+
cancelCallback);
132140
AtomicBoolean cancelled = new AtomicBoolean(false);
133141
LOGGER.info("Consumer {} consuming from {} has been registered", consumerTag, queue);
134142
emitter.onDispose(() -> {
@@ -217,7 +225,15 @@ public Flux<AcknowledgableDelivery> consumeManualAck(final String queue, Consume
217225

218226
completeOnChannelShutdown(channel, emitter);
219227

220-
final String consumerTag = channel.basicConsume(queue, false, options.getConsumerTag(), deliverCallback, cancelCallback);
228+
final String consumerTag = channel.basicConsume(
229+
queue,
230+
false, // no auto-ack
231+
options.getConsumerTag(),
232+
false, // noLocal (not supported by RabbitMQ)
233+
false, // not exclusive
234+
options.getArguments(),
235+
deliverCallback,
236+
cancelCallback);
221237
AtomicBoolean cancelled = new AtomicBoolean(false);
222238
LOGGER.info("Consumer {} consuming from {} has been registered", consumerTag, queue);
223239
emitter.onDispose(() -> {

src/test/java/reactor/rabbitmq/ConnectionRecoveryTests.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,16 @@ void consumeAutoAckRetryOnAck() throws Exception {
170170
AtomicReference<DeliverCallback> deliverCallbackAtomicReference = new AtomicReference<>();
171171

172172
when(mockChannel.basicConsume(
173-
anyString(), anyBoolean(), anyString(), any(DeliverCallback.class), any(CancelCallback.class)
173+
anyString(), // queue
174+
anyBoolean(), // auto-ack
175+
anyString(), // consumer tag
176+
anyBoolean(), // noLocal (always false)
177+
anyBoolean(), // exclusive (always false)
178+
anyMap(), // arguments
179+
any(DeliverCallback.class),
180+
any(CancelCallback.class)
174181
)).thenAnswer(answer -> {
175-
deliverCallbackAtomicReference.set(answer.getArgument(3));
182+
deliverCallbackAtomicReference.set(answer.getArgument(6));
176183
consumerRegisteredLatch.countDown();
177184
return "ctag";
178185
});
@@ -228,9 +235,16 @@ void consumeManualAckRetryOnAck() throws Exception {
228235
AtomicReference<DeliverCallback> deliverCallbackAtomicReference = new AtomicReference<>();
229236

230237
when(mockChannel.basicConsume(
231-
anyString(), anyBoolean(), anyString(), any(DeliverCallback.class), any(CancelCallback.class)
238+
anyString(), // queue
239+
anyBoolean(), // auto-ack
240+
anyString(), // consumer tag
241+
anyBoolean(), // noLocal (always false)
242+
anyBoolean(), // exclusive (always false)
243+
anyMap(), // arguments
244+
any(DeliverCallback.class),
245+
any(CancelCallback.class)
232246
)).thenAnswer(answer -> {
233-
deliverCallbackAtomicReference.set(answer.getArgument(3));
247+
deliverCallbackAtomicReference.set(answer.getArgument(6));
234248
consumerRegisteredLatch.countDown();
235249
return "ctag";
236250
});

0 commit comments

Comments
 (0)