Skip to content

Commit c217b27

Browse files
More fine-grained locking, extra a method that performs synchronised writes
1 parent 642e57b commit c217b27

File tree

1 file changed

+51
-60
lines changed

1 file changed

+51
-60
lines changed

projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

Lines changed: 51 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -192,15 +192,12 @@ public void Ack(BasicDeliverEventArgs evt)
192192
return;
193193
}
194194

195-
lock(m_eventLock)
196-
{
197-
if (!m_noAck && m_model.IsOpen) {
198-
m_model.BasicAck(evt.DeliveryTag, false);
199-
}
195+
if (!m_noAck && m_model.IsOpen) {
196+
m_model.BasicAck(evt.DeliveryTag, false);
197+
}
200198

201-
if (evt == m_latestEvent) {
202-
m_latestEvent = null;
203-
}
199+
if (evt == m_latestEvent) {
200+
MutateLatestEvent(null);
204201
}
205202
}
206203

@@ -238,15 +235,12 @@ public void Nack(BasicDeliverEventArgs evt,
238235
return;
239236
}
240237

241-
lock(m_eventLock)
242-
{
243-
if (!m_noAck && m_model.IsOpen) {
244-
m_model.BasicNack(evt.DeliveryTag, multiple, requeue);
245-
}
238+
if (!m_noAck && m_model.IsOpen) {
239+
m_model.BasicNack(evt.DeliveryTag, multiple, requeue);
240+
}
246241

247-
if (evt == m_latestEvent) {
248-
m_latestEvent = null;
249-
}
242+
if (evt == m_latestEvent) {
243+
MutateLatestEvent(null);
250244
}
251245
}
252246

@@ -270,24 +264,21 @@ public void Nack(BasicDeliverEventArgs evt,
270264
///</remarks>
271265
public BasicDeliverEventArgs Next()
272266
{
273-
lock(m_eventLock)
274-
{
275-
try {
276-
// Alias the pointer as otherwise it may change out
277-
// from under us by the operation of Close() from
278-
// another thread.
279-
QueueingBasicConsumer consumer = m_consumer;
280-
if (consumer == null || m_model.IsClosed) {
281-
// Closed!
282-
m_latestEvent = null;
283-
} else {
284-
m_latestEvent = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
285-
}
286-
} catch (EndOfStreamException) {
287-
m_latestEvent = null;
267+
// Alias the pointer as otherwise it may change out
268+
// from under us by the operation of Close() from
269+
// another thread.
270+
QueueingBasicConsumer consumer = m_consumer;
271+
try {
272+
if (consumer == null || m_model.IsClosed) {
273+
MutateLatestEvent(null);
274+
} else {
275+
BasicDeliverEventArgs bdea = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
276+
MutateLatestEvent(bdea);
288277
}
289-
return m_latestEvent;
278+
} catch (EndOfStreamException) {
279+
MutateLatestEvent(null);
290280
}
281+
return m_latestEvent;
291282
}
292283

293284
///<summary>Retrieves the next incoming delivery in our
@@ -336,32 +327,28 @@ public BasicDeliverEventArgs Next()
336327
///</remarks>
337328
public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
338329
{
339-
lock(m_eventLock)
340-
{
341-
try {
342-
// Alias the pointer as otherwise it may change out
343-
// from under us by the operation of Close() from
344-
// another thread.
345-
QueueingBasicConsumer consumer = m_consumer;
346-
if (consumer == null || m_model.IsClosed) {
347-
// Closed!
348-
m_latestEvent = null;
330+
try {
331+
// Alias the pointer as otherwise it may change out
332+
// from under us by the operation of Close() from
333+
// another thread.
334+
QueueingBasicConsumer consumer = m_consumer;
335+
if (consumer == null || m_model.IsClosed) {
336+
MutateLatestEvent(null);
337+
result = null;
338+
return false;
339+
} else {
340+
BasicDeliverEventArgs qValue;
341+
if (!consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
349342
result = null;
350343
return false;
351-
} else {
352-
BasicDeliverEventArgs qValue;
353-
if (!consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
354-
result = null;
355-
return false;
356-
}
357-
m_latestEvent = qValue;
358344
}
359-
} catch (EndOfStreamException) {
360-
m_latestEvent = null;
345+
MutateLatestEvent(qValue);
361346
}
362-
result = m_latestEvent;
363-
return true;
347+
} catch (EndOfStreamException) {
348+
MutateLatestEvent(null);
364349
}
350+
result = m_latestEvent;
351+
return true;
365352
}
366353

367354
///<summary>Implementation of the IEnumerable interface, for
@@ -387,14 +374,10 @@ IEnumerator IEnumerable.GetEnumerator()
387374
///</remarks>
388375
object IEnumerator.Current {
389376
get {
390-
lock(m_eventLock)
391-
{
392-
393-
if (m_latestEvent == null) {
394-
throw new InvalidOperationException();
395-
}
396-
return m_latestEvent;
377+
if (m_latestEvent == null) {
378+
throw new InvalidOperationException();
397379
}
380+
return m_latestEvent;
398381
}
399382
}
400383

@@ -429,5 +412,13 @@ void IDisposable.Dispose()
429412
{
430413
Close();
431414
}
415+
416+
protected void MutateLatestEvent(BasicDeliverEventArgs value)
417+
{
418+
lock(m_eventLock)
419+
{
420+
m_latestEvent = value;
421+
}
422+
}
432423
}
433424
}

0 commit comments

Comments
 (0)