@@ -93,13 +93,17 @@ public Consumer(ConsumerParameters parameters) {
9393 this .queueNames .set (new ArrayList <>(parameters .getQueueNames ()));
9494 this .initialQueueNames = new ArrayList <>(parameters .getQueueNames ());
9595
96- int consumerLatencyInMicroSeconds = parameters .getConsumerLatencyInMicroSeconds ();
97- if (consumerLatencyInMicroSeconds <= 0 ) {
98- this .consumerLatency = new NoWaitConsumerLatency ();
99- } else if (consumerLatencyInMicroSeconds >= 1000 ) {
100- this .consumerLatency = new ThreadSleepConsumerLatency (consumerLatencyInMicroSeconds / 1000 );
96+ if (parameters .getConsumerLatenciesIndicator ().isVariable ()) {
97+ this .consumerLatency = new VariableConsumerLatency (parameters .getConsumerLatenciesIndicator ());
10198 } else {
102- this .consumerLatency = new BusyWaitConsumerLatency (consumerLatencyInMicroSeconds * 1000 );
99+ long consumerLatencyInMicroSeconds = parameters .getConsumerLatenciesIndicator ().getValue ();
100+ if (consumerLatencyInMicroSeconds <= 0 ) {
101+ this .consumerLatency = new NoWaitConsumerLatency ();
102+ } else if (consumerLatencyInMicroSeconds >= 1000 ) {
103+ this .consumerLatency = new ThreadSleepConsumerLatency (parameters .getConsumerLatenciesIndicator ());
104+ } else {
105+ this .consumerLatency = new BusyWaitConsumerLatency (parameters .getConsumerLatenciesIndicator ());
106+ }
103107 }
104108
105109 if (timestampProvider .isTimestampInHeader ()) {
@@ -387,6 +391,46 @@ private interface ConsumerLatency {
387391
388392 }
389393
394+ private static class VariableConsumerLatency implements ConsumerLatency {
395+
396+ private final ValueIndicator <Long > consumerLatenciesIndicator ;
397+
398+ private VariableConsumerLatency (ValueIndicator <Long > consumerLatenciesIndicator ) {
399+ this .consumerLatenciesIndicator = consumerLatenciesIndicator ;
400+ }
401+
402+ @ Override
403+ public boolean simulateLatency ()
404+ {
405+ long consumerLatencyInMicroSeconds = consumerLatenciesIndicator .getValue ();
406+ if (consumerLatencyInMicroSeconds <= 0 ) {
407+ return true ;
408+ } else if (consumerLatencyInMicroSeconds >= 1000 ) {
409+ return sleep ();
410+ } else {
411+ return busyWait ();
412+ }
413+ }
414+
415+ private boolean sleep () {
416+ try {
417+ long ms = consumerLatenciesIndicator .getValue () / 1000 ;
418+ Thread .sleep (ms );
419+ return true ;
420+ } catch (InterruptedException e ) {
421+ Thread .currentThread ().interrupt ();
422+ return false ;
423+ }
424+ }
425+
426+ private boolean busyWait () {
427+ long delay = consumerLatenciesIndicator .getValue () * 1000 ;
428+ long start = System .nanoTime ();
429+ while (System .nanoTime () - start < delay );
430+ return true ;
431+ }
432+ }
433+
390434 private static class NoWaitConsumerLatency implements ConsumerLatency {
391435
392436 @ Override
@@ -398,16 +442,17 @@ public boolean simulateLatency() {
398442
399443 private static class ThreadSleepConsumerLatency implements ConsumerLatency {
400444
401- private final int waitTime ;
445+ private final ValueIndicator < Long > consumerLatenciesIndicator ;
402446
403- private ThreadSleepConsumerLatency (int waitTime ) {
404- this .waitTime = waitTime ;
447+ private ThreadSleepConsumerLatency (ValueIndicator < Long > consumerLatenciesIndicator ) {
448+ this .consumerLatenciesIndicator = consumerLatenciesIndicator ;
405449 }
406450
407451 @ Override
408452 public boolean simulateLatency () {
409453 try {
410- Thread .sleep (waitTime );
454+ long ms = consumerLatenciesIndicator .getValue () / 1000 ;
455+ Thread .sleep (ms );
411456 return true ;
412457 } catch (InterruptedException e ) {
413458 Thread .currentThread ().interrupt ();
@@ -419,14 +464,15 @@ public boolean simulateLatency() {
419464 // from https://stackoverflow.com/a/11499351
420465 private static class BusyWaitConsumerLatency implements ConsumerLatency {
421466
422- private final long delay ;
467+ private final ValueIndicator < Long > consumerLatenciesIndicator ;
423468
424- private BusyWaitConsumerLatency (long delay ) {
425- this .delay = delay ;
469+ private BusyWaitConsumerLatency (ValueIndicator < Long > consumerLatenciesIndicator ) {
470+ this .consumerLatenciesIndicator = consumerLatenciesIndicator ;
426471 }
427472
428473 @ Override
429474 public boolean simulateLatency () {
475+ long delay = consumerLatenciesIndicator .getValue () * 1000 ;
430476 long start = System .nanoTime ();
431477 while (System .nanoTime () - start < delay );
432478 return true ;
0 commit comments