3939import org .apache .pulsar .common .naming .NamespaceName ;
4040import org .apache .pulsar .common .naming .TopicDomain ;
4141import org .apache .pulsar .common .naming .TopicName ;
42- import org .apache .qpid .server .QpidException ;
4342import org .apache .qpid .server .bytebuffer .QpidByteBuffer ;
4443import org .apache .qpid .server .common .ServerPropertyNames ;
4544import org .apache .qpid .server .protocol .ProtocolVersion ;
@@ -74,13 +73,15 @@ public class ProxyConnection extends ChannelInboundHandlerAdapter implements
7473 private LookupHandler lookupHandler ;
7574 private AMQShortString virtualHost ;
7675 private String vhost ;
76+ private String remoteAddress ;
7777
7878 private List <Object > connectMsgList = new ArrayList <>();
7979
8080 private enum State {
8181 Init ,
8282 RedirectLookup ,
8383 RedirectToBroker ,
84+ Closing ,
8485 Closed
8586 }
8687
@@ -99,12 +100,16 @@ public ProxyConnection(ProxyService proxyService) throws PulsarClientException {
99100 public void channelActive (ChannelHandlerContext cnx ) throws Exception {
100101 super .channelActive (cnx );
101102 this .cnx = cnx ;
103+ this .remoteAddress = cnx .channel ().remoteAddress ().toString ();
104+ log .info ("[{}] New proxy connection established" , remoteAddress );
102105 }
103106
104107 @ Override
105108 public void channelInactive (ChannelHandlerContext ctx ) throws Exception {
106109 super .channelInactive (ctx );
107- this .close ();
110+ resetProxyHandler ();
111+ log .info ("[{}] ProxyConnection closed." , remoteAddress );
112+ this .state = State .Closed ;
108113 }
109114
110115 @ Override
@@ -136,7 +141,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
136141 try {
137142 brokerDecoder .decodeBuffer (QpidByteBuffer .wrap (buffer .nioBuffer ()));
138143 } catch (Throwable e ) {
139- log .error ("error while handle command:" , e );
144+ log .error ("Closing the proxy connection, error while handle command:" , e );
140145 close ();
141146 }
142147
@@ -180,7 +185,7 @@ public void receiveProtocolHeader(ProtocolInitiation protocolInitiation) {
180185 "PLAIN token" .getBytes (US_ASCII ),
181186 "en_US" .getBytes (US_ASCII ));
182187 writeFrame (responseBody .generateFrame (0 ));
183- } catch (QpidException e ) {
188+ } catch (Exception e ) {
184189 log .error ("Received unsupported protocol initiation for protocol version: {} " , getProtocolVersion (), e );
185190 }
186191 }
@@ -243,7 +248,8 @@ public void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString cap
243248 public void handleConnect (AtomicInteger retryTimes ) {
244249 log .info ("handle connect residue retryTimes: {}" , retryTimes );
245250 if (retryTimes .get () == 0 ) {
246- log .warn ("Handle connect retryTimes is 0." );
251+ log .warn ("[{}] Closing the proxy connection, retry times for handling connect is exhausted." ,
252+ remoteAddress );
247253 close ();
248254 return ;
249255 }
@@ -265,8 +271,7 @@ public void handleConnect(AtomicInteger retryTimes) {
265271 handleConnectComplete (pair .getLeft (), pair .getRight (), retryTimes );
266272 });
267273 } catch (Exception e ) {
268- log .error ("Lookup broker failed." , e );
269- resetProxyHandler ();
274+ log .error ("[{}] Closing the proxy connection, lookup broker failed." , remoteAddress , e );
270275 close ();
271276 }
272277 }
@@ -363,22 +368,26 @@ public synchronized void writeFrame(AMQDataBlock frame) {
363368 if (log .isDebugEnabled ()) {
364369 log .debug ("send: " + frame );
365370 }
366- cnx .writeAndFlush (frame );
371+ cnx .writeAndFlush (frame )
372+ .addListener (future -> {
373+ if (!future .isSuccess ()) {
374+ log .error ("[{}] ProxyConnection failed to write frame." , remoteAddress , future .cause ());
375+ }
376+ });
367377 }
368378
369- public void close () {
370- log .info ("ProxyConnection close." );
371- if (log .isDebugEnabled ()) {
372- log .debug ("ProxyConnection close." );
373- }
379+ @ Override
380+ public void exceptionCaught (ChannelHandlerContext ctx , Throwable cause ) throws Exception {
381+ super .exceptionCaught (ctx , cause );
382+ log .error ("[{}] Closing the proxy connection, exception caught: " , remoteAddress , cause );
383+ close ();
384+ }
374385
375- if (proxyHandler != null ) {
376- resetProxyHandler ();
377- }
378- if (cnx != null ) {
379- cnx .close ();
386+ public void close () {
387+ if (state != State .Closed ) {
388+ state = State .Closing ;
380389 }
381- state = State . Closed ;
390+ cnx . close () ;
382391 }
383392
384393}
0 commit comments