3737import java .util .concurrent .TimeUnit ;
3838
3939import com .rabbitmq .client .AMQP .BasicProperties ;
40- import com .rabbitmq .utility .ValueOrException ;
40+ import com .rabbitmq .utility .Utility ;
4141
4242/**
4343 * Convenience class: an implementation of {@link Consumer} with straightforward blocking semantics
4444 */
4545public class QueueingConsumer extends DefaultConsumer {
46- public BlockingQueue <ValueOrException <Delivery , ShutdownSignalException >> _queue ;
46+ private final BlockingQueue <Delivery > _queue ;
47+
48+ // When this is non-null the queue is in shutdown mode and nextDelivery should
49+ // throw a shutdown signal exception.
50+ private volatile ShutdownSignalException _shutdown ;
51+
52+ // Marker object used to signal the queue is in shutdown mode.
53+ // It is only there to wake up consumers. The canonical representation
54+ // of shutting down is the presence of _shutdown.
55+ // Invariant: This is never on _queue unless _shutdown != null.
56+ private static final Delivery POISON = new Delivery (null , null , null );
4757
4858 public QueueingConsumer (Channel ch ) {
49- this (ch ,
50- new LinkedBlockingQueue <ValueOrException <Delivery , ShutdownSignalException >>());
59+ this (ch , new LinkedBlockingQueue <Delivery >());
5160 }
5261
53- public QueueingConsumer (Channel ch ,
54- BlockingQueue <ValueOrException <Delivery , ShutdownSignalException >> q )
62+ public QueueingConsumer (Channel ch , BlockingQueue <Delivery > q )
5563 {
5664 super (ch );
5765 this ._queue = q ;
5866 }
5967
6068 @ Override public void handleShutdownSignal (String consumerTag , ShutdownSignalException sig ) {
61- this ._queue .add (ValueOrException . <Delivery , ShutdownSignalException > makeException (sig ));
69+ _shutdown = sig ;
70+ _queue .add (POISON );
6271 }
6372
6473 @ Override public void handleDelivery (String consumerTag ,
@@ -67,8 +76,8 @@ public QueueingConsumer(Channel ch,
6776 byte [] body )
6877 throws IOException
6978 {
70- this . _queue . add ( ValueOrException . < Delivery , ShutdownSignalException > makeValue
71- (new Delivery (envelope , properties , body ) ));
79+ checkShutdown ();
80+ this . _queue . add (new Delivery (envelope , properties , body ));
7281 }
7382
7483 /**
@@ -110,6 +119,27 @@ public byte[] getBody() {
110119 }
111120 }
112121
122+ /**
123+ * Check if we are in shutdown mode and if so throw an exception.
124+ */
125+ private void checkShutdown (){
126+ if (_shutdown != null ) throw Utility .fixStackTrace (_shutdown );
127+ }
128+
129+ /**
130+ * If this is a non-POISON non-null delivery simply return it.
131+ * If this is POISON we are in shutdown mode, throw _shutdown
132+ * If this is null, we may be in shutdown mode. Check and see.
133+ */
134+ private Delivery handle (Delivery delivery )
135+ {
136+ if (delivery == POISON || (delivery == null && _shutdown != null )){
137+ if (delivery == POISON ) _queue .add (POISON );
138+ throw Utility .fixStackTrace (_shutdown );
139+ }
140+ return delivery ;
141+ }
142+
113143 /**
114144 * Main application-side API: wait for the next message delivery and return it.
115145 * @return the next message
@@ -119,7 +149,7 @@ public byte[] getBody() {
119149 public Delivery nextDelivery ()
120150 throws InterruptedException , ShutdownSignalException
121151 {
122- return _queue .take (). getValue ( );
152+ return handle ( _queue .take ());
123153 }
124154
125155 /**
@@ -132,16 +162,7 @@ public Delivery nextDelivery()
132162 public Delivery nextDelivery (long timeout )
133163 throws InterruptedException , ShutdownSignalException
134164 {
135- ValueOrException <Delivery , ShutdownSignalException > r =
136- _queue .poll (timeout , TimeUnit .MILLISECONDS );
137- return r == null ? null : r .getValue ();
138- }
139-
140- /**
141- * Retrieve the underlying blocking queue.
142- * @return the queue where incoming messages are stored
143- */
144- public BlockingQueue <ValueOrException <Delivery , ShutdownSignalException >> getQueue () {
145- return _queue ;
165+ checkShutdown ();
166+ return handle (_queue .poll (timeout , TimeUnit .MILLISECONDS ));
146167 }
147168}
0 commit comments