Skip to content

Commit 4b8dd2f

Browse files
Merge pull request #141 from rabbitmq/rabbitmq-dotnet-client-140
Introduce ConnectionFactory.SocketReadTimeout and SocketWriteTimeout
2 parents db5b3a1 + 5c79158 commit 4b8dd2f

File tree

7 files changed

+76
-27
lines changed

7 files changed

+76
-27
lines changed

projects/client/RabbitMQ.Client.WinRT/src/client/impl/SocketFrameHandler.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,14 @@ public class SocketFrameHandler : IFrameHandler
7171

7272
public SocketFrameHandler(AmqpTcpEndpoint endpoint,
7373
Func<StreamSocket> socketFactory,
74-
int timeout)
74+
int connectionTimeout,
75+
int _readTimeout,
76+
int _writeTimeout)
7577
{
7678
Endpoint = endpoint;
7779

7880
m_socket = socketFactory();
79-
Connect(m_socket, endpoint, timeout);
81+
Connect(m_socket, endpoint, connectionTimeout);
8082

8183
if (endpoint.Ssl.Enabled)
8284
{
@@ -131,14 +133,23 @@ public int RemotePort
131133
}
132134
}
133135

134-
public int Timeout
136+
public int ReadTimeout
135137
{
136138
set
137139
{
138140
// Ignored, timeouts over streams on WinRT
139141
// are much trickier to get right.
140142
//
141-
// Heartbeat implementation is in Connection.
143+
// See heartbeats implementation is in Connection.
144+
}
145+
}
146+
147+
public int WriteTimeout
148+
{
149+
set
150+
{
151+
// Ignored, timeouts over streams on WinRT
152+
// are much trickier to get right.
142153
}
143154
}
144155

projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public TimeSpan ContinuationTimeout
202202
public int Port = AmqpTcpEndpoint.UseDefaultPort;
203203

204204
/// <summary>
205-
/// The AMQP protocol to be used. Currently 0-9-1.
205+
/// Protocol used, only AMQP 0-9-1 is supported in modern versions.
206206
/// </summary>
207207
public IProtocol Protocol = Protocols.DefaultProtocol;
208208

@@ -211,6 +211,16 @@ public TimeSpan ContinuationTimeout
211211
/// </summary>
212212
public int RequestedConnectionTimeout = DefaultConnectionTimeout;
213213

214+
/// <summary>
215+
/// Timeout setting for socket read operations (in milliseconds).
216+
/// </summary>
217+
public int SocketReadTimeout = DefaultConnectionTimeout;
218+
219+
/// <summary>
220+
/// Timeout setting for socket write operations (in milliseconds).
221+
/// </summary>
222+
public int SocketWriteTimeout = DefaultConnectionTimeout;
223+
214224
/// <summary>
215225
/// Ssl options setting.
216226
/// </summary>
@@ -380,12 +390,25 @@ public IConnection CreateConnection(IList<string> hostnames)
380390

381391
public IFrameHandler CreateFrameHandler()
382392
{
383-
return Protocols.DefaultProtocol.CreateFrameHandler(Endpoint, SocketFactory, RequestedConnectionTimeout);
393+
var fh = Protocols.DefaultProtocol.CreateFrameHandler(Endpoint, SocketFactory,
394+
RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout);
395+
return ConfigureFrameHandler(fh);
384396
}
385397

386398
public IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint)
387399
{
388-
return Protocols.DefaultProtocol.CreateFrameHandler(endpoint, SocketFactory, RequestedConnectionTimeout);
400+
var fh = Protocols.DefaultProtocol.CreateFrameHandler(endpoint, SocketFactory,
401+
RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout);
402+
return ConfigureFrameHandler(fh);
403+
}
404+
405+
private IFrameHandler ConfigureFrameHandler(IFrameHandler fh)
406+
{
407+
// make sure socket timeouts are higher than heartbeat
408+
fh.ReadTimeout = Math.Max(SocketReadTimeout, RequestedHeartbeat * 1000);
409+
fh.WriteTimeout = Math.Max(SocketWriteTimeout, RequestedHeartbeat * 1000);
410+
// TODO: add user-provided configurator, like in the Java client
411+
return fh;
389412
}
390413

391414
private void SetUri(Uri uri)

projects/client/RabbitMQ.Client/src/client/api/IProtocol.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public interface IProtocol
9999
/// Construct a frame handler for a given endpoint.
100100
/// </summary>
101101
/// <param name="socketFactory">Socket factory method.</param>
102-
/// <param name="timeout">Timeout in milliseconds.</param>
102+
/// <param name="connectionTimeout">Timeout in milliseconds.</param>
103103
/// <param name="endpoint">Represents a TCP-addressable AMQP peer: a host name and port number.</param>
104104
IFrameHandler CreateFrameHandler(
105105
AmqpTcpEndpoint endpoint,
@@ -108,7 +108,9 @@ IFrameHandler CreateFrameHandler(
108108
#else
109109
Func<StreamSocket> socketFactory,
110110
#endif
111-
int timeout);
111+
int connectionTimeout,
112+
int readTimeout,
113+
int writeTimeout);
112114
/// <summary>
113115
/// Construct a protocol model atop a given session.
114116
/// </summary>

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ public ushort Heartbeat
263263
// timers fire at slightly below half the interval to avoid race
264264
// conditions
265265
m_heartbeatTimeSpan = TimeSpan.FromMilliseconds((value * 1000) / 4);
266-
m_frameHandler.Timeout = value * 1000 * 2;
266+
m_frameHandler.ReadTimeout = value * 1000 * 2;
267267
}
268268
}
269269

