3232import java .io .IOException ;
3333
3434public class ConfirmDontLoseMessages {
35- final static int MSG_COUNT = 10000 ;
35+ static int msgCount = 10000 ;
3636 final static String QUEUE_NAME = "confirm-test" ;
3737 static ConnectionFactory connectionFactory ;
3838
3939 public static void main (String [] args )
4040 throws IOException , InterruptedException
4141 {
42+ if (args .length > 0 ) {
43+ msgCount = Integer .parseInt (args [0 ]);
44+ }
45+
4246 connectionFactory = new ConnectionFactory ();
4347
44- // Publish MSG_COUNT messages and wait for confirms .
48+ // Consume msgCount messages.
4549 (new Thread (new Consumer ())).start ();
46- // Consume MSG_COUNT messages.
50+ // Publish msgCount messages and wait for confirms .
4751 (new Thread (new Publisher ())).start ();
4852 }
4953
@@ -58,8 +62,7 @@ public void run() {
5862 // Setup
5963 Connection conn = connectionFactory .newConnection ();
6064 Channel ch = conn .createChannel ();
61- ch .queueDeclare (QUEUE_NAME , true , false , true , null );
62- ch .confirmSelect ();
65+ ch .queueDeclare (QUEUE_NAME , true , false , false , null );
6366 ch .setConfirmListener (new ConfirmListener () {
6467 public void handleAck (long seqNo , boolean multiple ) {
6568 if (multiple ) {
@@ -84,9 +87,10 @@ public void handleNack(long seqNo, boolean multiple) {
8487 lost );
8588 }
8689 });
90+ ch .confirmSelect ();
8791
8892 // Publish
89- for (long i = 0 ; i < MSG_COUNT ; ++i ) {
93+ for (long i = 0 ; i < msgCount ; ++i ) {
9094 unconfirmedSet .add (ch .getNextPublishSeqNo ());
9195 ch .basicPublish ("" , QUEUE_NAME ,
9296 MessageProperties .PERSISTENT_BASIC ,
@@ -98,6 +102,7 @@ public void handleNack(long seqNo, boolean multiple) {
98102 Thread .sleep (10 );
99103
100104 // Cleanup
105+ ch .queueDelete (QUEUE_NAME );
101106 ch .close ();
102107 conn .close ();
103108
@@ -117,16 +122,16 @@ public void run() {
117122 // Setup
118123 Connection conn = connectionFactory .newConnection ();
119124 Channel ch = conn .createChannel ();
120- ch .queueDeclare (QUEUE_NAME , true , false , true , null );
125+ ch .queueDeclare (QUEUE_NAME , true , false , false , null );
121126
122127 // Consume
123128 QueueingConsumer qc = new QueueingConsumer (ch );
124129 ch .basicConsume (QUEUE_NAME , true , qc );
125- for (int i = 0 ; i < MSG_COUNT ; ++i ) {
130+ for (int i = 0 ; i < msgCount ; ++i ) {
126131 qc .nextDelivery ();
127132 }
128133
129- // Consume
134+ // Cleanup
130135 ch .close ();
131136 conn .close ();
132137 } catch (Throwable e ) {
0 commit comments