1414// Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
1515//
1616
17-
1817package com .rabbitmq .client .impl ;
1918
2019import java .io .EOFException ;
@@ -258,10 +257,13 @@ public AMQConnection(String username,
258257 * Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then
259258 * calls Connection.Open and waits for the OpenOk. Sets heart-beat
260259 * and frame max values after tuning has taken place.
261- * @throws IOException if an error is encountered;
260+ * @throws IOException if an error is encountered
261+ * either before, or during, protocol negotiation;
262262 * sub-classes {@link ProtocolVersionMismatchException} and
263263 * {@link PossibleAuthenticationFailureException} will be thrown in the
264- * corresponding circumstances.
264+ * corresponding circumstances. If an exception is thrown, connection
265+ * resources allocated can all be garbage collected when the connection
266+ * object is no longer referenced.
265267 */
266268 public void start ()
267269 throws IOException
@@ -278,17 +280,22 @@ public void start()
278280 // initiator) is to wait for a connection.start method to
279281 // arrive.
280282 _channel0 .enqueueRpc (connStartBlocker );
281- // The following two lines are akin to AMQChannel's
282- // transmit() method for this pseudo-RPC.
283- _frameHandler .setTimeout (HANDSHAKE_TIMEOUT );
284- _frameHandler .sendHeader ();
283+ try {
284+ // The following two lines are akin to AMQChannel's
285+ // transmit() method for this pseudo-RPC.
286+ _frameHandler .setTimeout (HANDSHAKE_TIMEOUT );
287+ _frameHandler .sendHeader ();
288+ } catch (IOException ioe ) {
289+ _frameHandler .close ();
290+ throw ioe ;
291+ }
285292
286293 // start the main loop going
287- Thread ml = new MainLoop ();
288- ml .setName ("AMQP Connection " + getHostAddress () + ":" + getPort ());
289- ml .start ();
294+ new MainLoop ("AMQP Connection " + getHostAddress () + ":" + getPort ()).start ();
295+ // after this point clear-up of MainLoop is triggered by closing the frameHandler.
290296
291297 AMQP .Connection .Start connStart = null ;
298+ AMQP .Connection .Tune connTune = null ;
292299 try {
293300 connStart =
294301 (AMQP .Connection .Start ) connStartBlocker .getReply ().getMethod ();
@@ -300,70 +307,84 @@ public void start()
300307 connStart .getVersionMinor ());
301308
302309 if (!Version .checkVersion (clientVersion , serverVersion )) {
303- _frameHandler .close (); //this will cause mainLoop to terminate
304310 throw new ProtocolVersionMismatchException (clientVersion ,
305311 serverVersion );
306312 }
313+
314+ String [] mechanisms = connStart .getMechanisms ().toString ().split (" " );
315+ SaslMechanism sm = this .saslConfig .getSaslMechanism (mechanisms );
316+ if (sm == null ) {
317+ throw new IOException ("No compatible authentication mechanism found - " +
318+ "server offered [" + connStart .getMechanisms () + "]" );
319+ }
320+
321+ LongString challenge = null ;
322+ LongString response = sm .handleChallenge (null , this .username , this .password );
323+
324+ do {
325+ Method method = (challenge == null )
326+ ? new AMQP .Connection .StartOk .Builder ()
327+ .clientProperties (_clientProperties )
328+ .mechanism (sm .getName ())
329+ .response (response )
330+ .build ()
331+ : new AMQP .Connection .SecureOk .Builder ().response (response ).build ();
332+
333+ try {
334+ Method serverResponse = _channel0 .rpc (method ).getMethod ();
335+ if (serverResponse instanceof AMQP .Connection .Tune ) {
336+ connTune = (AMQP .Connection .Tune ) serverResponse ;
337+ } else {
338+ challenge = ((AMQP .Connection .Secure ) serverResponse ).getChallenge ();
339+ response = sm .handleChallenge (challenge , this .username , this .password );
340+ }
341+ } catch (ShutdownSignalException e ) {
342+ throw new PossibleAuthenticationFailureException (e );
343+ }
344+ } while (connTune == null );
307345 } catch (ShutdownSignalException sse ) {
346+ _frameHandler .close ();
308347 throw AMQChannel .wrap (sse );
348+ } catch (IOException ioe ) {
349+ _frameHandler .close ();
350+ throw ioe ;
309351 }
310352
311- String [] mechanisms = connStart .getMechanisms ().toString ().split (" " );
312- SaslMechanism sm = this .saslConfig .getSaslMechanism (mechanisms );
313- if (sm == null ) {
314- throw new IOException ("No compatible authentication mechanism found - " +
315- "server offered [" + connStart .getMechanisms () + "]" );
353+ try {
354+ int channelMax =
355+ negotiatedMaxValue (this .requestedChannelMax ,
356+ connTune .getChannelMax ());
357+ _channelManager = new ChannelManager (this ._workService , channelMax );
358+
359+ int frameMax =
360+ negotiatedMaxValue (this .requestedFrameMax ,
361+ connTune .getFrameMax ());
362+ this ._frameMax = frameMax ;
363+
364+ int heartbeat =
365+ negotiatedMaxValue (this .requestedHeartbeat ,
366+ connTune .getHeartbeat ());
367+
368+ setHeartbeat (heartbeat );
369+
370+ _channel0 .transmit (new AMQP .Connection .TuneOk .Builder ()
371+ .channelMax (channelMax )
372+ .frameMax (frameMax )
373+ .heartbeat (heartbeat )
374+ .build ());
375+ _channel0 .exnWrappingRpc (new AMQP .Connection .Open .Builder ()
376+ .virtualHost (_virtualHost )
377+ .build ());
378+ } catch (IOException ioe ) {
379+ _heartbeatSender .shutdown ();
380+ _frameHandler .close ();
381+ throw ioe ;
382+ } catch (ShutdownSignalException sse ) {
383+ _heartbeatSender .shutdown ();
384+ _frameHandler .close ();
385+ throw AMQChannel .wrap (sse );
316386 }
317387
318- LongString challenge = null ;
319- LongString response = sm .handleChallenge (null , this .username , this .password );
320-
321- AMQP .Connection .Tune connTune = null ;
322- do {
323- Method method = (challenge == null )
324- ? new AMQP .Connection .StartOk .Builder ()
325- .clientProperties (_clientProperties )
326- .mechanism (sm .getName ())
327- .response (response )
328- .build ()
329- : new AMQP .Connection .SecureOk .Builder ().response (response ).build ();
330-
331- try {
332- Method serverResponse = _channel0 .rpc (method ).getMethod ();
333- if (serverResponse instanceof AMQP .Connection .Tune ) {
334- connTune = (AMQP .Connection .Tune ) serverResponse ;
335- } else {
336- challenge = ((AMQP .Connection .Secure ) serverResponse ).getChallenge ();
337- response = sm .handleChallenge (challenge , this .username , this .password );
338- }
339- } catch (ShutdownSignalException e ) {
340- throw new PossibleAuthenticationFailureException (e );
341- }
342- } while (connTune == null );
343-
344- int channelMax =
345- negotiatedMaxValue (this .requestedChannelMax ,
346- connTune .getChannelMax ());
347- _channelManager = new ChannelManager (this ._workService , channelMax );
348-
349- int frameMax =
350- negotiatedMaxValue (this .requestedFrameMax ,
351- connTune .getFrameMax ());
352- this ._frameMax = frameMax ;
353-
354- int heartbeat =
355- negotiatedMaxValue (this .requestedHeartbeat ,
356- connTune .getHeartbeat ());
357- setHeartbeat (heartbeat );
358-
359- _channel0 .transmit (new AMQP .Connection .TuneOk .Builder ()
360- .channelMax (channelMax )
361- .frameMax (frameMax )
362- .heartbeat (heartbeat )
363- .build ());
364- _channel0 .exnWrappingRpc (new AMQP .Connection .Open .Builder ()
365- .virtualHost (_virtualHost )
366- .build ());
367388 return ;
368389 }
369390
@@ -452,6 +473,13 @@ private static final int negotiatedMaxValue(int clientValue, int serverValue) {
452473
453474 private class MainLoop extends Thread {
454475
476+ /**
477+ * @param name of thread
478+ */
479+ MainLoop (String name ) {
480+ super (name );
481+ }
482+
455483 /**
456484 * Channel reader thread main loop. Reads a frame, and if it is
457485 * not a heartbeat frame, dispatches it to the channel it refers to.
@@ -582,7 +610,7 @@ public void handleConnectionClose(Command closeCommand) {
582610 }
583611
584612 private class SocketCloseWait extends Thread {
585- private ShutdownSignalException cause ;
613+ private final ShutdownSignalException cause ;
586614
587615 public SocketCloseWait (ShutdownSignalException sse ) {
588616 cause = sse ;
@@ -614,7 +642,7 @@ public ShutdownSignalException shutdown(Object reason,
614642 reason , this );
615643 sse .initCause (cause );
616644 if (!setShutdownCauseIfOpen (sse )) {
617- if (initiatedByApplication )
645+ if (initiatedByApplication )
618646 throw new AlreadyClosedException ("Attempt to use closed connection" , this );
619647 }
620648
0 commit comments