Skip to content

Commit 24faf78

Browse files
author
Tim Watson
committed
merge bug23747 into default
2 parents 1ab0300 + 95d1b4c commit 24faf78

File tree

8 files changed

+89
-28
lines changed

8 files changed

+89
-28
lines changed

projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ public class ConnectionFactory
118118
/// seconds, with zero meaning none (value: 0)</summary>
119119
public const ushort DefaultHeartbeat = 0; // PLEASE KEEP THIS MATCHING THE DOC ABOVE
120120

121+
/// <summary> Default value for connection attempt timeout,
122+
/// in milliseconds</summary>
123+
public const int DefaultConnectionTimeout = 30 * 1000;
124+
121125
///<summary> Default SASL auth mechanisms to use.</summary>
122126
public static AuthMechanismFactory[] DefaultAuthMechanisms =
123127
new AuthMechanismFactory[] { new PlainMechanismFactory() };
@@ -140,6 +144,9 @@ public class ConnectionFactory
140144
/// <summary>Heartbeat setting to request (in seconds)</summary>
141145
public ushort RequestedHeartbeat = DefaultHeartbeat;
142146

147+
/// <summary>Timeout setting for connection attempts (in milliseconds)</summary>
148+
public int RequestedConnectionTimeout = DefaultConnectionTimeout;
149+
143150
/// <summary>Dictionary of client properties to be sent to the
144151
/// server</summary>
145152
public IDictionary ClientProperties = ConnectionBase.DefaultClientProperties();
@@ -189,6 +196,11 @@ public String Uri
189196
set { SetUri(new Uri(value, UriKind.Absolute)); }
190197
}
191198

199+
public delegate TcpClient ObtainSocket(AddressFamily addressFamily);
200+
201+
///<summary>Set custom socket options by providing a SocketFactory</summary>
202+
public ObtainSocket SocketFactory = DefaultSocketFactory;
203+
192204
///<summary>Construct a fresh instance, with all fields set to
193205
///their respective defaults.</summary>
194206
public ConnectionFactory() { }
@@ -212,7 +224,10 @@ protected virtual IConnection FollowRedirectChain
212224

213225
try {
214226
IProtocol p = candidate.Protocol;
215-
IFrameHandler fh = p.CreateFrameHandler(candidate);
227+
IFrameHandler fh = p.CreateFrameHandler(candidate,
228+
SocketFactory,
229+
RequestedConnectionTimeout);
230+
216231
// At this point, we may be able to create
217232
// and fully open a successful connection,
218233
// in which case we're done, and the
@@ -339,6 +354,12 @@ public AuthMechanismFactory AuthMechanismFactory(string[] mechs) {
339354
return null;
340355
}
341356

357+
public static TcpClient DefaultSocketFactory(AddressFamily addressFamily)
358+
{
359+
TcpClient tcpClient = new TcpClient(addressFamily);
360+
tcpClient.NoDelay = true;
361+
return tcpClient;
362+
}
342363

343364
private void SetUri(Uri uri)
344365
{

projects/client/RabbitMQ.Client/src/client/api/IProtocol.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ public interface IProtocol
6060
int DefaultPort { get; }
6161

6262
///<summary>Construct a frame handler for a given endpoint.</summary>
63-
IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint);
63+
IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint,
64+
ConnectionFactory.ObtainSocket socketFactory,
65+
int timeout);
6466
///<summary>Construct a connection from a given set of
6567
///parameters and a frame handler. The "insist" parameter is
6668
///passed on to the AMQP connection.open method.</summary>

projects/client/RabbitMQ.Client/src/client/impl/AbstractProtocolBase.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ public abstract class AbstractProtocolBase: IProtocol {
5454

5555
public IDictionary Capabilities = new Hashtable();
5656

57-
public abstract IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint);
57+
public abstract IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint,
58+
ConnectionFactory.ObtainSocket socketFactory,
59+
int timeout);
5860
public abstract IConnection CreateConnection(ConnectionFactory factory,
5961
bool insist,
6062
IFrameHandler frameHandler);

projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler_0_9.cs

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,35 +62,39 @@ public class SocketFrameHandler_0_9 : IFrameHandler
6262
private bool m_closed = false;
6363
private Object m_semaphore = new object();
6464

