Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit 2a057e6

Browse files
committed
Merge #168 into 1.5.4
Conflicts: build.gradle gradle.properties
2 parents e60d40a + e5f9fe9 commit 2a057e6

File tree

4 files changed

+53
-7
lines changed

4 files changed

+53
-7
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
version=1.5.4-SNAPSHOT
2-
reactorCoreVersion=3.4.11-SNAPSHOT
2+
reactorCoreVersion=3.4.11-SNAPSHOT

src/main/java/reactor/rabbitmq/ConsumeOptions.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import com.rabbitmq.client.Channel;
2020
import com.rabbitmq.client.Delivery;
21+
import java.util.Collections;
22+
import java.util.Map;
2123
import reactor.core.publisher.FluxSink;
2224

2325
import java.time.Duration;
@@ -60,6 +62,11 @@ public class ConsumeOptions {
6062
private Consumer<Channel> channelCallback = ch -> {
6163
};
6264

65+
/**
66+
* Arguments for the call to <code>basic.consume</code>.
67+
*/
68+
private Map<String, Object> arguments = Collections.emptyMap();
69+
6370
public int getQos() {
6471
return qos;
6572
}
@@ -129,4 +136,13 @@ public ConsumeOptions channelCallback(Consumer<Channel> channelCallback) {
129136
public Consumer<Channel> getChannelCallback() {
130137
return channelCallback;
131138
}
139+
140+
public ConsumeOptions arguments(Map<String, Object> arguments) {
141+
this.arguments = arguments;
142+
return this;
143+
}
144+
145+
public Map<String, Object> getArguments() {
146+
return arguments;
147+
}
132148
}

src/main/java/reactor/rabbitmq/Receiver.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,15 @@ public Flux<Delivery> consumeNoAck(final String queue, ConsumeOptions options) {
132132

133133
completeOnChannelShutdown(channel, emitter);
134134

135-
final String consumerTag = channel.basicConsume(queue, true, options.getConsumerTag(), deliverCallback, cancelCallback);
135+
final String consumerTag = channel.basicConsume(
136+
queue,
137+
true, // auto-ack
138+
options.getConsumerTag(),
139+
false, // noLocal (not supported by RabbitMQ)
140+
false, // not exclusive
141+
options.getArguments(),
142+
deliverCallback,
143+
cancelCallback);
136144
AtomicBoolean cancelled = new AtomicBoolean(false);
137145
LOGGER.info("Consumer {} consuming from {} has been registered", consumerTag, queue);
138146
emitter.onDispose(() -> {
@@ -221,7 +229,15 @@ public Flux<AcknowledgableDelivery> consumeManualAck(final String queue, Consume
221229

222230
completeOnChannelShutdown(channel, emitter);
223231

224-
final String consumerTag = channel.basicConsume(queue, false, options.getConsumerTag(), deliverCallback, cancelCallback);
232+
final String consumerTag = channel.basicConsume(
233+
queue,
234+
false, // no auto-ack
235+
options.getConsumerTag(),
236+
false, // noLocal (not supported by RabbitMQ)
237+
false, // not exclusive
238+
options.getArguments(),
239+
deliverCallback,
240+
cancelCallback);
225241
AtomicBoolean cancelled = new AtomicBoolean(false);
226242
LOGGER.info("Consumer {} consuming from {} has been registered", consumerTag, queue);
227243
emitter.onDispose(() -> {

src/test/java/reactor/rabbitmq/ConnectionRecoveryTests.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,16 @@ void consumeAutoAckRetryOnAck() throws Exception {
170170
AtomicReference<DeliverCallback> deliverCallbackAtomicReference = new AtomicReference<>();
171171

172172
when(mockChannel.basicConsume(
173-
anyString(), anyBoolean(), anyString(), any(DeliverCallback.class), any(CancelCallback.class)
173+
anyString(), // queue
174+
anyBoolean(), // auto-ack
175+
anyString(), // consumer tag
176+
anyBoolean(), // noLocal (always false)
177+
anyBoolean(), // exclusive (always false)
178+
anyMap(), // arguments
179+
any(DeliverCallback.class),
180+
any(CancelCallback.class)
174181
)).thenAnswer(answer -> {
175-
deliverCallbackAtomicReference.set(answer.getArgument(3));
182+
deliverCallbackAtomicReference.set(answer.getArgument(6));
176183
consumerRegisteredLatch.countDown();
177184
return "ctag";
178185
});
@@ -228,9 +235,16 @@ void consumeManualAckRetryOnAck() throws Exception {
228235
AtomicReference<DeliverCallback> deliverCallbackAtomicReference = new AtomicReference<>();
229236

230237
when(mockChannel.basicConsume(
231-
anyString(), anyBoolean(), anyString(), any(DeliverCallback.class), any(CancelCallback.class)
238+
anyString(), // queue
239+
anyBoolean(), // auto-ack
240+
anyString(), // consumer tag
241+
anyBoolean(), // noLocal (always false)
242+
anyBoolean(), // exclusive (always false)
243+
anyMap(), // arguments
244+
any(DeliverCallback.class),
245+
any(CancelCallback.class)
232246
)).thenAnswer(answer -> {
233-
deliverCallbackAtomicReference.set(answer.getArgument(3));
247+
deliverCallbackAtomicReference.set(answer.getArgument(6));
234248
consumerRegisteredLatch.countDown();
235249
return "ctag";
236250
});

0 commit comments

Comments
 (0)