@@ -127,10 +127,12 @@ public static void main(String[] args) {
127127 Channel channel = conn .createChannel ();
128128 if (producerTxSize > 0 ) channel .txSelect ();
129129 channel .exchangeDeclare (exchangeName , exchangeType );
130- Thread t = new Thread ( new Producer (channel , exchangeName , id ,
130+ final Producer p = new Producer (channel , exchangeName , id ,
131131 flags , producerTxSize ,
132132 1000L * samplingInterval ,
133- rateLimit , minMsgSize , timeLimit ));
133+ rateLimit , minMsgSize , timeLimit );
134+ channel .setReturnListener (p );
135+ Thread t = new Thread (p );
134136 producerThreads [i ] = t ;
135137 t .start ();
136138 }
@@ -197,7 +199,7 @@ private static List lstArg(CommandLine cmd, char opt) {
197199 return Arrays .asList (vals );
198200 }
199201
200- public static class Producer implements Runnable {
202+ public static class Producer implements Runnable , ReturnListener {
201203
202204 private Channel channel ;
203205 private String exchangeName ;
@@ -233,18 +235,15 @@ public Producer(Channel channel, String exchangeName, String id,
233235 this .rateLimit = rateLimit ;
234236 this .timeLimit = 1000L * timeLimit ;
235237 this .message = new byte [minMsgSize ];
238+ }
236239
237- channel .setReturnListener (new ReturnListener () {
238- public void handleBasicReturn (int replyCode ,
239- String replyText ,
240- String exchange ,
241- String routingKey ,
242- AMQP .BasicProperties properties ,
243- byte [] body )
244- throws IOException {
245- logBasicReturn ();
246- }
247- });
240+ public void handleBasicReturn (int replyCode ,
241+ String replyText ,
242+ String exchange ,
243+ String routingKey ,
244+ AMQP .BasicProperties properties ,
245+ byte [] body ) throws IOException {
246+ logBasicReturn ();
248247 }
249248
250249 public synchronized void logBasicReturn () {
0 commit comments