diff --git a/src/main/java/com/rabbitmq/stream/perf/CompletionHandler.java b/src/main/java/com/rabbitmq/stream/perf/CompletionHandler.java new file mode 100644 index 0000000..e78593f --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/perf/CompletionHandler.java @@ -0,0 +1,111 @@ +// Copyright (c) 2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Performance Testing Tool, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.perf; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; + +interface CompletionHandler { + + Logger LOGGER = org.slf4j.LoggerFactory.getLogger(CompletionHandler.class); + + void waitForCompletion() throws InterruptedException; + + void countDown(String reason); + + final class DefaultCompletionHandler implements CompletionHandler { + + private static final String STOP_REASON_REACHED_TIME_LIMIT = "Reached time limit"; + + private final Duration timeLimit; + private final CountDownLatch latch; + private final ConcurrentMap reasons; + private final AtomicBoolean completed = new AtomicBoolean(false); + + DefaultCompletionHandler( + int timeLimitSeconds, int countLimit, ConcurrentMap reasons) { + this.timeLimit = Duration.ofSeconds(timeLimitSeconds); + int count = countLimit <= 0 ? 1 : countLimit; + LOGGER.debug("Count completion limit is {}", count); + this.latch = new CountDownLatch(count); + this.reasons = reasons; + } + + @Override + public void waitForCompletion() throws InterruptedException { + if (timeLimit.isNegative() || timeLimit.isZero()) { + this.latch.await(); + completed.set(true); + } else { + boolean countedDown = this.latch.await(timeLimit.toMillis(), TimeUnit.MILLISECONDS); + completed.set(true); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Completed, counted down? {}", countedDown); + } + if (!countedDown) { + recordReason(reasons, STOP_REASON_REACHED_TIME_LIMIT); + } + } + } + + @Override + public void countDown(String reason) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Counting down ({})", reason); + } + if (!completed.get()) { + recordReason(reasons, reason); + latch.countDown(); + } + } + } + + /** + * This completion handler waits forever, but it can be counted down, typically when a producer or + * a consumer fails. This avoids PerfTest hanging after a failure. + */ + final class NoLimitCompletionHandler implements CompletionHandler { + + private final CountDownLatch latch = new CountDownLatch(1); + private final ConcurrentMap reasons; + + NoLimitCompletionHandler(ConcurrentMap reasons) { + this.reasons = reasons; + } + + @Override + public void waitForCompletion() throws InterruptedException { + latch.await(); + } + + @Override + public void countDown(String reason) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Counting down ({})", reason); + } + recordReason(reasons, reason); + latch.countDown(); + } + } + + private static void recordReason(Map reasons, String reason) { + reasons.compute(reason, (keyReason, count) -> count == null ? 1 : count + 1); + } +} diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index 37967f8..5d931fe 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -68,6 +68,8 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -668,6 +670,13 @@ void setTcpNoDelay(String input) throws Exception { defaultValue = "0") private Duration consumerLatency; + @CommandLine.Option( + names = {"--pmessages", "-C"}, + description = "producer message count, default is 0 (no limit)", + defaultValue = "0", + converter = Converters.GreaterThanOrEqualToZeroIntegerTypeConverter.class) + private long pmessages; + private MetricsCollector metricsCollector; private PerformanceMetrics performanceMetrics; private List monitorings; @@ -1046,6 +1055,16 @@ public Integer call() throws Exception { })); } + CompletionHandler completionHandler; + ConcurrentMap completionReasons = new ConcurrentHashMap<>(); + if (isRunTimeLimited() || this.pmessages > 0) { + completionHandler = + new CompletionHandler.DefaultCompletionHandler( + this.time, this.producers, completionReasons); + } else { + completionHandler = new CompletionHandler.NoLimitCompletionHandler(completionReasons); + } + List producers = Collections.synchronizedList(new ArrayList<>(this.producers)); List producerRunnables = IntStream.range(0, this.producers) @@ -1135,11 +1154,38 @@ public Integer call() throws Exception { } else { latencyCallback = msg -> {}; } + + if (this.confirmLatency) { + producerBuilder.confirmTimeout(Duration.ofSeconds(0)); + } + + Runnable messagePublishedCallback, messageConfirmedCallback; + if (this.pmessages > 0) { + AtomicLong messageCount = new AtomicLong(0); + messagePublishedCallback = + () -> { + if (messageCount.incrementAndGet() == this.pmessages) { + Thread.currentThread().interrupt(); + } + }; + AtomicLong messageConfirmedCount = new AtomicLong(0); + messageConfirmedCallback = + () -> { + if (messageConfirmedCount.incrementAndGet() == this.pmessages) { + completionHandler.countDown("Producer reached message limit"); + } + }; + } else { + messagePublishedCallback = () -> {}; + messageConfirmedCallback = () -> {}; + } + ConfirmationHandler confirmationHandler = confirmationStatus -> { if (confirmationStatus.isConfirmed()) { producerConfirm.increment(); latencyCallback.accept(confirmationStatus.getMessage()); + messageConfirmedCallback.run(); } }; @@ -1164,6 +1210,7 @@ public Integer call() throws Exception { messageBuilderConsumer.accept(messageBuilder); producer.send( messageBuilder.addData(payload).build(), confirmationHandler); + messagePublishedCallback.run(); } } catch (Exception e) { if (e.getCause() != null @@ -1312,12 +1359,8 @@ public Integer call() throws Exception { Thread shutdownHook = new Thread(latch::countDown); Runtime.getRuntime().addShutdownHook(shutdownHook); try { - if (isRunTimeLimited()) { - boolean completed = latch.await(this.time, TimeUnit.SECONDS); - LOGGER.debug("Completion latch reached 0: {}", completed); - } else { - latch.await(); - } + completionHandler.waitForCompletion(); + LOGGER.debug("Completion with reason(s): {}", completionReasons); Runtime.getRuntime().removeShutdownHook(shutdownHook); } catch (InterruptedException e) { // moving on to the closing sequence diff --git a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java index e2b8a89..d54d61a 100644 --- a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java @@ -21,6 +21,8 @@ import com.rabbitmq.stream.AddressResolver; import com.rabbitmq.stream.ByteCapacity; import com.rabbitmq.stream.Constants; +import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.StreamCreator.LeaderLocator; import com.rabbitmq.stream.compression.Compression; import com.rabbitmq.stream.impl.Client; @@ -51,6 +53,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -65,6 +68,8 @@ import org.junit.jupiter.api.condition.EnabledOnOs; import org.junit.jupiter.api.condition.OS; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class StreamPerfTestTest { @@ -527,6 +532,32 @@ void shouldNotFailWhenFilteringIsActivated() throws Exception { assertThat(consoleOutput()).containsIgnoringCase("summary:"); } + @ParameterizedTest + @CsvSource({ + "200, 1", "200, 2", + }) + void shouldPublishExpectedNumberOfMessagesIfOptionIsSet(long pmessages, int producerCount) + throws Exception { + long expectedMessageCount = pmessages * producerCount; + run(builder().pmessages(pmessages).producers(producerCount)); + waitUntilStreamExists(s); + waitRunEnds(); + AtomicLong receivedCount = new AtomicLong(); + AtomicLong lastCommittedChunkId = new AtomicLong(); + try (Environment env = Environment.builder().build()) { + env.consumerBuilder().stream(s) + .offset(OffsetSpecification.first()) + .messageHandler( + (ctx, msg) -> { + lastCommittedChunkId.set(ctx.committedChunkId()); + receivedCount.incrementAndGet(); + }) + .build(); + waitAtMost(() -> receivedCount.get() == expectedMessageCount); + waitAtMost(() -> lastCommittedChunkId.get() == env.queryStreamStats(s).committedChunkId()); + } + } + private static Consumer wrap(CallableConsumer action) { return t -> { try { @@ -759,6 +790,11 @@ ArgumentsBuilder filterValues(String... values) { return this; } + ArgumentsBuilder pmessages(long messages) { + arguments.put("pmessages", String.valueOf(messages)); + return this; + } + String build() { return this.arguments.entrySet().stream() .map(e -> "--" + e.getKey() + (e.getValue().isEmpty() ? "" : (" " + e.getValue())))