Skip to content

Commit 08ba300

Browse files
committed
Allow WaitForShutdown to be cancelled
1 parent 51743c0 commit 08ba300

File tree

3 files changed

+6
-10
lines changed

3 files changed

+6
-10
lines changed

projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public void Quiesce()
164164
}
165165
}
166166

167-
public async Task WaitForShutdownAsync()
167+
public async Task WaitForShutdownAsync(CancellationToken cancellationToken)
168168
{
169169
if (_disposed)
170170
{
@@ -183,7 +183,7 @@ public async Task WaitForShutdownAsync()
183183
*
184184
* await _reader.Completion.ConfigureAwait(false);
185185
*/
186-
await _worker
186+
await _worker.WaitAsync(cancellationToken)
187187
.ConfigureAwait(false);
188188
}
189189
catch (AggregateException aex)

projects/RabbitMQ.Client/ConsumerDispatching/IConsumerDispatcher.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,6 @@ ValueTask HandleBasicDeliverAsync(string consumerTag,
6464
void Quiesce();
6565

6666
Task ShutdownAsync(ShutdownEventArgs reason);
67-
Task WaitForShutdownAsync();
67+
Task WaitForShutdownAsync(CancellationToken cancellationToken);
6868
}
6969
}

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,10 @@ public Task CloseAsync(ushort replyCode, string replyText, bool abort,
208208
public async Task CloseAsync(ShutdownEventArgs args, bool abort)
209209
{
210210
CancellationToken cancellationToken = args.CancellationToken;
211-
if (IsOpen)
212-
{
213-
// Note: we really do need to try and close this channel!
214-
cancellationToken = CancellationToken.None;
215-
}
216211

217212
bool enqueued = false;
218-
var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
213+
// We should really try to clsoe the connection and therefore we don't allow this to be canceled by the user
214+
var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, IsOpen ? CancellationToken.None : cancellationToken);
219215

220216
await _rpcSemaphore.WaitAsync(k.CancellationToken)
221217
.ConfigureAwait(false);
@@ -235,7 +231,7 @@ await ModelSendAsync(in method, k.CancellationToken)
235231

236232
AssertResultIsTrue(await k);
237233

238-
await ConsumerDispatcher.WaitForShutdownAsync()
234+
await ConsumerDispatcher.WaitForShutdownAsync(cancellationToken)
239235
.ConfigureAwait(false);
240236
}
241237
catch (AlreadyClosedException)

0 commit comments

Comments
 (0)