@@ -76,8 +76,7 @@ public static void main(String[] args) {
7676 int consumerCount = intArg (cmd , 'y' , 1 );
7777 int producerTxSize = intArg (cmd , 'm' , 0 );
7878 int consumerTxSize = intArg (cmd , 'n' , 0 );
79- boolean confirm = cmd .hasOption ('c' );
80- long confirmMax = intArg (cmd , 'k' , 0 );
79+ long confirm = intArg (cmd , 'c' , -1 );
8180 boolean autoAck = cmd .hasOption ('a' );
8281 int prefetchCount = intArg (cmd , 'q' , 0 );
8382 int minMsgSize = intArg (cmd , 's' , 0 );
@@ -86,7 +85,7 @@ public static void main(String[] args) {
8685 int frameMax = intArg (cmd , 'M' , 0 );
8786 int heartbeat = intArg (cmd , 'b' , 0 );
8887
89- if ((producerTxSize > 0 ) && confirm ) {
88+ if ((producerTxSize > 0 ) && confirm >= 0 ) {
9089 throw new ParseException ("Cannot select both producerTxSize" +
9190 " and confirm" );
9291 }
@@ -131,13 +130,13 @@ public static void main(String[] args) {
131130 producerConnections [i ] = conn ;
132131 Channel channel = conn .createChannel ();
133132 if (producerTxSize > 0 ) channel .txSelect ();
134- if (confirm ) channel .confirmSelect ();
133+ if (confirm >= 0 ) channel .confirmSelect ();
135134 channel .exchangeDeclare (exchangeName , exchangeType );
136135 final Producer p = new Producer (channel , exchangeName , id ,
137136 flags , producerTxSize ,
138137 1000L * samplingInterval ,
139138 rateLimit , minMsgSize , timeLimit ,
140- confirm , confirmMax );
139+ confirm );
141140 channel .setReturnListener (p );
142141 channel .setConfirmListener (p );
143142 Thread t = new Thread (p );
@@ -183,9 +182,8 @@ private static Options getOptions() {
183182 options .addOption (new Option ("x" , "producers" , true , "producer count" ));
184183 options .addOption (new Option ("y" , "consumers" , true , "consumer count" ));
185184 options .addOption (new Option ("m" , "ptxsize" , true , "producer tx size" ));
186- options .addOption (new Option ("k" , "confirmMax" , true , "max unconfirmed publishes" ));
187185 options .addOption (new Option ("n" , "ctxsize" , true , "consumer tx size" ));
188- options .addOption (new Option ("c" , "confirm" , false , "confirm mode " ));
186+ options .addOption (new Option ("c" , "confirm" , true , "max unconfirmed publishes " ));
189187 options .addOption (new Option ("a" , "autoack" , false ,"auto ack" ));
190188 options .addOption (new Option ("q" , "qos" , true , "qos prefetch count" ));
191189 options .addOption (new Option ("s" , "size" , true , "message size" ));
@@ -235,17 +233,17 @@ public static class Producer implements Runnable, ReturnListener,
235233 private int msgCount ;
236234 private int returnCount ;
237235
238- private boolean confirm ;
236+ private long confirm ;
237+ private Semaphore confirmPool ;
239238 private long confirmCount ;
240239 private long nackCount ;
241- private Semaphore confirmPool ;
242240 private volatile SortedSet <Long > unconfirmedSet =
243241 Collections .synchronizedSortedSet (new TreeSet <Long >());
244242
245243 public Producer (Channel channel , String exchangeName , String id ,
246244 List flags , int txSize ,
247245 long interval , int rateLimit , int minMsgSize , int timeLimit ,
248- boolean confirm , long confirmMax )
246+ long confirm )
249247 throws IOException {
250248
251249 this .channel = channel ;
@@ -258,11 +256,11 @@ public Producer(Channel channel, String exchangeName, String id,
258256 this .interval = interval ;
259257 this .rateLimit = rateLimit ;
260258 this .timeLimit = 1000L * timeLimit ;
261- if (confirmMax > 0 ) {
262- this .confirmPool = new Semaphore ((int )confirmMax );
263- }
264259 this .message = new byte [minMsgSize ];
265260 this .confirm = confirm ;
261+ if (confirm > 0 ) {
262+ this .confirmPool = new Semaphore ((int )confirm );
263+ }
266264 }
267265
268266 public synchronized void handleReturn (int replyCode ,
@@ -385,7 +383,7 @@ private void delay(long now)
385383 if (mandatory || immediate ) {
386384 System .out .print (", returns: " + returnRate + " ret/s" );
387385 }
388- if (confirm ) {
386+ if (confirm >= 0 ) {
389387 System .out .print (", confirms: " + confirmRate + " c/s" );
390388 if (nackRate > 0 ) {
391389 System .out .print (", nacks: " + nackRate + " n/s" );
0 commit comments