3939import java .util .Arrays ;
4040import java .util .List ;
4141import java .util .UUID ;
42- import java .net .Socket ;
4342
4443import org .apache .commons .cli .CommandLine ;
4544import org .apache .commons .cli .CommandLineParser ;
6261import com .rabbitmq .client .AMQP .Queue ;
6362import com .rabbitmq .client .QueueingConsumer .Delivery ;
6463
64+
6565public class MulticastMain {
6666
6767 public static void main (String [] args ) {
@@ -70,24 +70,22 @@ public static void main(String[] args) {
7070 try {
7171 CommandLine cmd = parser .parse (options , args );
7272
73- final String hostName = strArg (cmd , 'h' , "localhost" );
74- final int portNumber = intArg (cmd , 'p' , AMQP .PROTOCOL .PORT );
75- final String exchangeType = strArg (cmd , 't' , "direct" );
76- final String exchangeName = strArg (cmd , 'e' , exchangeType );
77- final int samplingInterval = intArg (cmd , 'i' , 1 );
78- final int rateLimit = intArg (cmd , 'r' , 0 );
79- final int producerCount = intArg (cmd , 'x' , 1 );
80- final int consumerCount = intArg (cmd , 'y' , 1 );
81- final int producerTxSize = intArg (cmd , 'm' , 0 );
82- final int consumerTxSize = intArg (cmd , 'n' , 0 );
83- final boolean autoAck = cmd .hasOption ('a' );
84- final int prefetchCount = intArg (cmd , 'q' , 0 );
85- final int minMsgSize = intArg (cmd , 's' , 0 );
86- final int maxRedirects = intArg (cmd , 'd' , 0 );
87- final int timeLimit = intArg (cmd , 'z' , 0 );
88- final int sendBufferSize = intArg (cmd , 'b' , -1 );
89- final int recvBufferSize = intArg (cmd , 'c' , -1 );
90- final List flags = lstArg (cmd , 'f' );
73+ String hostName = strArg (cmd , 'h' , "localhost" );
74+ int portNumber = intArg (cmd , 'p' , AMQP .PROTOCOL .PORT );
75+ String exchangeType = strArg (cmd , 't' , "direct" );
76+ String exchangeName = strArg (cmd , 'e' , exchangeType );
77+ int samplingInterval = intArg (cmd , 'i' , 1 );
78+ int rateLimit = intArg (cmd , 'r' , 0 );
79+ int producerCount = intArg (cmd , 'x' , 1 );
80+ int consumerCount = intArg (cmd , 'y' , 1 );
81+ int producerTxSize = intArg (cmd , 'm' , 0 );
82+ int consumerTxSize = intArg (cmd , 'n' , 0 );
83+ boolean autoAck = cmd .hasOption ('a' );
84+ int prefetchCount = intArg (cmd , 'q' , 0 );
85+ int minMsgSize = intArg (cmd , 's' , 0 );
86+ int maxRedirects = intArg (cmd , 'd' , 0 );
87+ int timeLimit = intArg (cmd , 'z' , 0 );
88+ List flags = lstArg (cmd , 'f' );
9189
9290 //setup
9391 String id = UUID .randomUUID ().toString ();
@@ -100,15 +98,7 @@ public static void main(String[] args) {
10098 Connection [] consumerConnections = new Connection [consumerCount ];
10199 for (int i = 0 ; i < consumerCount ; i ++) {
102100 System .out .println ("starting consumer #" + i );
103- Connection conn = new ConnectionFactory (params ) {
104- public void configureSocket (Socket socket ) throws IOException {
105- super .configureSocket (socket );
106- if (recvBufferSize > 0 )
107- socket .setReceiveBufferSize (recvBufferSize );
108- if (sendBufferSize > 0 )
109- socket .setSendBufferSize (sendBufferSize );
110- }
111- }.newConnection (addresses , maxRedirects );
101+ Connection conn = new ConnectionFactory (params ).newConnection (addresses , maxRedirects );
112102 consumerConnections [i ] = conn ;
113103 Channel channel = conn .createChannel ();
114104 if (consumerTxSize > 0 ) channel .txSelect ();
@@ -119,7 +109,7 @@ public void configureSocket(Socket socket) throws IOException {
119109 if (prefetchCount > 0 ) channel .basicQos (prefetchCount );
120110 channel .basicConsume (queueName , autoAck , consumer );
121111 channel .queueBind (queueName , exchangeName , id );
122- Thread t =
112+ Thread t =
123113 new Thread (new Consumer (consumer , id ,
124114 consumerTxSize , autoAck ,
125115 stats , timeLimit ));
@@ -135,7 +125,7 @@ public void configureSocket(Socket socket) throws IOException {
135125 Channel channel = conn .createChannel ();
136126 if (producerTxSize > 0 ) channel .txSelect ();
137127 channel .exchangeDeclare (exchangeName , exchangeType );
138- Thread t =
128+ Thread t =
139129 new Thread (new Producer (channel , exchangeName , id ,
140130 flags , producerTxSize ,
141131 1000L * samplingInterval ,
@@ -168,24 +158,22 @@ public void configureSocket(Socket socket) throws IOException {
168158
169159 private static Options getOptions () {
170160 Options options = new Options ();
171- options .addOption (new Option ("h" , "host" , true , "broker host" ));
172- options .addOption (new Option ("p" , "port" , true , "broker port" ));
173- options .addOption (new Option ("t" , "type" , true , "exchange type" ));
174- options .addOption (new Option ("e" , "exchange" , true , "exchange name" ));
175- options .addOption (new Option ("i" , "interval" , true , "sampling interval" ));
176- options .addOption (new Option ("r" , "rate" , true , "rate limit" ));
177- options .addOption (new Option ("x" , "producers" , true , "producer count" ));
178- options .addOption (new Option ("y" , "consumers" , true , "consumer count" ));
179- options .addOption (new Option ("m" , "ptxsize" , true , "producer tx size" ));
180- options .addOption (new Option ("n" , "ctxsize" , true , "consumer tx size" ));
181- options .addOption (new Option ("a" , "autoack" , false ,"auto ack" ));
182- options .addOption (new Option ("q" , "qos" , true , "qos prefetch count" ));
183- options .addOption (new Option ("s" , "size" , true , "message size" ));
184- options .addOption (new Option ("d" , "redirects" , true , "max redirects" ));
185- options .addOption (new Option ("z" , "time" , true , "time limit" ));
186- options .addOption (new Option ("b" , "sendbuffer" , true , "send buffer size" ));
187- options .addOption (new Option ("c" , "recvbuffer" , true , "receive buffer size" ));
188- Option flag = new Option ("f" , "flag" , true , "message flag" );
161+ options .addOption (new Option ("h" , "host" , true , "broker host" ));
162+ options .addOption (new Option ("p" , "port" , true , "broker port" ));
163+ options .addOption (new Option ("t" , "type" , true , "exchange type" ));
164+ options .addOption (new Option ("e" , "exchange" , true , "exchange name" ));
165+ options .addOption (new Option ("i" , "interval" , true , "sampling interval" ));
166+ options .addOption (new Option ("r" , "rate" , true , "rate limit" ));
167+ options .addOption (new Option ("x" , "producers" , true , "producer count" ));
168+ options .addOption (new Option ("y" , "consumers" , true , "consumer count" ));
169+ options .addOption (new Option ("m" , "ptxsize" , true , "producer tx size" ));
170+ options .addOption (new Option ("n" , "ctxsize" , true , "consumer tx size" ));
171+ options .addOption (new Option ("a" , "autoack" , false ,"auto ack" ));
172+ options .addOption (new Option ("q" , "qos" , true , "qos prefetch count" ));
173+ options .addOption (new Option ("s" , "size" , true , "message size" ));
174+ options .addOption (new Option ("d" , "redirects" , true , "max redirects" ));
175+ options .addOption (new Option ("z" , "time" , true , "time limit" ));
176+ Option flag = new Option ("f" , "flag" , true , "message flag" );
189177 flag .setArgs (Option .UNLIMITED_VALUES );
190178 options .addOption (flag );
191179 return options ;
@@ -373,7 +361,7 @@ public void run() {
373361 int msgSeq = d .readInt ();
374362 long msgNano = d .readLong ();
375363 long nano = System .nanoTime ();
376-
364+
377365 Envelope envelope = delivery .getEnvelope ();
378366
379367 if (!autoAck ) {
@@ -434,7 +422,7 @@ private void reset(long t) {
434422
435423 public synchronized void collectStats (long now , long latency ) {
436424 msgCount ++;
437-
425+
438426 if (latency > 0 ) {
439427 minLatency = Math .min (minLatency , latency );
440428 maxLatency = Math .max (maxLatency , latency );
@@ -455,9 +443,9 @@ public synchronized void collectStats(long now, long latency) {
455443 "" ));
456444 reset (now );
457445 }
458-
446+
459447 }
460-
448+
461449 }
462450
463451}
0 commit comments