4343import com .rabbitmq .client .impl .AMQImpl ;
4444import com .rabbitmq .client .impl .Frame ;
4545import com .rabbitmq .utility .BlockingCell ;
46+ import com .rabbitmq .utility .Utility ;
4647
4748
4849/**
5253 * printed to stdout.
5354 */
5455public class Tracer implements Runnable {
56+ private static boolean property (String property ) {
57+ return Boolean .parseBoolean (System .getProperty (
58+ "com.rabbitmq.tools.Tracer." + property ));
59+ }
60+
5561 public static final boolean WITHHOLD_INBOUND_HEARTBEATS =
56- Boolean . parseBoolean ( System . getProperty ( "com.rabbitmq.tools.Tracer. WITHHOLD_INBOUND_HEARTBEATS") );
62+ property ( " WITHHOLD_INBOUND_HEARTBEATS" );
5763 public static final boolean WITHHOLD_OUTBOUND_HEARTBEATS =
58- Boolean . parseBoolean ( System . getProperty ( "com.rabbitmq.tools.Tracer. WITHHOLD_OUTBOUND_HEARTBEATS") );
64+ property ( " WITHHOLD_OUTBOUND_HEARTBEATS" );
5965 public static final boolean NO_ASSEMBLE_FRAMES =
60- Boolean . parseBoolean ( System . getProperty ( "com.rabbitmq.tools.Tracer. NO_ASSEMBLE_FRAMES") );
66+ property ( " NO_ASSEMBLE_FRAMES" );
6167 public static final boolean NO_DECODE_FRAMES =
62- Boolean . parseBoolean ( System . getProperty ( "com.rabbitmq.tools.Tracer. NO_DECODE_FRAMES") );
68+ property ( " NO_DECODE_FRAMES" );
6369 public static final boolean SUPPRESS_COMMAND_BODIES =
64- Boolean .parseBoolean (System .getProperty ("com.rabbitmq.tools.Tracer.SUPPRESS_COMMAND_BODIES" ));
65-
70+ property ("SUPPRESS_COMMAND_BODIES" );
6671 public static final boolean SILENT_MODE =
67- Boolean . parseBoolean ( System . getProperty ( "com.rabbitmq.tools.Tracer. SILENT_MODE") );
72+ property ( " SILENT_MODE" );
6873
6974 final static int LOG_QUEUE_SIZE = 1024 * 1024 ;
7075 final static int BUFFER_SIZE = 10 * 1024 * 1024 ;
7176 final static int MAX_TIME_BETWEEN_FLUSHES = 1000 ;
7277 final static Object FLUSH = new Object ();
7378
74- private static class AsyncLogger extends Thread {
79+ private static class AsyncLogger extends Thread {
7580 final PrintStream ps ;
7681 final BlockingQueue <Object > queue = new ArrayBlockingQueue <Object >(LOG_QUEUE_SIZE , true );
77- AsyncLogger (PrintStream ps ){
82+ AsyncLogger (PrintStream ps ) {
7883 this .ps = new PrintStream (new BufferedOutputStream (ps , BUFFER_SIZE ), false );
7984 start ();
8085
81- new Thread (){
82- @ Override public void run (){
83- while (true ){
86+ new Thread () {
87+ @ Override public void run () {
88+ while (true ) {
8489 try {
8590 Thread .sleep (MAX_TIME_BETWEEN_FLUSHES );
8691 queue .add (FLUSH );
@@ -91,31 +96,21 @@ private static class AsyncLogger extends Thread{
9196 }.start ();
9297 }
9398
94- void printMessage (Object message ){
95- if (message instanceof Throwable ){
96- ((Throwable )message ).printStackTrace (ps );
97- } else if (message instanceof String ){
98- ps .println (message );
99- } else {
100- throw new RuntimeException ("Unrecognised object " + message );
101- }
102- }
103-
104- @ Override public void run (){
99+ @ Override public void run () {
105100 try {
106- while (true ){
101+ while (true ) {
107102 Object message = queue .take ();
108103 if (message == FLUSH ) ps .flush ();
109- else printMessage (message );
104+ else ps . println (message );
110105 }
111- } catch (InterruptedException interrupt ){
106+ } catch (InterruptedException interrupt ) {
112107 }
113108 }
114109
115- void log (Object message ){
110+ void log (String message ) {
116111 try {
117112 queue .put (message );
118- } catch (InterruptedException ex ){
113+ } catch (InterruptedException ex ) {
119114 throw new RuntimeException (ex );
120115 }
121116 }
@@ -196,22 +191,31 @@ public void run() {
196191 new Thread (outHandler ).start ();
197192 Object result = w .uninterruptibleGet ();
198193 if (result instanceof Exception ) {
199- logger . log ( result );
194+ logException (( Exception ) result );
200195 }
201196 } catch (EOFException eofe ) {
202- logger . log ( eofe );
197+ logException (( Exception ) eofe );
203198 } catch (IOException ioe ) {
204- logger . log ( ioe );
199+ logException (( Exception ) ioe );
205200 } finally {
206201 try {
207202 inSock .close ();
208203 outSock .close ();
209204 } catch (IOException ioe2 ) {
210- logger . log ( ioe2 );
205+ logException (( Exception ) ioe2 );
211206 }
212207 }
213208 }
214209
210+ public void log (String message ) {
211+ logger .log ("" + System .currentTimeMillis () + ": conn#"
212+ + id + " " + message );
213+ }
214+
215+ public void logException (Exception e ) {
216+ log ("uncaught " + Utility .makeStackTrace (e ));
217+ }
218+
215219 public class DirectionHandler implements Runnable {
216220 public BlockingCell <Object > waitCell ;
217221
@@ -235,7 +239,9 @@ public Frame readFrame() throws IOException {
235239 }
236240
237241 public void report (int channel , Object object ) {
238- logger .log ("" + System .currentTimeMillis () + ": conn#" + id + " ch#" + channel + (inBound ? " -> " : " <- " ) + object );
242+ Tracer .this .log ("ch#" + channel
243+ + (inBound ? " -> " : " <- " )
244+ + object );
239245 }
240246
241247 public void reportFrame (Frame f )
@@ -261,12 +267,13 @@ public void reportFrame(Frame f)
261267 }
262268 }
263269
270+
264271 public void doFrame () throws IOException {
265272 Frame f = readFrame ();
266273
267274 if (f != null ) {
268275
269- if (SILENT_MODE ){
276+ if (SILENT_MODE ) {
270277 f .writeTo (o );
271278 return ;
272279 }
@@ -289,14 +296,14 @@ public void doFrame() throws IOException {
289296 }
290297 } else {
291298 AMQCommand .Assembler c = assemblers .get (f .channel );
292- if (c == null ){
299+ if (c == null ) {
293300 c = AMQCommand .newAssembler ();
294301 assemblers .put (f .channel , c );
295302 }
296303 AMQCommand cmd = c .handleFrame (f );
297304 if (cmd != null ) {
298305 report (f .channel , cmd .toString (SUPPRESS_COMMAND_BODIES ));
299- assemblers .remove (f .channel );
306+ assemblers .remove (f .channel );
300307 }
301308 }
302309 }
0 commit comments