Skip to content

Commit ab732e4

Browse files
committed
Connection timeouts and socket options
ConnectionFactory now passes a connection timeout value and TcpClient object down to the SocketFrameHandler_0_9 class. This allows setting a timeout value, and configuring socket options prior to a connection attempt. The ConnectionFactory can be subclassed, and the virtual ConfigureSocket hook function should be overridden.
1 parent 3906fdd commit ab732e4

File tree

9 files changed

+126
-42
lines changed

9 files changed

+126
-42
lines changed

projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ public class ConnectionFactory
118118
/// seconds, with zero meaning none (value: 0)</summary>
119119
public const ushort DefaultHeartbeat = 0; // PLEASE KEEP THIS MATCHING THE DOC ABOVE
120120

121+
/// <summary> Default value for connection attempt timeout,
122+
/// in milliseconds</summary>
123+
public const int DefaultConnectionTimeout = 30 * 1000;
124+
121125
///<summary> Default SASL auth mechanisms to use.</summary>
122126
public static AuthMechanismFactory[] DefaultAuthMechanisms =
123127
new AuthMechanismFactory[] { new PlainMechanismFactory() };
@@ -140,6 +144,9 @@ public class ConnectionFactory
140144
/// <summary>Heartbeat setting to request (in seconds)</summary>
141145
public ushort RequestedHeartbeat = DefaultHeartbeat;
142146

147+
/// <summary>Timeout setting for connection attempts (in milliseconds)</summary>
148+
public int RequestedConnectionTimeout = DefaultConnectionTimeout;
149+
143150
/// <summary>Dictionary of client properties to be sent to the
144151
/// server</summary>
145152
public IDictionary ClientProperties = ConnectionBase.DefaultClientProperties();
@@ -212,7 +219,41 @@ protected virtual IConnection FollowRedirectChain
212219

