@@ -229,24 +229,24 @@ void handleMessage(Envelope envelope, BasicProperties properties, byte[] body, C
229229
230230 stats .handleRecv (id .equals (envelope .getRoutingKey ()) ? diff_time : 0L );
231231
232- consumerLatency .simulateLatency ();
233-
234- ackIfNecessary (envelope , currentMessageCount , ch );
235- commitTransactionIfNecessary (currentMessageCount , ch );
236-
237- long now = System .currentTimeMillis ();
238- if (this .rateLimitation ) {
239- // if rate is limited, we need to reset stats every second
240- // otherwise pausing to throttle rate will be based on the whole history
241- // which is broken when rate varies
242- // as consumer does not choose the rate at which messages arrive,
243- // we can consider the rate is always subject to change,
244- // so we'd better off always resetting the stats
245- if (now - state .getLastStatsTime () > 1000 ) {
246- state .setLastStatsTime (now );
247- state .setMsgCount (0 );
232+ if (consumerLatency .simulateLatency ()) {
233+ ackIfNecessary (envelope , currentMessageCount , ch );
234+ commitTransactionIfNecessary (currentMessageCount , ch );
235+
236+ long now = System .currentTimeMillis ();
237+ if (this .rateLimitation ) {
238+ // if rate is limited, we need to reset stats every second
239+ // otherwise pausing to throttle rate will be based on the whole history
240+ // which is broken when rate varies
241+ // as consumer does not choose the rate at which messages arrive,
242+ // we can consider the rate is always subject to change,
243+ // so we'd better off always resetting the stats
244+ if (now - state .getLastStatsTime () > 1000 ) {
245+ state .setLastStatsTime (now );
246+ state .setMsgCount (0 );
247+ }
248+ delay (now , state );
248249 }
249- delay (now , state );
250250 }
251251 }
252252 if (msgLimit != 0 && currentMessageCount >= msgLimit ) { // NB: not quite the inverse of above
@@ -366,15 +366,19 @@ public int incrementMessageCount() {
366366
367367 private interface ConsumerLatency {
368368
369- void simulateLatency ();
369+ /**
370+ *
371+ * @return true if normal completion, false if not
372+ */
373+ boolean simulateLatency ();
370374
371375 }
372376
373377 private static class NoWaitConsumerLatency implements ConsumerLatency {
374378
375379 @ Override
376- public void simulateLatency () {
377- // NO OP
380+ public boolean simulateLatency () {
381+ return true ;
378382 }
379383
380384 }
@@ -388,12 +392,13 @@ private ThreadSleepConsumerLatency(int waitTime) {
388392 }
389393
390394 @ Override
391- public void simulateLatency () {
395+ public boolean simulateLatency () {
392396 try {
393397 Thread .sleep (waitTime );
398+ return true ;
394399 } catch (InterruptedException e ) {
395400 Thread .currentThread ().interrupt ();
396- throw new RuntimeException ( "Exception while simulating latency" , e ) ;
401+ return false ;
397402 }
398403 }
399404 }
@@ -408,9 +413,10 @@ private BusyWaitConsumerLatency(long delay) {
408413 }
409414
410415 @ Override
411- public void simulateLatency () {
416+ public boolean simulateLatency () {
412417 long start = System .nanoTime ();
413418 while (System .nanoTime () - start < delay );
419+ return true ;
414420 }
415421 }
416422
0 commit comments