Skip to content

Commit 16460c2

Browse files
author
Tim Watson
committed
Merge bug26097 into default
2 parents 0ff47f1 + ef479bc commit 16460c2

File tree

2 files changed

+201
-29
lines changed

2 files changed

+201
-29
lines changed

projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

Lines changed: 77 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public class Subscription: IEnumerable, IEnumerator, IDisposable {
8080
protected bool m_noAck;
8181

8282
protected readonly object m_consumerLock = new object();
83+
protected readonly object m_eventLock = new object();
8384
protected volatile QueueingBasicConsumer m_consumer;
8485
protected string m_consumerTag;
8586

@@ -156,7 +157,11 @@ public void Close()
156157
}
157158

158159
if (shouldCancelConsumer) {
159-
m_model.BasicCancel(m_consumerTag);
160+
if(m_model.IsOpen)
161+
{
162+
m_model.BasicCancel(m_consumerTag);
163+
}
164+
160165
m_consumerTag = null;
161166
}
162167
} catch (OperationInterruptedException) {
@@ -169,34 +174,73 @@ public void Close()
169174
///null.</summary>
170175
public void Ack()
171176
{
172-
if (m_latestEvent != null) {
173-
Ack(m_latestEvent);
174-
}
177+
Ack(m_latestEvent);
175178
}
176179

177180
///<summary>If we are not in "noAck" mode, calls
178-
///IModel.BasicAck with the delivery-tag from the passed in
179-
///event; otherwise, sends nothing to the server. In both
180-
///cases, if the passed-in event is the same as LatestEvent
181-
///(by pointer comparison), sets LatestEvent to
182-
///null.</summary>
181+
///IModel.BasicAck with the delivery-tag from <paramref name="evt"/>;
182+
///otherwise, sends nothing to the server. if <paramref name="evt"/> is the same as LatestEvent
183+
///by pointer comparison, sets LatestEvent to null.
184+
///</summary>
183185
///<remarks>
184-
/// Make sure that this method is only called with events that
185-
/// originated from this Subscription - other usage will have
186-
/// unpredictable results.
186+
///Passing an event that did not originate with this Subscription's
187+
/// channel, will lead to unpredictable behaviour
187188
///</remarks>
188189
public void Ack(BasicDeliverEventArgs evt)
189190
{
190191
if (evt == null) {
191192
return;
192193
}
193194

194-
if (!m_noAck) {
195+
if (!m_noAck && m_model.IsOpen) {
195196
m_model.BasicAck(evt.DeliveryTag, false);
196197
}
197198

198199
if (evt == m_latestEvent) {
199-
m_latestEvent = null;
200+
MutateLatestEvent(null);
201+
}
202+
}
203+
204+
///<summary>If LatestEvent is non-null, passes it to
205+
///Nack(BasicDeliverEventArgs, false, requeue). Causes LatestEvent to become
206+
///null.</summary>
207+
public void Nack(bool requeue)
208+
{
209+
Nack(m_latestEvent, false, requeue);
210+
}
211+
212+
213+
///<summary>If LatestEvent is non-null, passes it to
214+
///Nack(BasicDeliverEventArgs, multiple, requeue). Causes LatestEvent to become
215+
///null.</summary>
216+
public void Nack(bool multiple, bool requeue)
217+
{
218+
Nack(m_latestEvent, multiple, requeue);
219+
}
220+
221+
///<summary>If we are not in "noAck" mode, calls
222+
///IModel.BasicNack with the delivery-tag from <paramref name="evt"/>;
223+
///otherwise, sends nothing to the server. if <paramref name="evt"/> is the same as LatestEvent
224+
///by pointer comparison, sets LatestEvent to null.
225+
///</summary>
226+
///<remarks>
227+
///Passing an event that did not originate with this Subscription's
228+
/// channel, will lead to unpredictable behaviour
229+
///</remarks>
230+
public void Nack(BasicDeliverEventArgs evt,
231+
bool multiple,
232+
bool requeue)
233+
{
234+
if (evt == null) {
235+
return;
236+
}
237+
238+
if (!m_noAck && m_model.IsOpen) {
239+
m_model.BasicNack(evt.DeliveryTag, multiple, requeue);
240+
}
241+
242+
if (evt == m_latestEvent) {
243+
MutateLatestEvent(null);
200244
}
201245
}
202246

@@ -220,19 +264,19 @@ public void Ack(BasicDeliverEventArgs evt)
220264
///</remarks>
221265
public BasicDeliverEventArgs Next()
222266
{
267+
// Alias the pointer as otherwise it may change out
268+
// from under us by the operation of Close() from
269+
// another thread.
270+
QueueingBasicConsumer consumer = m_consumer;
223271
try {
224-
// Alias the pointer as otherwise it may change out
225-
// from under us by the operation of Close() from
226-
// another thread.
227-
QueueingBasicConsumer consumer = m_consumer;
228272
if (consumer == null || m_model.IsClosed) {
229-
// Closed!
230-
m_latestEvent = null;
273+
MutateLatestEvent(null);
231274
} else {
232-
m_latestEvent = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
275+
BasicDeliverEventArgs bdea = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
276+
MutateLatestEvent(bdea);
233277
}
234278
} catch (EndOfStreamException) {
235-
m_latestEvent = null;
279+
MutateLatestEvent(null);
236280
}
237281
return m_latestEvent;
238282
}
@@ -289,8 +333,7 @@ public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
289333
// another thread.
290334
QueueingBasicConsumer consumer = m_consumer;
291335
if (consumer == null || m_model.IsClosed) {
292-
// Closed!
293-
m_latestEvent = null;
336+
MutateLatestEvent(null);
294337
result = null;
295338
return false;
296339
} else {
@@ -299,10 +342,10 @@ public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
299342
result = null;
300343
return false;
301344
}
302-
m_latestEvent = qValue;
345+
MutateLatestEvent(qValue);
303346
}
304347
} catch (EndOfStreamException) {
305-
m_latestEvent = null;
348+
MutateLatestEvent(null);
306349
}
307350
result = m_latestEvent;
308351
return true;
@@ -369,5 +412,13 @@ void IDisposable.Dispose()
369412
{
370413
Close();
371414
}
415+
416+
protected void MutateLatestEvent(BasicDeliverEventArgs value)
417+
{
418+
lock(m_eventLock)
419+
{
420+
m_latestEvent = value;
421+
}
422+
}
372423
}
373424
}

