Skip to content

Commit 29be40f

Browse files
committed
Support for QueueDeclareFull, which gives number of messages and consumers
1 parent 94cafa8 commit 29be40f

File tree

4 files changed

+88
-18
lines changed

4 files changed

+88
-18
lines changed

projects/client/ApigenBootstrap/RabbitMQ.Client.ApigenBootstrap.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@
6666
<Compile Include="..\RabbitMQ.Client\src\client\api\PublicationAddress.cs">
6767
<Link>src\client\api\PublicationAddress.cs</Link>
6868
</Compile>
69+
<Compile Include="..\RabbitMQ.Client\src\client\api\QueueDeclareResult.cs">
70+
<Link>src\client\api\QueueDeclareResult.cs</Link>
71+
</Compile>
6972
<Compile Include="..\RabbitMQ.Client\src\client\api\ShutdownEventArgs.cs">
7073
<Link>src\client\api\ShutdownEventArgs.cs</Link>
7174
</Compile>

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,11 @@ void ExchangeUnbind(string destination,
258258
string QueueDeclare(string queue, bool durable, bool exclusive,
259259
bool autoDelete, IDictionary arguments);
260260

261+
///<summary>(Spec method) Declare a queue.</summary>
262+
[AmqpMethodDoNotImplement(null)]
263+
QueueDeclareResult QueueDeclareFull(string queue, bool passive, bool durable, bool exclusive,
264+
bool autoDelete, bool nowait, IDictionary arguments);
265+
261266
///<summary>(Spec method) Bind a queue to an exchange.</summary>
262267
[AmqpMethodDoNotImplement(null)]
263268
void QueueBind(string queue,
@@ -665,15 +670,21 @@ void _Private_ExchangeUnbind(string destination,
665670
///<summary>Used to send a Queue.Declare method. Called by the
666671
///public declare method.</summary>
667672
[AmqpMethodMapping(null, "queue", "declare")]
668-
[return: AmqpFieldMapping(null, "queue")]
669-
string _Private_QueueDeclare(string queue,
670-
bool passive,
671-
bool durable,
672-
bool exclusive,
673-
bool autoDelete,
674-
[AmqpNowaitArgument(null)]
675-
bool nowait,
676-
IDictionary arguments);
673+
[AmqpForceOneWay]
674+
void _Private_QueueDeclare(string queue,
675+
bool passive,
676+
bool durable,
677+
bool exclusive,
678+
bool autoDelete,
679+
[AmqpNowaitArgument(null)]
680+
bool nowait,
681+
IDictionary arguments);
682+
683+
///<summary>Handle incoming Queue.DeclareOk methods. Routes the
684+
///information to a waiting Queue.DeclareOk continuation.</summary>
685+
void HandleQueueDeclareOk(string queue,
686+
uint messageCount,
687+
uint consumerCount);
677688

678689
///<summary>Used to send a Queue.Bind method. Called by the
679690
///public bind method.</summary>
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
6+
namespace RabbitMQ.Client
7+
{
8+
public class QueueDeclareResult
9+
{
10+
public string Queue { get; private set; }
11+
public uint MessageCount { get; private set; }
12+
public uint ConsumerCount { get; private set; }
13+
14+
public QueueDeclareResult(string queue, uint messageCount, uint consumerCount)
15+
{
16+
this.Queue = queue;
17+
this.MessageCount = messageCount;
18+
this.ConsumerCount = consumerCount;
19+
}
20+
}
21+
}

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -858,22 +858,46 @@ public string QueueDeclare()
858858

859859
public string QueueDeclarePassive(string queue)
860860
{
861-
return _Private_QueueDeclare(queue, true, false, false, false, false, null);
861+
return QueueDeclareFull(queue, true, false, false, false, false, null).Queue;
862862
}
863863

864864
public string QueueDeclare(string queue, bool durable, bool exclusive,
865865
bool autoDelete, IDictionary arguments)
866866
{
867-
return _Private_QueueDeclare(queue, false, durable, exclusive, autoDelete, false, arguments);
867+
return QueueDeclareFull(queue, false, durable, exclusive, autoDelete, false, arguments).Queue;
868868
}
869869

870-
public abstract string _Private_QueueDeclare(string queue,
871-
bool passive,
872-
bool durable,
873-
bool exclusive,
874-
bool autoDelete,
875-
bool nowait,
876-
IDictionary arguments);
870+
public class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation
871+
{
872+
public QueueDeclareResult m_result;
873+
public QueueDeclareRpcContinuation() { }
874+
}
875+
public QueueDeclareResult QueueDeclareFull(string queue, bool passive, bool durable, bool exclusive,
876+
bool autoDelete, bool nowait, IDictionary arguments)
877+
{
878+
QueueDeclareRpcContinuation k = new QueueDeclareRpcContinuation();
879+
Enqueue(k);
880+
try
881+
{
882+
_Private_QueueDeclare(queue, false, durable, exclusive, autoDelete, false, arguments);
883+
}
884+
catch (AlreadyClosedException)
885+
{
886+
// Ignored, since the continuation will be told about
887+
// the closure via an OperationInterruptedException because
888+
// of the shutdown event propagation.
889+
}
890+
k.GetReply();
891+
return k.m_result;
892+
}
893+
894+
public abstract void _Private_QueueDeclare(string queue,
895+
bool passive,
896+
bool durable,
897+
bool exclusive,
898+
bool autoDelete,
899+
bool nowait,
900+
IDictionary arguments);
877901

878902
public void QueueBind(string queue,
879903
string exchange,
@@ -1473,6 +1497,17 @@ public abstract void _Private_ConnectionClose(ushort replyCode,
14731497

14741498
public abstract void _Private_ConnectionCloseOk();
14751499

1500+
public void HandleQueueDeclareOk(string queue,
1501+
uint messageCount,
1502+
uint consumerCount)
1503+
{
1504+
QueueDeclareRpcContinuation k = (QueueDeclareRpcContinuation)m_continuationQueue.Next();
1505+
k.m_result = new QueueDeclareResult(queue,
1506+
messageCount,
1507+
consumerCount);
1508+
k.HandleCommand(null); // release the continuation.
1509+
}
1510+
14761511
public override string ToString() {
14771512
return m_session.ToString();
14781513
}

0 commit comments

Comments
 (0)