@@ -441,7 +441,7 @@ public void ClosingLoop()
441441
{
442442
try
443443
{
444-
m_frameHandler.Timeout = 0;
444+
m_frameHandler.ReadTimeout = 0;
445445
// Wait for response/socket closure or timeout
446446
while (!m_closed)
447447
{
@@ -1247,7 +1247,7 @@ protected void StartAndTune()
12471247
var connectionStartCell = new BlockingCell();
12481248
m_model0.m_connectionStartCell = connectionStartCell;
12491249
m_model0.HandshakeContinuationTimeout = m_factory.HandshakeContinuationTimeout;
1250-
m_frameHandler.Timeout = m_factory.HandshakeContinuationTimeout.Milliseconds;
1250+
m_frameHandler.ReadTimeout = m_factory.HandshakeContinuationTimeout.Milliseconds;
12511251
m_frameHandler.SendHeader();
12521252

12531253
var connectionStart = (ConnectionStartDetails)

projects/client/RabbitMQ.Client/src/client/impl/IFrameHandler.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ public interface IFrameHandler
6161
int RemotePort { get; }
6262

6363
///<summary>Socket read timeout, in milliseconds. Zero signals "infinity".</summary>
64-
int Timeout { set; }
64+
int ReadTimeout { set; }
65+
66+
///<summary>Socket write timeout, in milliseconds. Zero signals "infinity".</summary>
67+
int WriteTimeout { set; }
6568

6669
void Close();
6770

projects/client/RabbitMQ.Client/src/client/impl/ProtocolBase.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
#endif
4949

5050
using RabbitMQ.Client.Impl;
51-
using RabbitMQ.Client;
5251
using RabbitMQ.Util;
5352

5453
namespace RabbitMQ.Client.Framing.Impl
@@ -151,9 +150,12 @@ public IFrameHandler CreateFrameHandler(
151150
#else
152151
Func<StreamSocket> socketFactory,
153152
#endif
154-
int timeout)
153+
int connectionTimeout,
154+
int readTimeout,
155+
int writeTimeout)
155156
{
156-
return new SocketFrameHandler(endpoint, socketFactory, timeout);
157+
return new SocketFrameHandler(endpoint, socketFactory,
158+
connectionTimeout, readTimeout, writeTimeout);
157159
}
158160

159161
public IModel CreateModel(ISession session)

projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public class SocketFrameHandler : IFrameHandler
6666

6767
public SocketFrameHandler(AmqpTcpEndpoint endpoint,
6868
Func<AddressFamily, ITcpClient> socketFactory,
69-
int timeout)
69+
int connectionTimeout, int readTimeout, int writeTimeout)
7070
{
7171
Endpoint = endpoint;
7272
m_socket = null;
@@ -75,7 +75,7 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
7575
try
7676
{
7777
m_socket = socketFactory(AddressFamily.InterNetworkV6);
78-
Connect(m_socket, endpoint, timeout);
78+
Connect(m_socket, endpoint, connectionTimeout);
7979
}
8080
catch (ConnectFailureException) // could not connect using IPv6
8181
{
@@ -91,13 +91,12 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
9191
if (m_socket == null)
9292
{
9393
m_socket = socketFactory(AddressFamily.InterNetwork);
94-
Connect(m_socket, endpoint, timeout);
94+
Connect(m_socket, endpoint, connectionTimeout);
9595
}
9696

9797
Stream netstream = m_socket.GetStream();
98-
// make sure the socket timeout is greater than heartbeat timeout
99-
netstream.ReadTimeout = timeout;
100-
netstream.WriteTimeout = timeout;
98+
netstream.ReadTimeout = readTimeout;
99+
netstream.WriteTimeout = writeTimeout;
101100

102101
if (endpoint.Ssl.Enabled)
103102
{
@@ -113,6 +112,8 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
113112
}
114113
m_reader = new NetworkBinaryReader(new BufferedStream(netstream));
115114
m_writer = new NetworkBinaryWriter(new BufferedStream(netstream));
115+
116+
m_writeableStateTimeout = writeTimeout;
116117
}
117118

118119
public AmqpTcpEndpoint Endpoint { get; set; }
@@ -137,17 +138,15 @@ public int RemotePort
137138
get { return ((IPEndPoint)LocalEndPoint).Port; }
138139
}
139140

140-
public int Timeout
141+
public int ReadTimeout
141142
{
142143
set
143144
{
144145
try
145146
{
146147
if (m_socket.Connected)
147-
{
148-
// make sure the socket timeout is greater than heartbeat interval
149-
m_socket.ReceiveTimeout = value * 4;
150-
m_writeableStateTimeout = value * 4;
148+
{
149+
m_socket.ReceiveTimeout = value;
151150
}
152151
}
153152
#pragma warning disable 0168
@@ -159,6 +158,15 @@ public int Timeout
159158
}
160159
}
161160

161+
public int WriteTimeout
162+
{
163+
set
164+
{
165+
m_writeableStateTimeout = value;
166+
m_socket.Client.SendTimeout = value;
167+
}
168+
}
169+
162170
public void Close()
163171
{
164172
lock (_semaphore)

0 commit comments

Comments
 (0)