Skip to content

Commit 2e63a08

Browse files
authored
Merge pull request #288 from rabbitmq/cmessages
Add --cmessages to limit the number of consumed messages
2 parents 03fff38 + 6e04bef commit 2e63a08

File tree

3 files changed

+46
-2
lines changed

3 files changed

+46
-2
lines changed

ci/start-broker.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ wait_for_message() {
1414

1515
make -C "${PWD}"/tls-gen/basic
1616

17+
rm -rf rabbitmq-configuration
1718
mkdir -p rabbitmq-configuration/tls
1819
cp -R "${PWD}"/tls-gen/basic/result/* rabbitmq-configuration/tls
1920
chmod o+r rabbitmq-configuration/tls/*

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,13 @@ void setTcpNoDelay(String input) throws Exception {
705705
converter = Converters.GreaterThanOrEqualToZeroIntegerTypeConverter.class)
706706
private long pmessages;
707707

708+
@CommandLine.Option(
709+
names = {"--cmessages", "-D"},
710+
description = "consumer message count, default is 0 (no limit)",
711+
defaultValue = "0",
712+
converter = Converters.GreaterThanOrEqualToZeroIntegerTypeConverter.class)
713+
private long cmessages;
714+
708715
private MetricsCollector metricsCollector;
709716
private PerformanceMetrics performanceMetrics;
710717
private List<Monitoring> monitorings;
@@ -1094,10 +1101,12 @@ public Integer call() throws Exception {
10941101

10951102
CompletionHandler completionHandler;
10961103
ConcurrentMap<String, Integer> completionReasons = new ConcurrentHashMap<>();
1097-
if (isRunTimeLimited() || this.pmessages > 0) {
1104+
if (isRunTimeLimited() || this.pmessages > 0 || this.cmessages > 0) {
1105+
int countLimit = this.pmessages > 0 ? this.producers : 0;
1106+
countLimit += this.cmessages > 0 ? this.consumers : 0;
10981107
completionHandler =
10991108
new CompletionHandler.DefaultCompletionHandler(
1100-
this.time, this.producers, completionReasons);
1109+
this.time, countLimit, completionReasons);
11011110
} else {
11021111
completionHandler = new CompletionHandler.NoLimitCompletionHandler(completionReasons);
11031112
}
@@ -1319,6 +1328,20 @@ public Integer call() throws Exception {
13191328
.builder();
13201329
}
13211330

1331+
java.util.function.Consumer<MessageHandler.Context> messageReceivedCallback;
1332+
if (this.cmessages > 0) {
1333+
AtomicLong messageCount = new AtomicLong(0);
1334+
messageReceivedCallback =
1335+
ctx -> {
1336+
if (messageCount.incrementAndGet() == this.cmessages) {
1337+
completionHandler.countDown("Consumer reached message limit");
1338+
ctx.consumer().close();
1339+
}
1340+
};
1341+
} else {
1342+
messageReceivedCallback = ctx -> {};
1343+
}
1344+
13221345
Runnable latencyWorker = Utils.latencyWorker(this.consumerLatency);
13231346
consumerBuilder =
13241347
consumerBuilder.messageHandler(
@@ -1333,6 +1356,7 @@ public Integer call() throws Exception {
13331356
// tool
13341357
}
13351358
metrics.offset(context.offset());
1359+
messageReceivedCallback.accept(context);
13361360
latencyWorker.run();
13371361
});
13381362

src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,20 @@ void shouldPublishExpectedNumberOfMessagesIfOptionIsSet(long pmessages, int prod
573573
}
574574
}
575575

576+
@ParameterizedTest
577+
@CsvSource({"100, 1", "100, 2"})
578+
void shouldConsumeExpectedNumberOfMessagesIfOptionIsSet(long cmessages, int consumerCount)
579+
throws Exception {
580+
run(
581+
builder()
582+
.cmessages(cmessages)
583+
.consumers(consumerCount)
584+
.producers(1)
585+
.pmessages(cmessages * 2));
586+
waitUntilStreamExists(s);
587+
waitRunEnds();
588+
}
589+
576590
private static <T> Consumer<T> wrap(CallableConsumer<T> action) {
577591
return t -> {
578592
try {
@@ -820,6 +834,11 @@ ArgumentsBuilder pmessages(long messages) {
820834
return this;
821835
}
822836

837+
ArgumentsBuilder cmessages(long messages) {
838+
arguments.put("cmessages", String.valueOf(messages));
839+
return this;
840+
}
841+
823842
String build() {
824843
return this.arguments.entrySet().stream()
825844
.map(e -> "--" + e.getKey() + (e.getValue().isEmpty() ? "" : (" " + e.getValue())))

0 commit comments

Comments
 (0)