4646import com .rabbitmq .client .ConnectionFactory ;
4747import com .rabbitmq .client .Envelope ;
4848import com .rabbitmq .client .MessageProperties ;
49+ import com .rabbitmq .client .NackListener ;
4950import com .rabbitmq .client .QueueingConsumer ;
5051import com .rabbitmq .client .ReturnListener ;
5152import com .rabbitmq .client .ShutdownSignalException ;
@@ -140,6 +141,7 @@ public static void main(String[] args) {
140141 confirm , confirmMax );
141142 channel .setReturnListener (p );
142143 channel .setAckListener (p );
144+ channel .setNackListener (p );
143145 Thread t = new Thread (p );
144146 producerThreads [i ] = t ;
145147 t .start ();
@@ -214,7 +216,8 @@ private static List lstArg(CommandLine cmd, char opt) {
214216 return Arrays .asList (vals );
215217 }
216218
217- public static class Producer implements Runnable , ReturnListener , AckListener {
219+ public static class Producer implements Runnable , ReturnListener , AckListener ,
220+ NackListener {
218221
219222 private Channel channel ;
220223 private String exchangeName ;
@@ -236,6 +239,7 @@ public static class Producer implements Runnable, ReturnListener, AckListener {
236239
237240 private boolean confirm ;
238241 private long confirmCount ;
242+ private long nackCount ;
239243 private Semaphore confirmPool ;
240244 private volatile SortedSet <Long > ackSet =
241245 Collections .synchronizedSortedSet (new TreeSet <Long >());
@@ -274,6 +278,15 @@ public synchronized void handleBasicReturn(int replyCode,
274278 }
275279
276280 public void handleAck (long seqNo , boolean multiple ) {
281+ handleAckNack (seqNo , multiple , false );
282+ }
283+
284+ public void handleNack (long seqNo , boolean multiple ) {
285+ handleAckNack (seqNo , multiple , true );
286+ }
287+
288+ private void handleAckNack (long seqNo , boolean multiple ,
289+ boolean nack ) {
277290 int numConfirms = 0 ;
278291 if (multiple ) {
279292 SortedSet <Long > confirmed = ackSet .headSet (seqNo + 1 );
@@ -284,14 +297,19 @@ public void handleAck(long seqNo, boolean multiple) {
284297 numConfirms = 1 ;
285298 }
286299 synchronized (this ) {
287- confirmCount += numConfirms ;
300+ if (nack ) {
301+ nackCount += numConfirms ;
302+ } else {
303+ confirmCount += numConfirms ;
304+ }
288305 }
289306
290307 if (confirmPool != null ) {
291308 for (int i = 0 ; i < numConfirms ; ++i ) {
292309 confirmPool .release ();
293310 }
294311 }
312+
295313 }
296314
297315 public void run () {
@@ -354,21 +372,26 @@ private void delay(long now)
354372 Thread .sleep (pause );
355373 }
356374 if (elapsed > interval ) {
357- long sendRate , returnRate , confirmRate ;
375+ long sendRate , returnRate , confirmRate , nackRate ;
358376 synchronized (this ) {
359377 sendRate = msgCount * 1000L / elapsed ;
360378 returnRate = returnCount * 1000L / elapsed ;
361379 confirmRate = confirmCount * 1000L / elapsed ;
380+ nackRate = nackCount * 1000L / elapsed ;
362381 msgCount = 0 ;
363382 returnCount = 0 ;
364383 confirmCount = 0 ;
384+ nackCount = 0 ;
365385 }
366386 System .out .print ("sending rate: " + sendRate + " msg/s" );
367387 if (mandatory || immediate ) {
368388 System .out .print (", returns: " + returnRate + " ret/s" );
369389 }
370390 if (confirm ) {
371391 System .out .print (", confirms: " + confirmRate + " c/s" );
392+ if (nackRate > 0 ) {
393+ System .out .print (", nacks: " + nackRate + " n/s" );
394+ }
372395 }
373396 System .out .println ();
374397 lastStatsTime = now ;
0 commit comments