projects/client/Unit/src/unit/TestMessagePatternsSubscription.cs

Lines changed: 124 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,18 @@
4343
using System;
4444
using System.Text;
4545
using System.Threading;
46-
using System.Diagnostics;
46+
using System.Collections.Generic;
47+
using System.Timers;
4748

49+
using RabbitMQ.Client.Exceptions;
4850
using RabbitMQ.Client.Events;
4951
using RabbitMQ.Client.MessagePatterns;
5052

51-
namespace RabbitMQ.Client.Unit {
53+
namespace RabbitMQ.Client.Unit
54+
{
5255
[TestFixture]
53-
public class TestMessagePatternsSubscription : IntegrationFixture {
56+
public class TestMessagePatternsSubscription : IntegrationFixture
57+
{
5458
UTF8Encoding enc = new UTF8Encoding();
5559

5660
[Test]
@@ -75,5 +79,122 @@ public void TestChannelClosureIsObservableOnSubscription()
7579
BasicDeliverEventArgs r3;
7680
Assert.IsFalse(sub.Next(100, out r3));
7781
}
82+
83+
[Test]
84+
public void TestSubscriptionAck()
85+
{
86+
TestSubscriptionAction((s) => s.Ack());
87+
}
88+
89+
[Test]
90+
public void TestSubscriptionNack()
91+
{
92+
TestSubscriptionAction((s) => s.Nack(false, false));
93+
}
94+
95+
[Test]
96+
public void TestConcurrentIterationAndAck()
97+
{
98+
TestConcurrentIterationWithDrainer((s) => s.Ack());
99+
}
100+
101+
[Test]
102+
public void TestConcurrentIterationAndNack()
103+
{
104+
TestConcurrentIterationWithDrainer((s) => s.Nack(false, false));
105+
}
106+
107+
protected void TestConcurrentIterationWithDrainer(SubscriptionAction act)
108+
{
109+
IDictionary<string, object> args = new Dictionary<string, object>
110+
{
111+
{"x-message-ttl", 5000}
112+
};
113+
string q = Model.QueueDeclare("", false, true, false, args);
114+
Subscription sub = new Subscription(Model, q, false);
115+
116+
PreparedQueue(q);
117+
118+
List<Thread> ts = new List<Thread>();
119+
for (int i = 0; i < 50; i++)
120+
{
121+
SubscriptionDrainer drainer = new SubscriptionDrainer(sub, act);
122+
Thread t = new Thread(drainer.Drain);
123+
ts.Add(t);
124+
t.Start();
125+
}
126+
127+
foreach(Thread t in ts)
128+
{
129+
t.Join();
130+
}
131+
}
132+
133+
private void TestSubscriptionAction(SubscriptionAction action)
134+
{
135+
Model.BasicQos(0, 1, false);
136+
string q = Model.QueueDeclare();
137+
Subscription sub = new Subscription(Model, q, false);
138+
139+
Model.BasicPublish("", q, null, enc.GetBytes("a message"));
140+
BasicDeliverEventArgs res = sub.Next();
141+
Assert.IsNotNull(res);
142+
action(sub);
143+
QueueDeclareOk ok = Model.QueueDeclarePassive(q);
144+
Assert.AreEqual(0, ok.MessageCount);
145+
}
146+
147+
protected delegate void SubscriptionAction(Subscription s);
148+
149+
protected class SubscriptionDrainer
150+
{
151+
protected Subscription m_subscription;
152+
private SubscriptionAction PostProcess { get; set; }
153+
154+
public SubscriptionDrainer(Subscription sub, SubscriptionAction op)
155+
{
156+
m_subscription = sub;
157+
PostProcess = op;
158+
}
159+
160+
public void Drain()
161+
{
162+
#pragma warning disable 0168
163+
try
164+
{
165+
for(int i = 0; i < 100; i++)
166+
{
167+
BasicDeliverEventArgs ea = m_subscription.Next();
168+
if(ea != null)
169+
{
170+
Assert.That(ea, Is.TypeOf(typeof(BasicDeliverEventArgs)));
171+
this.PostProcess(m_subscription);
172+
}
173+
else
174+
{
175+
break;
176+
}
177+
}
178+
}
179+
catch (AlreadyClosedException ace)
180+
{
181+
// expected
182+
}
183+
finally
184+
{
185+
m_subscription.Close();
186+
}
187+
#pragma warning restore
188+
189+
}
190+
}
191+
192+
private void PreparedQueue(string q)
193+
{
194+
for (int i = 0; i < 1024; i++)
195+
{
196+
Model.BasicPublish("", q, null, enc.GetBytes("a message"));
197+
}
198+
}
78199
}
79200
}

0 commit comments

Comments
 (0)