Skip to content

Commit b9bdfa1

Browse files
Merge pull request #129 from rabbitmq/rabbitmq-dotnet-client-80
Introduce continuation timeouts
2 parents 5551ef3 + e423d06 commit b9bdfa1

File tree

10 files changed

+184
-64
lines changed

10 files changed

+184
-64
lines changed

projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,29 @@ public class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
172172
/// </summary>
173173
public TimeSpan NetworkRecoveryInterval = TimeSpan.FromSeconds(5);
174174

175+
private TimeSpan m_handshakeContinuationTimeout = TimeSpan.FromSeconds(10);
176+
private TimeSpan m_continuationTimeout = TimeSpan.FromSeconds(20);
177+
178+
/// <summary>
179+
/// Amount of time protocol handshake operations are allowed to take before
180+
/// timing out.
181+
/// </summary>
182+
public TimeSpan HandshakeContinuationTimeout
183+
{
184+
get { return m_handshakeContinuationTimeout; }
185+
set { m_handshakeContinuationTimeout = value; }
186+
}
187+
188+
/// <summary>
189+
/// Amount of time protocol operations (e.g. <code>queue.declare</code>) are allowed to take before
190+
/// timing out.
191+
/// </summary>
192+
public TimeSpan ContinuationTimeout
193+
{
194+
get { return m_continuationTimeout; }
195+
set { m_continuationTimeout = value; }
196+
}
197+
175198
/// <summary>
176199
/// The port to connect on. <see cref="AmqpTcpEndpoint.UseDefaultPort"/>
177200
/// indicates the default for the protocol should be used.

projects/client/RabbitMQ.Client/src/client/api/IConnectionFactory.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,5 +110,17 @@ public interface IConnectionFactory
110110
/// What task scheduler should consumer dispatcher use.
111111
/// </summary>
112112
TaskScheduler TaskScheduler { get; set; }
113+
114+
/// <summary>
115+
/// Amount of time protocol handshake operations are allowed to take before
116+
/// timing out.
117+
/// </summary>
118+
TimeSpan HandshakeContinuationTimeout { get; set; }
119+
120+
/// <summary>
121+
/// Amount of time protocol operations (e.g. <code>queue.declare</code>) are allowed to take before
122+
/// timing out.
123+
/// </summary>
124+
TimeSpan ContinuationTimeout { get; set; }
113125
}
114126
}

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,5 +611,11 @@ void QueueDeclareNoWait(string queue, bool durable, bool exclusive,
611611
/// </remarks>
612612
[AmqpMethodDoNotImplement(null)]
613613
void WaitForConfirmsOrDie(TimeSpan timeout);
614+
615+
/// <summary>
616+
/// Amount of time protocol operations (e.g. <code>queue.declare</code>) are allowed to take before
617+
/// timing out.
618+
/// </summary>
619+
TimeSpan ContinuationTimeout { get; set; }
614620
}
615621
}

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringModel.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ public IConsumerDispatcher ConsumerDispatcher
7777
get { return m_delegate.ConsumerDispatcher; }
7878
}
7979

80+
public TimeSpan ContinuationTimeout
81+
{
82+
get { return m_delegate.ContinuationTimeout; }
83+
set { m_delegate.ContinuationTimeout = value; }
84+
}
85+
8086
public AutorecoveringModel(AutorecoveringConnection conn, RecoveryAwareModel _delegate)
8187
{
8288
m_connection = conn;

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,6 @@ public class Connection : IConnection
7070
///<summary>Heartbeat frame for transmission. Reusable across connections.</summary>
7171
public readonly Frame m_heartbeatFrame = new Frame(Constants.FrameHeartbeat, 0, new byte[0]);
7272

73-
///<summary>Timeout used while waiting for AMQP handshaking to
74-
///complete (milliseconds)</summary>
75-
public const int HandshakeTimeout = 10000;
76-
7773
public ManualResetEvent m_appContinuation = new ManualResetEvent(false);
7874
public EventHandler<CallbackExceptionEventArgs> m_callbackException;
7975

@@ -1204,6 +1200,7 @@ public IModel CreateModel()
12041200
EnsureIsOpen();
12051201
ISession session = CreateSession();
12061202
var model = (IFullModel)Protocol.CreateModel(session, this.ConsumerWorkService);
1203+
model.ContinuationTimeout = m_factory.ContinuationTimeout;
12071204
model._Private_ChannelOpen("");
12081205
return model;
12091206
}
@@ -1253,7 +1250,8 @@ protected void StartAndTune()
12531250
m_inConnectionNegotiation = true;
12541251
var connectionStartCell = new BlockingCell();
12551252
m_model0.m_connectionStartCell = connectionStartCell;
1256-
m_frameHandler.Timeout = HandshakeTimeout;
1253+
m_model0.HandshakeContinuationTimeout = m_factory.HandshakeContinuationTimeout;
1254+
m_frameHandler.Timeout = m_factory.HandshakeContinuationTimeout.Milliseconds;
12571255
m_frameHandler.SendHeader();
12581256

