2121import java .util .Collections ;
2222import java .util .HashMap ;
2323import java .util .Map ;
24+ import java .util .SortedSet ;
25+ import java .util .TreeSet ;
2426import java .util .concurrent .TimeoutException ;
2527
2628import com .rabbitmq .client .AMQP ;
5355 * <pre>
5456 * {@link Connection} conn = ...;
5557 * {@link ChannelN} ch1 = conn.{@link Connection#createChannel createChannel}();
56- * ch1.{@link ChannelN#open open}();
5758 * </pre>
5859 */
5960public class ChannelN extends AMQChannel implements com .rabbitmq .client .Channel {
@@ -99,6 +100,15 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
99100 */
100101 public volatile Consumer defaultConsumer = null ;
101102
103+ /** Set of currently unconfirmed messages (i.e. messages that have
104+ * not been ack'd or nack'd by the server yet. */
105+ protected volatile SortedSet <Long > unconfirmedSet =
106+ Collections .synchronizedSortedSet (new TreeSet <Long >());
107+
108+ /** Whether any nacks have been received since the last
109+ * waitForConfirms(). */
110+ protected volatile boolean onlyAcksReceived = true ;
111+
102112 /**
103113 * Construct a new channel on the given connection with the given
104114 * channel number. Usually not called directly - call
@@ -154,6 +164,35 @@ public ConfirmListener getConfirmListener() {
154164 return confirmListener ;
155165 }
156166
167+ /** {@inheritDoc} */
168+ public boolean waitForConfirms ()
169+ throws InterruptedException
170+ {
171+ synchronized (unconfirmedSet ) {
172+ while (true ) {
173+ if (getCloseReason () != null ) {
174+ throw Utility .fixStackTrace (getCloseReason ());
175+ }
176+ if (unconfirmedSet .isEmpty ()) {
177+ boolean aux = onlyAcksReceived ;
178+ onlyAcksReceived = true ;
179+ return aux ;
180+ }
181+ unconfirmedSet .wait ();
182+ }
183+ }
184+ }
185+
186+ /** {@inheritDoc} */
187+ public void waitForConfirmsOrDie ()
188+ throws IOException , InterruptedException
189+ {
190+ if (!waitForConfirms ()) {
191+ close (AMQP .REPLY_SUCCESS , "NACKS RECEIVED" , true , null , false );
192+ throw new IOException ("nacks received" );
193+ }
194+ }
195+
157196 /**
158197 * Sets the current {@link ConfirmListener}.
159198 * A null argument is interpreted to mean "do not use a confirm listener".
@@ -208,6 +247,9 @@ public void broadcastShutdownSignal(ShutdownSignalException signal) {
208247 {
209248 super .processShutdownSignal (signal , ignoreClosed , notifyRpc );
210249 broadcastShutdownSignal (signal );
250+ synchronized (unconfirmedSet ) {
251+ unconfirmedSet .notify ();
252+ }
211253 }
212254
213255 public void releaseChannelNumber () {
@@ -310,25 +352,23 @@ public void releaseChannelNumber() {
310352 return true ;
311353 } else if (method instanceof Basic .Ack ) {
312354 Basic .Ack ack = (Basic .Ack ) method ;
313- ConfirmListener l = getConfirmListener ();
314- if (l != null ) {
315- try {
316- l .handleAck (ack .getDeliveryTag (), ack .getMultiple ());
317- } catch (Throwable ex ) {
318- _connection .getExceptionHandler ().handleConfirmListenerException (this , ex );
319- }
355+ try {
356+ if (confirmListener != null )
357+ confirmListener .handleAck (ack .getDeliveryTag (), ack .getMultiple ());
358+ } catch (Throwable ex ) {
359+ _connection .getExceptionHandler ().handleConfirmListenerException (this , ex );
320360 }
361+ handleAckNack (ack .getDeliveryTag (), ack .getMultiple (), false );
321362 return true ;
322363 } else if (method instanceof Basic .Nack ) {
323364 Basic .Nack nack = (Basic .Nack ) method ;
324- ConfirmListener l = getConfirmListener ();
325- if (l != null ) {
326- try {
327- l .handleNack (nack .getDeliveryTag (), nack .getMultiple ());
328- } catch (Throwable ex ) {
329- _connection .getExceptionHandler ().handleConfirmListenerException (this , ex );
330- }
365+ try {
366+ if (confirmListener != null )
367+ confirmListener .handleNack (nack .getDeliveryTag (), nack .getMultiple ());
368+ } catch (Throwable ex ) {
369+ _connection .getExceptionHandler ().handleConfirmListenerException (this , ex );
331370 }
371+ handleAckNack (nack .getDeliveryTag (), nack .getMultiple (), true );
332372 return true ;
333373 } else if (method instanceof Basic .RecoverOk ) {
334374 for (Consumer callback : _consumers .values ()) {
@@ -513,7 +553,10 @@ public void basicPublish(String exchange, String routingKey,
513553 BasicProperties props , byte [] body )
514554 throws IOException
515555 {
516- if (nextPublishSeqNo > 0 ) nextPublishSeqNo ++;
556+ if (nextPublishSeqNo > 0 ) {
557+ unconfirmedSet .add (getNextPublishSeqNo ());
558+ nextPublishSeqNo ++;
559+ }
517560 BasicProperties useProps = props ;
518561 if (props == null ) {
519562 useProps = MessageProperties .MINIMAL_BASIC ;
@@ -919,4 +962,16 @@ public com.rabbitmq.client.Method rpc(com.rabbitmq.client.Method method) throws
919962 return exnWrappingRpc ((com .rabbitmq .client .impl .Method )method ).getMethod ();
920963 }
921964
922- }
965+ protected void handleAckNack (long seqNo , boolean multiple , boolean nack ) {
966+ if (multiple ) {
967+ unconfirmedSet .headSet (seqNo + 1 ).clear ();
968+ } else {
969+ unconfirmedSet .remove (seqNo );
970+ }
971+ synchronized (unconfirmedSet ) {
972+ onlyAcksReceived = onlyAcksReceived && !nack ;
973+ if (unconfirmedSet .isEmpty ())
974+ unconfirmedSet .notify ();
975+ }
976+ }
977+ }
0 commit comments