@@ -76,6 +76,7 @@ public static void main(String[] args) {
7676 int consumerTxSize = intArg (cmd , 'n' , 0 );
7777 long confirm = intArg (cmd , 'c' , -1 );
7878 boolean autoAck = cmd .hasOption ('a' );
79+ int multiAckEvery = intArg (cmd , 'A' , 0 );
7980 int prefetchCount = intArg (cmd , 'q' , 0 );
8081 int minMsgSize = intArg (cmd , 's' , 0 );
8182 int timeLimit = intArg (cmd , 'z' , 0 );
@@ -120,7 +121,7 @@ public static void main(String[] args) {
120121 Thread t =
121122 new Thread (new Consumer (channel , id , qName ,
122123 consumerTxSize , autoAck ,
123- stats , timeLimit ));
124+ multiAckEvery , stats , timeLimit ));
124125 consumerThreads [i ] = t ;
125126 }
126127 Thread [] producerThreads = new Thread [producerCount ];
@@ -183,27 +184,28 @@ private static void usage(Options options) {
183184
184185 private static Options getOptions () {
185186 Options options = new Options ();
186- options .addOption (new Option ("?" , "help" , false ,"show usage" ));
187- options .addOption (new Option ("h" , "uri" , true , "AMQP URI" ));
188- options .addOption (new Option ("t" , "type" , true , "exchange type" ));
189- options .addOption (new Option ("e" , "exchange" , true , "exchange name" ));
190- options .addOption (new Option ("u" , "queue" , true , "queue name" ));
191- options .addOption (new Option ("i" , "interval" , true , "sampling interval" ));
192- options .addOption (new Option ("r" , "rate" , true , "rate limit" ));
193- options .addOption (new Option ("x" , "producers" , true , "producer count" ));
194- options .addOption (new Option ("y" , "consumers" , true , "consumer count" ));
195- options .addOption (new Option ("m" , "ptxsize" , true , "producer tx size" ));
196- options .addOption (new Option ("n" , "ctxsize" , true , "consumer tx size" ));
197- options .addOption (new Option ("c" , "confirm" , true , "max unconfirmed publishes" ));
198- options .addOption (new Option ("a" , "autoack" , false ,"auto ack" ));
199- options .addOption (new Option ("q" , "qos" , true , "qos prefetch count" ));
200- options .addOption (new Option ("s" , "size" , true , "message size" ));
201- options .addOption (new Option ("z" , "time" , true , "time limit" ));
202- Option flag = new Option ("f" , "flag" , true , "message flag" );
187+ options .addOption (new Option ("?" , "help" , false ,"show usage" ));
188+ options .addOption (new Option ("h" , "uri" , true , "AMQP URI" ));
189+ options .addOption (new Option ("t" , "type" , true , "exchange type" ));
190+ options .addOption (new Option ("e" , "exchange" , true , "exchange name" ));
191+ options .addOption (new Option ("u" , "queue" , true , "queue name" ));
192+ options .addOption (new Option ("i" , "interval" , true , "sampling interval" ));
193+ options .addOption (new Option ("r" , "rate" , true , "rate limit" ));
194+ options .addOption (new Option ("x" , "producers" , true , "producer count" ));
195+ options .addOption (new Option ("y" , "consumers" , true , "consumer count" ));
196+ options .addOption (new Option ("m" , "ptxsize" , true , "producer tx size" ));
197+ options .addOption (new Option ("n" , "ctxsize" , true , "consumer tx size" ));
198+ options .addOption (new Option ("c" , "confirm" , true , "max unconfirmed publishes" ));
199+ options .addOption (new Option ("a" , "autoack" , false ,"auto ack" ));
200+ options .addOption (new Option ("A" , "multiAckEvery" , true , "multi ack every" ));
201+ options .addOption (new Option ("q" , "qos" , true , "qos prefetch count" ));
202+ options .addOption (new Option ("s" , "size" , true , "message size" ));
203+ options .addOption (new Option ("z" , "time" , true , "time limit" ));
204+ Option flag = new Option ("f" , "flag" , true , "message flag" );
203205 flag .setArgs (Option .UNLIMITED_VALUES );
204206 options .addOption (flag );
205- options .addOption (new Option ("M" , "framemax" , true , "frame max" ));
206- options .addOption (new Option ("b" , "heartbeat" , true , "heartbeat interval" ));
207+ options .addOption (new Option ("M" , "framemax" , true , "frame max" ));
208+ options .addOption (new Option ("b" , "heartbeat" , true , "heartbeat interval" ));
207209 return options ;
208210 }
209211
@@ -410,20 +412,22 @@ public static class Consumer implements Runnable {
410412 private String queueName ;
411413 private int txSize ;
412414 private boolean autoAck ;
415+ private int multiAckEvery ;
413416 private Stats stats ;
414417 private long timeLimit ;
415418
416419 public Consumer (Channel channel , String id ,
417420 String queueName , int txSize , boolean autoAck ,
418- Stats stats , int timeLimit ) {
419-
420- this .channel = channel ;
421- this .id = id ;
422- this .queueName = queueName ;
423- this .txSize = txSize ;
424- this .autoAck = autoAck ;
425- this .stats = stats ;
426- this .timeLimit = 1000L * timeLimit ;
421+ int multiAckEvery , Stats stats , int timeLimit ) {
422+
423+ this .channel = channel ;
424+ this .id = id ;
425+ this .queueName = queueName ;
426+ this .txSize = txSize ;
427+ this .autoAck = autoAck ;
428+ this .multiAckEvery = multiAckEvery ;
429+ this .stats = stats ;
430+ this .timeLimit = 1000L * timeLimit ;
427431 }
428432
429433 public void run () {
@@ -462,7 +466,11 @@ public void run() {
462466 Envelope envelope = delivery .getEnvelope ();
463467
464468 if (!autoAck ) {
465- channel .basicAck (envelope .getDeliveryTag (), false );
469+ if (multiAckEvery == 0 ) {
470+ channel .basicAck (envelope .getDeliveryTag (), false );
471+ } else if (totalMsgCount % multiAckEvery == 0 ) {
472+ channel .basicAck (envelope .getDeliveryTag (), true );
473+ }
466474 }
467475
468476 if (txSize != 0 && totalMsgCount % txSize == 0 ) {
0 commit comments