1818package com .rabbitmq .client .impl ;
1919
2020import java .io .IOException ;
21+ import java .util .Collection ;
2122import java .util .Collections ;
2223import java .util .HashMap ;
2324import java .util .Map ;
2425import java .util .SortedSet ;
2526import java .util .TreeSet ;
27+ import java .util .concurrent .CopyOnWriteArrayList ;
2628import java .util .concurrent .TimeoutException ;
27-
2829import com .rabbitmq .client .AMQP ;
2930import com .rabbitmq .client .Command ;
3031import com .rabbitmq .client .ConfirmListener ;
@@ -70,37 +71,31 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
7071 * BlockingRpcContinuation to inject code into the reader thread
7172 * in basicConsume and basicCancel.
7273 */
73- public final Map <String , Consumer > _consumers =
74+ private final Map <String , Consumer > _consumers =
7475 Collections .synchronizedMap (new HashMap <String , Consumer >());
7576
76- /** Reference to the currently-active ReturnListener, or null if there is none.
77- */
78- public volatile ReturnListener returnListener = null ;
79-
80- /** Reference to the currently-active FlowListener, or null if there is none.
81- */
82- public volatile FlowListener flowListener = null ;
83-
84- /** Reference to the currently-active ConfirmListener, or null if there is none.
85- */
86- public volatile ConfirmListener confirmListener = null ;
77+ /* All listeners collections are in CopyOnWriteArrayList objects */
78+ /** The ReturnListener collection. */
79+ private final Collection <ReturnListener > returnListeners = new CopyOnWriteArrayList <ReturnListener >();
80+ /** The FlowListener collection. */
81+ private final Collection <FlowListener > flowListeners = new CopyOnWriteArrayList <FlowListener >();
82+ /** The ConfirmListener collection. */
83+ private final Collection <ConfirmListener > confirmListeners = new CopyOnWriteArrayList <ConfirmListener >();
8784
8885 /** Sequence number of next published message requiring confirmation. */
8986 private long nextPublishSeqNo = 0L ;
9087
91- /** Reference to the currently-active default consumer, or null if there is
92- * none.
93- */
94- public volatile Consumer defaultConsumer = null ;
88+ /** The current default consumer, or null if there is none. */
89+ private volatile Consumer defaultConsumer = null ;
9590
9691 /** Set of currently unconfirmed messages (i.e. messages that have
9792 * not been ack'd or nack'd by the server yet. */
98- protected volatile SortedSet <Long > unconfirmedSet =
93+ private volatile SortedSet <Long > unconfirmedSet =
9994 Collections .synchronizedSortedSet (new TreeSet <Long >());
10095
10196 /** Whether any nacks have been received since the last
10297 * waitForConfirms(). */
103- protected volatile boolean onlyAcksReceived = true ;
98+ private volatile boolean onlyAcksReceived = true ;
10499
105100 /**
106101 * Construct a new channel on the given connection with the given
@@ -126,48 +121,52 @@ public void open() throws IOException {
126121 Utility .use (openOk );
127122 }
128123
129- /** Returns the current ReturnListener. */
130- public ReturnListener getReturnListener () {
131- return returnListener ;
124+ public void addReturnListener (ReturnListener listener ) {
125+ returnListeners .add (listener );
132126 }
133127
134- /**
135- * Sets the current ReturnListener.
136- * A null argument is interpreted to mean "do not use a return listener".
137- */
138- public void setReturnListener (ReturnListener listener ) {
139- returnListener = listener ;
128+ public boolean removeReturnListener (ReturnListener listener ) {
129+ return returnListeners .remove (listener );
140130 }
141131
142- /** Returns the current {@link FlowListener}. */
143- public FlowListener getFlowListener () {
144- return flowListener ;
132+ public void clearReturnListeners () {
133+ returnListeners .clear ();
145134 }
146135
147- /**
148- * Sets the current {@link FlowListener}.
149- * A null argument is interpreted to mean "do not use a flow listener".
150- */
151- public void setFlowListener (FlowListener listener ) {
152- flowListener = listener ;
136+ public void addFlowListener (FlowListener listener ) {
137+ flowListeners .add (listener );
138+ }
139+
140+ public boolean removeFlowListener (FlowListener listener ) {
141+ return flowListeners .remove (listener );
142+ }
143+
144+ public void clearFlowListeners () {
145+ flowListeners .clear ();
153146 }
154147
155- /** Returns the current {@link ConfirmListener}. */
156- public ConfirmListener getConfirmListener () {
157- return confirmListener ;
148+ public void addConfirmListener (ConfirmListener listener ) {
149+ confirmListeners .add (listener );
150+ }
151+
152+ public boolean removeConfirmListener (ConfirmListener listener ) {
153+ return confirmListeners .remove (listener );
154+ }
155+
156+ public void clearConfirmListeners () {
157+ confirmListeners .clear ();
158158 }
159159
160160 /** {@inheritDoc} */
161161 public boolean waitForConfirms ()
162162 throws InterruptedException
163163 {
164- long seqHead = this .getNextPublishSeqNo ();
165164 synchronized (unconfirmedSet ) {
166165 while (true ) {
167166 if (getCloseReason () != null ) {
168167 throw Utility .fixStackTrace (getCloseReason ());
169168 }
170- if (unconfirmedSet .headSet ( seqHead ). isEmpty ()) {
169+ if (unconfirmedSet .isEmpty ()) {
171170 boolean aux = onlyAcksReceived ;
172171 onlyAcksReceived = true ;
173172 return aux ;
@@ -187,14 +186,6 @@ public void waitForConfirmsOrDie()
187186 }
188187 }
189188
190- /**
191- * Sets the current {@link ConfirmListener}.
192- * A null argument is interpreted to mean "do not use a confirm listener".
193- */
194- public void setConfirmListener (ConfirmListener listener ) {
195- confirmListener = listener ;
196- }
197-
198189 /** Returns the current default consumer. */
199190 public Consumer getDefaultConsumer () {
200191 return defaultConsumer ;
@@ -311,22 +302,7 @@ public void releaseChannelNumber() {
311302 }
312303 return true ;
313304 } else if (method instanceof Basic .Return ) {
314- ReturnListener l = getReturnListener ();
315- if (l != null ) {
316- Basic .Return basicReturn = (Basic .Return ) method ;
317- try {
318- l .handleReturn (basicReturn .getReplyCode (),
319- basicReturn .getReplyText (),
320- basicReturn .getExchange (),
321- basicReturn .getRoutingKey (),
322- (BasicProperties )
323- command .getContentHeader (),
324- command .getContentBody ());
325- } catch (Throwable ex ) {
326- _connection .getExceptionHandler ().handleReturnListenerException (this ,
327- ex );
328- }
329- }
305+ callReturnListeners (command , (Basic .Return ) method );
330306 return true ;
331307 } else if (method instanceof Channel .Flow ) {
332308 Channel .Flow channelFlow = (Channel .Flow ) method ;
@@ -335,34 +311,17 @@ public void releaseChannelNumber() {
335311 transmit (new Channel .FlowOk (!_blockContent ));
336312 _channelMutex .notifyAll ();
337313 }
338- FlowListener l = getFlowListener ();
339- if (l != null ) {
340- try {
341- l .handleFlow (channelFlow .getActive ());
342- } catch (Throwable ex ) {
343- _connection .getExceptionHandler ().handleFlowListenerException (this , ex );
344- }
345- }
314+ callFlowListeners (command , channelFlow );
346315 return true ;
347316 } else if (method instanceof Basic .Ack ) {
348317 Basic .Ack ack = (Basic .Ack ) method ;
349- try {
350- if (confirmListener != null )
351- confirmListener .handleAck (ack .getDeliveryTag (), ack .getMultiple ());
352- } catch (Throwable ex ) {
353- _connection .getExceptionHandler ().handleConfirmListenerException (this , ex );
354- }
318+ callConfirmListeners (command , ack );
355319 handleAckNack (ack .getDeliveryTag (), ack .getMultiple (), false );
356320 return true ;
357321 } else if (method instanceof Basic .Nack ) {
358322 Basic .Nack nack = (Basic .Nack ) method ;
359- try {
360- if (confirmListener != null )
361- confirmListener .handleNack (nack .getDeliveryTag (), nack .getMultiple ());
362- } catch (Throwable ex ) {
363- _connection .getExceptionHandler ().handleConfirmListenerException (this , ex );
364- }
365- handleAckNack (nack .getDeliveryTag (), nack .getMultiple (), true );
323+ callConfirmListeners (command , nack );
324+ handleAckNack (nack .getDeliveryTag (), nack .getMultiple (), false );
366325 return true ;
367326 } else if (method instanceof Basic .RecoverOk ) {
368327 for (Consumer callback : _consumers .values ()) {
@@ -413,6 +372,51 @@ public void releaseChannelNumber() {
413372 }
414373 }
415374
375+ private void callReturnListeners (Command command , Basic .Return basicReturn ) {
376+ try {
377+ for (ReturnListener l : this .returnListeners ) {
378+ l .handleReturn (basicReturn .getReplyCode (),
379+ basicReturn .getReplyText (),
380+ basicReturn .getExchange (),
381+ basicReturn .getRoutingKey (),
382+ (BasicProperties ) command .getContentHeader (),
383+ command .getContentBody ());
384+ }
385+ } catch (Throwable ex ) {
386+ _connection .getExceptionHandler ().handleReturnListenerException (this , ex );
387+ }
388+ }
389+
390+ private void callFlowListeners (Command command , Channel .Flow channelFlow ) {
391+ try {
392+ for (FlowListener l : this .flowListeners ) {
393+ l .handleFlow (channelFlow .getActive ());
394+ }
395+ } catch (Throwable ex ) {
396+ _connection .getExceptionHandler ().handleFlowListenerException (this , ex );
397+ }
398+ }
399+
400+ private void callConfirmListeners (Command command , Basic .Ack ack ) {
401+ try {
402+ for (ConfirmListener l : this .confirmListeners ) {
403+ l .handleAck (ack .getDeliveryTag (), ack .getMultiple ());
404+ }
405+ } catch (Throwable ex ) {
406+ _connection .getExceptionHandler ().handleConfirmListenerException (this , ex );
407+ }
408+ }
409+
410+ private void callConfirmListeners (Command command , Basic .Nack nack ) {
411+ try {
412+ for (ConfirmListener l : this .confirmListeners ) {
413+ l .handleNack (nack .getDeliveryTag (), nack .getMultiple ());
414+ }
415+ } catch (Throwable ex ) {
416+ _connection .getExceptionHandler ().handleConfirmListenerException (this , ex );
417+ }
418+ }
419+
416420 private void asyncShutdown (Command command ) throws IOException {
417421 releaseChannelNumber ();
418422 ShutdownSignalException signal = new ShutdownSignalException (false ,
0 commit comments