213220
try {
214221
IProtocol p = candidate.Protocol;
215-
IFrameHandler fh = p.CreateFrameHandler(candidate);
222+
TcpClient socket = null;
223+
IFrameHandler fh = null;
224+
if (Socket.OSSupportsIPv6)
225+
{
226+
try
227+
{
228+
socket = new TcpClient(AddressFamily.InterNetworkV6);
229+
ConfigureSocket(socket);
230+
fh = p.CreateFrameHandler(socket, candidate, RequestedConnectionTimeout);
231+
}
232+
// Don't attempt to use an IPv4 socket, as timeout was exceeded
233+
catch (TimeoutException)
234+
{
235+
throw;
236+
}
237+
// Socket error, do try an IPv4 socket.
238+
catch (SocketException)
239+
{
240+
socket = null;
241+
fh = null;
242+
}
243+
// IPv4 address was used for endpoint hostname, try IPv4 socket
244+
catch (ArgumentException)
245+
{
246+
socket = null;
247+
fh = null;
248+
}
249+
}
250+
if (socket == null)
251+
{
252+
socket = new TcpClient(AddressFamily.InterNetwork);
253+
ConfigureSocket(socket);
254+
fh = p.CreateFrameHandler(socket, candidate, RequestedConnectionTimeout);
255+
}
256+
216257
// At this point, we may be able to create
217258
// and fully open a successful connection,
218259
// in which case we're done, and the
@@ -339,6 +380,21 @@ public AuthMechanismFactory AuthMechanismFactory(string[] mechs) {
339380
return null;
340381
}
341382

383+
/// <summary>
384+
/// Provides a hook to insert custom configuration of the sockets
385+
/// used to connect to an AMQP server before they connect.
386+
///
387+
/// The default behaviour of this method is to disable Nagle's
388+
/// algorithm to get more consistently low latency. However it
389+
/// may be overridden freely and there is no requirement to retain
390+
/// this behaviour.
391+
/// </summary>
392+
/// <param name="socket">The socket that is to be used for the Connection</param>
393+
protected virtual void ConfigureSocket(TcpClient socket)
394+
{
395+
// disable Nagle's algorithm, for more consistently low latency
396+
socket.NoDelay = true;
397+
}
342398

343399
private void SetUri(Uri uri)
344400
{

projects/client/RabbitMQ.Client/src/client/api/IProtocol.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040

4141
using RabbitMQ.Client.Impl;
4242

43+
using System.Net.Sockets;
44+
4345
namespace RabbitMQ.Client
4446
{
4547
///<summary>Object describing various overarching parameters
@@ -60,7 +62,7 @@ public interface IProtocol
6062
int DefaultPort { get; }
6163

6264
///<summary>Construct a frame handler for a given endpoint.</summary>
63-
IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint);
65+
IFrameHandler CreateFrameHandler(TcpClient socket, AmqpTcpEndpoint endpoint, int timeout);
6466
///<summary>Construct a connection from a given set of
6567
///parameters and a frame handler. The "insist" parameter is
6668
///passed on to the AMQP connection.open method.</summary>

projects/client/RabbitMQ.Client/src/client/impl/AbstractProtocolBase.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
using RabbitMQ.Util;
4444

4545
using System.Collections;
46+
using System.Net.Sockets;
4647

4748
namespace RabbitMQ.Client.Impl {
4849
public abstract class AbstractProtocolBase: IProtocol {
@@ -54,7 +55,7 @@ public abstract class AbstractProtocolBase: IProtocol {
5455

5556
public IDictionary Capabilities = new Hashtable();
5657

57-
public abstract IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint);
58+
public abstract IFrameHandler CreateFrameHandler(TcpClient socket, AmqpTcpEndpoint endpoint, int timeout);
5859
public abstract IConnection CreateConnection(ConnectionFactory factory,
5960
bool insist,
6061
IFrameHandler frameHandler);

projects/client/RabbitMQ.Client/src/client/impl/Command.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,28 +56,28 @@ namespace RabbitMQ.Client.Impl {
5656
public class Command {
5757
private static readonly byte[] m_emptyByteArray = new byte[0];
5858

59-
// EmptyFrameSize, 8 = 1 + 2 + 4 + 1
59+
// EmptyContentBodyFrameSize, 8 = 1 + 2 + 4 + 1
6060
// - 1 byte of frame type
6161
// - 2 bytes of channel number
6262
// - 4 bytes of frame payload length
6363
// - 1 byte of payload trailer FrameEnd byte
64-
public const int EmptyFrameSize = 8;
64+
public const int EmptyContentBodyFrameSize = 8;
6565

6666
static Command() {
67-
CheckEmptyFrameSize();
67+
CheckEmptyContentBodyFrameSize();
6868
}
6969

70-
public static void CheckEmptyFrameSize() {
70+
public static void CheckEmptyContentBodyFrameSize() {
7171
Frame f = new Frame(CommonFraming.Constants.FrameBody, 0, m_emptyByteArray);
7272
MemoryStream stream = new MemoryStream();
7373
NetworkBinaryWriter writer = new NetworkBinaryWriter(stream);
7474
f.WriteTo(writer);
7575
long actualLength = stream.Length;
7676

77-
if (EmptyFrameSize != actualLength) {
78-
string message =
79-
string.Format("EmptyFrameSize is incorrect - defined as {0} where the computed value is in fact {1}.",
80-
EmptyFrameSize,
77+
if (EmptyContentBodyFrameSize != actualLength) {
78+
string message =
79+
string.Format("EmptyContentBodyFrameSize is incorrect - defined as {0} where the computed value is in fact {1}.",
80+
EmptyContentBodyFrameSize,
8181
actualLength);
8282
throw new ProtocolViolationException(message);
8383
}
@@ -159,7 +159,7 @@ public void Transmit(int channelNumber, ConnectionBase connection) {
159159
int frameMax = (int) Math.Min(int.MaxValue, connection.FrameMax);
160160
int bodyPayloadMax = (frameMax == 0)
161161
? body.Length
162-
: frameMax - EmptyFrameSize;
162+
: frameMax - EmptyContentBodyFrameSize;
163163
for (int offset = 0; offset < body.Length; offset += bodyPayloadMax) {
164164
int remaining = body.Length - offset;
165165

projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler_0_9.cs

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -62,35 +62,21 @@ public class SocketFrameHandler_0_9 : IFrameHandler
6262
private bool m_closed = false;
6363
private Object m_semaphore = new object();
6464

65-
public SocketFrameHandler_0_9(AmqpTcpEndpoint endpoint)
65+
public SocketFrameHandler_0_9(TcpClient socket, AmqpTcpEndpoint endpoint, int timeout)
6666
{
6767
m_endpoint = endpoint;
68-
m_socket = null;
69-
if (Socket.OSSupportsIPv6)
68+
m_socket = socket;
69+
Connect(m_socket, timeout);
70+
71+
Stream netstream = m_socket.GetStream();
72+
if (endpoint.Ssl.Enabled)
7073
{
7174
try
7275
{
73-
m_socket = new TcpClient(AddressFamily.InterNetworkV6);
74-
m_socket.Connect(endpoint.HostName, endpoint.Port);
76+
netstream = SslHelper.TcpUpgrade(netstream, endpoint.Ssl);
7577
}
76-
catch(SocketException)
78+
catch (Exception)
7779
{
78-
m_socket = null;
79-
}
80-
}
81-
if (m_socket == null)
82-
{
83-
m_socket = new TcpClient(AddressFamily.InterNetwork);
84-
m_socket.Connect(endpoint.HostName, endpoint.Port);
85-
}
86-
// disable Nagle's algorithm, for more consistently low latency
87-
m_socket.NoDelay = true;
88-
89-
Stream netstream = m_socket.GetStream();
90-
if (endpoint.Ssl.Enabled) {
91-
try {
92-
netstream = SslHelper.TcpUpgrade(netstream, endpoint.Ssl);
93-
} catch (Exception) {
9480
Close();
9581
throw;
9682
}
@@ -99,6 +85,26 @@ public SocketFrameHandler_0_9(AmqpTcpEndpoint endpoint)
9985
m_writer = new NetworkBinaryWriter(new BufferedStream(netstream));
10086
}
10187

88+
private void Connect(TcpClient socket, int timeout)
89+
{
90+
IAsyncResult ar = null;
91+
try
92+
{
93+
ar = socket.BeginConnect(m_endpoint.HostName, m_endpoint.Port, null, null);
94+
if (!ar.AsyncWaitHandle.WaitOne(timeout, false))
95+
{
96+
m_socket.Close();
97+
throw new TimeoutException();
98+
}
99+
m_socket.EndConnect(ar);
100+
}
101+
finally
102+
{
103+
if (ar != null)
104+
ar.AsyncWaitHandle.Close();
105+
}
106+
}
107+
102108
public AmqpTcpEndpoint Endpoint
103109
{
104110
get

projects/client/RabbitMQ.Client/src/client/impl/v0_8/ProtocolBase.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,16 @@
4242
using RabbitMQ.Client.Impl;
4343
using RabbitMQ.Util;
4444

45+
using System.Net.Sockets;
46+
4547
namespace RabbitMQ.Client.Framing.Impl.v0_8 {
4648
public abstract class ProtocolBase: AbstractProtocolBase {
4749

48-
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint) {
49-
return new SocketFrameHandler_0_9(endpoint);
50+
public override IFrameHandler CreateFrameHandler(TcpClient socket,
51+
AmqpTcpEndpoint endpoint,
52+
int timeout)
53+
{
54+
return new SocketFrameHandler_0_9(socket, endpoint, timeout);
5055
}
5156

5257
public override IModel CreateModel(ISession session) {

projects/client/RabbitMQ.Client/src/client/impl/v0_8qpid/ProtocolBase.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,16 @@
4242
using RabbitMQ.Client.Impl;
4343
using RabbitMQ.Util;
4444

45+
using System.Net.Sockets;
46+
4547
namespace RabbitMQ.Client.Framing.Impl.v0_8qpid {
4648
public abstract class ProtocolBase: AbstractProtocolBase {
4749

48-
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint) {
49-
return new SocketFrameHandler_0_9(endpoint);
50+
public override IFrameHandler CreateFrameHandler(TcpClient socket,
51+
AmqpTcpEndpoint endpoint,
52+
int timeout)
53+
{
54+
return new SocketFrameHandler_0_9(socket, endpoint, timeout);
5055
}
5156

5257
public override IModel CreateModel(ISession session) {

projects/client/RabbitMQ.Client/src/client/impl/v0_9/ProtocolBase.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,16 @@
4242
using RabbitMQ.Client.Impl;
4343
using RabbitMQ.Util;
4444

45+
using System.Net.Sockets;
46+
4547
namespace RabbitMQ.Client.Framing.Impl.v0_9 {
4648
public abstract class ProtocolBase: AbstractProtocolBase {
4749

48-
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint) {
49-
return new SocketFrameHandler_0_9(endpoint);
50+
public override IFrameHandler CreateFrameHandler(TcpClient socket,
51+
AmqpTcpEndpoint endpoint,
52+
int timeout)
53+
{
54+
return new SocketFrameHandler_0_9(socket, endpoint, timeout);
5055
}
5156

5257
public override IModel CreateModel(ISession session) {

projects/client/RabbitMQ.Client/src/client/impl/v0_9_1/ProtocolBase.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
using RabbitMQ.Util;
4444

4545
using System.Collections;
46+
using System.Net.Sockets;
4647

4748
namespace RabbitMQ.Client.Framing.Impl.v0_9_1 {
4849
public abstract class ProtocolBase: AbstractProtocolBase {
@@ -54,8 +55,11 @@ public ProtocolBase() {
5455
Capabilities["consumer_cancel_notify"] = true;
5556
}
5657

57-
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint) {
58-
return new SocketFrameHandler_0_9(endpoint);
58+
public override IFrameHandler CreateFrameHandler(TcpClient socket,
59+
AmqpTcpEndpoint endpoint,
60+
int timeout)
61+
{
62+
return new SocketFrameHandler_0_9(socket, endpoint, timeout);
5963
}
6064

6165
public override IModel CreateModel(ISession session) {

0 commit comments

Comments
 (0)