Skip to content

Commit 491170c

Browse files
committed
Permit clean shutdown of Subscriptions, by (a) returning null instead
of throwing InvalidOperationException from Next(), and (b) by locking around the decision whether or not to cancel the subscription's consumer and making m_consumer and m_shouldDelete volatile.
1 parent 8a0fd6b commit 491170c

File tree

1 file changed

+23
-14
lines changed

1 file changed

+23
-14
lines changed

src/client/messagepatterns/Subscription.cs

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,10 @@ public class Subscription: IEnumerable, IEnumerator, IDisposable {
9898
public IModel Model { get { return m_model; } }
9999

100100
protected string m_queueName;
101-
protected QueueingBasicConsumer m_consumer;
101+
protected volatile QueueingBasicConsumer m_consumer;
102102
protected string m_consumerTag;
103103
protected bool m_noAck;
104-
protected bool m_shouldDelete;
104+
protected volatile bool m_shouldDelete;
105105

106106
///<summary>Retrieve the queue name we have subscribed to. May
107107
///be a server-generated name, depending on how the
@@ -219,9 +219,18 @@ public Subscription(IModel model, string queueName, bool noAck)
219219
public void Close()
220220
{
221221
try {
222-
if (m_consumer != null) {
222+
bool shouldCancelConsumer = false;
223+
lock (this) {
224+
if (m_consumer != null) {
225+
shouldCancelConsumer = true;
226+
m_consumer = null;
227+
}
228+
}
229+
if (shouldCancelConsumer) {
223230
m_model.BasicCancel(m_consumerTag);
231+
m_consumerTag = null;
224232
}
233+
225234
if (m_shouldDelete) {
226235
m_shouldDelete = false;
227236
// We set m_shouldDelete false before attempting
@@ -232,8 +241,6 @@ public void Close()
232241
} catch (OperationInterruptedException) {
233242
// We don't mind, here.
234243
}
235-
m_consumer = null;
236-
m_consumerTag = null;
237244
}
238245

239246
///<summary>Causes the queue to which we have subscribed to be
@@ -319,9 +326,10 @@ public BasicDeliverEventArgs Next()
319326
try {
320327
if (m_consumer == null) {
321328
// Closed!
322-
throw new InvalidOperationException();
329+
m_latestEvent = null;
330+
} else {
331+
m_latestEvent = (BasicDeliverEventArgs) m_consumer.Queue.Dequeue();
323332
}
324-
m_latestEvent = (BasicDeliverEventArgs) m_consumer.Queue.Dequeue();
325333
} catch (EndOfStreamException) {
326334
m_latestEvent = null;
327335
}
@@ -377,14 +385,15 @@ public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
377385
try {
378386
if (m_consumer == null) {
379387
// Closed!
380-
throw new InvalidOperationException();
381-
}
382-
object qValue;
383-
if (!m_consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
384-
result = null;
385-
return false;
388+
m_latestEvent = null;
389+
} else {
390+
object qValue;
391+
if (!m_consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
392+
result = null;
393+
return false;
394+
}
395+
m_latestEvent = (BasicDeliverEventArgs) qValue;
386396
}
387-
m_latestEvent = (BasicDeliverEventArgs) qValue;
388397
} catch (EndOfStreamException) {
389398
m_latestEvent = null;
390399
}

0 commit comments

Comments
 (0)