Skip to content

Commit 731d813

Browse files
Synchronise Subscription methods that mutate m_lastEvent
1 parent 79d3e92 commit 731d813

File tree

2 files changed

+181
-57
lines changed

2 files changed

+181
-57
lines changed

projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

Lines changed: 85 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public class Subscription: IEnumerable, IEnumerator, IDisposable {
8080
protected bool m_noAck;
8181

8282
protected readonly object m_consumerLock = new object();
83+
protected readonly object m_eventLock = new object();
8384
protected volatile QueueingBasicConsumer m_consumer;
8485
protected string m_consumerTag;
8586

@@ -169,8 +170,11 @@ public void Close()
169170
///null.</summary>
170171
public void Ack()
171172
{
172-
if (m_latestEvent != null) {
173-
Ack(m_latestEvent);
173+
lock(m_eventLock)
174+
{
175+
if (m_latestEvent != null) {
176+
Ack(m_latestEvent);
177+
}
174178
}
175179
}
176180

@@ -185,16 +189,19 @@ public void Ack()
185189
///</remarks>
186190
public void Ack(BasicDeliverEventArgs evt)
187191
{
188-
if (evt == null) {
189-
return;
190-
}
192+
lock(m_eventLock)
193+
{
194+
if (evt == null) {
195+
return;
196+
}
191197

192-
if (!m_noAck) {
193-
m_model.BasicAck(evt.DeliveryTag, false);
194-
}
198+
if (!m_noAck) {
199+
m_model.BasicAck(evt.DeliveryTag, false);
200+
}
195201

196-
if (evt == m_latestEvent) {
197-
m_latestEvent = null;
202+
if (evt == m_latestEvent) {
203+
m_latestEvent = null;
204+
}
198205
}
199206
}
200207

@@ -203,8 +210,11 @@ public void Ack(BasicDeliverEventArgs evt)
203210
///null.</summary>
204211
public void Nack(bool requeue)
205212
{
206-
if (m_latestEvent != null) {
207-
Nack(m_latestEvent, false, requeue);
213+
lock(m_eventLock)
214+
{
215+
if (m_latestEvent != null) {
216+
Nack(m_latestEvent, false, requeue);
217+
}
208218
}
209219
}
210220

@@ -214,8 +224,11 @@ public void Nack(bool requeue)
214224
///null.</summary>
215225
public void Nack(bool multiple, bool requeue)
216226
{
217-
if (m_latestEvent != null) {
218-
Nack(m_latestEvent, multiple, requeue);
227+
lock(m_eventLock)
228+
{
229+
if (m_latestEvent != null) {
230+
Nack(m_latestEvent, multiple, requeue);
231+
}
219232
}
220233
}
221234

@@ -232,16 +245,19 @@ public void Nack(BasicDeliverEventArgs evt,
232245
bool multiple,
233246
bool requeue)
234247
{
235-
if (evt == null) {
236-
return;
237-
}
248+
lock(m_eventLock)
249+
{
250+
if (evt == null) {
251+
return;
252+
}
238253

239-
if (!m_noAck) {
240-
m_model.BasicNack(evt.DeliveryTag, multiple, requeue);
241-
}
254+
if (!m_noAck) {
255+
m_model.BasicNack(evt.DeliveryTag, multiple, requeue);
256+
}
242257

243-
if (evt == m_latestEvent) {
244-
m_latestEvent = null;
258+
if (evt == m_latestEvent) {
259+
m_latestEvent = null;
260+
}
245261
}
246262
}
247263

@@ -265,21 +281,24 @@ public void Nack(BasicDeliverEventArgs evt,
265281
///</remarks>
266282
public BasicDeliverEventArgs Next()
267283
{
268-
try {
269-
// Alias the pointer as otherwise it may change out
270-
// from under us by the operation of Close() from
271-
// another thread.
272-
QueueingBasicConsumer consumer = m_consumer;
273-
if (consumer == null || m_model.IsClosed) {
274-
// Closed!
284+
lock(m_eventLock)
285+
{
286+
try {
287+
// Alias the pointer as otherwise it may change out
288+
// from under us by the operation of Close() from
289+
// another thread.
290+
QueueingBasicConsumer consumer = m_consumer;
291+
if (consumer == null || m_model.IsClosed) {
292+
// Closed!
293+
m_latestEvent = null;
294+
} else {
295+
m_latestEvent = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
296+
}
297+
} catch (EndOfStreamException) {
275298
m_latestEvent = null;
276-
} else {
277-
m_latestEvent = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
278299
}
279-
} catch (EndOfStreamException) {
280-
m_latestEvent = null;
300+
return m_latestEvent;
281301
}
282-
return m_latestEvent;
283302
}
284303

285304
///<summary>Retrieves the next incoming delivery in our
@@ -328,29 +347,32 @@ public BasicDeliverEventArgs Next()
328347
///</remarks>
329348
public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
330349
{
331-
try {
332-
// Alias the pointer as otherwise it may change out
333-
// from under us by the operation of Close() from
334-
// another thread.
335-
QueueingBasicConsumer consumer = m_consumer;
336-
if (consumer == null || m_model.IsClosed) {
337-
// Closed!
338-
m_latestEvent = null;
339-
result = null;
340-
return false;
341-
} else {
342-
BasicDeliverEventArgs qValue;
343-
if (!consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
350+
lock(m_eventLock)
351+
{
352+
try {
353+
// Alias the pointer as otherwise it may change out
354+
// from under us by the operation of Close() from
355+
// another thread.
356+
QueueingBasicConsumer consumer = m_consumer;
357+
if (consumer == null || m_model.IsClosed) {
358+
// Closed!
359+
m_latestEvent = null;
344360
result = null;
345361
return false;
362+
} else {
363+
BasicDeliverEventArgs qValue;
364+
if (!consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
365+
result = null;
366+
return false;
367+
}
368+
m_latestEvent = qValue;
346369
}
347-
m_latestEvent = qValue;
370+
} catch (EndOfStreamException) {
371+
m_latestEvent = null;
348372
}
349-
} catch (EndOfStreamException) {
350-
m_latestEvent = null;
373+
result = m_latestEvent;
374+
return true;
351375
}
352-
result = m_latestEvent;
353-
return true;
354376
}
355377

356378
///<summary>Implementation of the IEnumerable interface, for
@@ -376,10 +398,14 @@ IEnumerator IEnumerable.GetEnumerator()
376398
///</remarks>
377399
object IEnumerator.Current {
378400
get {
379-
if (m_latestEvent == null) {
380-
throw new InvalidOperationException();
401+
lock(m_eventLock)
402+
{
403+
404+
if (m_latestEvent == null) {
405+
throw new InvalidOperationException();
406+
}
407+
return m_latestEvent;
381408
}
382-
return m_latestEvent;
383409
}
384410
}
385411

@@ -394,7 +420,10 @@ object IEnumerator.Current {
394420
///</remarks>
395421
bool IEnumerator.MoveNext()
396422
{
397-
return Next() != null;
423+
lock(m_eventLock)
424+
{
425+
return Next() != null;
426+
}
398427
}
399428

400429
///<summary>Dummy implementation of the IEnumerator interface,

projects/client/Unit/src/unit/TestMessagePatternsSubscription.cs

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@
4343
using System;
4444
using System.Text;
4545
using System.Threading;
46-
using System.Diagnostics;
4746

47+
using RabbitMQ.Client.Exceptions;
4848
using RabbitMQ.Client.Events;
4949
using RabbitMQ.Client.MessagePatterns;
5050

@@ -105,5 +105,100 @@ public void TestSubscriptionNack()
105105
QueueDeclareOk ok = Model.QueueDeclarePassive(q);
106106
Assert.AreEqual(0, ok.MessageCount);
107107
}
108+
109+
private abstract class SubscriptionDrainer
110+
{
111+
protected Subscription m_subscription;
112+
113+
public SubscriptionDrainer(Subscription sub)
114+
{
115+
m_subscription = sub;
116+
}
117+
118+
public void Drain()
119+
{
120+
bool shouldStop = false;
121+
#pragma warning disable 0168
122+
try
123+
{
124+
while(!shouldStop)
125+
{
126+
BasicDeliverEventArgs ea = m_subscription.Next();
127+
if(ea != null)
128+
{
129+
Assert.That(ea, Is.TypeOf(typeof(BasicDeliverEventArgs)));
130+
PostProcess();
131+
} else
132+
{
133+
shouldStop = true;
134+
}
135+
}
136+
} catch (AlreadyClosedException ace)
137+
{
138+
shouldStop = true;
139+
}
140+
#pragma warning restore
141+
}
142+
143+
protected abstract void PostProcess();
144+
}
145+
146+
private class AckingDrainer : SubscriptionDrainer
147+
{
148+
public AckingDrainer(Subscription sub) : base(sub) {}
149+
150+
override protected void PostProcess()
151+
{
152+
m_subscription.Ack();
153+
}
154+
}
155+
156+
private class NackingDrainer : SubscriptionDrainer
157+
{
158+
public NackingDrainer(Subscription sub) : base(sub) {}
159+
160+
override protected void PostProcess()
161+
{
162+
m_subscription.Nack(false, false);
163+
}
164+
}
165+
166+
[Test]
167+
public void TestConcurrentIterationAndAck()
168+
{
169+
string q = Model.QueueDeclare();
170+
Subscription sub = new Subscription(Model, q, false);
171+
172+
PreparedQueue(q);
173+
for (int i = 0; i < 10; i++)
174+
{
175+
SubscriptionDrainer drainer = new AckingDrainer(sub);
176+
Thread t = new Thread(drainer.Drain);
177+
t.Start();
178+
}
179+
}
180+
181+
[Test]
182+
public void TestConcurrentIterationAndNack()
183+
{
184+
string q = Model.QueueDeclare();
185+
Subscription sub = new Subscription(Model, q, false);
186+
187+
PreparedQueue(q);
188+
for (int i = 0; i < 10; i++)
189+
{
190+
SubscriptionDrainer drainer = new NackingDrainer(sub);
191+
Thread t = new Thread(drainer.Drain);
192+
t.Start();
193+
}
194+
}
195+
196+
private void PreparedQueue(string q)
197+
{
198+
for (int i = 0; i < 1024; i++)
199+
{
200+
Model.BasicPublish("", q, null, enc.GetBytes("a message"));
201+
}
202+
}
108203
}
109204
}

0 commit comments

Comments
 (0)