3131
3232package com .rabbitmq .tools ;
3333
34- import java .io .DataInputStream ;
35- import java .io .DataOutputStream ;
36- import java .io .EOFException ;
37- import java .io .IOException ;
34+ import java .io .*;
3835import java .net .ServerSocket ;
3936import java .net .Socket ;
4037import java .util .HashMap ;
38+ import java .util .concurrent .*;
4139
4240import com .rabbitmq .client .AMQP ;
4341import com .rabbitmq .client .impl .AMQCommand ;
4644import com .rabbitmq .client .impl .Frame ;
4745import com .rabbitmq .utility .BlockingCell ;
4846
47+
4948/**
5049 * AMQP Protocol Analyzer program. Listens on a configurable port and when a
5150 * connection arrives, makes an outbound connection to a configurable host and
@@ -64,6 +63,64 @@ public class Tracer implements Runnable {
6463 public static final boolean SUPPRESS_COMMAND_BODIES =
6564 Boolean .parseBoolean (System .getProperty ("com.rabbitmq.tools.Tracer.SUPPRESS_COMMAND_BODIES" ));
6665
66+ public static final boolean SILENT_MODE =
67+ Boolean .parseBoolean (System .getProperty ("com.rabbitmq.tools.Tracer.SILENT_MODE" ));
68+
69+ final static int LOG_QUEUE_SIZE = 1024 * 1024 ;
70+ final static int BUFFER_SIZE = 10 * 1024 * 1024 ;
71+ final static int MAX_TIME_BETWEEN_FLUSHES = 1000 ;
72+ final static Object FLUSH = new Object ();
73+
74+ private static class AsyncLogger extends Thread {
75+ final PrintStream ps ;
76+ final BlockingQueue <Object > queue = new ArrayBlockingQueue <Object >(LOG_QUEUE_SIZE , true );
77+ AsyncLogger (PrintStream ps ){
78+ this .ps = new PrintStream (new BufferedOutputStream (ps , BUFFER_SIZE ), false );
79+ start ();
80+
81+ new Thread (){
82+ @ Override public void run (){
83+ while (true ){
84+ try {
85+ Thread .sleep (MAX_TIME_BETWEEN_FLUSHES );
86+ queue .add (FLUSH );
87+ } catch (InterruptedException e ) { }
88+ }
89+
90+ }
91+ }.start ();
92+ }
93+
94+ void printMessage (Object message ){
95+ if (message instanceof Throwable ){
96+ ((Throwable )message ).printStackTrace (ps );
97+ } else if (message instanceof String ){
98+ ps .println (message );
99+ } else {
100+ throw new RuntimeException ("Unrecognised object " + message );
101+ }
102+ }
103+
104+ @ Override public void run (){
105+ try {
106+ while (true ){
107+ Object message = queue .take ();
108+ if (message == FLUSH ) ps .flush ();
109+ else printMessage (message );
110+ }
111+ } catch (InterruptedException interrupt ){
112+ }
113+ }
114+
115+ void log (Object message ){
116+ try {
117+ queue .put (message );
118+ } catch (InterruptedException ex ){
119+ throw new RuntimeException (ex );
120+ }
121+ }
122+ }
123+
67124 public static void main (String [] args ) {
68125 int listenPort = args .length > 0 ? Integer .parseInt (args [0 ]) : 5673 ;
69126 String connectHost = args .length > 1 ? args [1 ] : "localhost" ;
@@ -85,9 +142,10 @@ public static void main(String[] args) {
85142 try {
86143 ServerSocket server = new ServerSocket (listenPort );
87144 int counter = 0 ;
145+ AsyncLogger logger = new AsyncLogger (System .out );
88146 while (true ) {
89147 Socket conn = server .accept ();
90- new Tracer (conn , counter ++, connectHost , connectPort );
148+ new Tracer (conn , counter ++, connectHost , connectPort , logger );
91149 }
92150 } catch (IOException ioe ) {
93151 ioe .printStackTrace ();
@@ -109,7 +167,9 @@ public static void main(String[] args) {
109167
110168 public DataOutputStream oos ;
111169
112- public Tracer (Socket sock , int id , String host , int port ) throws IOException {
170+ public AsyncLogger logger ;
171+
172+ public Tracer (Socket sock , int id , String host , int port , AsyncLogger logger ) throws IOException {
113173 this .inSock = sock ;
114174 this .outSock = new Socket (host , port );
115175 this .id = id ;
@@ -118,6 +178,7 @@ public Tracer(Socket sock, int id, String host, int port) throws IOException {
118178 this .ios = new DataOutputStream (inSock .getOutputStream ());
119179 this .ois = new DataInputStream (outSock .getInputStream ());
120180 this .oos = new DataOutputStream (outSock .getOutputStream ());
181+ this .logger = logger ;
121182
122183 new Thread (this ).start ();
123184 }
@@ -135,18 +196,18 @@ public void run() {
135196 new Thread (outHandler ).start ();
136197 Object result = w .uninterruptibleGet ();
137198 if (result instanceof Exception ) {
138- (( Exception ) result ). printStackTrace ( );
199+ logger . log ( result );
139200 }
140201 } catch (EOFException eofe ) {
141- eofe . printStackTrace ( );
202+ logger . log ( eofe );
142203 } catch (IOException ioe ) {
143- ioe . printStackTrace ( );
204+ logger . log ( ioe );
144205 } finally {
145206 try {
146207 inSock .close ();
147208 outSock .close ();
148209 } catch (IOException ioe2 ) {
149- ioe2 . printStackTrace ( );
210+ logger . log ( ioe2 );
150211 }
151212 }
152213 }
@@ -174,7 +235,7 @@ public Frame readFrame() throws IOException {
174235 }
175236
176237 public void report (int channel , Object object ) {
177- System . out . println ("" + System .currentTimeMillis () + ": conn#" + id + " ch#" + channel + (inBound ? " -> " : " <- " ) + object );
238+ logger . log ("" + System .currentTimeMillis () + ": conn#" + id + " ch#" + channel + (inBound ? " -> " : " <- " ) + object );
178239 }
179240
180241 public void reportFrame (Frame f )
@@ -202,7 +263,13 @@ public void reportFrame(Frame f)
202263
203264 public void doFrame () throws IOException {
204265 Frame f = readFrame ();
266+
205267 if (f != null ) {
268+
269+ if (SILENT_MODE ){
270+ f .writeTo (o );
271+ return ;
272+ }
206273 if (f .type == AMQP .FRAME_HEARTBEAT ) {
207274 if ((inBound && !WITHHOLD_INBOUND_HEARTBEATS ) ||
208275 (!inBound && !WITHHOLD_OUTBOUND_HEARTBEATS ))
0 commit comments