4343import com .rabbitmq .client .ConfirmListener ;
4444import com .rabbitmq .client .Connection ;
4545import com .rabbitmq .client .ConnectionFactory ;
46+ import com .rabbitmq .client .ConsumerCancelledException ;
4647import com .rabbitmq .client .Envelope ;
4748import com .rabbitmq .client .MessageProperties ;
4849import com .rabbitmq .client .QueueingConsumer ;
@@ -108,12 +109,10 @@ public static void main(String[] args) {
108109 flags .contains ("persistent" ),
109110 exclusive , autoDelete ,
110111 null ).getQueue ();
111- QueueingConsumer consumer = new QueueingConsumer (channel );
112112 if (prefetchCount > 0 ) channel .basicQos (prefetchCount );
113- channel .basicConsume (qName , autoAck , consumer );
114113 channel .queueBind (qName , exchangeName , id );
115114 Thread t =
116- new Thread (new Consumer (consumer , id ,
115+ new Thread (new Consumer (channel , id , qName ,
117116 consumerTxSize , autoAck ,
118117 stats , timeLimit ));
119118 consumerThreads [i ] = t ;
@@ -419,18 +418,21 @@ private byte[] createMessage(int sequenceNumber)
419418 public static class Consumer implements Runnable {
420419
421420 private QueueingConsumer q ;
421+ private Channel channel ;
422422 private String id ;
423+ private String queueName ;
423424 private int txSize ;
424425 private boolean autoAck ;
425426 private Stats stats ;
426427 private long timeLimit ;
427428
428- public Consumer (QueueingConsumer q , String id ,
429- int txSize , boolean autoAck ,
429+ public Consumer (Channel channel , String id ,
430+ String queueName , int txSize , boolean autoAck ,
430431 Stats stats , int timeLimit ) {
431432
432- this .q = q ;
433+ this .channel = channel ;
433434 this .id = id ;
435+ this .queueName = queueName ;
434436 this .txSize = txSize ;
435437 this .autoAck = autoAck ;
436438 this .stats = stats ;
@@ -444,17 +446,24 @@ public void run() {
444446 startTime = now = System .currentTimeMillis ();
445447 int totalMsgCount = 0 ;
446448
447- Channel channel = q .getChannel ();
448-
449449 try {
450+ q = new QueueingConsumer (channel );
451+ channel .basicConsume (queueName , autoAck , q );
450452
451453 while (timeLimit == 0 || now < startTime + timeLimit ) {
452454 Delivery delivery ;
453- if (timeLimit == 0 ) {
454- delivery = q .nextDelivery ();
455- } else {
456- delivery = q .nextDelivery (startTime + timeLimit - now );
457- if (delivery == null ) break ;
455+ try {
456+ if (timeLimit == 0 ) {
457+ delivery = q .nextDelivery ();
458+ } else {
459+ delivery = q .nextDelivery (startTime + timeLimit - now );
460+ if (delivery == null ) break ;
461+ }
462+ } catch (ConsumerCancelledException e ) {
463+ System .out .println ("Consumer cancelled by broker. Re-consuming." );
464+ q = new QueueingConsumer (channel );
465+ channel .basicConsume (queueName , autoAck , q );
466+ continue ;
458467 }
459468 totalMsgCount ++;
460469
0 commit comments