Skip to content

Commit 38776ce

Browse files
committed
Mutex in Close() and read m_consumer only once elsewhere.
1 parent 491170c commit 38776ce

File tree

1 file changed

+30
-14
lines changed

1 file changed

+30
-14
lines changed

src/client/messagepatterns/Subscription.cs

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,11 @@ public class Subscription: IEnumerable, IEnumerator, IDisposable {
9898
public IModel Model { get { return m_model; } }
9999

100100
protected string m_queueName;
101+
protected bool m_noAck;
102+
103+
protected readonly object m_consumerLock = new object();
101104
protected volatile QueueingBasicConsumer m_consumer;
102105
protected string m_consumerTag;
103-
protected bool m_noAck;
104106
protected volatile bool m_shouldDelete;
105107

106108
///<summary>Retrieve the queue name we have subscribed to. May
@@ -127,9 +129,10 @@ public class Subscription: IEnumerable, IEnumerator, IDisposable {
127129
protected BasicDeliverEventArgs m_latestEvent;
128130

129131
///<summary>Returns the most recent value returned by Next(),
130-
///or null when either no values have been retrieved yet, or
131-
///the most recent value has already been Ack()ed. See also
132-
///the documentation for Ack().</summary>
132+
///or null when either no values have been retrieved yet, the
133+
///end of the subscription has been reached, or the most
134+
///recent value has already been Ack()ed. See also the
135+
///documentation for Ack().</summary>
133136
public BasicDeliverEventArgs LatestEvent { get { return m_latestEvent; } }
134137

135138
///<summary>Creates a new Subscription in "noAck" mode,
@@ -220,22 +223,27 @@ public void Close()
220223
{
221224
try {
222225
bool shouldCancelConsumer = false;
223-
lock (this) {
226+
bool shouldDelete = false;
227+
228+
lock (m_consumerLock) {
224229
if (m_consumer != null) {
225230
shouldCancelConsumer = true;
226231
m_consumer = null;
227232
}
233+
234+
shouldDelete = m_shouldDelete;
235+
// We set m_shouldDelete false before attempting
236+
// the delete, because trying twice is worse than
237+
// trying once and failing.
238+
m_shouldDelete = false;
228239
}
240+
229241
if (shouldCancelConsumer) {
230242
m_model.BasicCancel(m_consumerTag);
231243
m_consumerTag = null;
232244
}
233245

234-
if (m_shouldDelete) {
235-
m_shouldDelete = false;
236-
// We set m_shouldDelete false before attempting
237-
// the delete, because trying twice is worse than
238-
// trying once and failing.
246+
if (shouldDelete) {
239247
m_model.QueueDelete(m_queueName, false, false, false);
240248
}
241249
} catch (OperationInterruptedException) {
@@ -324,11 +332,15 @@ public void Ack(BasicDeliverEventArgs evt)
324332
public BasicDeliverEventArgs Next()
325333
{
326334
try {
327-
if (m_consumer == null) {
335+
// Alias the pointer as otherwise it may change out
336+
// from under us by the operation of Close() from
337+
// another thread.
338+
QueueingBasicConsumer consumer = m_consumer;
339+
if (consumer == null) {
328340
// Closed!
329341
m_latestEvent = null;
330342
} else {
331-
m_latestEvent = (BasicDeliverEventArgs) m_consumer.Queue.Dequeue();
343+
m_latestEvent = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
332344
}
333345
} catch (EndOfStreamException) {
334346
m_latestEvent = null;
@@ -383,12 +395,16 @@ public BasicDeliverEventArgs Next()
383395
public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
384396
{
385397
try {
386-
if (m_consumer == null) {
398+
// Alias the pointer as otherwise it may change out
399+
// from under us by the operation of Close() from
400+
// another thread.
401+
QueueingBasicConsumer consumer = m_consumer;
402+
if (consumer == null) {
387403
// Closed!
388404
m_latestEvent = null;
389405
} else {
390406
object qValue;
391-
if (!m_consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
407+
if (!consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
392408
result = null;
393409
return false;
394410
}

0 commit comments

Comments
 (0)