Skip to content

Commit f11b369

Browse files
author
Emile Joubert
committed
Simplify configurability of sockets
1 parent ab732e4 commit f11b369

File tree

9 files changed

+69
-93
lines changed

9 files changed

+69
-93
lines changed

projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs

Lines changed: 11 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,10 @@ public String Uri
196196
set { SetUri(new Uri(value, UriKind.Absolute)); }
197197
}
198198

199+
public delegate TcpClient ObtainSocket(AddressFamily addressFamily);
200+
201+
public ObtainSocket SocketFactory = DefaultSocketFactory;
202+
199203
///<summary>Construct a fresh instance, with all fields set to
200204
///their respective defaults.</summary>
201205
public ConnectionFactory() { }
@@ -219,40 +223,9 @@ protected virtual IConnection FollowRedirectChain
219223

220224
try {
221225
IProtocol p = candidate.Protocol;
222-
TcpClient socket = null;
223-
IFrameHandler fh = null;
224-
if (Socket.OSSupportsIPv6)
225-
{
226-
try
227-
{
228-
socket = new TcpClient(AddressFamily.InterNetworkV6);
229-
ConfigureSocket(socket);
230-
fh = p.CreateFrameHandler(socket, candidate, RequestedConnectionTimeout);
231-
}
232-
// Don't attempt to use an IPv4 socket, as timeout was exceeded
233-
catch (TimeoutException)
234-
{
235-
throw;
236-
}
237-
// Socket error, do try an IPv4 socket.
238-
catch (SocketException)
239-
{
240-
socket = null;
241-
fh = null;
242-
}
243-
// IPv4 address was used for endpoint hostname, try IPv4 socket
244-
catch (ArgumentException)
245-
{
246-
socket = null;
247-
fh = null;
248-
}
249-
}
250-
if (socket == null)
251-
{
252-
socket = new TcpClient(AddressFamily.InterNetwork);
253-
ConfigureSocket(socket);
254-
fh = p.CreateFrameHandler(socket, candidate, RequestedConnectionTimeout);
255-
}
226+
IFrameHandler fh = p.CreateFrameHandler(candidate,
227+
SocketFactory,
228+
RequestedConnectionTimeout);
256229

257230
// At this point, we may be able to create
258231
// and fully open a successful connection,
@@ -380,20 +353,11 @@ public AuthMechanismFactory AuthMechanismFactory(string[] mechs) {
380353
return null;
381354
}
382355

383-
/// <summary>
384-
/// Provides a hook to insert custom configuration of the sockets
385-
/// used to connect to an AMQP server before they connect.
386-
///
387-
/// The default behaviour of this method is to disable Nagle's
388-
/// algorithm to get more consistently low latency. However it
389-
/// may be overridden freely and there is no requirement to retain
390-
/// this behaviour.
391-
/// </summary>
392-
/// <param name="socket">The socket that is to be used for the Connection</param>
393-
protected virtual void ConfigureSocket(TcpClient socket)
356+
public static TcpClient DefaultSocketFactory(AddressFamily addressFamily)
394357
{
395-
// disable Nagle's algorithm, for more consistently low latency
396-
socket.NoDelay = true;
358+
TcpClient tcpClient = new TcpClient(addressFamily);
359+
tcpClient.NoDelay = true;
360+
return tcpClient;
397361
}
398362

399363
private void SetUri(Uri uri)

projects/client/RabbitMQ.Client/src/client/api/IProtocol.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@
4040

4141
using RabbitMQ.Client.Impl;
4242

43-
using System.Net.Sockets;
44-
4543
namespace RabbitMQ.Client
4644
{
4745
///<summary>Object describing various overarching parameters
@@ -62,7 +60,9 @@ public interface IProtocol
6260
int DefaultPort { get; }
6361

6462
///<summary>Construct a frame handler for a given endpoint.</summary>
65-
IFrameHandler CreateFrameHandler(TcpClient socket, AmqpTcpEndpoint endpoint, int timeout);
63+
IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint,
64+
ConnectionFactory.ObtainSocket socketFactory,
65+
int timeout);
6666
///<summary>Construct a connection from a given set of
6767
///parameters and a frame handler. The "insist" parameter is
6868
///passed on to the AMQP connection.open method.</summary>

projects/client/RabbitMQ.Client/src/client/impl/AbstractProtocolBase.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
using RabbitMQ.Util;
4444

4545
using System.Collections;
46-
using System.Net.Sockets;
4746

4847
namespace RabbitMQ.Client.Impl {
4948
public abstract class AbstractProtocolBase: IProtocol {
@@ -55,7 +54,9 @@ public abstract class AbstractProtocolBase: IProtocol {
5554

5655
public IDictionary Capabilities = new Hashtable();
5756

58-
public abstract IFrameHandler CreateFrameHandler(TcpClient socket, AmqpTcpEndpoint endpoint, int timeout);
57+
public abstract IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint,
58+
ConnectionFactory.ObtainSocket socketFactory,
59+
int timeout);
5960
public abstract IConnection CreateConnection(ConnectionFactory factory,
6061
bool insist,
6162
IFrameHandler frameHandler);

projects/client/RabbitMQ.Client/src/client/impl/Command.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,28 +56,28 @@ namespace RabbitMQ.Client.Impl {
5656
public class Command {
5757
private static readonly byte[] m_emptyByteArray = new byte[0];
5858

59-
// EmptyContentBodyFrameSize, 8 = 1 + 2 + 4 + 1
59+
// EmptyFrameSize, 8 = 1 + 2 + 4 + 1
6060
// - 1 byte of frame type
6161
// - 2 bytes of channel number
6262
// - 4 bytes of frame payload length
6363
// - 1 byte of payload trailer FrameEnd byte
64-
public const int EmptyContentBodyFrameSize = 8;
64+
public const int EmptyFrameSize = 8;
6565

6666
static Command() {
67-
CheckEmptyContentBodyFrameSize();
67+
CheckEmptyFrameSize();
6868
}
6969

70-
public static void CheckEmptyContentBodyFrameSize() {
70+
public static void CheckEmptyFrameSize() {
7171
Frame f = new Frame(CommonFraming.Constants.FrameBody, 0, m_emptyByteArray);
7272
MemoryStream stream = new MemoryStream();
7373
NetworkBinaryWriter writer = new NetworkBinaryWriter(stream);
7474
f.WriteTo(writer);
7575
long actualLength = stream.Length;
7676

77-
if (EmptyContentBodyFrameSize != actualLength) {
78-
string message =
79-
string.Format("EmptyContentBodyFrameSize is incorrect - defined as {0} where the computed value is in fact {1}.",
80-
EmptyContentBodyFrameSize,
77+
if (EmptyFrameSize != actualLength) {
78+
string message =
79+
string.Format("EmptyFrameSize is incorrect - defined as {0} where the computed value is in fact {1}.",
80+
EmptyFrameSize,
8181
actualLength);
8282
throw new ProtocolViolationException(message);
8383
}
@@ -159,7 +159,7 @@ public void Transmit(int channelNumber, ConnectionBase connection) {
159159
int frameMax = (int) Math.Min(int.MaxValue, connection.FrameMax);
160160
int bodyPayloadMax = (frameMax == 0)
161161
? body.Length
162-
: frameMax - EmptyContentBodyFrameSize;
162+
: frameMax - EmptyFrameSize;
163163
for (int offset = 0; offset < body.Length; offset += bodyPayloadMax) {
164164
int remaining = body.Length - offset;
165165

projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler_0_9.cs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,29 @@ public class SocketFrameHandler_0_9 : IFrameHandler
6262
private bool m_closed = false;
6363
private Object m_semaphore = new object();
6464

65-
public SocketFrameHandler_0_9(TcpClient socket, AmqpTcpEndpoint endpoint, int timeout)
65+
public SocketFrameHandler_0_9(AmqpTcpEndpoint endpoint,
66+
ConnectionFactory.ObtainSocket socketFactory,
67+
int timeout)
6668
{
6769
m_endpoint = endpoint;
68-
m_socket = socket;
69-
Connect(m_socket, timeout);
70+
m_socket = null;
71+
if (Socket.OSSupportsIPv6)
72+
{
73+
try
74+
{
75+
m_socket = socketFactory(AddressFamily.InterNetworkV6);
76+
Connect(m_socket, endpoint, timeout);
77+
}
78+
catch (ArgumentException)
79+
{
80+
m_socket = null;
81+
}
82+
}
83+
if (m_socket == null)
84+
{
85+
m_socket = socketFactory(AddressFamily.InterNetwork);
86+
Connect(m_socket, endpoint, timeout);
87+
}
7088

7189
Stream netstream = m_socket.GetStream();
7290
if (endpoint.Ssl.Enabled)
@@ -85,18 +103,18 @@ public SocketFrameHandler_0_9(TcpClient socket, AmqpTcpEndpoint endpoint, int ti
85103
m_writer = new NetworkBinaryWriter(new BufferedStream(netstream));
86104
}
87105

88-
private void Connect(TcpClient socket, int timeout)
106+
private void Connect(TcpClient socket, AmqpTcpEndpoint endpoint, int timeout)
89107
{
90108
IAsyncResult ar = null;
91109
try
92110
{
93-
ar = socket.BeginConnect(m_endpoint.HostName, m_endpoint.Port, null, null);
111+
ar = socket.BeginConnect(endpoint.HostName, endpoint.Port, null, null);
94112
if (!ar.AsyncWaitHandle.WaitOne(timeout, false))
95113
{
96-
m_socket.Close();
114+
socket.Close();
97115
throw new TimeoutException();
98116
}
99-
m_socket.EndConnect(ar);
117+
socket.EndConnect(ar);
100118
}
101119
finally
102120
{

projects/client/RabbitMQ.Client/src/client/impl/v0_8/ProtocolBase.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,14 @@
4242
using RabbitMQ.Client.Impl;
4343
using RabbitMQ.Util;
4444

45-
using System.Net.Sockets;
46-
4745
namespace RabbitMQ.Client.Framing.Impl.v0_8 {
4846
public abstract class ProtocolBase: AbstractProtocolBase {
4947

50-
public override IFrameHandler CreateFrameHandler(TcpClient socket,
51-
AmqpTcpEndpoint endpoint,
48+
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint,
49+
ConnectionFactory.ObtainSocket socketFactory,
5250
int timeout)
5351
{
54-
return new SocketFrameHandler_0_9(socket, endpoint, timeout);
52+
return new SocketFrameHandler_0_9(endpoint, socketFactory, timeout);
5553
}
5654

5755
public override IModel CreateModel(ISession session) {
@@ -86,11 +84,11 @@ public override void CreateChannelClose(ushort reasonCode,
8684
{
8785
request = new Command(new RabbitMQ.Client.Framing.Impl.v0_8.ChannelClose(reasonCode,
8886
reasonText,
89-
0, 0));
87+
0, 0));
9088
replyClassId = RabbitMQ.Client.Framing.Impl.v0_8.ChannelCloseOk.ClassId;
91-
replyMethodId = RabbitMQ.Client.Framing.Impl.v0_8.ChannelCloseOk.MethodId;
89+
replyMethodId = RabbitMQ.Client.Framing.Impl.v0_8.ChannelCloseOk.MethodId;
9290
}
93-
91+
9492
public override bool CanSendWhileClosed(Command cmd)
9593
{
9694
return cmd.m_method is RabbitMQ.Client.Framing.Impl.v0_8.ChannelCloseOk;

projects/client/RabbitMQ.Client/src/client/impl/v0_8qpid/ProtocolBase.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,14 @@
4242
using RabbitMQ.Client.Impl;
4343
using RabbitMQ.Util;
4444

45-
using System.Net.Sockets;
46-
4745
namespace RabbitMQ.Client.Framing.Impl.v0_8qpid {
4846
public abstract class ProtocolBase: AbstractProtocolBase {
4947

50-
public override IFrameHandler CreateFrameHandler(TcpClient socket,
51-
AmqpTcpEndpoint endpoint,
48+
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint,
49+
ConnectionFactory.ObtainSocket socketFactory,
5250
int timeout)
5351
{
54-
return new SocketFrameHandler_0_9(socket, endpoint, timeout);
52+
return new SocketFrameHandler_0_9(endpoint, socketFactory, timeout);
5553
}
5654

5755
public override IModel CreateModel(ISession session) {
@@ -86,11 +84,11 @@ public override void CreateChannelClose(ushort reasonCode,
8684
{
8785
request = new Command(new RabbitMQ.Client.Framing.Impl.v0_8qpid.ChannelClose(reasonCode,
8886
reasonText,
89-
0, 0));
87+
0, 0));
9088
replyClassId = RabbitMQ.Client.Framing.Impl.v0_8qpid.ChannelCloseOk.ClassId;
91-
replyMethodId = RabbitMQ.Client.Framing.Impl.v0_8qpid.ChannelCloseOk.MethodId;
89+
replyMethodId = RabbitMQ.Client.Framing.Impl.v0_8qpid.ChannelCloseOk.MethodId;
9290
}
93-
91+
9492
public override bool CanSendWhileClosed(Command cmd)
9593
{
9694
return cmd.m_method is RabbitMQ.Client.Framing.Impl.v0_8qpid.ChannelCloseOk;

projects/client/RabbitMQ.Client/src/client/impl/v0_9/ProtocolBase.cs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,14 @@
4242
using RabbitMQ.Client.Impl;
4343
using RabbitMQ.Util;
4444

45-
using System.Net.Sockets;
46-
4745
namespace RabbitMQ.Client.Framing.Impl.v0_9 {
4846
public abstract class ProtocolBase: AbstractProtocolBase {
4947

50-
public override IFrameHandler CreateFrameHandler(TcpClient socket,
51-
AmqpTcpEndpoint endpoint,
48+
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint,
49+
ConnectionFactory.ObtainSocket socketFactory,
5250
int timeout)
5351
{
54-
return new SocketFrameHandler_0_9(socket, endpoint, timeout);
52+
return new SocketFrameHandler_0_9(endpoint, socketFactory, timeout);
5553
}
5654

5755
public override IModel CreateModel(ISession session) {

projects/client/RabbitMQ.Client/src/client/impl/v0_9_1/ProtocolBase.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
using RabbitMQ.Util;
4444

4545
using System.Collections;
46-
using System.Net.Sockets;
4746

4847
namespace RabbitMQ.Client.Framing.Impl.v0_9_1 {
4948
public abstract class ProtocolBase: AbstractProtocolBase {
@@ -55,11 +54,11 @@ public ProtocolBase() {
5554
Capabilities["consumer_cancel_notify"] = true;
5655
}
5756

58-
public override IFrameHandler CreateFrameHandler(TcpClient socket,
59-
AmqpTcpEndpoint endpoint,
57+
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint,
58+
ConnectionFactory.ObtainSocket socketFactory,
6059
int timeout)
6160
{
62-
return new SocketFrameHandler_0_9(socket, endpoint, timeout);
61+
return new SocketFrameHandler_0_9(endpoint, socketFactory, timeout);
6362
}
6463

6564
public override IModel CreateModel(ISession session) {

0 commit comments

Comments
 (0)