Skip to content

Commit 5ebd32a

Browse files
Merge pull request #173 from rabbitmq/rabbitmq-dotnet-client-13
Deprecate QueueingBasicConsumer and reduce its use by other components
2 parents 52d55e6 + ae1ceea commit 5ebd32a

File tree

4 files changed

+98
-36
lines changed

4 files changed

+98
-36
lines changed

projects/client/RabbitMQ.Client/src/client/api/DefaultBasicConsumer.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,8 @@ namespace RabbitMQ.Client
5050
/// </summary>
5151
/// <remarks>
5252
/// Note that the "Handle*" methods run in the connection's thread!
53-
/// Consider using <see cref="QueueingBasicConsumer"/>, which uses a
54-
/// <see cref="SharedQueue"/> instance to safely pass received messages across
55-
/// to user threads, or RabbitMQ.Client.MessagePatterns.Subscription,
56-
/// which manages resource declaration and binding in addition to providing a
57-
/// thread-safe interface.
53+
/// Consider using <see cref="EventingBasicConsumer"/>, which exposes
54+
/// events that can be subscribed to consumer messages.
5855
/// </remarks>
5956
public class DefaultBasicConsumer : IBasicConsumer
6057
{

projects/client/RabbitMQ.Client/src/client/api/QueueingBasicConsumer.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ namespace RabbitMQ.Client
9090
/// }
9191
/// </code></example>
9292
/// </remarks>
93+
[Obsolete("Deprecated. Use EventingBasicConsumer or a different consumer interface implementation instead")]
9394
public class QueueingBasicConsumer : DefaultBasicConsumer, IQueueingBasicConsumer
9495
{
9596
/// <summary>

projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

100644100755
Lines changed: 80 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@
4040

4141
using System;
4242
using System.Collections;
43+
using System.Collections.Concurrent;
4344
using System.IO;
4445

45-
#if NETFX_CORE || NET4 // For Windows 8 Store, but could be .NET 4.0 and greater
46+
using System.Threading;
4647
using System.Threading.Tasks;
47-
#endif
4848

4949
using RabbitMQ.Client.Events;
5050
using RabbitMQ.Client.Exceptions;
@@ -59,7 +59,7 @@ namespace RabbitMQ.Client.MessagePatterns
5959
///</para>
6060
///<para>
6161
/// Once created, the Subscription consumes from a queue (using a
62-
/// QueueingBasicConsumer). Received deliveries can be retrieved
62+
/// EventingBasicConsumer). Received deliveries can be retrieved
6363
/// by calling Next(), or by using the Subscription as an
6464
/// IEnumerator in, for example, a foreach loop.
6565
///</para>
@@ -76,8 +76,16 @@ namespace RabbitMQ.Client.MessagePatterns
7676
public class Subscription : ISubscription
7777
{
7878
protected readonly object m_eventLock = new object();
79-
protected volatile QueueingBasicConsumer m_consumer;
79+
protected volatile EventingBasicConsumer m_consumer;
80+
private BlockingCollection<BasicDeliverEventArgs> m_queue =
81+
new BlockingCollection<BasicDeliverEventArgs>(new ConcurrentQueue<BasicDeliverEventArgs>());
82+
83+
private CancellationTokenSource m_queueCts = new CancellationTokenSource();
8084

85+
#if NETFX_CORE || NET4
86+
private ConcurrentQueue<TaskCompletionSource<BasicDeliverEventArgs>> m_waiting =
87+
new ConcurrentQueue<TaskCompletionSource<BasicDeliverEventArgs>>();
88+
#endif
8189
///<summary>Creates a new Subscription in "noAck" mode,
8290
///consuming from a named queue.</summary>
8391
public Subscription(IModel model, string queueName)
@@ -92,7 +100,12 @@ public Subscription(IModel model, string queueName, bool noAck)
92100
Model = model;
93101
QueueName = queueName;
94102
NoAck = noAck;
95-
m_consumer = new QueueingBasicConsumer(Model);
103+
m_consumer = new EventingBasicConsumer(Model);
104+
#if NETFX_CORE || NET4
105+
m_consumer.Received += (sender, args) => QueueAdd(args);
106+
#else
107+
m_consumer.Received += (sender, args) => m_queue.Add(args);
108+
#endif
96109
ConsumerTag = Model.BasicConsume(QueueName, NoAck, m_consumer);
97110
m_consumer.ConsumerCancelled += HandleConsumerCancelled;
98111
LatestEvent = null;
@@ -105,8 +118,9 @@ public Subscription(IModel model, string queueName, bool noAck, string consumerT
105118
Model = model;
106119
QueueName = queueName;
107120
NoAck = noAck;
108-
m_consumer = new QueueingBasicConsumer(Model);
121+
m_consumer = new EventingBasicConsumer(Model);
109122
m_consumer.ConsumerCancelled += HandleConsumerCancelled;
123+
m_consumer.Received += (sender, args) => m_queue.Add(args);
110124
ConsumerTag = Model.BasicConsume(QueueName, NoAck, consumerTag, m_consumer);
111125
LatestEvent = null;
112126
}
@@ -228,6 +242,17 @@ public void Close()
228242

229243
ConsumerTag = null;
230244
}
245+
246+
m_queueCts.Cancel(true);
247+
m_queue.Dispose();
248+
m_queue = null;
249+
#if NETFX_CORE || NET4
250+
var exn = new EndOfStreamException("Subscription closed");
251+
foreach (var tsc in m_waiting)
252+
{
253+
tsc.TrySetException(exn);
254+
}
255+
#endif
231256
}
232257
catch (OperationInterruptedException)
233258
{
@@ -301,7 +326,7 @@ public BasicDeliverEventArgs Next()
301326
// Alias the pointer as otherwise it may change out
302327
// from under us by the operation of Close() from
303328
// another thread.
304-
QueueingBasicConsumer consumer = m_consumer;
329+
EventingBasicConsumer consumer = m_consumer;
305330
try
306331
{
307332
if (consumer == null || Model.IsClosed)
@@ -310,7 +335,7 @@ public BasicDeliverEventArgs Next()
310335
}
311336
else
312337
{
313-
BasicDeliverEventArgs bdea = consumer.Queue.Dequeue();
338+
BasicDeliverEventArgs bdea = m_queue.Take(m_queueCts.Token);
314339
MutateLatestEvent(bdea);
315340
}
316341
}
@@ -322,31 +347,49 @@ public BasicDeliverEventArgs Next()
322347
}
323348

