Skip to content

Commit 0ab9f04

Browse files
committed
* Make quiescing thread safe
* Add warning log when processing work while quiescing
1 parent 3fff3aa commit 0ab9f04

File tree

2 files changed

+26
-8
lines changed

2 files changed

+26
-8
lines changed

projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ protected override async Task ProcessChannelAsync()
2626
{
2727
try
2828
{
29+
if (IsQuiescing)
30+
{
31+
ESLog.Warn($"processing consumer work while quiescing: {work.WorkType}");
32+
}
33+
2934
switch (work.WorkType)
3035
{
3136
case WorkType.Deliver:

projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
4444
private readonly System.Threading.Channels.ChannelWriter<WorkStruct> _writer;
4545
private readonly Task _worker;
4646
private readonly ushort _concurrency;
47-
private bool _quiesce = false;
47+
private long _isQuiescing;
4848
private bool _disposed;
4949

5050
internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)
@@ -79,15 +79,15 @@ internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)
7979
}
8080
}
8181

82-
public bool IsShutdown => _quiesce;
82+
public bool IsShutdown => IsQuiescing;
8383

8484
public ushort Concurrency => _concurrency;
8585

8686
public async ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
8787
{
8888
cancellationToken.ThrowIfCancellationRequested();
8989

90-
if (false == _disposed && false == _quiesce)
90+
if (false == _disposed && false == IsQuiescing)
9191
{
9292
try
9393
{
@@ -110,7 +110,7 @@ public async ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliver
110110
{
111111
cancellationToken.ThrowIfCancellationRequested();
112112

113-
if (false == _disposed && false == _quiesce)
113+
if (false == _disposed && false == IsQuiescing)
114114
{
115115
IAsyncBasicConsumer consumer = GetConsumerOrDefault(consumerTag);
116116
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body);
@@ -123,7 +123,7 @@ public async ValueTask HandleBasicCancelOkAsync(string consumerTag, Cancellation
123123
{
124124
cancellationToken.ThrowIfCancellationRequested();
125125

126-
if (false == _disposed && false == _quiesce)
126+
if (false == _disposed && false == IsQuiescing)
127127
{
128128
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
129129
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag);
@@ -136,7 +136,7 @@ public async ValueTask HandleBasicCancelAsync(string consumerTag, CancellationTo
136136
{
137137
cancellationToken.ThrowIfCancellationRequested();
138138

139-
if (false == _disposed && false == _quiesce)
139+
if (false == _disposed && false == IsQuiescing)
140140
{
141141
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
142142
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag);
@@ -147,7 +147,7 @@ await _writer.WriteAsync(work, cancellationToken)
147147

148148
public void Quiesce()
149149
{
150-
_quiesce = true;
150+
Interlocked.Exchange(ref _isQuiescing, 1);
151151
}
152152

153153
public async Task WaitForShutdownAsync()
@@ -157,7 +157,7 @@ public async Task WaitForShutdownAsync()
157157
return;
158158
}
159159

160-
if (_quiesce)
160+
if (IsQuiescing)
161161
{
162162
try
163163
{
@@ -193,6 +193,19 @@ await _worker
193193
}
194194
}
195195

196+
protected bool IsQuiescing
197+
{
198+
get
199+
{
200+
if (Interlocked.Read(ref _isQuiescing) == 1)
201+
{
202+
return true;
203+
}
204+
205+
return false;
206+
}
207+
}
208+
196209
protected sealed override void ShutdownConsumer(IAsyncBasicConsumer consumer, ShutdownEventArgs reason)
197210
{
198211
_writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason));

0 commit comments

Comments
 (0)