@@ -43,6 +43,9 @@ namespace RabbitMQ.Client.Impl
4343{
4444 internal abstract class AsyncRpcContinuation < T > : IRpcContinuation
4545 {
46+ private readonly TimeSpan _continuationTimeout ;
47+ private readonly CancellationToken _rpcCancellationToken ;
48+ private readonly CancellationToken _continuationTimeoutCancellationToken ;
4649 private readonly CancellationTokenSource _continuationTimeoutCancellationTokenSource ;
4750 private readonly CancellationTokenRegistration _continuationTimeoutCancellationTokenRegistration ;
4851 private readonly CancellationTokenSource _linkedCancellationTokenSource ;
@@ -53,37 +56,25 @@ internal abstract class AsyncRpcContinuation<T> : IRpcContinuation
5356
5457 public AsyncRpcContinuation ( TimeSpan continuationTimeout , CancellationToken rpcCancellationToken )
5558 {
59+ _continuationTimeout = continuationTimeout ;
60+ _rpcCancellationToken = rpcCancellationToken ;
61+
5662 /*
5763 * Note: we can't use an ObjectPool for these because the netstandard2.0
5864 * version of CancellationTokenSource can't be reset prior to checking
5965 * in to the ObjectPool
6066 */
6167 _continuationTimeoutCancellationTokenSource = new CancellationTokenSource ( continuationTimeout ) ;
68+ _continuationTimeoutCancellationToken = _continuationTimeoutCancellationTokenSource . Token ;
6269
6370#if NET
64- _continuationTimeoutCancellationTokenRegistration = _continuationTimeoutCancellationTokenSource . Token . UnsafeRegister ( ( object ? state ) =>
65- {
66- var tcs = ( TaskCompletionSource < T > ) state ! ;
67- if ( tcs . TrySetCanceled ( ) )
68- {
69- // Cancellation was successful, does this mean we set a TimeoutException
70- // in the same manner as BlockingCell used to
71- string msg = $ "operation '{ GetType ( ) . FullName } ' timed out after { continuationTimeout } ";
72- tcs . TrySetException ( new TimeoutException ( msg ) ) ;
73- }
74- } , _tcs ) ;
71+ _continuationTimeoutCancellationTokenRegistration =
72+ _continuationTimeoutCancellationToken . UnsafeRegister (
73+ callback : HandleContinuationTimeout , state : _tcs ) ;
7574#else
76- _continuationTimeoutCancellationTokenRegistration = _continuationTimeoutCancellationTokenSource . Token . Register ( ( object state ) =>
77- {
78- var tcs = ( TaskCompletionSource < T > ) state ;
79- if ( tcs . TrySetCanceled ( ) )
80- {
81- // Cancellation was successful, does this mean we set a TimeoutException
82- // in the same manner as BlockingCell used to
83- string msg = $ "operation '{ GetType ( ) . FullName } ' timed out after { continuationTimeout } ";
84- tcs . TrySetException ( new TimeoutException ( msg ) ) ;
85- }
86- } , state : _tcs , useSynchronizationContext : false ) ;
75+ _continuationTimeoutCancellationTokenRegistration =
76+ _continuationTimeoutCancellationToken . Register (
77+ callback : HandleContinuationTimeout , state : _tcs , useSynchronizationContext : false ) ;
8778#endif
8879
8980 _tcsConfiguredTaskAwaitable = _tcs . Task . ConfigureAwait ( false ) ;
@@ -114,9 +105,26 @@ await DoHandleCommandAsync(cmd)
114105 }
115106 catch ( OperationCanceledException )
116107 {
117- if ( CancellationToken . IsCancellationRequested )
108+ if ( _rpcCancellationToken . IsCancellationRequested )
109+ {
110+ #if NET
111+ _tcs . TrySetCanceled ( _rpcCancellationToken ) ;
112+ #else
113+ _tcs . TrySetCanceled ( ) ;
114+ #endif
115+ }
116+ else if ( _continuationTimeoutCancellationToken . IsCancellationRequested )
118117 {
119- _tcs . SetCanceled ( ) ;
118+ #if NET
119+ if ( _tcs . TrySetCanceled ( _continuationTimeoutCancellationToken ) )
120+ #else
121+ if ( _tcs . TrySetCanceled ( ) )
122+ #endif
123+ {
124+ // Cancellation was successful, does this mean we set a TimeoutException
125+ // in the same manner as BlockingCell used to
126+ _tcs . TrySetException ( GetTimeoutException ( ) ) ;
127+ }
120128 }
121129 else
122130 {
@@ -125,13 +133,24 @@ await DoHandleCommandAsync(cmd)
125133 }
126134 }
127135
128- protected abstract Task DoHandleCommandAsync ( IncomingCommand cmd ) ;
129-
130136 public virtual void HandleChannelShutdown ( ShutdownEventArgs reason )
131137 {
132138 _tcs . TrySetException ( new OperationInterruptedException ( reason ) ) ;
133139 }
134140
141+ public void Dispose ( )
142+ {
143+ Dispose ( disposing : true ) ;
144+ GC . SuppressFinalize ( this ) ;
145+ }
146+
147+ protected abstract Task DoHandleCommandAsync ( IncomingCommand cmd ) ;
148+
149+ protected void HandleUnexpectedCommand ( IncomingCommand cmd )
150+ {
151+ _tcs . SetException ( new InvalidOperationException ( $ "Received unexpected command of type { cmd . CommandId } !") ) ;
152+ }
153+
135154 protected virtual void Dispose ( bool disposing )
136155 {
137156 if ( ! _disposedValue )
@@ -147,10 +166,33 @@ protected virtual void Dispose(bool disposing)
147166 }
148167 }
149168
150- public void Dispose ( )
169+ #if NET
170+ private void HandleContinuationTimeout ( object ? state , CancellationToken cancellationToken )
151171 {
152- Dispose ( disposing : true ) ;
153- GC . SuppressFinalize ( this ) ;
172+ var tcs = ( TaskCompletionSource < T > ) state ! ;
173+ if ( tcs . TrySetCanceled ( cancellationToken ) )
174+ {
175+ tcs . TrySetException ( GetTimeoutException ( ) ) ;
176+ }
177+ }
178+ #else
179+ private void HandleContinuationTimeout ( object state )
180+ {
181+ var tcs = ( TaskCompletionSource < T > ) state ;
182+ if ( tcs . TrySetCanceled ( ) )
183+ {
184+ tcs . TrySetException ( GetTimeoutException ( ) ) ;
185+ }
186+ }
187+ #endif
188+
189+ private TimeoutException GetTimeoutException ( )
190+ {
191+ // TODO
192+ // Cancellation was successful, does this mean we set a TimeoutException
193+ // in the same manner as BlockingCell used to
194+ string msg = $ "operation '{ GetType ( ) . FullName } ' timed out after { _continuationTimeout } ";
195+ return new TimeoutException ( msg ) ;
154196 }
155197 }
156198
@@ -180,7 +222,7 @@ protected override Task DoHandleCommandAsync(IncomingCommand cmd)
180222 }
181223 else
182224 {
183- _tcs . SetException ( new InvalidOperationException ( $ "Received unexpected command of type { cmd . CommandId } !" ) ) ;
225+ HandleUnexpectedCommand ( cmd ) ;
184226 }
185227
186228 return Task . CompletedTask ;
@@ -206,7 +248,7 @@ protected override Task DoHandleCommandAsync(IncomingCommand cmd)
206248 }
207249 else
208250 {
209- _tcs . SetException ( new InvalidOperationException ( $ "Received unexpected command of type { cmd . CommandId } !" ) ) ;
251+ HandleUnexpectedCommand ( cmd ) ;
210252 }
211253
212254 return Task . CompletedTask ;
@@ -237,7 +279,7 @@ await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationTok
237279 }
238280 else
239281 {
240- _tcs . SetException ( new InvalidOperationException ( $ "Received unexpected command of type { cmd . CommandId } !" ) ) ;
282+ HandleUnexpectedCommand ( cmd ) ;
241283 }
242284 }
243285 }
@@ -268,7 +310,7 @@ await _consumerDispatcher.HandleBasicConsumeOkAsync(_consumer, method._consumerT
268310 }
269311 else
270312 {
271- _tcs . SetException ( new InvalidOperationException ( $ "Received unexpected command of type { cmd . CommandId } !" ) ) ;
313+ HandleUnexpectedCommand ( cmd ) ;
272314 }
273315 }
274316 }
@@ -310,7 +352,7 @@ protected override Task DoHandleCommandAsync(IncomingCommand cmd)
310352 }
311353 else
312354 {
313- _tcs . SetException ( new InvalidOperationException ( $ "Received unexpected command of type { cmd . CommandId } !" ) ) ;
355+ HandleUnexpectedCommand ( cmd ) ;
314356 }
315357
316358 return Task . CompletedTask ;
@@ -409,7 +451,7 @@ protected override Task DoHandleCommandAsync(IncomingCommand cmd)
409451 }
410452 else
411453 {
412- _tcs . SetException ( new InvalidOperationException ( $ "Received unexpected command of type { cmd . CommandId } !" ) ) ;
454+ HandleUnexpectedCommand ( cmd ) ;
413455 }
414456
415457 return Task . CompletedTask ;
@@ -448,7 +490,7 @@ protected override Task DoHandleCommandAsync(IncomingCommand cmd)
448490 }
449491 else
450492 {
451- _tcs . SetException ( new InvalidOperationException ( $ "Received unexpected command of type { cmd . CommandId } !" ) ) ;
493+ HandleUnexpectedCommand ( cmd ) ;
452494 }
453495
454496 return Task . CompletedTask ;
@@ -471,7 +513,7 @@ protected override Task DoHandleCommandAsync(IncomingCommand cmd)
471513 }
472514 else
473515 {
474- _tcs . SetException ( new InvalidOperationException ( $ "Received unexpected command of type { cmd . CommandId } !" ) ) ;
516+ HandleUnexpectedCommand ( cmd ) ;
475517 }
476518
477519 return Task . CompletedTask ;
0 commit comments