diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index b312048..04fac18 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -595,6 +595,18 @@ void setIncludeByteSizeMetric(String input) throws Exception { volatile boolean includeBatchSizeMetric; + @CommandLine.Option( + names = {"--no-latency", "-nl"}, + description = "no latency calculation", + arity = "0..1", + fallbackValue = "true", + defaultValue = "false") + void setNoLatency(String input) throws Exception { + this.noLatency = Converters.BOOLEAN_TYPE_CONVERTER.convert(input); + } + + volatile boolean noLatency; + static class InstanceSyncOptions { @CommandLine.Option( @@ -1342,19 +1354,29 @@ public Integer call() throws Exception { messageReceivedCallback = ctx -> {}; } + java.util.function.Consumer latencyCallback; + if (this.noLatency) { + latencyCallback = ignored -> {}; + } else { + latencyCallback = + msg -> { + try { + long time = Utils.readLong(msg.getBodyAsBinary()); + // see above why we use current time to measure latency + metrics.latency( + System.currentTimeMillis() - time, TimeUnit.MILLISECONDS); + } catch (Exception e) { + // not able to read the body, maybe not a message from the + // tool + } + }; + } + Runnable latencyWorker = Utils.latencyWorker(this.consumerLatency); consumerBuilder = consumerBuilder.messageHandler( (context, message) -> { - try { - long time = Utils.readLong(message.getBodyAsBinary()); - // see above why we use current time to measure latency - metrics.latency( - System.currentTimeMillis() - time, TimeUnit.MILLISECONDS); - } catch (Exception e) { - // not able to read the body, maybe not a message from the - // tool - } + latencyCallback.accept(message); metrics.offset(context.offset()); messageReceivedCallback.accept(context); latencyWorker.run();