@@ -595,6 +595,18 @@ void setIncludeByteSizeMetric(String input) throws Exception {
595595
596596 volatile boolean includeBatchSizeMetric ;
597597
598+ @ CommandLine .Option (
599+ names = {"--no-latency" , "-nl" },
600+ description = "no latency calculation" ,
601+ arity = "0..1" ,
602+ fallbackValue = "true" ,
603+ defaultValue = "false" )
604+ void setNoLatency (String input ) throws Exception {
605+ this .noLatency = Converters .BOOLEAN_TYPE_CONVERTER .convert (input );
606+ }
607+
608+ volatile boolean noLatency ;
609+
598610 static class InstanceSyncOptions {
599611
600612 @ CommandLine .Option (
@@ -1342,19 +1354,29 @@ public Integer call() throws Exception {
13421354 messageReceivedCallback = ctx -> {};
13431355 }
13441356
1357+ java .util .function .Consumer <Message > latencyCallback ;
1358+ if (this .noLatency ) {
1359+ latencyCallback = ignored -> {};
1360+ } else {
1361+ latencyCallback =
1362+ msg -> {
1363+ try {
1364+ long time = Utils .readLong (msg .getBodyAsBinary ());
1365+ // see above why we use current time to measure latency
1366+ metrics .latency (
1367+ System .currentTimeMillis () - time , TimeUnit .MILLISECONDS );
1368+ } catch (Exception e ) {
1369+ // not able to read the body, maybe not a message from the
1370+ // tool
1371+ }
1372+ };
1373+ }
1374+
13451375 Runnable latencyWorker = Utils .latencyWorker (this .consumerLatency );
13461376 consumerBuilder =
13471377 consumerBuilder .messageHandler (
13481378 (context , message ) -> {
1349- try {
1350- long time = Utils .readLong (message .getBodyAsBinary ());
1351- // see above why we use current time to measure latency
1352- metrics .latency (
1353- System .currentTimeMillis () - time , TimeUnit .MILLISECONDS );
1354- } catch (Exception e ) {
1355- // not able to read the body, maybe not a message from the
1356- // tool
1357- }
1379+ latencyCallback .accept (message );
13581380 metrics .offset (context .offset ());
13591381 messageReceivedCallback .accept (context );
13601382 latencyWorker .run ();
0 commit comments