5656import com .rabbitmq .client .Envelope ;
5757import com .rabbitmq .client .MessageProperties ;
5858import com .rabbitmq .client .QueueingConsumer ;
59+ import com .rabbitmq .client .ReturnListener ;
5960import com .rabbitmq .client .ShutdownSignalException ;
6061import com .rabbitmq .client .AMQP .Queue ;
6162import com .rabbitmq .client .QueueingConsumer .Delivery ;
@@ -126,11 +127,22 @@ public static void main(String[] args) {
126127 Channel channel = conn .createChannel ();
127128 if (producerTxSize > 0 ) channel .txSelect ();
128129 channel .exchangeDeclare (exchangeName , exchangeType );
129- Thread t =
130- new Thread (new Producer (channel , exchangeName , id ,
131- flags , producerTxSize ,
132- 1000L * samplingInterval ,
133- rateLimit , minMsgSize , timeLimit ));
130+ final Producer p = new Producer (channel , exchangeName , id ,
131+ flags , producerTxSize ,
132+ 1000L * samplingInterval ,
133+ rateLimit , minMsgSize , timeLimit );
134+ channel .setReturnListener (new ReturnListener () {
135+ public void handleBasicReturn (int replyCode ,
136+ String replyText ,
137+ String exchange ,
138+ String routingKey ,
139+ AMQP .BasicProperties properties ,
140+ byte [] body )
141+ throws IOException {
142+ p .logBasicReturn ();
143+ }
144+ });
145+ Thread t = new Thread (p );
134146 producerThreads [i ] = t ;
135147 t .start ();
136148 }
@@ -215,6 +227,7 @@ public static class Producer implements Runnable {
215227 private long startTime ;
216228 private long lastStatsTime ;
217229 private int msgCount ;
230+ private int basicReturnCount ;
218231
219232 public Producer (Channel channel , String exchangeName , String id ,
220233 List flags , int txSize ,
@@ -234,6 +247,14 @@ public Producer(Channel channel, String exchangeName, String id,
234247 this .message = new byte [minMsgSize ];
235248 }
236249
250+ public synchronized void logBasicReturn () {
251+ basicReturnCount ++;
252+ }
253+
254+ public synchronized void resetBasicReturns () {
255+ basicReturnCount = 0 ;
256+ }
257+
237258 public void run () {
238259
239260 long now ;
@@ -292,7 +313,11 @@ private void delay(long now)
292313 if (elapsed > interval ) {
293314 System .out .println ("sending rate: " +
294315 (msgCount * 1000L / elapsed ) +
295- " msg/s" );
316+ " msg/s" +
317+ ", basic returns: " +
318+ (basicReturnCount * 1000L / elapsed ) +
319+ " ret/s" );
320+ resetBasicReturns ();
296321 msgCount = 0 ;
297322 lastStatsTime = now ;
298323 }
0 commit comments