Skip to content

Commit dc6a3f4

Browse files
committed
* Handle continuation cancellation and timeouts correctly
* Refactor repeated code
1 parent a3618c6 commit dc6a3f4

File tree

4 files changed

+151
-128
lines changed

4 files changed

+151
-128
lines changed

projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs

Lines changed: 79 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ namespace RabbitMQ.Client.Impl
4343
{
4444
internal abstract class AsyncRpcContinuation<T> : IRpcContinuation
4545
{
46+
private readonly TimeSpan _continuationTimeout;
47+
private readonly CancellationToken _rpcCancellationToken;
48+
private readonly CancellationToken _continuationTimeoutCancellationToken;
4649
private readonly CancellationTokenSource _continuationTimeoutCancellationTokenSource;
4750
private readonly CancellationTokenRegistration _continuationTimeoutCancellationTokenRegistration;
4851
private readonly CancellationTokenSource _linkedCancellationTokenSource;
@@ -53,37 +56,25 @@ internal abstract class AsyncRpcContinuation<T> : IRpcContinuation
5356

5457
public AsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken rpcCancellationToken)
5558
{
59+
_continuationTimeout = continuationTimeout;
60+
_rpcCancellationToken = rpcCancellationToken;
61+
5662
/*
5763
* Note: we can't use an ObjectPool for these because the netstandard2.0
5864
* version of CancellationTokenSource can't be reset prior to checking
5965
* in to the ObjectPool
6066
*/
6167
_continuationTimeoutCancellationTokenSource = new CancellationTokenSource(continuationTimeout);
68+
_continuationTimeoutCancellationToken = _continuationTimeoutCancellationTokenSource.Token;
6269

6370
#if NET
64-
_continuationTimeoutCancellationTokenRegistration = _continuationTimeoutCancellationTokenSource.Token.UnsafeRegister((object? state) =>
65-
{
66-
var tcs = (TaskCompletionSource<T>)state!;
67-
if (tcs.TrySetCanceled())
68-
{
69-
// Cancellation was successful, does this mean we set a TimeoutException
70-
// in the same manner as BlockingCell used to
71-
string msg = $"operation '{GetType().FullName}' timed out after {continuationTimeout}";
72-
tcs.TrySetException(new TimeoutException(msg));
73-
}
74-
}, _tcs);
71+
_continuationTimeoutCancellationTokenRegistration =
72+
_continuationTimeoutCancellationToken.UnsafeRegister(
73+
callback: HandleContinuationTimeout, state: _tcs);
7574
#else
76-
_continuationTimeoutCancellationTokenRegistration = _continuationTimeoutCancellationTokenSource.Token.Register((object state) =>
77-
{
78-
var tcs = (TaskCompletionSource<T>)state;
79-
if (tcs.TrySetCanceled())
80-
{
81-
// Cancellation was successful, does this mean we set a TimeoutException
82-
// in the same manner as BlockingCell used to
83-
string msg = $"operation '{GetType().FullName}' timed out after {continuationTimeout}";
84-
tcs.TrySetException(new TimeoutException(msg));
85-
}
86-
}, state: _tcs, useSynchronizationContext: false);
75+
_continuationTimeoutCancellationTokenRegistration =
76+
_continuationTimeoutCancellationToken.Register(
77+
callback: HandleContinuationTimeout, state: _tcs, useSynchronizationContext: false);
8778
#endif
8879

8980
_tcsConfiguredTaskAwaitable = _tcs.Task.ConfigureAwait(false);
@@ -114,9 +105,26 @@ await DoHandleCommandAsync(cmd)
114105
}
115106
catch (OperationCanceledException)
116107
{
117-
if (CancellationToken.IsCancellationRequested)
108+
if (_rpcCancellationToken.IsCancellationRequested)
109+
{
110+
#if NET
111+
_tcs.TrySetCanceled(_rpcCancellationToken);
112+
#else
113+
_tcs.TrySetCanceled();
114+
#endif
115+
}
116+
else if (_continuationTimeoutCancellationToken.IsCancellationRequested)
118117
{
119-
_tcs.SetCanceled();
118+
#if NET
119+
if (_tcs.TrySetCanceled(_continuationTimeoutCancellationToken))
120+
#else
121+
if (_tcs.TrySetCanceled())
122+
#endif
123+
{
124+
// Cancellation was successful, does this mean we set a TimeoutException
125+
// in the same manner as BlockingCell used to
126+
_tcs.TrySetException(GetTimeoutException());
127+
}
120128
}
121129
else
122130
{
@@ -125,13 +133,24 @@ await DoHandleCommandAsync(cmd)
125133
}
126134
}
127135

128-
protected abstract Task DoHandleCommandAsync(IncomingCommand cmd);
129-
130136
public virtual void HandleChannelShutdown(ShutdownEventArgs reason)
131137
{
132138
_tcs.TrySetException(new OperationInterruptedException(reason));
133139
}
134140

141+
public void Dispose()
142+
{
143+
Dispose(disposing: true);
144+
GC.SuppressFinalize(this);
145+
}
146+
147+
protected abstract Task DoHandleCommandAsync(IncomingCommand cmd);
148+
149+
protected void HandleUnexpectedCommand(IncomingCommand cmd)
150+
{
151+
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
152+
}
153+
135154
protected virtual void Dispose(bool disposing)
136155
{
137156
if (!_disposedValue)
@@ -147,10 +166,33 @@ protected virtual void Dispose(bool disposing)
147166
}
148167
}
149168

