Skip to content

Commit 81b77f2

Browse files
author
Marek Majkowski
committed
bug24281 merged into default (queue-declare-ok)
2 parents aa9f3ed + 7786a40 commit 81b77f2

File tree

5 files changed

+219
-24
lines changed

5 files changed

+219
-24
lines changed

projects/client/ApigenBootstrap/RabbitMQ.Client.ApigenBootstrap.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@
6666
<Compile Include="..\RabbitMQ.Client\src\client\api\PublicationAddress.cs">
6767
<Link>src\client\api\PublicationAddress.cs</Link>
6868
</Compile>
69+
<Compile Include="..\RabbitMQ.Client\src\client\api\QueueDeclareOk.cs">
70+
<Link>src\client\api\QueueDeclareOk.cs</Link>
71+
</Compile>
6972
<Compile Include="..\RabbitMQ.Client\src\client\api\ShutdownEventArgs.cs">
7073
<Link>src\client\api\ShutdownEventArgs.cs</Link>
7174
</Compile>

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ void ExchangeUnbind(string destination,
242242
///name is the return value of this method.
243243
///</remarks>
244244
[AmqpMethodDoNotImplement(null)]
245-
string QueueDeclare();
245+
QueueDeclareOk QueueDeclare();
246246

247247
///<summary>Declare a queue passively.</summary>
248248
///<remarks>
@@ -251,11 +251,11 @@ void ExchangeUnbind(string destination,
251251
///The queue is declared passively; i.e. only check if it exists.
252252
///</remarks>
253253
[AmqpMethodDoNotImplement(null)]
254-
string QueueDeclarePassive(string queue);
254+
QueueDeclareOk QueueDeclarePassive(string queue);
255255

256256
///<summary>(Spec method) Declare a queue.</summary>
257257
[AmqpMethodDoNotImplement(null)]
258-
string QueueDeclare(string queue, bool durable, bool exclusive,
258+
QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive,
259259
bool autoDelete, IDictionary arguments);
260260

261261
///<summary>(Spec method) Bind a queue to an exchange.</summary>
@@ -663,15 +663,21 @@ void _Private_ExchangeUnbind(string destination,
663663
///<summary>Used to send a Queue.Declare method. Called by the
664664
///public declare method.</summary>
665665
[AmqpMethodMapping(null, "queue", "declare")]
666-
[return: AmqpFieldMapping(null, "queue")]
667-
string _Private_QueueDeclare(string queue,
668-
bool passive,
669-
bool durable,
670-
bool exclusive,
671-
bool autoDelete,
672-
[AmqpNowaitArgument(null)]
673-
bool nowait,
674-
IDictionary arguments);
666+
[AmqpForceOneWay]
667+
void _Private_QueueDeclare(string queue,
668+
bool passive,
669+
bool durable,
670+
bool exclusive,
671+
bool autoDelete,
672+
[AmqpNowaitArgument(null)]
673+
bool nowait,
674+
IDictionary arguments);
675+
676+
///<summary>Handle incoming Queue.DeclareOk methods. Routes the
677+
///information to a waiting Queue.DeclareOk continuation.</summary>
678+
void HandleQueueDeclareOk(string queue,
679+
uint messageCount,
680+
uint consumerCount);
675681

676682
///<summary>Used to send a Queue.Bind method. Called by the
677683
///public bind method.</summary>
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2011 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// http://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at http://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is VMware, Inc.
38+
// Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
namespace RabbitMQ.Client
42+
{
43+
public class QueueDeclareOk {
44+
public string QueueName { get; private set; }
45+
public uint MessageCount { get; private set; }
46+
public uint ConsumerCount { get; private set; }
47+
48+
public QueueDeclareOk(string queueName, uint messageCount, uint consumerCount) {
49+
this.QueueName = queueName;
50+
this.MessageCount = messageCount;
51+
this.ConsumerCount = consumerCount;
52+
}
53+
54+
public static implicit operator string(QueueDeclareOk declareOk)
55+
{
56+
return declareOk.QueueName;
57+
}
58+
}
59+
}

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -851,29 +851,54 @@ public abstract void _Private_ExchangeUnbind(string destination,
851851

852852
//TODO: Mark these as virtual, maybe the model has an optimized way
853853
// of dealing with missing parameters.
854-
public string QueueDeclare()
854+
public QueueDeclareOk QueueDeclare()
855855
{
856856
return QueueDeclare("", false, true, true, null);
857857
}
858858

859-
public string QueueDeclarePassive(string queue)
859+
public QueueDeclareOk QueueDeclarePassive(string queue)
860860
{
861-
return _Private_QueueDeclare(queue, true, false, false, false, false, null);
861+
return QueueDeclare(queue, true, false, false, false, null);
862862
}
863863

864-
public string QueueDeclare(string queue, bool durable, bool exclusive,
864+
public QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive,
865865
bool autoDelete, IDictionary arguments)
866866
{
867-
return _Private_QueueDeclare(queue, false, durable, exclusive, autoDelete, false, arguments);
867+
return QueueDeclare(queue, false, durable, exclusive, autoDelete, arguments);
868868
}
869869

870-
public abstract string _Private_QueueDeclare(string queue,
871-
bool passive,
872-
bool durable,
873-
bool exclusive,
874-
bool autoDelete,
875-
bool nowait,
876-
IDictionary arguments);
870+
public class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation
871+
{
872+
public QueueDeclareOk m_result;
873+
public QueueDeclareRpcContinuation() { }
874+
}
875+
876+
private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bool exclusive,
877+
bool autoDelete, IDictionary arguments)
878+
{
879+
QueueDeclareRpcContinuation k = new QueueDeclareRpcContinuation();
880+
Enqueue(k);
881+
try
882+
{
883+
_Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments);
884+
}
885+
catch (AlreadyClosedException)
886+
{
887+
// Ignored, since the continuation will be told about
888+
// the closure via an OperationInterruptedException because
889+
// of the shutdown event propagation.
890+
}
891+
k.GetReply();
892+
return k.m_result;
893+
}
894+
895+
public abstract void _Private_QueueDeclare(string queue,
896+
bool passive,
897+
bool durable,
898+
bool exclusive,
899+
bool autoDelete,
900+
bool nowait,
901+
IDictionary arguments);
877902

878903
public void QueueBind(string queue,
879904
string exchange,
@@ -1473,6 +1498,17 @@ public abstract void _Private_ConnectionClose(ushort replyCode,
14731498

14741499
public abstract void _Private_ConnectionCloseOk();
14751500

1501+
public void HandleQueueDeclareOk(string queue,
1502+
uint messageCount,
1503+
uint consumerCount)
1504+
{
1505+
QueueDeclareRpcContinuation k = (QueueDeclareRpcContinuation)m_continuationQueue.Next();
1506+
k.m_result = new QueueDeclareOk(queue,
1507+
messageCount,
1508+
consumerCount);
1509+
k.HandleCommand(null); // release the continuation.
1510+
}
1511+
14761512
public override string ToString() {
14771513
return m_session.ToString();
14781514
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2011 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// http://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at http://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is VMware, Inc.
38+
// Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
using System;
42+
using NUnit.Framework;
43+
using RabbitMQ.Client.Exceptions;
44+
45+
namespace RabbitMQ.Client.Unit
46+
{
47+
[TestFixture]
48+
public class TestComplexResults : IntegrationFixture
49+
{
50+
51+
private readonly String QueueName = "declare-ok-test-queue";
52+
53+
[Test]
54+
public void TestQueueDeclareOk()
55+
{
56+
QueueDeclareOk result;
57+
58+
result = QueueDeclare();
59+
Assert.AreEqual(0, result.MessageCount);
60+
Assert.AreEqual(0, result.ConsumerCount);
61+
Assert.AreEqual(QueueName, result.QueueName);
62+
Model.BasicPublish("", result.QueueName, null, new byte[] { });
63+
64+
result = QueueDeclare();
65+
Assert.AreEqual(1, result.MessageCount);
66+
Assert.AreEqual(0, result.ConsumerCount);
67+
Assert.AreEqual(QueueName, result.QueueName);
68+
69+
QueueingBasicConsumer consumer = new QueueingBasicConsumer(Model);
70+
Model.BasicConsume(QueueName, true, consumer);
71+
consumer.Queue.Dequeue();
72+
73+
result = QueueDeclare();
74+
Assert.AreEqual(0, result.MessageCount);
75+
Assert.AreEqual(1, result.ConsumerCount);
76+
Assert.AreEqual(QueueName, result.QueueName);
77+
}
78+
79+
[Test]
80+
public void TestQueueDeclarePassive()
81+
{
82+
Assert.Throws(Is.TypeOf<OperationInterruptedException>(),
83+
delegate { Model.QueueDeclarePassive(QueueName); });
84+
}
85+
86+
private QueueDeclareOk QueueDeclare()
87+
{
88+
return Model.QueueDeclare(QueueName, false, true, true, null);
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)