@@ -48,7 +48,7 @@ public class QueueingConsumer extends DefaultConsumer {
4848
4949 // Marker object used to signal the queue is in shutdown mode.
5050 // Invariant: This is never on _queue unless _shutdown != null.
51- public static final Delivery POISON = new Delivery (null , null , null );
51+ private static final Delivery POISON = new Delivery (null , null , null );
5252
5353 public QueueingConsumer (Channel ch ) {
5454 this (ch , new LinkedBlockingQueue <Delivery >());
@@ -113,6 +113,10 @@ public byte[] getBody() {
113113 }
114114 }
115115
116+ private void checkShutdown (){
117+ if (_shutdown != null ) throw _shutdown ;
118+ }
119+
116120 private Delivery handle (Delivery delivery )
117121 {
118122 if (delivery == POISON || (delivery == null && _shutdown != null )){
@@ -131,6 +135,7 @@ private Delivery handle(Delivery delivery)
131135 public Delivery nextDelivery ()
132136 throws InterruptedException , ShutdownSignalException
133137 {
138+ checkShutdown ();
134139 return handle (_queue .take ());
135140 }
136141
@@ -144,14 +149,7 @@ public Delivery nextDelivery()
144149 public Delivery nextDelivery (long timeout )
145150 throws InterruptedException , ShutdownSignalException
146151 {
152+ checkShutdown ();
147153 return handle (_queue .poll (timeout , TimeUnit .MILLISECONDS ));
148154 }
149-
150- /**
151- * Retrieve the underlying blocking queue.
152- * @return the queue where incoming messages are stored
153- */
154- public BlockingQueue <Delivery > getQueue () {
155- return _queue ;
156- }
157155}
0 commit comments