150-
public void Dispose()
169+
#if NET
170+
private void HandleContinuationTimeout(object? state, CancellationToken cancellationToken)
151171
{
152-
Dispose(disposing: true);
153-
GC.SuppressFinalize(this);
172+
var tcs = (TaskCompletionSource<T>)state!;
173+
if (tcs.TrySetCanceled(cancellationToken))
174+
{
175+
tcs.TrySetException(GetTimeoutException());
176+
}
177+
}
178+
#else
179+
private void HandleContinuationTimeout(object state)
180+
{
181+
var tcs = (TaskCompletionSource<T>)state;
182+
if (tcs.TrySetCanceled())
183+
{
184+
tcs.TrySetException(GetTimeoutException());
185+
}
186+
}
187+
#endif
188+
189+
private TimeoutException GetTimeoutException()
190+
{
191+
// TODO
192+
// Cancellation was successful, does this mean we set a TimeoutException
193+
// in the same manner as BlockingCell used to
194+
string msg = $"operation '{GetType().FullName}' timed out after {_continuationTimeout}";
195+
return new TimeoutException(msg);
154196
}
155197
}
156198

@@ -180,7 +222,7 @@ protected override Task DoHandleCommandAsync(IncomingCommand cmd)
180222
}
181223
else
182224
{
183-
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
225+
HandleUnexpectedCommand(cmd);
184226
}
185227

186228
return Task.CompletedTask;
@@ -206,7 +248,7 @@ protected override Task DoHandleCommandAsync(IncomingCommand cmd)
206248
}
207249
else
208250
{
209-
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
251+
HandleUnexpectedCommand(cmd);
210252
}
211253

212254
return Task.CompletedTask;
@@ -237,7 +279,7 @@ await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationTok
237279
}
238280
else
239281
{
240-
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
282+
HandleUnexpectedCommand(cmd);
241283
}
242284
}
243285
}
@@ -268,7 +310,7 @@ await _consumerDispatcher.HandleBasicConsumeOkAsync(_consumer, method._consumerT
268310
}
269311
else
270312
{
271-
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
313+
HandleUnexpectedCommand(cmd);
272314
}
273315
}
274316
}
@@ -310,7 +352,7 @@ protected override Task DoHandleCommandAsync(IncomingCommand cmd)
310352
}
311353
else
312354
{
313-
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
355+
HandleUnexpectedCommand(cmd);
314356
}
315357

316358
return Task.CompletedTask;
@@ -409,7 +451,7 @@ protected override Task DoHandleCommandAsync(IncomingCommand cmd)
409451
}
410452
else
411453
{
412-
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
454+
HandleUnexpectedCommand(cmd);
413455
}
414456

415457
return Task.CompletedTask;
@@ -448,7 +490,7 @@ protected override Task DoHandleCommandAsync(IncomingCommand cmd)
448490
}
449491
else
450492
{
451-
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
493+
HandleUnexpectedCommand(cmd);
452494
}
453495

454496
return Task.CompletedTask;
@@ -471,7 +513,7 @@ protected override Task DoHandleCommandAsync(IncomingCommand cmd)
471513
}
472514
else
473515
{
474-
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
516+
HandleUnexpectedCommand(cmd);
475517
}
476518

477519
return Task.CompletedTask;

0 commit comments

Comments
 (0)