@@ -68,9 +68,10 @@ public static void main(String[] args) {
6868 String hostName = optArg (args , 0 , "localhost" );
6969 int portNumber = optArg (args , 1 , AMQP .PROTOCOL .PORT );
7070 boolean writeStats = optArg (args , 2 , true );
71+ boolean noAck = optArg (args , 3 , true );
7172 final Connection conn = new ConnectionFactory ().newConnection (hostName , portNumber );
7273 System .out .println ("Channel 0 fully open." );
73- new ConsumerMain (conn , writeStats ).run ();
74+ new ConsumerMain (conn , writeStats , noAck ).run ();
7475 } catch (Exception e ) {
7576 System .err .println ("Main thread caught exception: " + e );
7677 e .printStackTrace ();
@@ -90,10 +91,14 @@ public static void sleep(int ms) {
9091
9192 public boolean _writeStats ;
9293
93- public ConsumerMain (Connection connection , boolean writeStats ) {
94+ public boolean _noAck ;
95+
96+ public ConsumerMain (Connection connection , boolean writeStats , boolean noAck ) {
9497 _connection = connection ;
9598 _writeStats = writeStats ;
99+ _noAck = noAck ;
96100 System .out .println ((_writeStats ? "WILL" : "WON'T" ) + " write statistics." );
101+ System .out .println ((_noAck ? "WILL" : "WON'T" ) + " use server-side auto-acking." );
97102 }
98103
99104 public void run () {
@@ -119,8 +124,9 @@ private void runIt() throws IOException {
119124 channel .queueBind (completionQueue , exchangeName , "" );
120125
121126 LatencyExperimentConsumer callback = new LatencyExperimentConsumer (channel , queueName );
127+ callback ._noAck = this ._noAck ;
122128
123- channel .basicConsume (queueName , true , callback );
129+ channel .basicConsume (queueName , _noAck , callback );
124130 channel .basicConsume (completionQueue , true , "completion" , callback );
125131 callback .report (_writeStats );
126132
0 commit comments