Skip to content

Commit 1085d50

Browse files
authored
Merge pull request #274 from RafalGoslawski/add-nack-requeue-false
Added --requeue (-re) option to set the requeue flag for nack operation
2 parents 7be2502 + 9fb1f9c commit 1085d50

File tree

4 files changed

+27
-6
lines changed

4 files changed

+27
-6
lines changed

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ public class Consumer extends AgentBase implements Runnable {
3636
private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
3737

3838
private static final AckNackOperation ACK_OPERATION =
39-
(ch, envelope, multiple) -> ch.basicAck(envelope.getDeliveryTag(), multiple);
39+
(ch, envelope, multiple, requeue) -> ch.basicAck(envelope.getDeliveryTag(), multiple);
4040

4141
private static final AckNackOperation NACK_OPERATION =
42-
(ch, envelope, multiple) -> ch.basicNack(envelope.getDeliveryTag(), multiple, true);
42+
(ch, envelope, multiple, requeue) -> ch.basicNack(envelope.getDeliveryTag(), multiple, requeue);
4343
static final String STOP_REASON_CONSUMER_REACHED_MESSAGE_LIMIT = "Consumer reached message limit";
4444

4545
private volatile ConsumerImpl q;
@@ -48,6 +48,7 @@ public class Consumer extends AgentBase implements Runnable {
4848
private final int txSize;
4949
private final boolean autoAck;
5050
private final int multiAckEvery;
51+
private final boolean requeue;
5152
private final Stats stats;
5253
private final int msgLimit;
5354
private final Map<String, String> consumerTagBranchMap = Collections.synchronizedMap(new HashMap<>());
@@ -81,6 +82,7 @@ public Consumer(ConsumerParameters parameters) {
8182
this.txSize = parameters.getTxSize();
8283
this.autoAck = parameters.isAutoAck();
8384
this.multiAckEvery = parameters.getMultiAckEvery();
85+
this.requeue = parameters.isRequeue();
8486
this.stats = parameters.getStats();
8587
this.msgLimit = parameters.getMsgLimit();
8688
this.timestampProvider = parameters.getTimestampProvider();
@@ -266,9 +268,9 @@ private void ackIfNecessary(Envelope envelope, int currentMessageCount, final Ch
266268
if (!autoAck) {
267269
dealWithWriteOperation(() -> {
268270
if (multiAckEvery == 0) {
269-
ackNackOperation.apply(ch, envelope, false);
271+
ackNackOperation.apply(ch, envelope, false, requeue);
270272
} else if (currentMessageCount % multiAckEvery == 0) {
271-
ackNackOperation.apply(ch, envelope, true);
273+
ackNackOperation.apply(ch, envelope, true, requeue);
272274
}
273275
}, recoveryProcess);
274276
}
@@ -473,7 +475,7 @@ public boolean simulateLatency() {
473475
@FunctionalInterface
474476
private interface AckNackOperation {
475477

476-
void apply(Channel channel, Envelope envelope, boolean multiple) throws IOException;
478+
void apply(Channel channel, Envelope envelope, boolean multiple, boolean requeue) throws IOException;
477479

478480
}
479481

src/main/java/com/rabbitmq/perf/ConsumerParameters.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class ConsumerParameters {
4343
private ExecutorService executorService;
4444
private boolean polling;
4545
private boolean nack = false;
46+
private boolean requeue = true;
4647

4748
private int pollingInterval;
4849

@@ -201,6 +202,15 @@ public ConsumerParameters setNack(boolean nack) {
201202
return this;
202203
}
203204

205+
public boolean isRequeue() {
206+
return requeue;
207+
}
208+
209+
public ConsumerParameters setRequeue(boolean requeue) {
210+
this.requeue = requeue;
211+
return this;
212+
}
213+
204214
public ConsumerParameters setConsumerArguments(Map<String, Object> consumerArguments) {
205215
this.consumerArguments = consumerArguments;
206216
return this;

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public class MulticastParams {
112112
private int pollingInterval = -1;
113113

114114
private boolean nack = false;
115+
private boolean requeue = true;
115116

116117
private boolean jsonBody = false;
117118
private int bodyFieldCount = 1000;
@@ -437,6 +438,10 @@ public void setNack(boolean nack) {
437438
this.nack = nack;
438439
}
439440

441+
public void setRequeue(boolean requeue) {
442+
this.requeue = requeue;
443+
}
444+
440445
public void setJsonBody(boolean jsonBody) {
441446
this.jsonBody = jsonBody;
442447
}
@@ -539,6 +544,7 @@ public Consumer createConsumer(Connection connection,
539544
.setPolling(this.polling)
540545
.setPollingInterval(this.pollingInterval)
541546
.setNack(this.nack)
547+
.setRequeue(this.requeue)
542548
.setConsumerArguments(this.consumerArguments)
543549
);
544550
this.topologyHandler.next();

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
181181
int pollingInterval = intArg(cmd, "pi", -1);
182182

183183
boolean nack = hasOption(cmd, "na");
184+
boolean requeue = boolArg(cmd, "re", true);
184185

185186
boolean jsonBody = hasOption(cmd, "jb");
186187
int bodyFieldCount = intArg(cmd, "bfc", 1000);
@@ -372,6 +373,7 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
372373
p.setPolling(polling);
373374
p.setPollingInterval(pollingInterval);
374375
p.setNack(nack);
376+
p.setRequeue(requeue);
375377
p.setJsonBody(jsonBody);
376378
p.setBodyFieldCount(bodyFieldCount);
377379
p.setBodyCount(bodyCount);
@@ -680,7 +682,8 @@ public static Options getOptions() {
680682
options.addOption(new Option("pi", "polling-interval",true, "time to wait before polling with basic.get, " +
681683
"in millisecond, default is 0."));
682684

683-
options.addOption(new Option("na", "nack",false,"nack and requeue messages"));
685+
options.addOption(new Option("na", "nack",false,"nack messages"));
686+
options.addOption(new Option("re", "requeue",true,"should nacked messages be requeued"));
684687

685688
options.addOption(new Option("jb", "json-body", false, "generate a random JSON document for message body. " +
686689
"Use with --size."));

0 commit comments

Comments
 (0)