324349
#if NETFX_CORE || NET4
325-
public async Task<BasicDeliverEventArgs> NextAsync() {
326-
try {
350+
public Task<BasicDeliverEventArgs> NextAsync()
351+
{
352+
try
353+
{
327354
// Alias the pointer as otherwise it may change out
328355
// from under us by the operation of Close() from
329356
// another thread.
330-
QueueingBasicConsumer consumer = m_consumer;
331-
if (consumer == null) {
357+
var queue = m_queue;
358+
if (queue == null || Model.IsClosed)
359+
{
332360
// Closed!
333361
MutateLatestEvent(null);
334362
}
335-
else {
336-
MutateLatestEvent(await consumer.Queue.DequeueAsync());
363+
else
364+
{
365+
BasicDeliverEventArgs evt = null;
366+
if(queue.TryTake(out evt))
367+
{
368+
MutateLatestEvent(evt);
369+
}
370+
else
371+
{
372+
var tcs = new TaskCompletionSource<BasicDeliverEventArgs>();
373+
m_waiting.Enqueue(tcs);
374+
return tcs.Task;
375+
}
337376
}
338377
}
339-
catch (AggregateException ex) {
378+
catch (AggregateException ex)
379+
{
340380
// since tasks wrap exceptions as AggregateException,
341381
// reach in and check if the EndOfStream exception is what happened
342-
if (ex.InnerException is EndOfStreamException) {
382+
if (ex.InnerException is EndOfStreamException)
383+
{
343384
MutateLatestEvent(null);
344385
}
345386
}
346-
catch (EndOfStreamException) {
387+
catch (EndOfStreamException)
388+
{
347389
MutateLatestEvent(null);
348390
}
349-
return LatestEvent;
391+
392+
return Task.FromResult(LatestEvent);
350393
}
351394
#endif
352395

@@ -401,7 +444,7 @@ public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
401444
// Alias the pointer as otherwise it may change out
402445
// from under us by the operation of Close() from
403446
// another thread.
404-
QueueingBasicConsumer consumer = m_consumer;
447+
var consumer = m_consumer;
405448
if (consumer == null || Model.IsClosed)
406449
{
407450
MutateLatestEvent(null);
@@ -411,7 +454,7 @@ public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
411454
else
412455
{
413456
BasicDeliverEventArgs qValue;
414-
if (!consumer.Queue.Dequeue(millisecondsTimeout, out qValue))
457+
if (!m_queue.TryTake(out qValue, millisecondsTimeout))
415458
{
416459
result = null;
417460
return false;
@@ -483,5 +526,22 @@ private void HandleConsumerCancelled(object sender, ConsumerEventArgs e)
483526
MutateLatestEvent(null);
484527
}
485528
}
529+
530+
#if NETFX_CORE || NET4
531+
private void QueueAdd(BasicDeliverEventArgs args)
532+
{
533+
//NB: as long as there are async awaiters sync callers will never be served
534+
//this is not ideal but consistent with how SharedQueue behaves
535+
TaskCompletionSource<BasicDeliverEventArgs> tsc;
536+
if(m_waiting.TryDequeue(out tsc) && tsc.TrySetResult(args))
537+
{
538+
return;
539+
}
540+
else
541+
{
542+
m_queue.Add(args);
543+
}
544+
}
545+
#endif
486546
}
487547
}

projects/wcf/RabbitMQ.ServiceModel/src/serviceModel/RabbitMQInputChannel.cs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42+
using System.Collections.Concurrent;
4243
using System.Diagnostics;
4344
using System.IO;
4445
using System.ServiceModel;
@@ -55,7 +56,9 @@ internal sealed class RabbitMQInputChannel : RabbitMQInputChannelBase
5556
private RabbitMQTransportBindingElement m_bindingElement;
5657
private MessageEncoder m_encoder;
5758
private IModel m_model;
58-
private QueueingBasicConsumer m_messageQueue;
59+
private EventingBasicConsumer m_consumer;
60+
private BlockingCollection<BasicDeliverEventArgs> m_queue =
61+
new BlockingCollection<BasicDeliverEventArgs>(new ConcurrentQueue<BasicDeliverEventArgs>());
5962

6063
public RabbitMQInputChannel(BindingContext context, IModel model, EndpointAddress address)
6164
: base(context, address)
@@ -67,21 +70,21 @@ public RabbitMQInputChannel(BindingContext context, IModel model, EndpointAddres
6770
m_encoder = encoderElem.CreateMessageEncoderFactory().Encoder;
6871
}
6972
m_model = model;
70-
m_messageQueue = null;
73+
m_consumer = null;
7174
}
7275

7376

74-
public override Message Receive(TimeSpan timeout)
77+
public override Message Receive(TimeSpan timeout) //TODO: timeout isn't used
7578
{
7679
try
7780
{
78-
BasicDeliverEventArgs msg = m_messageQueue.Queue.Dequeue() as BasicDeliverEventArgs;
81+
BasicDeliverEventArgs msg = m_queue.Take();
7982
#if VERBOSE
8083
DebugHelper.Start();
8184
#endif
8285
Message result = m_encoder.ReadMessage(new MemoryStream(msg.Body), (int)m_bindingElement.MaxReceivedMessageSize);
8386
result.Headers.To = base.LocalAddress.Uri;
84-
m_messageQueue.Model.BasicAck(msg.DeliveryTag, false);
87+
m_consumer.Model.BasicAck(msg.DeliveryTag, false);
8588
#if VERBOSE
8689
DebugHelper.Stop(" #### Message.Receive {{\n\tAction={2}, \n\tBytes={1}, \n\tTime={0}ms}}.",
8790
msg.Body.Length,
@@ -91,7 +94,7 @@ public override Message Receive(TimeSpan timeout)
9194
}
9295
catch (EndOfStreamException)
9396
{
94-
if (m_messageQueue== null || m_messageQueue.ShutdownReason != null && m_messageQueue.ShutdownReason.ReplyCode != Constants.ReplySuccess)
97+
if (m_consumer== null || m_consumer.ShutdownReason != null && m_consumer.ShutdownReason.ReplyCode != Constants.ReplySuccess)
9598
{
9699
OnFaulted();
97100
}
@@ -124,9 +127,9 @@ public override void Close(TimeSpan timeout)
124127
#if VERBOSE
125128
DebugHelper.Start();
126129
#endif
127-
if (m_messageQueue != null) {
128-
m_model.BasicCancel(m_messageQueue.ConsumerTag);
129-
m_messageQueue = null;
130+
if (m_consumer != null) {
131+
m_model.BasicCancel(m_consumer.ConsumerTag);
132+
m_consumer = null;
130133
}
131134
#if VERBOSE
132135
DebugHelper.Stop(" ## In.Channel.Close {{\n\tAddress={1}, \n\tTime={0}ms}}.", LocalAddress.Uri.PathAndQuery);
@@ -148,8 +151,9 @@ public override void Open(TimeSpan timeout)
148151
m_model.QueueBind(queue, Exchange, base.LocalAddress.Uri.PathAndQuery, null);
149152

150153
//Listen to the queue
151-
m_messageQueue = new QueueingBasicConsumer(m_model);
152-
m_model.BasicConsume(queue, false, m_messageQueue);
154+
m_consumer = new EventingBasicConsumer(m_model);
155+
m_consumer.Received += (sender, args) => m_queue.Add(args);
156+
m_model.BasicConsume(queue, false, m_consumer);
153157

154158
#if VERBOSE
155159
DebugHelper.Stop(" ## In.Channel.Open {{\n\tAddress={1}, \n\tTime={0}ms}}.", LocalAddress.Uri.PathAndQuery);

0 commit comments

Comments
 (0)