diff --git a/pom.xml b/pom.xml
index 763c359..693b7f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@
33.4.0-jre
5.4.1.Final
2.0.2.Final
+ 1.8.0
5.11.4
3.27.3
true
@@ -161,6 +162,12 @@
${jgroups-kubernetes.version}
+
+ org.threeten
+ threeten-extra
+ ${threeten-extra.version}
+
+
org.junit.jupiter
junit-jupiter-engine
diff --git a/src/main/java/com/rabbitmq/stream/perf/Converters.java b/src/main/java/com/rabbitmq/stream/perf/Converters.java
index ce815b8..b6ef0db 100644
--- a/src/main/java/com/rabbitmq/stream/perf/Converters.java
+++ b/src/main/java/com/rabbitmq/stream/perf/Converters.java
@@ -35,10 +35,14 @@
import java.util.stream.IntStream;
import javax.net.ssl.SNIHostName;
import javax.net.ssl.SNIServerName;
+import org.threeten.extra.AmountFormats;
import picocli.CommandLine;
final class Converters {
+ private static final CommandLine.ITypeConverter DURATION_TYPE_CONVERTER =
+ new DurationTypeConverter();
+
private Converters() {}
static void typeConversionException(String message) {
@@ -290,17 +294,48 @@ static class DurationTypeConverter implements CommandLine.ITypeConverter {
+
+ @Override
+ public Duration convert(String value) throws Exception {
+ Duration duration = DURATION_TYPE_CONVERTER.convert(value);
+ if (duration.isNegative() || duration.isZero()) {
+ throw new CommandLine.TypeConversionException(
+ "'" + value + "' is not valid, it must be positive");
+ }
+ return duration;
+ }
+ }
+
+ static class GreaterThanOrEqualToZeroDurationTypeConverter
+ implements CommandLine.ITypeConverter {
+
+ @Override
+ public Duration convert(String value) throws Exception {
+ Duration duration = DURATION_TYPE_CONVERTER.convert(value);
+ if (duration.isNegative()) {
throw new CommandLine.TypeConversionException(
- "'" + value + "' is not valid, valid example values: PT15M, PT10H");
+ "'" + value + "' is not valid, it must be greater than or equal to 0");
}
+ return duration;
}
}
diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
index eaacaad..d9f78f5 100644
--- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
+++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
@@ -235,9 +235,10 @@ public class StreamPerfTest implements Callable {
@CommandLine.Option(
names = {"--max-age", "-ma"},
description =
- "max age of segments using the ISO 8601 duration format, "
- + "e.g. PT10M30S for 10 minutes 30 seconds, P5DT8H for 5 days 8 hours.",
- converter = Converters.DurationTypeConverter.class)
+ "max age of segments using Golang parseDuration syntax or the ISO 8601 duration format, "
+ + "e.g. 10m30s for 10 minutes 30 seconds (d, w, y not supported) or "
+ + "PT10M30S for 10 minutes 30 seconds, P5DT8H for 5 days 8 hours.",
+ converter = Converters.PositiveDurationTypeConverter.class)
private Duration maxAge;
@CommandLine.Option(
@@ -556,6 +557,15 @@ static class InstanceSyncOptions {
defaultValue = "true")
private boolean tcpNoDelay;
+ @CommandLine.Option(
+ names = {"--consumer-latency", "-L"},
+ description =
+ "consumer latency using Golang parseDuration syntax, "
+ + "e.g. 5 ms (d, w, y are not supported)",
+ converter = Converters.GreaterThanOrEqualToZeroDurationTypeConverter.class,
+ defaultValue = "0")
+ private Duration consumerLatency;
+
private MetricsCollector metricsCollector;
private PerformanceMetrics performanceMetrics;
private List monitorings;
@@ -1124,6 +1134,7 @@ public Integer call() throws Exception {
.builder();
}
+ Runnable latencyWorker = Utils.latencyWorker(this.consumerLatency);
consumerBuilder =
consumerBuilder.messageHandler(
(context, message) -> {
@@ -1137,12 +1148,12 @@ public Integer call() throws Exception {
// tool
}
metrics.offset(context.offset());
+ latencyWorker.run();
});
consumerBuilder = maybeConfigureForFiltering(consumerBuilder);
- Consumer consumer = consumerBuilder.build();
- return consumer;
+ return consumerBuilder.build();
})
.collect(Collectors.toList()));
diff --git a/src/main/java/com/rabbitmq/stream/perf/Utils.java b/src/main/java/com/rabbitmq/stream/perf/Utils.java
index a0fe3c5..683492e 100644
--- a/src/main/java/com/rabbitmq/stream/perf/Utils.java
+++ b/src/main/java/com/rabbitmq/stream/perf/Utils.java
@@ -481,4 +481,30 @@ static int filteringSubSetSize(int setSize) {
return setSize - 3;
}
}
+
+ static Runnable latencyWorker(Duration latency) {
+ if (latency.isZero()) {
+ return () -> {};
+ } else if (latency.toMillis() >= 1) {
+ long latencyInMs = latency.toMillis();
+ return () -> latencySleep(latencyInMs);
+ } else {
+ long latencyInNs = latency.toNanos();
+ return () -> latencyBusyWait(latencyInNs);
+ }
+ }
+
+ private static void latencySleep(long delayInMs) {
+ try {
+ Thread.sleep(delayInMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private static void latencyBusyWait(long delayInNs) {
+ long start = System.nanoTime();
+ while (System.nanoTime() - start < delayInNs)
+ ;
+ }
}
diff --git a/src/test/java/com/rabbitmq/stream/perf/ConvertersTest.java b/src/test/java/com/rabbitmq/stream/perf/ConvertersTest.java
index 6f312b5..5f41363 100644
--- a/src/test/java/com/rabbitmq/stream/perf/ConvertersTest.java
+++ b/src/test/java/com/rabbitmq/stream/perf/ConvertersTest.java
@@ -207,6 +207,24 @@ void sniServerNamesConverter() {
.contains(new SNIHostName("dummy"));
}
+ @ParameterizedTest
+ @CsvSource({"50ms,50", "0,0", "1s,1000", "10m30s,630000", "PT1M30S,90000"})
+ void durationTypeConverterOk(String value, long expectedInMs) {
+ Converters.DurationTypeConverter converter = new Converters.DurationTypeConverter();
+ Duration duration = converter.convert(value);
+ assertThat(duration).isNotNull();
+ assertThat(duration).isEqualTo(Duration.ofMillis(expectedInMs));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"1", "abc", "1.5"})
+ void durationTypeConverterKo(String value) {
+ Converters.DurationTypeConverter converter = new Converters.DurationTypeConverter();
+ assertThatThrownBy(() -> converter.convert(value))
+ .isInstanceOf(CommandLine.TypeConversionException.class)
+ .hasMessageContaining("valid");
+ }
+
private static Tag tag(String key, String value) {
return Tag.of(key, value);
}