Skip to content

Commit 8ac481d

Browse files
committed
Merge branch 'hotfix-2.1.2'
2 parents cbbb7d9 + e44a4be commit 8ac481d

File tree

2 files changed

+22
-4
lines changed

2 files changed

+22
-4
lines changed

src/NServiceBus.RabbitMQ.Tests/When_stopping_endpoint.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ public void Should__gracefully_shutdown()
2222
var address = Address.Parse(ReceiverQueue);
2323

2424
Parallel.For(0, 2000, i =>
25-
sender.Send(new TransportMessage(), new SendOptions(address)));
25+
sender.Send(new TransportMessage(){Body = new byte[1]}, new SendOptions(address)));
2626

2727
dequeueStrategy.Start(50);
28-
Thread.Sleep(10);
28+
Thread.Sleep(1000);
2929
dequeueStrategy.Stop();
3030
connectionManager.Dispose();
3131
}

src/NServiceBus.RabbitMQ/RabbitMqDequeueStrategy.cs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public void Init(Address address, TransactionSettings transactionSettings, Func<
3636

3737
public void Start(int maximumConcurrencyLevel)
3838
{
39+
isStopping = false;
40+
3941
if (receiveOptions.DefaultPrefetchCount > 0)
4042
{
4143
actualPrefetchCount = receiveOptions.DefaultPrefetchCount;
@@ -79,16 +81,26 @@ public void Start(int maximumConcurrencyLevel)
7981
/// </summary>
8082
public void Stop()
8183
{
84+
if (isStopping)
85+
{
86+
return;
87+
}
88+
89+
isStopping = true;
90+
8291
if (tokenSource == null)
8392
{
8493
return;
8594
}
8695

8796
tokenSource.Cancel();
88-
DrainStopSemaphore();
97+
98+
WaitForThreadsToStop();
99+
100+
tokenSource = null;
89101
}
90102

91-
void DrainStopSemaphore()
103+
void WaitForThreadsToStop()
92104
{
93105
for (var index = 0; index < actualConcurrencyLevel; index++)
94106
{
@@ -114,6 +126,7 @@ void StartConsumer(string queue)
114126
{
115127
t.Exception.Handle(ex =>
116128
{
129+
Logger.Error("Failed to receive messages from " + queue,t.Exception);
117130
circuitBreaker.Failure(ex);
118131
return true;
119132
});
@@ -224,6 +237,10 @@ void ConsumeMessages(object state)
224237
{
225238
//Unable to write data to the transport connection: An existing connection was forcibly closed by the remote host.
226239
//This exception is expected because we are shutting down!
240+
if (!isStopping)
241+
{
242+
throw;
243+
}
227244
}
228245
finally
229246
{
@@ -270,6 +287,7 @@ void Purge()
270287
readonly IManageRabbitMqConnections connectionManager;
271288
readonly ReceiveOptions receiveOptions;
272289
ushort actualPrefetchCount;
290+
bool isStopping;
273291

274292

275293
class ConsumeParams

0 commit comments

Comments
 (0)