@@ -68,6 +68,7 @@ public static void main(String[] args) {
6868 int portNumber = intArg (cmd , 'p' , AMQP .PROTOCOL .PORT );
6969 String exchangeType = strArg (cmd , 't' , "direct" );
7070 String exchangeName = strArg (cmd , 'e' , exchangeType );
71+ String queueName = strArg (cmd , 'u' , "" );
7172 int samplingInterval = intArg (cmd , 'i' , 1 );
7273 int rateLimit = intArg (cmd , 'r' , 0 );
7374 int producerCount = intArg (cmd , 'x' , 1 );
@@ -83,6 +84,9 @@ public static void main(String[] args) {
8384 int frameMax = intArg (cmd , 'M' , 0 );
8485 int heartbeat = intArg (cmd , 'b' , 0 );
8586
87+ boolean exclusive = "" .equals (queueName );
88+ boolean autoDelete = !exclusive ;
89+
8690 //setup
8791 String id = UUID .randomUUID ().toString ();
8892 Stats stats = new Stats (1000L * samplingInterval );
@@ -101,13 +105,15 @@ public static void main(String[] args) {
101105 Channel channel = conn .createChannel ();
102106 if (consumerTxSize > 0 ) channel .txSelect ();
103107 channel .exchangeDeclare (exchangeName , exchangeType );
104- String queueName =
105- channel .queueDeclare ("" , flags .contains ("persistent" ),
106- true , false , null ).getQueue ();
108+ String qName =
109+ channel .queueDeclare (queueName ,
110+ flags .contains ("persistent" ),
111+ exclusive , autoDelete ,
112+ null ).getQueue ();
107113 QueueingConsumer consumer = new QueueingConsumer (channel );
108114 if (prefetchCount > 0 ) channel .basicQos (prefetchCount );
109- channel .basicConsume (queueName , autoAck , consumer );
110- channel .queueBind (queueName , exchangeName , id );
115+ channel .basicConsume (qName , autoAck , consumer );
116+ channel .queueBind (qName , exchangeName , id );
111117 Thread t =
112118 new Thread (new Consumer (consumer , id ,
113119 consumerTxSize , autoAck ,
@@ -174,6 +180,7 @@ private static Options getOptions() {
174180 options .addOption (new Option ("p" , "port" , true , "broker port" ));
175181 options .addOption (new Option ("t" , "type" , true , "exchange type" ));
176182 options .addOption (new Option ("e" , "exchange" , true , "exchange name" ));
183+ options .addOption (new Option ("u" , "queue" , true , "queue name" ));
177184 options .addOption (new Option ("i" , "interval" , true , "sampling interval" ));
178185 options .addOption (new Option ("r" , "rate" , true , "rate limit" ));
179186 options .addOption (new Option ("x" , "producers" , true , "producer count" ));
0 commit comments