12591257
var connectionStart = (ConnectionStartDetails)

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ public abstract class ModelBase : IFullModel, IRecoverable
6363
///sequence. See <see cref="Connection.Open"/> </summary>
6464
public BlockingCell m_connectionStartCell = null;
6565

66+
private TimeSpan m_handshakeContinuationTimeout = TimeSpan.FromSeconds(10);
67+
private TimeSpan m_continuationTimeout = TimeSpan.FromSeconds(20);
68+
6669
public RpcContinuationQueue m_continuationQueue = new RpcContinuationQueue();
6770
public ManualResetEvent m_flowControlBlock = new ManualResetEvent(true);
6871

@@ -106,6 +109,18 @@ protected void Initialise(ISession session)
106109
Session.SessionShutdown += OnSessionShutdown;
107110
}
108111

112+
public TimeSpan HandshakeContinuationTimeout
113+
{
114+
get { return m_handshakeContinuationTimeout; }
115+
set { m_handshakeContinuationTimeout = value; }
116+
}
117+
118+
public TimeSpan ContinuationTimeout
119+
{
120+
get { return m_continuationTimeout; }
121+
set { m_continuationTimeout = value; }
122+
}
123+
109124
public event EventHandler<BasicAckEventArgs> BasicAcks
110125
{
111126
add
@@ -300,8 +315,8 @@ public void Close(ShutdownEventArgs reason, bool abort)
300315
if (SetCloseReason(reason))
301316
{
302317
_Private_ChannelClose(reason.ReplyCode, reason.ReplyText, 0, 0);
303-
}
304-
k.Wait();
318+
}
319+
k.Wait(TimeSpan.FromMilliseconds(10000));
305320
ConsumerDispatcher.Shutdown(this);
306321
}
307322
catch (AlreadyClosedException)
@@ -343,7 +358,7 @@ public string ConnectionOpen(string virtualHost,
343358
// which is a much more suitable exception before connection
344359
// negotiation finishes
345360
}
346-
k.GetReply();
361+
k.GetReply(HandshakeContinuationTimeout);
347362
return k.m_knownHosts;
348363
}
349364

@@ -361,7 +376,7 @@ public ConnectionSecureOrTune ConnectionSecureOk(byte[] response)
361376
// which is a much more suitable exception before connection
362377
// negotiation finishes
363378
}
364-
k.GetReply();
379+
k.GetReply(HandshakeContinuationTimeout);
365380
return k.m_result;
366381
}
367382

@@ -383,7 +398,7 @@ public ConnectionSecureOrTune ConnectionStartOk(IDictionary<string, object> clie
383398
// which is a much more suitable exception before connection
384399
// negotiation finishes
385400
}
386-
k.GetReply();
401+
k.GetReply(HandshakeContinuationTimeout);
387402
return k.m_result;
388403
}
389404

@@ -434,7 +449,7 @@ public MethodBase ModelRpc(MethodBase method, ContentHeaderBase header, byte[] b
434449
{
435450
var k = new SimpleBlockingRpcContinuation();
436451
TransmitAndEnqueue(new Command(method, header, body), k);
437-
return k.GetReply().Method;
452+
return k.GetReply(this.ContinuationTimeout).Method;
438453
}
439454

440455
public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body)
@@ -1127,7 +1142,7 @@ public void BasicCancel(string consumerTag)
11271142
Enqueue(k);
11281143

11291144
_Private_BasicCancel(consumerTag, false);
1130-
k.GetReply();
1145+
k.GetReply(this.ContinuationTimeout);
11311146
lock (m_consumers)
11321147
{
11331148
m_consumers.Remove(consumerTag);
@@ -1175,7 +1190,7 @@ public string BasicConsume(string queue,
11751190
// the RPC response, but a response is still expected.
11761191
_Private_BasicConsume(queue, consumerTag, noLocal, noAck, exclusive,
11771192
/*nowait:*/ false, arguments);
1178-
k.GetReply();
1193+
k.GetReply(this.ContinuationTimeout);
11791194
string actualConsumerTag = k.m_consumerTag;
11801195

11811196
return actualConsumerTag;
@@ -1187,7 +1202,7 @@ public BasicGetResult BasicGet(string queue,
11871202
var k = new BasicGetRpcContinuation();
11881203
Enqueue(k);
11891204
_Private_BasicGet(queue, noAck);
1190-
k.GetReply();
1205+
k.GetReply(this.ContinuationTimeout);
11911206
return k.m_result;
11921207
}
11931208

@@ -1255,7 +1270,7 @@ public void BasicRecover(bool requeue)
12551270

12561271
Enqueue(k);
12571272
_Private_BasicRecover(requeue);
1258-
k.GetReply();
1273+
k.GetReply(this.ContinuationTimeout);
12591274
}
12601275

12611276
public abstract void BasicRecoverAsync(bool requeue);
@@ -1586,7 +1601,7 @@ private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bo
15861601
var k = new QueueDeclareRpcContinuation();
15871602
Enqueue(k);
15881603
_Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments);
1589-
k.GetReply();
1604+
k.GetReply(this.ContinuationTimeout);
15901605
return k.m_result;
15911606
}
15921607

