Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions src/main/java/com/rabbitmq/stream/perf/CompletionHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright (c) 2025 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Performance Testing Tool, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].
package com.rabbitmq.stream.perf;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;

interface CompletionHandler {

Logger LOGGER = org.slf4j.LoggerFactory.getLogger(CompletionHandler.class);

void waitForCompletion() throws InterruptedException;

void countDown(String reason);

final class DefaultCompletionHandler implements CompletionHandler {

private static final String STOP_REASON_REACHED_TIME_LIMIT = "Reached time limit";

private final Duration timeLimit;
private final CountDownLatch latch;
private final ConcurrentMap<String, Integer> reasons;
private final AtomicBoolean completed = new AtomicBoolean(false);

DefaultCompletionHandler(
int timeLimitSeconds, int countLimit, ConcurrentMap<String, Integer> reasons) {
this.timeLimit = Duration.ofSeconds(timeLimitSeconds);
int count = countLimit <= 0 ? 1 : countLimit;
LOGGER.debug("Count completion limit is {}", count);
this.latch = new CountDownLatch(count);
this.reasons = reasons;
}

@Override
public void waitForCompletion() throws InterruptedException {
if (timeLimit.isNegative() || timeLimit.isZero()) {
this.latch.await();
completed.set(true);
} else {
boolean countedDown = this.latch.await(timeLimit.toMillis(), TimeUnit.MILLISECONDS);
completed.set(true);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Completed, counted down? {}", countedDown);
}
if (!countedDown) {
recordReason(reasons, STOP_REASON_REACHED_TIME_LIMIT);
}
}
}

@Override
public void countDown(String reason) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Counting down ({})", reason);
}
if (!completed.get()) {
recordReason(reasons, reason);
latch.countDown();
}
}
}

/**
* This completion handler waits forever, but it can be counted down, typically when a producer or
* a consumer fails. This avoids PerfTest hanging after a failure.
*/
final class NoLimitCompletionHandler implements CompletionHandler {

private final CountDownLatch latch = new CountDownLatch(1);
private final ConcurrentMap<String, Integer> reasons;

NoLimitCompletionHandler(ConcurrentMap<String, Integer> reasons) {
this.reasons = reasons;
}

@Override
public void waitForCompletion() throws InterruptedException {
latch.await();
}

@Override
public void countDown(String reason) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Counting down ({})", reason);
}
recordReason(reasons, reason);
latch.countDown();
}
}

private static void recordReason(Map<String, Integer> reasons, String reason) {
reasons.compute(reason, (keyReason, count) -> count == null ? 1 : count + 1);
}
}
55 changes: 49 additions & 6 deletions src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -668,6 +670,13 @@ void setTcpNoDelay(String input) throws Exception {
defaultValue = "0")
private Duration consumerLatency;

@CommandLine.Option(
names = {"--pmessages", "-C"},
description = "producer message count, default is 0 (no limit)",
defaultValue = "0",
converter = Converters.GreaterThanOrEqualToZeroIntegerTypeConverter.class)
private long pmessages;

private MetricsCollector metricsCollector;
private PerformanceMetrics performanceMetrics;
private List<Monitoring> monitorings;
Expand Down Expand Up @@ -1046,6 +1055,16 @@ public Integer call() throws Exception {
}));
}

CompletionHandler completionHandler;
ConcurrentMap<String, Integer> completionReasons = new ConcurrentHashMap<>();
if (isRunTimeLimited() || this.pmessages > 0) {
completionHandler =
new CompletionHandler.DefaultCompletionHandler(
this.time, this.producers, completionReasons);
} else {
completionHandler = new CompletionHandler.NoLimitCompletionHandler(completionReasons);
}

List<Producer> producers = Collections.synchronizedList(new ArrayList<>(this.producers));
List<Runnable> producerRunnables =
IntStream.range(0, this.producers)
Expand Down Expand Up @@ -1135,11 +1154,38 @@ public Integer call() throws Exception {
} else {
latencyCallback = msg -> {};
}

if (this.confirmLatency) {
producerBuilder.confirmTimeout(Duration.ofSeconds(0));
}

