Skip to content

Commit af21b44

Browse files
Make sure Subscription does not double-cancel its consumer
In a case when the consumer is already cancelled.
1 parent 366a15e commit af21b44

File tree

4 files changed

+94
-5
lines changed

4 files changed

+94
-5
lines changed

projects/client/RabbitMQ.Client/src/client/api/QueueingBasicConsumer.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
// Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
3939
//---------------------------------------------------------------------------
4040

41+
using System;
42+
4143
using RabbitMQ.Client;
4244
using RabbitMQ.Client.Events;
4345
using RabbitMQ.Util;
@@ -153,8 +155,8 @@ public override void HandleBasicDeliver(string consumerTag,
153155
/// </summary>
154156
public override void OnCancel()
155157
{
156-
Queue.Close();
157158
base.OnCancel();
159+
Queue.Close();
158160
}
159161
}
160162
}

projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public Subscription(IModel model, string queueName, bool noAck)
8989
NoAck = noAck;
9090
m_consumer = new QueueingBasicConsumer(Model);
9191
ConsumerTag = Model.BasicConsume(QueueName, NoAck, m_consumer);
92+
m_consumer.ConsumerCancelled += HandleConsumerCancelled;
9293
LatestEvent = null;
9394
}
9495

@@ -100,6 +101,7 @@ public Subscription(IModel model, string queueName, bool noAck, string consumerT
100101
QueueName = queueName;
101102
NoAck = noAck;
102103
m_consumer = new QueueingBasicConsumer(Model);
104+
m_consumer.ConsumerCancelled += HandleConsumerCancelled;
103105
ConsumerTag = Model.BasicConsume(QueueName, NoAck, consumerTag, m_consumer);
104106
LatestEvent = null;
105107
}
@@ -206,10 +208,9 @@ public void Close()
206208
try
207209
{
208210
bool shouldCancelConsumer = false;
209-
210211
if (m_consumer != null)
211212
{
212-
shouldCancelConsumer = true;
213+
shouldCancelConsumer = m_consumer.IsRunning;
213214
m_consumer = null;
214215
}
215216

@@ -439,5 +440,14 @@ protected void MutateLatestEvent(BasicDeliverEventArgs value)
439440
LatestEvent = value;
440441
}
441442
}
443+
444+
private void HandleConsumerCancelled(object sender, ConsumerEventArgs e)
445+
{
446+
lock (m_eventLock)
447+
{
448+
m_consumer = null;
449+
MutateLatestEvent(null);
450+
}
451+
}
442452
}
443453
}

projects/client/Unit/RabbitMQ.Client.Unit.csproj

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<?xml version="1.0" encoding="utf-8"?>
1+
<?xml version="1.0" encoding="utf-8"?>
22
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
33
<!-- Warning! This file contains important customizations. Using Visual Studio to edit project's properties might break things. -->
44
<!-- Props file -->
@@ -124,6 +124,7 @@
124124
<Compile Include="src\unit\TestSharedQueue.cs" />
125125
<Compile Include="src\unit\TestSsl.cs" />
126126
<Compile Include="src\unit\TestStreamWireFormatting.cs" />
127+
<Compile Include="src\unit\TestSubscription.cs" />
127128
<Compile Include="src\unit\WireFormattingFixture.cs" />
128129
</ItemGroup>
129130
<ItemGroup>
@@ -143,4 +144,4 @@
143144
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
144145
<!-- Custom BeforeClean -->
145146
<Target Name="BeforeClean" DependsOnTargets="CleanTestResults" />
146-
</Project>
147+
</Project>
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2014 GoPivotal, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// http://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at http://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is GoPivotal, Inc.
38+
// Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
using System;
42+
using System.Threading;
43+
using NUnit.Framework;
44+
45+
using RabbitMQ.Client.MessagePatterns;
46+
47+
namespace RabbitMQ.Client.Unit
48+
{
49+
[TestFixture]
50+
class TestSubscription : IntegrationFixture
51+
{
52+
[SetUp]
53+
public override void Init()
54+
{
55+
var connFactory = new ConnectionFactory()
56+
{
57+
AutomaticRecoveryEnabled = false
58+
};
59+
Conn = connFactory.CreateConnection();
60+
Model = Conn.CreateModel();
61+
}
62+
63+
[Test, Timeout(5000)]
64+
public void TestConsumerCancellationNotification()
65+
{
66+
var q = Guid.NewGuid().ToString();
67+
this.Model.QueueDeclare(queue: q, durable: false, exclusive: false, autoDelete: false, arguments: null);
68+
var sub = new Subscription(this.Model, q);
69+
this.Model.QueueDelete(q);
70+
sub.Consumer.ConsumerCancelled += (_sender, _args) =>
71+
{
72+
sub.Close();
73+
};
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)