projects/client/RabbitMQ.Client/src/client/impl/ShutdownContinuation.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,5 +76,10 @@ public virtual ShutdownEventArgs Wait()
7676
{
7777
return (ShutdownEventArgs)m_cell.Value;
7878
}
79+
80+
public ShutdownEventArgs Wait(TimeSpan timeout)
81+
{
82+
return (ShutdownEventArgs)m_cell.GetValue(timeout);
83+
}
7984
}
8085
}

projects/client/RabbitMQ.Client/src/client/impl/SimpleBlockingRpcContinuation.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,31 @@ public virtual Command GetReply()
6969
}
7070
}
7171

72+
public virtual Command GetReply(TimeSpan timeout)
73+
{
74+
var result = (Either)m_cell.GetValue(timeout);
75+
switch (result.Alternative)
76+
{
77+
case EitherAlternative.Left:
78+
return (Command)result.Value;
79+
case EitherAlternative.Right:
80+
throw new OperationInterruptedException((ShutdownEventArgs)result.Value);
81+
default:
82+
ReportInvalidInvariant(result);
83+
return null;
84+
}
85+
}
86+
87+
private static void ReportInvalidInvariant(Either result)
88+
{
89+
string error = "Illegal EitherAlternative " + result.Alternative;
90+
#if !(NETFX_CORE)
91+
Trace.Fail(error);
92+
#else
93+
MetroEventSource.Log.Error(error);
94+
#endif
95+
}
96+
7297
public virtual void HandleCommand(Command cmd)
7398
{
7499
m_cell.Value = Either.Left(cmd);

projects/client/RabbitMQ.Client/src/util/BlockingCell.cs

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -108,40 +108,56 @@ public static int validatedTimeout(int timeout)
108108
/// If a value is present in the cell at the time the call is
109109
/// made, the call will return immediately. Otherwise, the
110110
/// calling thread blocks until either a value appears, or
111-
/// millisecondsTimeout milliseconds have elapsed.
111+
/// operation times out.
112112
///</para>
113113
///<para>
114-
/// Returns true in the case that the value was available
115-
/// before the timeout, in which case the out parameter
116-
/// "result" is set to the value itself.
114+
/// If no value was available before the timeout, an exception
115+
/// is thrown.
117116
///</para>
117+
///</remarks>
118+
public object GetValue(TimeSpan timeout)
119+
{
120+
lock (_lock)
121+
{
122+
if (!m_valueSet)
123+
{
124+
Monitor.Wait(_lock, timeout);
125+
if (!m_valueSet)
126+
{
127+
throw new TimeoutException();
128+
}
129+
}
130+
return m_value;
131+
}
132+
}
133+
134+
///<summary>Retrieve the cell's value, waiting for the given
135+
///timeout if no value is immediately available.</summary>
136+
///<remarks>
118137
///<para>
119-
/// If no value was available before the timeout, returns
120-
/// false, and sets "result" to null.
138+
/// If a value is present in the cell at the time the call is
139+
/// made, the call will return immediately. Otherwise, the
140+
/// calling thread blocks until either a value appears, or
141+
/// operation times out.
121142
///</para>
122143
///<para>
123-
/// A timeout of -1 (i.e. System.Threading.Timeout.Infinite)
124-
/// will be interpreted as a command to wait for an
125-
/// indefinitely long period of time for the cell's value to
126-
/// become available. See the MSDN documentation for
127-
/// System.Threading.Monitor.Wait(object,int).
144+
/// If no value was available before the timeout, an exception
145+
/// is thrown.
128146
///</para>
129147
///</remarks>
130-
public bool GetValue(int millisecondsTimeout, out object result)
148+
public object GetValue(int timeout)
131149
{
132150
lock (_lock)
133151
{
134152
if (!m_valueSet)
135153
{
136-
Monitor.Wait(_lock, validatedTimeout(millisecondsTimeout));
154+
Monitor.Wait(_lock, validatedTimeout(timeout));
137155
if (!m_valueSet)
138156
{
139-
result = null;
140-
return false;
157+
throw new TimeoutException();
141158
}
142159
}
143-
result = m_value;
144-
return true;
160+
return m_value;
145161
}
146162
}
147163
}

0 commit comments

Comments
 (0)