Runnable messagePublishedCallback, messageConfirmedCallback;
if (this.pmessages > 0) {
AtomicLong messageCount = new AtomicLong(0);
messagePublishedCallback =
() -> {
if (messageCount.incrementAndGet() == this.pmessages) {
Thread.currentThread().interrupt();
}
};
AtomicLong messageConfirmedCount = new AtomicLong(0);
messageConfirmedCallback =
() -> {
if (messageConfirmedCount.incrementAndGet() == this.pmessages) {
completionHandler.countDown("Producer reached message limit");
}
};
} else {
messagePublishedCallback = () -> {};
messageConfirmedCallback = () -> {};
}

ConfirmationHandler confirmationHandler =
confirmationStatus -> {
if (confirmationStatus.isConfirmed()) {
producerConfirm.increment();
latencyCallback.accept(confirmationStatus.getMessage());
messageConfirmedCallback.run();
}
};

Expand All @@ -1164,6 +1210,7 @@ public Integer call() throws Exception {
messageBuilderConsumer.accept(messageBuilder);
producer.send(
messageBuilder.addData(payload).build(), confirmationHandler);
messagePublishedCallback.run();
}
} catch (Exception e) {
if (e.getCause() != null
Expand Down Expand Up @@ -1312,12 +1359,8 @@ public Integer call() throws Exception {
Thread shutdownHook = new Thread(latch::countDown);
Runtime.getRuntime().addShutdownHook(shutdownHook);
try {
if (isRunTimeLimited()) {
boolean completed = latch.await(this.time, TimeUnit.SECONDS);
LOGGER.debug("Completion latch reached 0: {}", completed);
} else {
latch.await();
}
completionHandler.waitForCompletion();
LOGGER.debug("Completion with reason(s): {}", completionReasons);
Runtime.getRuntime().removeShutdownHook(shutdownHook);
} catch (InterruptedException e) {
// moving on to the closing sequence
Expand Down
36 changes: 36 additions & 0 deletions src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.rabbitmq.stream.AddressResolver;
import com.rabbitmq.stream.ByteCapacity;
import com.rabbitmq.stream.Constants;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.StreamCreator.LeaderLocator;
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.impl.Client;
Expand Down Expand Up @@ -51,6 +53,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -65,6 +68,8 @@
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
public class StreamPerfTestTest {
Expand Down Expand Up @@ -527,6 +532,32 @@ void shouldNotFailWhenFilteringIsActivated() throws Exception {
assertThat(consoleOutput()).containsIgnoringCase("summary:");
}

@ParameterizedTest
@CsvSource({
"200, 1", "200, 2",
})
void shouldPublishExpectedNumberOfMessagesIfOptionIsSet(long pmessages, int producerCount)
throws Exception {
long expectedMessageCount = pmessages * producerCount;
run(builder().pmessages(pmessages).producers(producerCount));
waitUntilStreamExists(s);
waitRunEnds();
AtomicLong receivedCount = new AtomicLong();
AtomicLong lastCommittedChunkId = new AtomicLong();
try (Environment env = Environment.builder().build()) {
env.consumerBuilder().stream(s)
.offset(OffsetSpecification.first())
.messageHandler(
(ctx, msg) -> {
lastCommittedChunkId.set(ctx.committedChunkId());
receivedCount.incrementAndGet();
})
.build();
waitAtMost(() -> receivedCount.get() == expectedMessageCount);
waitAtMost(() -> lastCommittedChunkId.get() == env.queryStreamStats(s).committedChunkId());
}
}

private static <T> Consumer<T> wrap(CallableConsumer<T> action) {
return t -> {
try {
Expand Down Expand Up @@ -759,6 +790,11 @@ ArgumentsBuilder filterValues(String... values) {
return this;
}

ArgumentsBuilder pmessages(long messages) {
arguments.put("pmessages", String.valueOf(messages));
return this;
}

String build() {
return this.arguments.entrySet().stream()
.map(e -> "--" + e.getKey() + (e.getValue().isEmpty() ? "" : (" " + e.getValue())))
Expand Down