1+ package com .rabbitmq .examples ;
2+
3+ import com .rabbitmq .client .AMQP ;
4+ import com .rabbitmq .client .AMQP .BasicProperties ;
5+ import com .rabbitmq .client .Channel ;
6+ import com .rabbitmq .client .Connection ;
7+ import com .rabbitmq .client .ConnectionFactory ;
8+ import com .rabbitmq .client .MessageProperties ;
9+ import com .rabbitmq .client .QueueingConsumer ;
10+ import java .io .IOException ;
11+ import java .io .PrintWriter ;
12+ import java .util .ArrayList ;
13+ import java .util .HashMap ;
14+ import java .util .List ;
15+ import java .util .Map ;
16+ import org .apache .commons .cli .CommandLine ;
17+ import org .apache .commons .cli .CommandLineParser ;
18+ import org .apache .commons .cli .GnuParser ;
19+ import org .apache .commons .cli .Option ;
20+ import org .apache .commons .cli .Options ;
21+ import org .apache .commons .cli .ParseException ;
22+
23+ public class StressPersister {
24+ public static void main (String [] args ) {
25+ try {
26+ StressPersister sp = new StressPersister ();
27+ sp .configure (args );
28+ sp .run ();
29+ System .exit (0 );
30+ } catch (Exception e ) {
31+ e .printStackTrace ();
32+ System .exit (1 );
33+ }
34+ }
35+
36+ private static String strArg (CommandLine cmd , char opt , String def ) {
37+ return cmd .getOptionValue (opt , def );
38+ }
39+
40+ private static int intArg (CommandLine cmd , char opt , int def ) {
41+ return Integer .parseInt (cmd .getOptionValue (opt , Integer .toString (def )));
42+ }
43+
44+ private static int sizeArg (CommandLine cmd , char opt , int def ) {
45+ String arg = cmd .getOptionValue (opt , Integer .toString (def ));
46+ int multiplier = 1 ;
47+ boolean strip = false ;
48+ switch (Character .toLowerCase (arg .charAt (arg .length () - 1 ))) {
49+ case 'b' : multiplier = 1 ; strip = true ; break ;
50+ case 'k' : multiplier = 1024 ; strip = true ; break ;
51+ case 'm' : multiplier = 1048576 ; strip = true ; break ;
52+ default : break ;
53+ }
54+ if (strip ) {
55+ arg = arg .substring (0 , arg .length () - 1 );
56+ }
57+ return multiplier * Integer .parseInt (arg );
58+ }
59+
60+ public String hostName ;
61+ public int portNumber ;
62+
63+ public String commentText ;
64+ public int backlogSize ;
65+ public int bodySize ;
66+ public int repeatCount ;
67+ public int sampleGranularity ;
68+
69+ public ConnectionFactory connectionFactory ;
70+ public long topStartTime ;
71+ public PrintWriter logOut ;
72+
73+ public void configure (String [] args ) throws ParseException {
74+ Options options = new Options ();
75+ options .addOption (new Option ("h" , "host" , true , "broker host" ));
76+ options .addOption (new Option ("p" , "port" , true , "broker port" ));
77+ options .addOption (new Option ("C" , "comment" , true , "comment text" ));
78+ options .addOption (new Option ("b" , "backlog" , true , "backlog size" ));
79+ options .addOption (new Option ("B" , "bodysize" , true , "body size" ));
80+ options .addOption (new Option ("c" , "count" , true , "plateau repeat count" ));
81+ options .addOption (new Option ("s" , "sampleevery" , true , "sample granularity" ));
82+ CommandLineParser parser = new GnuParser ();
83+ CommandLine cmd = parser .parse (options , args );
84+
85+ hostName = strArg (cmd , 'h' , "localhost" );
86+ portNumber = intArg (cmd , 'p' , AMQP .PROTOCOL .PORT );
87+
88+ commentText = strArg (cmd , 'C' , "" );
89+ if ("" .equals (commentText )) {
90+ throw new IllegalArgumentException ("Comment text must be nonempty" );
91+ }
92+
93+ backlogSize = intArg (cmd , 'b' , 5000 );
94+ bodySize = sizeArg (cmd , 'B' , 16384 );
95+ repeatCount = intArg (cmd , 'c' , backlogSize * 5 );
96+ sampleGranularity = intArg (cmd , 's' , Math .max (5 , repeatCount / 250 ));
97+
98+ connectionFactory = new ConnectionFactory ();
99+ }
100+
101+ public Connection newConnection () throws IOException {
102+ return connectionFactory .newConnection (hostName , portNumber );
103+ }
104+
105+ public void run () throws IOException , InterruptedException {
106+ topStartTime = System .currentTimeMillis ();
107+ String logFileName = String .format ("stress-persister-b%08d-B%010d-c%08d-s%06d-%s.out" ,
108+ backlogSize , bodySize , repeatCount , sampleGranularity , commentText );
109+ logOut = new PrintWriter (logFileName );
110+ System .out .println (logFileName );
111+ trace ("Logging to " + logFileName );
112+ publishOneInOneOutReceive (backlogSize , bodySize , repeatCount , sampleGranularity );
113+ logOut .close ();
114+ }
115+
116+ public void trace (String message ) {
117+ long now = System .currentTimeMillis ();
118+ long delta = now - topStartTime ;
119+ String s = String .format ("# %010d ms: %s" , delta , message );
120+ System .out .println (s );
121+ logOut .println (s );
122+ logOut .flush ();
123+ }
124+
125+ public void redeclare (String q , Channel chan ) throws IOException {
126+ trace ("Redeclaring queue " + q );
127+ chan .queueDeclare (q , true );
128+ // ^^ synchronous operation to get some kind
129+ // of indication back from the server that it's caught up with us
130+ }
131+
132+ public void publishOneInOneOutReceive (int backlogSize , int bodySize , int repeatCount , int sampleGranularity ) throws IOException , InterruptedException {
133+ String q = "test" ;
134+ BasicProperties props = MessageProperties .MINIMAL_PERSISTENT_BASIC ;
135+ Connection conn = newConnection ();
136+ Channel chan = conn .createChannel ();
137+ byte [] body = new byte [bodySize ];
138+ List <Long > plateauSampleTimes = new ArrayList <Long >(repeatCount );
139+ List <Double > plateauSampleDeltas = new ArrayList <Double >(repeatCount );
140+
141+ trace ("Declaring and purging queue " + q );
142+ chan .queueDeclare (q , true );
143+ chan .queuePurge (q );
144+ chan .basicQos (1 );
145+
146+ trace ("Building backlog out to " + backlogSize + " messages, each " + bodySize + " bytes long" );
147+ for (int i = 0 ; i < backlogSize ; i ++) {
148+ chan .basicPublish ("" , q , props , body );
149+ }
150+
151+ redeclare (q , chan );
152+
153+ trace ("Beginning plateau of " + repeatCount + " repeats, sampling every " + sampleGranularity + " messages" );
154+
155+ QueueingConsumer consumer = new QueueingConsumer (chan );
156+ chan .basicConsume (q , consumer );
157+
158+ long startTime = System .currentTimeMillis ();
159+ for (int i = 0 ; i < repeatCount ; i ++) {
160+ if (((i % sampleGranularity ) == 0 ) && (i > 0 )) {
161+ long now = System .currentTimeMillis ();
162+ double delta = 1000 * (now - startTime ) / (double ) sampleGranularity ;
163+ plateauSampleTimes .add (now );
164+ plateauSampleDeltas .add (delta );
165+ System .out .print (String .format ("# %3d%%; %012d --> %g microseconds/roundtrip \r " ,
166+ (100 * i / repeatCount ),
167+ now ,
168+ delta ));
169+ startTime = System .currentTimeMillis ();
170+ }
171+ chan .basicPublish ("" , q , props , body );
172+ QueueingConsumer .Delivery d = consumer .nextDelivery ();
173+ chan .basicAck (d .getEnvelope ().getDeliveryTag (), false );
174+ }
175+ System .out .println ();
176+
177+ trace ("Switching QOS to unlimited" );
178+ chan .basicQos (0 );
179+
180+ trace ("Draining backlog" );
181+ for (int i = 0 ; i < backlogSize ; i ++) {
182+ QueueingConsumer .Delivery d = consumer .nextDelivery ();
183+ chan .basicAck (d .getEnvelope ().getDeliveryTag (), false );
184+ }
185+
186+ redeclare (q , chan );
187+
188+ trace ("Closing connection" );
189+ chan .close ();
190+ conn .close ();
191+
192+ trace ("Sample results (timestamp in milliseconds since epoch; microseconds/roundtrip)" );
193+ System .out .println ("(See log file for results; final sample was " +
194+ plateauSampleDeltas .get (plateauSampleDeltas .size () - 1 ) + ")" );
195+ for (int i = 0 ; i < plateauSampleTimes .size (); i ++) {
196+ String s = String .format ("%d %d" ,
197+ plateauSampleTimes .get (i ),
198+ plateauSampleDeltas .get (i ).longValue ());
199+ logOut .println (s );
200+ }
201+ logOut .flush ();
202+ }
203+ }
0 commit comments