65-
public SocketFrameHandler_0_9(AmqpTcpEndpoint endpoint)
65+
public SocketFrameHandler_0_9(AmqpTcpEndpoint endpoint,
66+
ConnectionFactory.ObtainSocket socketFactory,
67+
int timeout)
6668
{
6769
m_endpoint = endpoint;
6870
m_socket = null;
6971
if (Socket.OSSupportsIPv6)
7072
{
7173
try
7274
{
73-
m_socket = new TcpClient(AddressFamily.InterNetworkV6);
74-
m_socket.Connect(endpoint.HostName, endpoint.Port);
75+
m_socket = socketFactory(AddressFamily.InterNetworkV6);
76+
Connect(m_socket, endpoint, timeout);
7577
}
76-
catch(SocketException)
78+
catch (ArgumentException) // could not connect using IPv6
7779
{
7880
m_socket = null;
7981
}
8082
}
8183
if (m_socket == null)
8284
{
83-
m_socket = new TcpClient(AddressFamily.InterNetwork);
84-
m_socket.Connect(endpoint.HostName, endpoint.Port);
85+
m_socket = socketFactory(AddressFamily.InterNetwork);
86+
Connect(m_socket, endpoint, timeout);
8587
}
86-
// disable Nagle's algorithm, for more consistently low latency
87-
m_socket.NoDelay = true;
8888

8989
Stream netstream = m_socket.GetStream();
90-
if (endpoint.Ssl.Enabled) {
91-
try {
90+
if (endpoint.Ssl.Enabled)
91+
{
92+
try
93+
{
9294
netstream = SslHelper.TcpUpgrade(netstream, endpoint.Ssl);
93-
} catch (Exception) {
95+
}
96+
catch (Exception)
97+
{
9498
Close();
9599
throw;
96100
}
@@ -99,6 +103,26 @@ public SocketFrameHandler_0_9(AmqpTcpEndpoint endpoint)
99103
m_writer = new NetworkBinaryWriter(new BufferedStream(netstream));
100104
}
101105

106+
private void Connect(TcpClient socket, AmqpTcpEndpoint endpoint, int timeout)
107+
{
108+
IAsyncResult ar = null;
109+
try
110+
{
111+
ar = socket.BeginConnect(endpoint.HostName, endpoint.Port, null, null);
112+
if (!ar.AsyncWaitHandle.WaitOne(timeout, false))
113+
{
114+
socket.Close();
115+
throw new TimeoutException("Connection to " + endpoint + " timed out");
116+
}
117+
socket.EndConnect(ar);
118+
}
119+
finally
120+
{
121+
if (ar != null)
122+
ar.AsyncWaitHandle.Close();
123+
}
124+
}
125+
102126
public AmqpTcpEndpoint Endpoint
103127
{
104128
get

projects/client/RabbitMQ.Client/src/client/impl/v0_8/ProtocolBase.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,11 @@
4545
namespace RabbitMQ.Client.Framing.Impl.v0_8 {
4646
public abstract class ProtocolBase: AbstractProtocolBase {
4747

48-
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint) {
49-
return new SocketFrameHandler_0_9(endpoint);
48+
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint,
49+
ConnectionFactory.ObtainSocket socketFactory,
50+
int timeout)
51+
{
52+
return new SocketFrameHandler_0_9(endpoint, socketFactory, timeout);
5053
}
5154

5255
public override IModel CreateModel(ISession session) {
@@ -81,11 +84,11 @@ public override void CreateChannelClose(ushort reasonCode,
8184
{
8285
request = new Command(new RabbitMQ.Client.Framing.Impl.v0_8.ChannelClose(reasonCode,
8386
reasonText,
84-
0, 0));
87+
0, 0));
8588
replyClassId = RabbitMQ.Client.Framing.Impl.v0_8.ChannelCloseOk.ClassId;
86-
replyMethodId = RabbitMQ.Client.Framing.Impl.v0_8.ChannelCloseOk.MethodId;
89+
replyMethodId = RabbitMQ.Client.Framing.Impl.v0_8.ChannelCloseOk.MethodId;
8790
}
88-
91+
8992
public override bool CanSendWhileClosed(Command cmd)
9093
{
9194
return cmd.m_method is RabbitMQ.Client.Framing.Impl.v0_8.ChannelCloseOk;

projects/client/RabbitMQ.Client/src/client/impl/v0_8qpid/ProtocolBase.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,11 @@
4545
namespace RabbitMQ.Client.Framing.Impl.v0_8qpid {
4646
public abstract class ProtocolBase: AbstractProtocolBase {
4747

48-
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint) {
49-
return new SocketFrameHandler_0_9(endpoint);
48+
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint,
49+
ConnectionFactory.ObtainSocket socketFactory,
50+
int timeout)
51+
{
52+
return new SocketFrameHandler_0_9(endpoint, socketFactory, timeout);
5053
}
5154

5255
public override IModel CreateModel(ISession session) {
@@ -81,11 +84,11 @@ public override void CreateChannelClose(ushort reasonCode,
8184
{
8285
request = new Command(new RabbitMQ.Client.Framing.Impl.v0_8qpid.ChannelClose(reasonCode,
8386
reasonText,
84-
0, 0));
87+
0, 0));
8588
replyClassId = RabbitMQ.Client.Framing.Impl.v0_8qpid.ChannelCloseOk.ClassId;
86-
replyMethodId = RabbitMQ.Client.Framing.Impl.v0_8qpid.ChannelCloseOk.MethodId;
89+
replyMethodId = RabbitMQ.Client.Framing.Impl.v0_8qpid.ChannelCloseOk.MethodId;
8790
}
88-
91+
8992
public override bool CanSendWhileClosed(Command cmd)
9093
{
9194
return cmd.m_method is RabbitMQ.Client.Framing.Impl.v0_8qpid.ChannelCloseOk;

projects/client/RabbitMQ.Client/src/client/impl/v0_9/ProtocolBase.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,11 @@
4545
namespace RabbitMQ.Client.Framing.Impl.v0_9 {
4646
public abstract class ProtocolBase: AbstractProtocolBase {
4747

48-
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint) {
49-
return new SocketFrameHandler_0_9(endpoint);
48+
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint,
49+
ConnectionFactory.ObtainSocket socketFactory,
50+
int timeout)
51+
{
52+
return new SocketFrameHandler_0_9(endpoint, socketFactory, timeout);
5053
}
5154

5255
public override IModel CreateModel(ISession session) {

projects/client/RabbitMQ.Client/src/client/impl/v0_9_1/ProtocolBase.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,11 @@ public ProtocolBase() {
5454
Capabilities["consumer_cancel_notify"] = true;
5555
}
5656

57-
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint) {
58-
return new SocketFrameHandler_0_9(endpoint);
57+
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint,
58+
ConnectionFactory.ObtainSocket socketFactory,
59+
int timeout)
60+
{
61+
return new SocketFrameHandler_0_9(endpoint, socketFactory, timeout);
5962
}
6063

6164
public override IModel CreateModel(ISession session) {

0 commit comments

Comments
 (0)