Skip to content

Commit 8de0827

Browse files
committed
Add --consumer-latency
1 parent 51acb81 commit 8de0827

File tree

4 files changed

+73
-2
lines changed

4 files changed

+73
-2
lines changed

src/main/java/com/rabbitmq/stream/perf/Converters.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,23 @@ public Duration convert(String value) {
304304
}
305305
}
306306

307+
static class MicroSecondsToDurationTypeConverter implements CommandLine.ITypeConverter<Duration> {
308+
309+
@Override
310+
public Duration convert(String value) {
311+
try {
312+
Duration duration = Duration.ofNanos(Long.parseLong(value) * 1_000);
313+
if (duration.isNegative()) {
314+
throw new CommandLine.TypeConversionException(
315+
"'" + value + "' is not valid, it must be greater than or equal to 0");
316+
}
317+
return duration;
318+
} catch (NumberFormatException e) {
319+
throw new CommandLine.TypeConversionException("'" + value + "' is not a valid number");
320+
}
321+
}
322+
}
323+
307324
static class LeaderLocatorTypeConverter
308325
implements CommandLine.ITypeConverter<StreamCreator.LeaderLocator> {
309326

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,13 @@ static class InstanceSyncOptions {
556556
defaultValue = "true")
557557
private boolean tcpNoDelay;
558558

559+
@CommandLine.Option(
560+
names = {"--consumer-latency", "-L"},
561+
description = "consumer latency in microseconds",
562+
converter = Converters.MicroSecondsToDurationTypeConverter.class,
563+
defaultValue = "0")
564+
private Duration consumerLatency;
565+
559566
private MetricsCollector metricsCollector;
560567
private PerformanceMetrics performanceMetrics;
561568
private List<Monitoring> monitorings;
@@ -1124,6 +1131,7 @@ public Integer call() throws Exception {
11241131
.builder();
11251132
}
11261133

1134+
Runnable latencyWorker = Utils.latencyWorker(this.consumerLatency);
11271135
consumerBuilder =
11281136
consumerBuilder.messageHandler(
11291137
(context, message) -> {
@@ -1137,12 +1145,12 @@ public Integer call() throws Exception {
11371145
// tool
11381146
}
11391147
metrics.offset(context.offset());
1148+
latencyWorker.run();
11401149
});
11411150

11421151
consumerBuilder = maybeConfigureForFiltering(consumerBuilder);
11431152

1144-
Consumer consumer = consumerBuilder.build();
1145-
return consumer;
1153+
return consumerBuilder.build();
11461154
})
11471155
.collect(Collectors.toList()));
11481156

src/main/java/com/rabbitmq/stream/perf/Utils.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,4 +481,30 @@ static int filteringSubSetSize(int setSize) {
481481
return setSize - 3;
482482
}
483483
}
484+
485+
static Runnable latencyWorker(Duration latency) {
486+
if (latency.isZero()) {
487+
return () -> {};
488+
} else if (latency.toMillis() >= 1) {
489+
long latencyInMs = latency.toMillis();
490+
return () -> latencySleep(latencyInMs);
491+
} else {
492+
long latencyInNs = latency.toNanos();
493+
return () -> latencyBusyWait(latencyInNs);
494+
}
495+
}
496+
497+
private static void latencySleep(long delayInMs) {
498+
try {
499+
Thread.sleep(delayInMs);
500+
} catch (InterruptedException e) {
501+
Thread.currentThread().interrupt();
502+
}
503+
}
504+
505+
private static void latencyBusyWait(long delayInNs) {
506+
long start = System.nanoTime();
507+
while (System.nanoTime() - start < delayInNs)
508+
;
509+
}
484510
}

src/test/java/com/rabbitmq/stream/perf/ConvertersTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,26 @@ void sniServerNamesConverter() {
207207
.contains(new SNIHostName("dummy"));
208208
}
209209

210+
@ParameterizedTest
211+
@CsvSource({"5000,5", "0,0", "1000,1"})
212+
void microSecondsToDurationTypeConverterOk(String value, long expectedInMs) {
213+
Converters.MicroSecondsToDurationTypeConverter converter =
214+
new Converters.MicroSecondsToDurationTypeConverter();
215+
Duration duration = converter.convert(value);
216+
assertThat(duration).isNotNull();
217+
assertThat(duration).isEqualTo(Duration.ofMillis(expectedInMs));
218+
}
219+
220+
@ParameterizedTest
221+
@ValueSource(strings = {"-1000000", "abc", "1.5"})
222+
void microSecondsToDurationTypeConverterKo(String value) {
223+
Converters.MicroSecondsToDurationTypeConverter converter =
224+
new Converters.MicroSecondsToDurationTypeConverter();
225+
assertThatThrownBy(() -> converter.convert(value))
226+
.isInstanceOf(CommandLine.TypeConversionException.class)
227+
.hasMessageContaining("valid");
228+
}
229+
210230
private static Tag tag(String key, String value) {
211231
return Tag.of(key, value);
212232
}

0 commit comments

Comments
 (0)