2121import java .security .NoSuchAlgorithmException ;
2222import java .util .Map ;
2323import java .util .concurrent .ExecutorService ;
24- import java .util .concurrent .Executors ;
2524
2625import java .net .InetSocketAddress ;
2726import java .net .Socket ;
4342 */
4443
4544public class ConnectionFactory implements Cloneable {
46-
45+
4746 /** Default Executor threads */
48- private static final int DEFAULT_NUM_CONSUMER_THREADS = 5 ;
47+ @ Deprecated
48+ public static final int DEFAULT_NUM_CONSUMER_THREADS = 5 ;
4949 /** Default user name */
50- private static final String DEFAULT_USER = "guest" ;
50+ public static final String DEFAULT_USER = "guest" ;
5151 /** Default password */
52- private static final String DEFAULT_PASS = "guest" ;
52+ public static final String DEFAULT_PASS = "guest" ;
5353 /** Default virtual host */
54- private static final String DEFAULT_VHOST = "/" ;
54+ public static final String DEFAULT_VHOST = "/" ;
5555 /** Default maximum channel number;
5656 * zero for unlimited */
57- private static final int DEFAULT_CHANNEL_MAX = 0 ;
57+ public static final int DEFAULT_CHANNEL_MAX = 0 ;
5858 /** Default maximum frame size;
5959 * zero means no limit */
60- private static final int DEFAULT_FRAME_MAX = 0 ;
60+ public static final int DEFAULT_FRAME_MAX = 0 ;
6161 /** Default heart-beat interval;
6262 * zero means no heart-beats */
63- private static final int DEFAULT_HEARTBEAT = 0 ;
63+ public static final int DEFAULT_HEARTBEAT = 0 ;
6464 /** The default host */
65- private static final String DEFAULT_HOST = "localhost" ;
65+ public static final String DEFAULT_HOST = "localhost" ;
6666 /** 'Use the default port' port */
67- private static final int USE_DEFAULT_PORT = -1 ;
67+ public static final int USE_DEFAULT_PORT = -1 ;
6868 /** The default non-ssl port */
69- private static final int DEFAULT_AMQP_PORT = AMQP .PROTOCOL .PORT ;
69+ public static final int DEFAULT_AMQP_PORT = AMQP .PROTOCOL .PORT ;
7070 /** The default ssl port */
71- private static final int DEFAULT_AMQP_OVER_SSL_PORT = 5671 ;
71+ public static final int DEFAULT_AMQP_OVER_SSL_PORT = 5671 ;
7272 /** The default connection timeout;
7373 * zero means wait indefinitely */
74- private static final int DEFAULT_CONNECTION_TIMEOUT = 0 ;
74+ public static final int DEFAULT_CONNECTION_TIMEOUT = 0 ;
75+
7576 /** The default SSL protocol */
7677 private static final String DEFAULT_SSL_PROTOCOL = "SSLv3" ;
7778
78- private int numConsumerThreads = DEFAULT_NUM_CONSUMER_THREADS ;
7979 private String username = DEFAULT_USER ;
8080 private String password = DEFAULT_PASS ;
8181 private String virtualHost = DEFAULT_VHOST ;
@@ -90,13 +90,15 @@ public class ConnectionFactory implements Cloneable {
9090 private SaslConfig saslConfig = DefaultSaslConfig .PLAIN ;
9191
9292 /** @return number of consumer threads in default {@link ExecutorService} */
93+ @ Deprecated
9394 public int getNumConsumerThreads () {
94- return numConsumerThreads ;
95+ return DEFAULT_NUM_CONSUMER_THREADS ;
9596 }
9697
9798 /** @param numConsumerThreads threads in created private executor service */
99+ @ Deprecated
98100 public void setNumConsumerThreads (int numConsumerThreads ) {
99- this . numConsumerThreads = numConsumerThreads ;
101+ throw new IllegalArgumentException ( "setNumConsumerThreads not supported -- create explicit ExecutorService instead." ) ;
100102 }
101103
102104 /** @return the default host to use for connections */
@@ -436,10 +438,22 @@ protected FrameHandler createFrameHandler(Address addr)
436438
437439 String hostName = addr .getHost ();
438440 int portNumber = portOrDefault (addr .getPort ());
439- Socket socket = factory .createSocket ();
440- configureSocket (socket );
441- socket .connect (new InetSocketAddress (hostName , portNumber ), connectionTimeout );
442- return createFrameHandler (socket );
441+ Socket socket = null ;
442+ try {
443+ socket = factory .createSocket ();
444+ configureSocket (socket );
445+ socket .connect (new InetSocketAddress (hostName , portNumber ),
446+ connectionTimeout );
447+ return createFrameHandler (socket );
448+ } catch (IOException ioe ) {
449+ quietTrySocketClose (socket );
450+ throw ioe ;
451+ }
452+ }
453+
454+ private static void quietTrySocketClose (Socket socket ) {
455+ if (socket != null )
456+ try { socket .close (); } catch (Exception _) {/*ignore exceptions*/ }
443457 }
444458
445459 protected FrameHandler createFrameHandler (Socket sock )
@@ -464,21 +478,31 @@ protected void configureSocket(Socket socket) throws IOException{
464478 socket .setTcpNoDelay (true );
465479 }
466480
481+ /**
482+ * Create a new broker connection
483+ * @param addrs an array of known broker addresses (hostname/port pairs) to try in order
484+ * @return an interface to the connection
485+ * @throws IOException if it encounters a problem
486+ */
487+ public Connection newConnection (Address [] addrs ) throws IOException {
488+ return newConnection (null , addrs );
489+ }
490+
467491 /**
468492 * Create a new broker connection
469493 * @param executor thread execution service for consumers on the connection
470494 * @param addrs an array of known broker addresses (hostname/port pairs) to try in order
471495 * @return an interface to the connection
472496 * @throws IOException if it encounters a problem
473497 */
474- private Connection newConnection (ExecutorService executor , Address [] addrs )
498+ public Connection newConnection (ExecutorService executor , Address [] addrs )
475499 throws IOException
476500 {
477501 IOException lastException = null ;
478502 for (Address addr : addrs ) {
479503 try {
480504 FrameHandler frameHandler = createFrameHandler (addr );
481- AMQConnection conn =
505+ AMQConnection conn =
482506 new AMQConnection (username ,
483507 password ,
484508 frameHandler ,
@@ -506,7 +530,7 @@ private Connection newConnection(ExecutorService executor, Address[] addrs)
506530 * @throws IOException if it encounters a problem
507531 */
508532 public Connection newConnection () throws IOException {
509- return newConnection (Executors . newFixedThreadPool ( this . numConsumerThreads ) ,
533+ return newConnection (null ,
510534 new Address [] {new Address (getHost (), getPort ())}
511535 );
512536 }
0 commit comments