Skip to content

Commit b914bb3

Browse files
authored
Merge pull request #204 from rabbitmq/rabbitmq-perf-test-198-nack
Add option to nack messages
2 parents 6efb64b + 82f5fcc commit b914bb3

File tree

5 files changed

+117
-2
lines changed

5 files changed

+117
-2
lines changed

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ public class Consumer extends AgentBase implements Runnable {
3535

3636
private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
3737

38+
private static final AckNackOperation ACK_OPERATION =
39+
(ch, envelope, multiple) -> ch.basicAck(envelope.getDeliveryTag(), multiple);
40+
41+
private static final AckNackOperation NACK_OPERATION =
42+
(ch, envelope, multiple) -> ch.basicNack(envelope.getDeliveryTag(), multiple, true);
43+
3844
private volatile ConsumerImpl q;
3945
private final Channel channel;
4046
private final String id;
@@ -64,6 +70,8 @@ public class Consumer extends AgentBase implements Runnable {
6470

6571
private final int pollingInterval;
6672

73+
private final AckNackOperation ackNackOperation;
74+
6775
public Consumer(ConsumerParameters parameters) {
6876
this.channel = parameters.getChannel();
6977
this.id = parameters.getId();
@@ -108,6 +116,13 @@ public Consumer(ConsumerParameters parameters) {
108116
};
109117
}
110118

119+
if (parameters.isNack()) {
120+
this.ackNackOperation = NACK_OPERATION;
121+
} else {
122+
this.ackNackOperation = ACK_OPERATION;
123+
}
124+
125+
111126
this.state = new ConsumerState(parameters.getRateLimit());
112127
this.recoveryProcess = parameters.getRecoveryProcess();
113128
this.recoveryProcess.init(this);
@@ -243,9 +258,9 @@ private void ackIfNecessary(Envelope envelope, int currentMessageCount, final Ch
243258
if (!autoAck) {
244259
dealWithWriteOperation(() -> {
245260
if (multiAckEvery == 0) {
246-
ch.basicAck(envelope.getDeliveryTag(), false);
261+
ackNackOperation.apply(ch, envelope, false);
247262
} else if (currentMessageCount % multiAckEvery == 0) {
248-
ch.basicAck(envelope.getDeliveryTag(), true);
263+
ackNackOperation.apply(ch, envelope, true);
249264
}
250265
}, recoveryProcess);
251266
}
@@ -399,4 +414,11 @@ public void simulateLatency() {
399414
}
400415
}
401416

417+
@FunctionalInterface
418+
private interface AckNackOperation {
419+
420+
void apply(Channel channel, Envelope envelope, boolean multiple) throws IOException;
421+
422+
}
423+
402424
}

src/main/java/com/rabbitmq/perf/ConsumerParameters.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class ConsumerParameters {
4141
private Recovery.RecoveryProcess recoveryProcess;
4242
private ExecutorService executorService;
4343
private boolean polling;
44+
private boolean nack = false;
4445

4546
private int pollingInterval;
4647

@@ -187,4 +188,13 @@ public ConsumerParameters setPollingInterval(int pollingInterval) {
187188
this.pollingInterval = pollingInterval;
188189
return this;
189190
}
191+
192+
public boolean isNack() {
193+
return nack;
194+
}
195+
196+
public ConsumerParameters setNack(boolean nack) {
197+
this.nack = nack;
198+
return this;
199+
}
190200
}

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ public class MulticastParams {
107107

108108
private int pollingInterval = -1;
109109

110+
private boolean nack = false;
111+
110112
public void setExchangeType(String exchangeType) {
111113
this.exchangeType = exchangeType;
112114
}
@@ -396,6 +398,10 @@ public void setPollingInterval(int pollingInterval) {
396398
this.pollingInterval = pollingInterval;
397399
}
398400

401+
public void setNack(boolean nack) {
402+
this.nack = nack;
403+
}
404+
399405
public Producer createProducer(Connection connection, Stats stats, MulticastSet.CompletionHandler completionHandler,
400406
ValueIndicator<Float> rateIndicator, ValueIndicator<Integer> messageSizeIndicator) throws IOException {
401407
Channel channel = connection.createChannel(); //NOSONAR
@@ -471,6 +477,7 @@ public Consumer createConsumer(Connection connection, Stats stats, MulticastSet.
471477
.setExecutorService(executorService)
472478
.setPolling(this.polling)
473479
.setPollingInterval(this.pollingInterval)
480+
.setNack(this.nack)
474481
);
475482
this.topologyHandler.next();
476483
return consumer;

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
167167
boolean polling = hasOption(cmd, "po");
168168
int pollingInterval = intArg(cmd, "pi", -1);
169169

170+
boolean nack = hasOption(cmd, "na");
171+
170172
String uri = strArg(cmd, 'h', "amqp://localhost");
171173
String urisParameter = strArg(cmd, 'H', null);
172174
String outputFile = strArg(cmd, 'o', null);
@@ -298,6 +300,7 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
298300
p.setMessageSizes(variableSizes);
299301
p.setPolling(polling);
300302
p.setPollingInterval(pollingInterval);
303+
p.setNack(nack);
301304

302305
MulticastSet.CompletionHandler completionHandler = getCompletionHandler(p);
303306

@@ -567,6 +570,8 @@ public static Options getOptions() {
567570
options.addOption(new Option("po", "polling",false,"use basic.get to consume messages"));
568571
options.addOption(new Option("pi", "polling-interval",true, "time to wait before polling with basic.get, " +
569572
"in millisecond, default is 0."));
573+
574+
options.addOption(new Option("na", "nack",false,"nack and requeue messages"));
570575
return options;
571576
}
572577

src/test/java/com/rabbitmq/perf/MessageCountTimeLimitAndPublishingIntervalRateTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import org.junit.jupiter.api.TestInfo;
2525
import org.junit.jupiter.params.ParameterizedTest;
2626
import org.junit.jupiter.params.provider.Arguments;
27+
import org.junit.jupiter.params.provider.ArgumentsSource;
2728
import org.junit.jupiter.params.provider.MethodSource;
29+
import org.junit.jupiter.params.provider.ValueSource;
2830
import org.slf4j.Logger;
2931
import org.slf4j.LoggerFactory;
3032

@@ -666,6 +668,75 @@ public void pollingOnlyDoesNotStop() throws Exception {
666668
waitAtMost(20, () -> testIsDone.get());
667669
}
668670

671+
@ParameterizedTest
672+
@ValueSource(strings = {"false", "true"})
673+
public void ackNack(String nackParameter) throws Exception {
674+
boolean nack = Boolean.valueOf(nackParameter);
675+
int messagesCount = 100;
676+
countsAndTimeLimit(0, messagesCount, 0);
677+
params.setConsumerCount(1);
678+
params.setProducerCount(0);
679+
params.setQueueNames(asList("queue"));
680+
params.setNack(nack);
681+
682+
CountDownLatch consumersLatch = new CountDownLatch(1);
683+
AtomicInteger consumerTagCounter = new AtomicInteger(0);
684+
List<Consumer> consumers = new CopyOnWriteArrayList<>();
685+
AtomicInteger acks = new AtomicInteger(0);
686+
AtomicInteger nacks = new AtomicInteger(0);
687+
Channel channel = proxy(Channel.class,
688+
callback("basicConsume", (proxy, method, args) -> {
689+
consumers.add((Consumer) args[2]);
690+
consumersLatch.countDown();
691+
return consumerTagCounter.getAndIncrement() + "";
692+
}),
693+
callback("basicAck", (proxy, method, args) -> {
694+
acks.incrementAndGet();
695+
return null;
696+
}),
697+
callback("basicNack", (proxy, method, args) -> {
698+
nacks.incrementAndGet();
699+
return null;
700+
})
701+
);
702+
703+
Connection connection = proxy(Connection.class,
704+
callback("createChannel", (proxy, method, args) -> channel)
705+
);
706+
707+
MulticastSet multicastSet = getMulticastSet(connectionFactoryThatReturns(connection));
708+
run(multicastSet);
709+
710+
waitForRunToStart();
711+
712+
assertThat("consumer should have been registered by now",
713+
consumersLatch.await(5, TimeUnit.SECONDS), is(true));
714+
715+
waitAtMost(20, () -> consumers.size() == 1);
716+
717+
Collection<Future<?>> sendTasks = new ArrayList<>(consumers.size());
718+
for (Consumer consumer : consumers) {
719+
Collection<Future<?>> tasks = sendMessagesToConsumer(messagesCount, consumer);
720+
sendTasks.addAll(tasks);
721+
}
722+
723+
for (Future<?> latch : sendTasks) {
724+
latch.get(10, TimeUnit.SECONDS);
725+
}
726+
727+
waitAtMost(20, () -> testIsDone.get());
728+
729+
if (nack) {
730+
Assertions.assertThat(acks).hasValue(0);
731+
Assertions.assertThat(nacks).hasValue(messagesCount);
732+
} else {
733+
Assertions.assertThat(acks).hasValue(messagesCount);
734+
Assertions.assertThat(nacks).hasValue(0);
735+
}
736+
737+
738+
}
739+
669740
private Collection<Future<?>> sendMessagesToConsumer(int messagesCount, Consumer consumer) {
670741
final Collection<Future<?>> tasks = new ArrayList<>(messagesCount);
671742
IntStream.range(0, messagesCount).forEach(i -> {

0 commit comments

Comments
 (0)