1919
2020import com .rabbitmq .client .test .BrokerTestCase ;
2121import com .rabbitmq .client .AMQP ;
22- import com .rabbitmq .client .AckListener ;
22+ import com .rabbitmq .client .ConfirmListener ;
2323import com .rabbitmq .client .Channel ;
2424import com .rabbitmq .client .DefaultConsumer ;
2525import com .rabbitmq .client .GetResponse ;
@@ -35,17 +35,21 @@ public class Confirm extends BrokerTestCase
3535{
3636 final static int NUM_MESSAGES = 1000 ;
3737 private static final String TTL_ARG = "x-message-ttl" ;
38- private SortedSet <Long > ackSet ;
38+ private SortedSet <Long > unconfirmedSet ;
3939
4040 @ Override
4141 protected void setUp () throws IOException {
4242 super .setUp ();
43- ackSet = Collections . synchronizedSortedSet ( new TreeSet < Long >());
44- channel . setAckListener (new AckListener () {
45- public void handleAck ( long seqNo ,
46- boolean multiple ) {
43+ unconfirmedSet =
44+ Collections . synchronizedSortedSet (new TreeSet < Long >());
45+ channel . setConfirmListener ( new ConfirmListener () {
46+ public void handleAck ( long seqNo , boolean multiple ) {
4747 Confirm .this .handleAck (seqNo , multiple );
4848 }
49+
50+ public void handleNack (long seqNo , boolean multiple ) {
51+ Confirm .this .fail ("got a nack" );
52+ }
4953 });
5054 channel .confirmSelect ();
5155 channel .queueDeclare ("confirm-test" , true , true , false , null );
@@ -236,7 +240,7 @@ private void publishN(String exchangeName, String queueName,
236240 throws IOException
237241 {
238242 for (long i = 0 ; i < NUM_MESSAGES ; i ++) {
239- ackSet .add (channel .getNextPublishSeqNo ());
243+ unconfirmedSet .add (channel .getNextPublishSeqNo ());
240244 publish (exchangeName , queueName , persistent , mandatory , immediate );
241245 }
242246 }
@@ -254,13 +258,13 @@ private void publish(String exchangeName, String queueName,
254258 }
255259
256260 private void handleAck (long msgSeqNo , boolean multiple ) {
257- if (!ackSet .contains (msgSeqNo )) {
261+ if (!unconfirmedSet .contains (msgSeqNo )) {
258262 fail ("got duplicate ack: " + msgSeqNo );
259263 }
260264 if (multiple ) {
261- ackSet .headSet (msgSeqNo + 1 ).clear ();
265+ unconfirmedSet .headSet (msgSeqNo + 1 ).clear ();
262266 } else {
263- ackSet .remove (msgSeqNo );
267+ unconfirmedSet .remove (msgSeqNo );
264268 }
265269 }
266270
@@ -278,7 +282,7 @@ private void basicRejectCommon(boolean requeue)
278282 }
279283
280284 private void waitAcks () throws InterruptedException {
281- while (ackSet .size () > 0 )
285+ while (unconfirmedSet .size () > 0 )
282286 Thread .sleep (10 );
283287 }
284288}
0 commit comments