Skip to content

Commit 1ec3840

Browse files
author
Alexandru Scvortov
committed
add NackListener
1 parent eef5cdc commit 1ec3840

File tree

5 files changed

+84
-0
lines changed

5 files changed

+84
-0
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,18 @@ public interface Channel extends ShutdownNotifier {
146146
*/
147147
void setAckListener(AckListener listener);
148148

149+
/**
150+
* Return the current {@link NackListener}.
151+
* @return an interface to the current nack listener.
152+
*/
153+
NackListener getNackListener();
154+
155+
/**
156+
* Set the current {@link NackListener}.
157+
* @param listener the listener to use, or null indicating "don't use one".
158+
*/
159+
void setNackListener(NackListener listener);
160+
149161
/**
150162
* Get the current default consumer. @see setDefaultConsumer for rationale.
151163
* @return an interface to the current default consumer.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is VMware, Inc.
14+
// Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
15+
//
16+
17+
18+
package com.rabbitmq.client;
19+
20+
import java.io.IOException;
21+
22+
/**
23+
* Implement this interface in order to be notified of Basic.Nack
24+
* events.
25+
*/
26+
public interface NackListener {
27+
void handleNack(long deliveryTag, boolean multiple)
28+
throws IOException;
29+
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.rabbitmq.client.impl;
1919

2020
import com.rabbitmq.client.AckListener;
21+
import com.rabbitmq.client.NackListener;
2122
import com.rabbitmq.client.AMQP.BasicProperties;
2223
import com.rabbitmq.client.AMQP;
2324
import com.rabbitmq.client.Command;
@@ -91,6 +92,10 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
9192
*/
9293
public volatile AckListener ackListener = null;
9394

95+
/** Reference to the currently-active NackListener, or null if there is none.
96+
*/
97+
public volatile NackListener nackListener = null;
98+
9499
/** Sequence number of next published message requiring confirmation.
95100
*/
96101
private long nextPublishSeqNo = 0L;
@@ -163,6 +168,19 @@ public void setAckListener(AckListener listener) {
163168
ackListener = listener;
164169
}
165170

171+
/** Returns the current NackListener. */
172+
public NackListener getNackListener() {
173+
return nackListener;
174+
}
175+
176+
/**
177+
* Sets the current NackListener.
178+
* A null argument is interpreted to mean "do not use a nack listener".
179+
*/
180+
public void setNackListener(NackListener listener) {
181+
nackListener = listener;
182+
}
183+
166184
/** Returns the current default consumer. */
167185
public Consumer getDefaultConsumer() {
168186
return defaultConsumer;
@@ -331,6 +349,17 @@ public void releaseChannelNumber() {
331349
}
332350
}
333351
return true;
352+
} else if (method instanceof Basic.Nack) {
353+
Basic.Nack nack = (Basic.Nack) method;
354+
NackListener l = getNackListener();
355+
if (l != null) {
356+
try {
357+
l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
358+
} catch (Throwable ex) {
359+
_connection.getExceptionHandler().handleNackListenerException(this, ex);
360+
}
361+
}
362+
return true;
334363
} else if (method instanceof Basic.RecoverOk) {
335364
for (Consumer callback: _consumers.values()) {
336365
callback.handleRecoverOk();

src/com/rabbitmq/client/impl/DefaultExceptionHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ public void handleAckListenerException(Channel channel, Throwable exception) {
4545
handleChannelKiller(channel, exception, "AckListener.handleAck");
4646
}
4747

48+
public void handleNackListenerException(Channel channel, Throwable exception) {
49+
handleChannelKiller(channel, exception, "NackListener.handleNack");
50+
}
51+
4852
public void handleConsumerException(Channel channel, Throwable exception,
4953
Consumer consumer, String consumerTag,
5054
String methodName)

src/com/rabbitmq/client/impl/ExceptionHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,16 @@ public interface ExceptionHandler {
6363
*/
6464
void handleAckListenerException(Channel channel, Throwable exception);
6565

66+
/**
67+
* Perform any required exception processing for the situation
68+
* when the driver thread for the connection has called an
69+
* NackListener's handleNack method, and that method has
70+
* thrown an exeption.
71+
* @param channel the ChannelN that held the NackListener
72+
* @param exception the exception thrown by NackListener.handleNack
73+
*/
74+
void handleNackListenerException(Channel channel, Throwable exception);
75+
6676
/**
6777
* Perform any required exception processing for the situation
6878
* when the driver thread for the connection has called a method

0 commit comments

Comments
 (0)