@@ -90,7 +90,7 @@ public static void main(String[] args) {
9090
9191 //setup
9292 String id = UUID .randomUUID ().toString ();
93- Stats stats = new Stats (1000L * samplingInterval );
93+ Stats stats = new Stats (1000L * samplingInterval );
9494 ConnectionFactory factory = new ConnectionFactory ();
9595 factory .setHost (hostName );
9696 factory .setPort (portNumber );
@@ -127,22 +127,10 @@ public static void main(String[] args) {
127127 Channel channel = conn .createChannel ();
128128 if (producerTxSize > 0 ) channel .txSelect ();
129129 channel .exchangeDeclare (exchangeName , exchangeType );
130- final Producer p = new Producer (channel , exchangeName , id ,
131- flags , producerTxSize ,
132- 1000L * samplingInterval ,
133- rateLimit , minMsgSize , timeLimit );
134- channel .setReturnListener (new ReturnListener () {
135- public void handleBasicReturn (int replyCode ,
136- String replyText ,
137- String exchange ,
138- String routingKey ,
139- AMQP .BasicProperties properties ,
140- byte [] body )
141- throws IOException {
142- p .logBasicReturn ();
143- }
144- });
145- Thread t = new Thread (p );
130+ Thread t = new Thread (new Producer (channel , exchangeName , id ,
131+ flags , producerTxSize ,
132+ 1000L * samplingInterval ,
133+ rateLimit , minMsgSize , timeLimit ));
146134 producerThreads [i ] = t ;
147135 t .start ();
148136 }
@@ -245,6 +233,18 @@ public Producer(Channel channel, String exchangeName, String id,
245233 this .rateLimit = rateLimit ;
246234 this .timeLimit = 1000L * timeLimit ;
247235 this .message = new byte [minMsgSize ];
236+
237+ channel .setReturnListener (new ReturnListener () {
238+ public void handleBasicReturn (int replyCode ,
239+ String replyText ,
240+ String exchange ,
241+ String routingKey ,
242+ AMQP .BasicProperties properties ,
243+ byte [] body )
244+ throws IOException {
245+ logBasicReturn ();
246+ }
247+ });
248248 }
249249
250250 public synchronized void logBasicReturn () {
0 commit comments