4646import java .io .IOException ;
4747
4848public class ConfirmDontLoseMessages {
49- final static int MSG_COUNT = 10000 ;
49+ static int msgCount = 10000 ;
5050 final static String QUEUE_NAME = "confirm-test" ;
5151 static ConnectionFactory connectionFactory ;
5252
5353 public static void main (String [] args )
5454 throws IOException , InterruptedException
5555 {
56+ if (args .length > 0 ) {
57+ msgCount = Integer .parseInt (args [0 ]);
58+ }
59+
5660 connectionFactory = new ConnectionFactory ();
5761
58- // Publish MSG_COUNT messages and wait for confirms.
62+ // Publish msgCount messages and wait for confirms.
5963 (new Thread (new Consumer ())).start ();
60- // Consume MSG_COUNT messages.
64+ // Consume msgCount messages.
6165 (new Thread (new Publisher ())).start ();
6266 }
6367
@@ -72,23 +76,22 @@ public void run() {
7276 // Setup
7377 Connection conn = connectionFactory .newConnection ();
7478 Channel ch = conn .createChannel ();
75- ch .queueDeclare (QUEUE_NAME , true , false , true , null );
76- ch .confirmSelect ();
79+ ch .queueDeclare (QUEUE_NAME , true , false , false , null );
7780 ch .setAckListener (new AckListener () {
7881 public void handleAck (long seqNo ,
7982 boolean multiple ) {
8083 if (multiple ) {
81- for (long i = ackSet .first (); i <= seqNo ; ++i )
82- ackSet .remove (i );
84+ ackSet .headSet (seqNo +1 ).clear ();
8385 } else {
84- ackSet .remove (seqNo );
86+ ackSet .remove (seqNo );
8587 }
8688 }
8789 });
90+ ch .confirmSelect ();
8891
8992 // Publish
90- for (long i = 0 ; i < MSG_COUNT ; ++i ) {
91- ackSet .add (i );
93+ for (long i = 0 ; i < msgCount ; ++i ) {
94+ ackSet .add (ch . getNextPublishSeqNo () );
9295 ch .basicPublish ("" , QUEUE_NAME ,
9396 MessageProperties .PERSISTENT_BASIC ,
9497 "nop" .getBytes ());
@@ -99,6 +102,7 @@ public void handleAck(long seqNo,
99102 Thread .sleep (10 );
100103
101104 // Cleanup
105+ ch .queueDelete (QUEUE_NAME );
102106 ch .close ();
103107 conn .close ();
104108
@@ -118,16 +122,16 @@ public void run() {
118122 // Setup
119123 Connection conn = connectionFactory .newConnection ();
120124 Channel ch = conn .createChannel ();
121- ch .queueDeclare (QUEUE_NAME , true , false , true , null );
125+ ch .queueDeclare (QUEUE_NAME , true , false , false , null );
122126
123127 // Consume
124128 QueueingConsumer qc = new QueueingConsumer (ch );
125129 ch .basicConsume (QUEUE_NAME , true , qc );
126- for (int i = 0 ; i < MSG_COUNT ; ++i ) {
130+ for (int i = 0 ; i < msgCount ; ++i ) {
127131 qc .nextDelivery ();
128132 }
129133
130- // Consume
134+ // Cleanup
131135 ch .close ();
132136 conn .close ();
133137 } catch (Throwable e ) {
0 commit comments