Skip to content

Commit c832c65

Browse files
author
Steve Powell
committed
Merge in default.
2 parents ee3da36 + 591e684 commit c832c65

File tree

11 files changed

+249
-131
lines changed

11 files changed

+249
-131
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -111,40 +111,61 @@ public interface Channel extends ShutdownNotifier {
111111
void abort(int closeCode, String closeMessage) throws IOException;
112112

113113
/**
114-
* Return the current {@link ReturnListener}.
115-
* @return an interface to the current return listener
114+
* Add a {@link ReturnListener}.
115+
* @param listener the listener to add
116116
*/
117-
ReturnListener getReturnListener();
117+
void addReturnListener(ReturnListener listener);
118118

119119
/**
120-
* Set the current {@link ReturnListener}.
121-
* @param listener the listener to use, or null indicating "don't use one".
120+
* Remove a {@link ReturnListener}.
121+
* @param listener the listener to remove
122+
* @return <code><b>true</b></code> if the listener was found and removed,
123+
* <code><b>false</b></code> otherwise
122124
*/
123-
void setReturnListener(ReturnListener listener);
125+
boolean removeReturnListener(ReturnListener listener);
124126

125127
/**
126-
* Return the current {@link FlowListener}.
127-
* @return an interface to the current flow listener.
128+
* Remove all {@link ReturnListener}s.
128129
*/
129-
FlowListener getFlowListener();
130+
void clearReturnListeners();
130131

131132
/**
132-
* Set the current {@link FlowListener}.
133-
* @param listener the listener to use, or null indicating "don't use one".
133+
* Add a {@link FlowListener}.
134+
* @param listener the listener to add
134135
*/
135-
void setFlowListener(FlowListener listener);
136+
void addFlowListener(FlowListener listener);
136137

137138
/**
138-
* Return the current {@link ConfirmListener}.
139-
* @return an interface to the current ack listener.
139+
* Remove a {@link FlowListener}.
140+
* @param listener the listener to remove
141+
* @return <code><b>true</b></code> if the listener was found and removed,
142+
* <code><b>false</b></code> otherwise
140143
*/
141-
ConfirmListener getConfirmListener();
144+
boolean removeFlowListener(FlowListener listener);
142145

143146
/**
144-
* Set the current {@link ConfirmListener}.
145-
* @param listener the listener to use, or null indicating "don't use one".
147+
* Remove all {@link FlowListener}s.
146148
*/
147-
void setConfirmListener(ConfirmListener listener);
149+
void clearFlowListeners();
150+
151+
/**
152+
* Add a {@link ConfirmListener}.
153+
* @param listener the listener to add
154+
*/
155+
void addConfirmListener(ConfirmListener listener);
156+
157+
/**
158+
* Remove a {@link ConfirmListener}.
159+
* @param listener the listener to remove
160+
* @return <code><b>true</b></code> if the listener was found and removed,
161+
* <code><b>false</b></code> otherwise
162+
*/
163+
boolean removeConfirmListener(ConfirmListener listener);
164+
165+
/**
166+
* Remove all {@link ConfirmListener}s.
167+
*/
168+
void clearConfirmListeners();
148169

149170
/**
150171
* Get the current default consumer. @see setDefaultConsumer for rationale.

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 90 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818
package com.rabbitmq.client.impl;
1919

2020
import java.io.IOException;
21+
import java.util.Collection;
2122
import java.util.Collections;
2223
import java.util.HashMap;
2324
import java.util.Map;
2425
import java.util.SortedSet;
2526
import java.util.TreeSet;
27+
import java.util.concurrent.CopyOnWriteArrayList;
2628
import java.util.concurrent.TimeoutException;
27-
2829
import com.rabbitmq.client.AMQP;
2930
import com.rabbitmq.client.Command;
3031
import com.rabbitmq.client.ConfirmListener;
@@ -70,37 +71,31 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
7071
* BlockingRpcContinuation to inject code into the reader thread
7172
* in basicConsume and basicCancel.
7273
*/
73-
public final Map<String, Consumer> _consumers =
74+
private final Map<String, Consumer> _consumers =
7475
Collections.synchronizedMap(new HashMap<String, Consumer>());
7576

76-
/** Reference to the currently-active ReturnListener, or null if there is none.
77-
*/
78-
public volatile ReturnListener returnListener = null;
79-
80-
/** Reference to the currently-active FlowListener, or null if there is none.
81-
*/
82-
public volatile FlowListener flowListener = null;
83-
84-
/** Reference to the currently-active ConfirmListener, or null if there is none.
85-
*/
86-
public volatile ConfirmListener confirmListener = null;
77+
/* All listeners collections are in CopyOnWriteArrayList objects */
78+
/** The ReturnListener collection. */
79+
private final Collection<ReturnListener> returnListeners = new CopyOnWriteArrayList<ReturnListener>();
80+
/** The FlowListener collection. */
81+
private final Collection<FlowListener> flowListeners = new CopyOnWriteArrayList<FlowListener>();
82+
/** The ConfirmListener collection. */
83+
private final Collection<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();
8784

8885
/** Sequence number of next published message requiring confirmation. */
8986
private long nextPublishSeqNo = 0L;
9087

91-
/** Reference to the currently-active default consumer, or null if there is
92-
* none.
93-
*/
94-
public volatile Consumer defaultConsumer = null;
88+
/** The current default consumer, or null if there is none. */
89+
private volatile Consumer defaultConsumer = null;
9590

9691
/** Set of currently unconfirmed messages (i.e. messages that have
9792
* not been ack'd or nack'd by the server yet. */
98-
protected volatile SortedSet<Long> unconfirmedSet =
93+
private volatile SortedSet<Long> unconfirmedSet =
9994
Collections.synchronizedSortedSet(new TreeSet<Long>());
10095

10196
/** Whether any nacks have been received since the last
10297
* waitForConfirms(). */
103-
protected volatile boolean onlyAcksReceived = true;
98+
private volatile boolean onlyAcksReceived = true;
10499

105100
/**
106101
* Construct a new channel on the given connection with the given
@@ -126,35 +121,40 @@ public void open() throws IOException {
126121
Utility.use(openOk);
127122
}
128123

129-
/** Returns the current ReturnListener. */
130-
public ReturnListener getReturnListener() {
131-
return returnListener;
124+
public void addReturnListener(ReturnListener listener) {
125+
returnListeners.add(listener);
132126
}
133127

134-
/**
135-
* Sets the current ReturnListener.
136-
* A null argument is interpreted to mean "do not use a return listener".
137-
*/
138-
public void setReturnListener(ReturnListener listener) {
139-
returnListener = listener;
128+
public boolean removeReturnListener(ReturnListener listener) {
129+
return returnListeners.remove(listener);
140130
}
141131

142-
/** Returns the current {@link FlowListener}. */
143-
public FlowListener getFlowListener() {
144-
return flowListener;
132+
public void clearReturnListeners() {
133+
returnListeners.clear();
145134
}
146135

147-
/**
148-
* Sets the current {@link FlowListener}.
149-
* A null argument is interpreted to mean "do not use a flow listener".
150-
*/
151-
public void setFlowListener(FlowListener listener) {
152-
flowListener = listener;
136+
public void addFlowListener(FlowListener listener) {
137+
flowListeners.add(listener);
138+
}
139+
140+
public boolean removeFlowListener(FlowListener listener) {
141+
return flowListeners.remove(listener);
142+
}
143+
144+
public void clearFlowListeners() {
145+
flowListeners.clear();
153146
}
154147

155-
/** Returns the current {@link ConfirmListener}. */
156-
public ConfirmListener getConfirmListener() {
157-
return confirmListener;
148+
public void addConfirmListener(ConfirmListener listener) {
149+
confirmListeners.add(listener);
150+
}
151+
152+
public boolean removeConfirmListener(ConfirmListener listener) {
153+
return confirmListeners.remove(listener);
154+
}
155+
156+
public void clearConfirmListeners() {
157+
confirmListeners.clear();
158158
}
159159

160160
/** {@inheritDoc} */
@@ -187,14 +187,6 @@ public void waitForConfirmsOrDie()
187187
}
188188
}
189189

190-
/**
191-
* Sets the current {@link ConfirmListener}.
192-
* A null argument is interpreted to mean "do not use a confirm listener".
193-
*/
194-
public void setConfirmListener(ConfirmListener listener) {
195-
confirmListener = listener;
196-
}
197-
198190
/** Returns the current default consumer. */
199191
public Consumer getDefaultConsumer() {
200192
return defaultConsumer;
@@ -311,22 +303,7 @@ public void releaseChannelNumber() {
311303
}
312304
return true;
313305
} else if (method instanceof Basic.Return) {
314-
ReturnListener l = getReturnListener();
315-
if (l != null) {
316-
Basic.Return basicReturn = (Basic.Return) method;
317-
try {
318-
l.handleReturn(basicReturn.getReplyCode(),
319-
basicReturn.getReplyText(),
320-
basicReturn.getExchange(),
321-
basicReturn.getRoutingKey(),
322-
(BasicProperties)
323-
command.getContentHeader(),
324-
command.getContentBody());
325-
} catch (Throwable ex) {
326-
_connection.getExceptionHandler().handleReturnListenerException(this,
327-
ex);
328-
}
329-
}
306+
callReturnListeners(command, (Basic.Return) method);
330307
return true;
331308
} else if (method instanceof Channel.Flow) {
332309
Channel.Flow channelFlow = (Channel.Flow) method;
@@ -335,34 +312,17 @@ public void releaseChannelNumber() {
335312
transmit(new Channel.FlowOk(!_blockContent));
336313
_channelMutex.notifyAll();
337314
}
338-
FlowListener l = getFlowListener();
339-
if (l != null) {
340-
try {
341-
l.handleFlow(channelFlow.getActive());
342-
} catch (Throwable ex) {
343-
_connection.getExceptionHandler().handleFlowListenerException(this, ex);
344-
}
345-
}
315+
callFlowListeners(command, channelFlow);
346316
return true;
347317
} else if (method instanceof Basic.Ack) {
348318
Basic.Ack ack = (Basic.Ack) method;
349-
try {
350-
if (confirmListener != null)
351-
confirmListener.handleAck(ack.getDeliveryTag(), ack.getMultiple());
352-
} catch (Throwable ex) {
353-
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
354-
}
319+
callConfirmListeners(command, ack);
355320
handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);
356321
return true;
357322
} else if (method instanceof Basic.Nack) {
358323
Basic.Nack nack = (Basic.Nack) method;
359-
try {
360-
if (confirmListener != null)
361-
confirmListener.handleNack(nack.getDeliveryTag(), nack.getMultiple());
362-
} catch (Throwable ex) {
363-
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
364-
}
365-
handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
324+
callConfirmListeners(command, nack);
325+
handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), false);
366326
return true;
367327
} else if (method instanceof Basic.RecoverOk) {
368328
for (Consumer callback: _consumers.values()) {
@@ -413,6 +373,51 @@ public void releaseChannelNumber() {
413373
}
414374
}
415375

376+
private void callReturnListeners(Command command, Basic.Return basicReturn) {
377+
try {
378+
for (ReturnListener l : this.returnListeners) {
379+
l.handleReturn(basicReturn.getReplyCode(),
380+
basicReturn.getReplyText(),
381+
basicReturn.getExchange(),
382+
basicReturn.getRoutingKey(),
383+
(BasicProperties) command.getContentHeader(),
384+
command.getContentBody());
385+
}
386+
} catch (Throwable ex) {
387+
_connection.getExceptionHandler().handleReturnListenerException(this, ex);
388+
}
389+
}
390+
391+
private void callFlowListeners(Command command, Channel.Flow channelFlow) {
392+
try {
393+
for (FlowListener l : this.flowListeners) {
394+
l.handleFlow(channelFlow.getActive());
395+
}
396+
} catch (Throwable ex) {
397+
_connection.getExceptionHandler().handleFlowListenerException(this, ex);
398+
}
399+
}
400+
401+
private void callConfirmListeners(Command command, Basic.Ack ack) {
402+
try {
403+
for (ConfirmListener l : this.confirmListeners) {
404+
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
405+
}
406+
} catch (Throwable ex) {
407+
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
408+
}
409+
}
410+
411+
private void callConfirmListeners(Command command, Basic.Nack nack) {
412+
try {
413+
for (ConfirmListener l : this.confirmListeners) {
414+
l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
415+
}
416+
} catch (Throwable ex) {
417+
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
418+
}
419+
}
420+
416421
private void asyncShutdown(Command command) throws IOException {
417422
releaseChannelNumber();
418423
ShutdownSignalException signal = new ShutdownSignalException(false,

0 commit comments

Comments
 (0)