Skip to content

Commit 43fc3dd

Browse files
author
Emile Joubert
committed
Merged bug22940 into default
2 parents bb1b3a5 + 5701769 commit 43fc3dd

File tree

8 files changed

+265
-140
lines changed

8 files changed

+265
-140
lines changed

RabbitMQDotNetClient.sln

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Client.Examples.Sh
6060
EndProject
6161
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Client.Examples.PerfTest", "projects\examples\client\PerfTest\RabbitMQ.Client.Examples.PerfTest.csproj", "{6ED176D6-B789-4673-8300-CB671962FE00}"
6262
EndProject
63+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Client.Examples.Subscriber", "projects\examples\client\Subscriber\RabbitMQ.Client.Examples.Subscriber.csproj", "{F6DF1899-A038-4DBF-86D0-0DE64F2422EC}"
64+
EndProject
6365
Global
6466
GlobalSection(SolutionConfigurationPlatforms) = preSolution
6567
Debug|Any CPU = Debug|Any CPU
@@ -192,6 +194,12 @@ Global
192194
{6ED176D6-B789-4673-8300-CB671962FE00}.DebugNoTest|Any CPU.Build.0 = Debug|Any CPU
193195
{6ED176D6-B789-4673-8300-CB671962FE00}.Release|Any CPU.ActiveCfg = Release|Any CPU
194196
{6ED176D6-B789-4673-8300-CB671962FE00}.Release|Any CPU.Build.0 = Release|Any CPU
197+
{F6DF1899-A038-4DBF-86D0-0DE64F2422EC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
198+
{F6DF1899-A038-4DBF-86D0-0DE64F2422EC}.Debug|Any CPU.Build.0 = Debug|Any CPU
199+
{F6DF1899-A038-4DBF-86D0-0DE64F2422EC}.DebugNoTest|Any CPU.ActiveCfg = Debug|Any CPU
200+
{F6DF1899-A038-4DBF-86D0-0DE64F2422EC}.DebugNoTest|Any CPU.Build.0 = Debug|Any CPU
201+
{F6DF1899-A038-4DBF-86D0-0DE64F2422EC}.Release|Any CPU.ActiveCfg = Release|Any CPU
202+
{F6DF1899-A038-4DBF-86D0-0DE64F2422EC}.Release|Any CPU.Build.0 = Release|Any CPU
195203
EndGlobalSection
196204
GlobalSection(SolutionProperties) = preSolution
197205
HideSolutionNode = FALSE
@@ -219,6 +227,7 @@ Global
219227
{44D14FF0-1015-4AAA-ABCA-BF611879816C} = {78D13AD2-B1AC-442C-95C8-958D643FC40B}
220228
{8BCE15DB-C92F-4FA4-AE57-9CA73DE91EB2} = {78D13AD2-B1AC-442C-95C8-958D643FC40B}
221229
{6ED176D6-B789-4673-8300-CB671962FE00} = {78D13AD2-B1AC-442C-95C8-958D643FC40B}
230+
{F6DF1899-A038-4DBF-86D0-0DE64F2422EC} = {78D13AD2-B1AC-442C-95C8-958D643FC40B}
222231
{20E34D9F-EE72-4B55-B6FE-1D0DBE5B74CD} = {EA42A7EF-7CE6-4EDA-98EA-6675C7EF7F69}
223232
{39CA1299-1F9E-452E-AA00-4BF874944B1E} = {EA42A7EF-7CE6-4EDA-98EA-6675C7EF7F69}
224233
{201B37E1-9BFC-4A01-9760-30C5BB19CFE3} = {20E34D9F-EE72-4B55-B6FE-1D0DBE5B74CD}

docs/wikipages/data.MessagingPatterns.txt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -382,22 +382,22 @@ on class [code RabbitMQ.Client.IModel].
382382

383383
The class [code RabbitMQ.Client.MessagePatterns.Subscription]
384384
implements most of the boilerplate of receiving messages (including,
385-
in particular, broadcast events) for you, including queue and exchange
386-
declaration and queue binding, as well as consumer declaration and
387-
management. For example,
385+
in particular, broadcast events) for you, including consumer
386+
declaration and management, but excluding queue and exchange
387+
declaration and queue binding. For example,
388388

389389
@code java
390390
// "IModel ch" in scope.
391-
Subscription sub = new Subscription(ch, "price", "topic", "STOCK.IBM.#");
391+
Subscription sub = new Subscription(ch, "STOCK.IBM.#");
392392
foreach (BasicDeliverEventArgs e in sub) {
393393
// handle the message contained in e ...
394394
// ... and finally acknowledge it
395395
sub.Ack(e);
396396
}
397397

398-
will declare the relevant exchange, declare a temporary queue, bind
399-
the two together, and start a consumer on the queue using [code
400-
IModel.BasicConsume].
398+
will start a consumer on the queue using [code IModel.BasicConsume].
399+
It is assumed that the queue and any bindings have been previously
400+
declared.
401401

402402
[code Subscription.Ack()] should be called for each received event,
403403
whether or not auto-acknowledgement mode is used, because [code

projects/client/RabbitMQ.Client/src/client/messagepatterns/SimpleRpcClient.cs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,20 +87,16 @@ namespace RabbitMQ.Client.MessagePatterns {
8787
/// SimpleRpcClient client =
8888
/// new SimpleRpcClient(ch, queueName);
8989
/// client.TimeoutMilliseconds = 5000; // optional
90-
///
90+
///
9191
/// /// ... make use of the various Call() overloads
9292
/// }
9393
/// }
9494
///</code></example>
9595
///<para>
96-
/// Instances of this class do not themselves declare any
97-
/// resources (exchanges, queues or bindings). The Subscription we
98-
/// use for receiving RPC replies declares its own resources
99-
/// (usually a single queue), but if we are sending to an exchange
100-
/// other than one of the AMQP-standard mandated predefined
101-
/// exchanges, it is the user's responsibility to ensure that the
102-
/// exchange concerned exists (using IModel.ExchangeDeclare)
103-
/// before invoking Call() or Cast().
96+
/// Instances of this class declare a queue, so it is the user's
97+
/// responsibility to ensure that the exchange concerned exists
98+
/// (using IModel.ExchangeDeclare) before invoking Call() or
99+
/// Cast().
104100
///</para>
105101
///<para>
106102
/// This class implements only a few basic RPC message formats -
@@ -235,7 +231,8 @@ public void Close()
235231
protected virtual void EnsureSubscription()
236232
{
237233
if (m_subscription == null) {
238-
m_subscription = new Subscription(m_model);
234+
string queueName = m_model.QueueDeclare();
235+
m_subscription = new Subscription(m_model, queueName);
239236
}
240237
}
241238

projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

Lines changed: 5 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,6 @@ namespace RabbitMQ.Client.MessagePatterns {
7777
/// IEnumerator in, for example, a foreach loop.
7878
///</para>
7979
///<para>
80-
/// See the documentation for Bind() and for the various overloads
81-
/// of the constructor for the various styles of binding and
82-
/// subscription that are available.
83-
///</para>
84-
///<para>
8580
/// Note that if the "noAck" option is enabled (which it is by
8681
/// default), then received deliveries are automatically acked
8782
/// within the server before they are even transmitted across the
@@ -103,11 +98,8 @@ public class Subscription: IEnumerable, IEnumerator, IDisposable {
10398
protected readonly object m_consumerLock = new object();
10499
protected volatile QueueingBasicConsumer m_consumer;
105100
protected string m_consumerTag;
106-
protected volatile bool m_shouldDelete;
107101

108-
///<summary>Retrieve the queue name we have subscribed to. May
109-
///be a server-generated name, depending on how the
110-
///Subscription was constructed.</summary>
102+
///<summary>Retrieve the queue name we have subscribed to.</summary>
111103
public string QueueName { get { return m_queueName; } }
112104
///<summary>Retrieve the IBasicConsumer that is receiving the
113105
///messages from the server for us. Normally, you will not
@@ -136,147 +128,44 @@ public class Subscription: IEnumerable, IEnumerator, IDisposable {
136128
public BasicDeliverEventArgs LatestEvent { get { return m_latestEvent; } }
137129

138130
///<summary>Creates a new Subscription in "noAck" mode,
139-
///consuming from a fresh, exclusive, autodelete, anonymous
140-
///queue. The name of the queue can be retrieved using the
141-
///QueueName property of the Subscription. After creating the
142-
///queue, the queue is bound to the named exchange, using
143-
///Bind() with the given routingKey bind parameter.</summary>
144-
public Subscription(IModel model, string exchangeName,
145-
string exchangeType, string routingKey)
146-
: this(model)
147-
{
148-
Bind(exchangeName, exchangeType, routingKey);
149-
}
150-
151-
///<summary>Creates a new Subscription in "noAck" mode,
152-
///consuming from a fresh, exclusive, autodelete, anonymous
153-
///queue. The name of the queue can be retrieved using the
154-
///QueueName property of the Subscription.</summary>
155-
public Subscription(IModel model)
156-
: this(model, null) {}
157-
158-
///<summary>Creates a new Subscription in "noAck" mode,
159-
///consuming from a named queue. If the queueName parameter is
160-
///null or the empty-string, creates a fresh, exclusive,
161-
///autodelete, anonymous queue; otherwise, the queue is
162-
///declared using IModel.QueueDeclare() before
163-
///IModel.BasicConsume() is called. After declaring the queue
164-
///and starting the consumer, the queue is bound to the named
165-
///exchange, using Bind() with the given routingKey bind
166-
///parameter.</summary>
167-
public Subscription(IModel model, string queueName, string exchangeName,
168-
string exchangeType, string routingKey)
169-
: this(model, queueName)
170-
{
171-
Bind(exchangeName, exchangeType, routingKey);
172-
}
173-
174-
///<summary>Creates a new Subscription in "noAck" mode,
175-
///consuming from a named queue. If the queueName parameter is
176-
///null or the empty-string, creates a fresh, exclusive,
177-
///autodelete, anonymous queue; otherwise, the queue is
178-
///declared using IModel.QueueDeclare() before
179-
///IModel.BasicConsume() is called.</summary>
131+
///consuming from a named queue.</summary>
180132
public Subscription(IModel model, string queueName)
181133
: this(model, queueName, true) {}
182134

183135
///<summary>Creates a new Subscription, with full control over
184-
///both "noAck" mode and the name of the queue (which, if null
185-
///or the empty-string, will be a fresh, exclusive,
186-
///autodelete, anonymous queue, as for the other constructor
187-
///overloads). After declaring the queue and starting the
188-
///consumer, the queue is bound to the named exchange, using
189-
///Bind() with the given routingKey bind parameter.</summary>
190-
public Subscription(IModel model, string queueName, bool noAck,
191-
string exchangeName, string exchangeType, string routingKey)
192-
: this(model, queueName, noAck)
193-
{
194-
Bind(exchangeName, exchangeType, routingKey);
195-
}
196-
197-
///<summary>Creates a new Subscription, with full control over
198-
///both "noAck" mode and the name of the queue (which, if null
199-
///or the empty-string, will be a fresh, exclusive,
200-
///autodelete, anonymous queue, as for the other constructor
201-
///overloads).</summary>
136+
///both "noAck" mode and the name of the queue.</summary>
202137
public Subscription(IModel model, string queueName, bool noAck)
203138
{
204139
m_model = model;
205-
if (queueName == null || queueName.Equals("")) {
206-
m_queueName = m_model.QueueDeclare();
207-
m_shouldDelete = true;
208-
} else {
209-
m_queueName = m_model.QueueDeclare(queueName);
210-
m_shouldDelete = false;
211-
}
140+
m_queueName = queueName;
212141
m_consumer = new QueueingBasicConsumer(m_model);
213142
m_consumerTag = m_model.BasicConsume(m_queueName, m_noAck, null, m_consumer);
214143
m_latestEvent = null;
215144
}
216145

217146
///<summary>Closes this Subscription, cancelling the consumer
218-
///record in the server. If an anonymous, exclusive,
219-
///autodelete queue (i.e., one with a server-generated name)
220-
///was created during construction of the Subscription, this
221-
///method also deletes the created queue (which is an
222-
///optimisation: autodelete queues will be deleted when the
223-
///IModel closes in any case).</summary>
147+
///record in the server.</summary>
224148
public void Close()
225149
{
226150
try {
227151
bool shouldCancelConsumer = false;
228-
bool shouldDelete = false;
229152

230153
lock (m_consumerLock) {
231154
if (m_consumer != null) {
232155
shouldCancelConsumer = true;
233156
m_consumer = null;
234157
}
235-
236-
shouldDelete = m_shouldDelete;
237-
// We set m_shouldDelete false before attempting
238-
// the delete, because trying twice is worse than
239-
// trying once and failing.
240-
m_shouldDelete = false;
241158
}
242159

243160
if (shouldCancelConsumer) {
244161
m_model.BasicCancel(m_consumerTag);
245162
m_consumerTag = null;
246163
}
247-
248-
if (shouldDelete) {
249-
m_model.QueueDelete(m_queueName, false, false, false);
250-
}
251164
} catch (OperationInterruptedException) {
252165
// We don't mind, here.
253166
}
254167
}
255168

256-
///<summary>Causes the queue to which we have subscribed to be
257-
///bound to an exchange. Uses IModel.ExchangeDeclare and
258-
///IModel.QueueBind to (a) ensure the exchange exists, and (b)
259-
///link the exchange to our queue.</summary>
260-
///<remarks>
261-
///<para>
262-
/// This method is called by some of the overloads of the
263-
/// Subscription constructor.
264-
///</para>
265-
///<para>
266-
/// Calling Bind() multiple times to bind to multiple
267-
/// exchanges, or to bind to a single exchange more than once
268-
/// with a different routingKey, is perfectly
269-
/// acceptable. Calling Bind() twice with exactly the same
270-
/// arguments is permitted and idempotent. For details, see
271-
/// the AMQP specification.
272-
///</para>
273-
///</remarks>
274-
public void Bind(string exchangeName, string exchangeType, string routingKey)
275-
{
276-
m_model.ExchangeDeclare(exchangeName, exchangeType);
277-
m_model.QueueBind(m_queueName, exchangeName, routingKey, false, null);
278-
}
279-
280169
///<summary>If LatestEvent is non-null, passes it to
281170
///Ack(BasicDeliverEventArgs). Causes LatestEvent to become
282171
///null.</summary>

projects/examples/client/LogTail/src/examples/LogTail.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,16 @@ public static int Main(string[] args) {
8181
string routingKey = args[3];
8282

8383
ConnectionFactory cf = new ConnectionFactory();
84-
cf.Address = serverAddress;
85-
84+
cf.Address = serverAddress;
85+
8686
using (IConnection conn = cf.CreateConnection())
8787
{
8888
using (IModel ch = conn.CreateModel()) {
89-
Subscription sub;
90-
if (exchange == "") {
91-
sub = new Subscription(ch, routingKey);
92-
} else {
93-
sub = new Subscription(ch, exchange, exchangeType, routingKey);
89+
ch.QueueDeclare(routingKey);
90+
Subscription sub = new Subscription(ch, routingKey);
91+
if (exchange != "") {
92+
ch.ExchangeDeclare(exchange, exchangeType);
93+
ch.QueueBind(routingKey, exchange, routingKey, false, null);
9494
}
9595

9696
Console.WriteLine("Consumer tag: " + sub.ConsumerTag);
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3+
<!-- Warning! This file contains important customizations. Using Visual Studio to edit project's properties might break things. -->
4+
<!-- Props file -->
5+
<Import Project="$(MSBuildProjectDirectory)\..\..\..\..\Local.props" />
6+
<!-- Visual Studio generated -->
7+
<PropertyGroup>
8+
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
9+
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
10+
<ProductVersion>9.0.21022</ProductVersion>
11+
<SchemaVersion>2.0</SchemaVersion>
12+
<ProjectGuid>{F6DF1899-A038-4DBF-86D0-0DE64F2422EC}</ProjectGuid>
13+
<OutputType>Exe</OutputType>
14+
<AppDesignerFolder>properties</AppDesignerFolder>
15+
<RootNamespace>RabbitMQ.Client.Examples</RootNamespace>
16+
<AssemblyName>Subscriber</AssemblyName>
17+
<TargetFrameworkVersion>$(PropTargetFramework)</TargetFrameworkVersion>
18+
<FileAlignment>512</FileAlignment>
19+
</PropertyGroup>
20+
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
21+
<DebugSymbols>true</DebugSymbols>
22+
<DebugType>full</DebugType>
23+
<Optimize>false</Optimize>
24+
<OutputPath>build\bin\</OutputPath>
25+
<DefineConstants>DEBUG;TRACE</DefineConstants>
26+
<ErrorReport>prompt</ErrorReport>
27+
<WarningLevel>4</WarningLevel>
28+
</PropertyGroup>
29+
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
30+
<DebugType>pdbonly</DebugType>
31+
<Optimize>true</Optimize>
32+
<OutputPath>build\bin\</OutputPath>
33+
<DefineConstants>TRACE</DefineConstants>
34+
<ErrorReport>prompt</ErrorReport>
35+
<WarningLevel>4</WarningLevel>
36+
</PropertyGroup>
37+
<ItemGroup>
38+
<Reference Include="System" />
39+
<Reference Include="System.Data" />
40+
<Reference Include="System.Xml" />
41+
</ItemGroup>
42+
<ItemGroup>
43+
<Compile Include="properties\AssemblyInfo.cs" />
44+
</ItemGroup>
45+
<ItemGroup>
46+
<ProjectReference Include="..\..\..\client\RabbitMQ.Client\RabbitMQ.Client.csproj">
47+
<Project>{71713FDD-D5EC-40B2-A924-76F80AD57E12}</Project>
48+
<Name>RabbitMQ.Client</Name>
49+
</ProjectReference>
50+
</ItemGroup>
51+
<ItemGroup>
52+
<Compile Include="src\examples\Subscriber.cs" />
53+
</ItemGroup>
54+
<!-- Mono compatibility workarounds -->
55+
<PropertyGroup Condition=" '$(PropUsingMono)' == 'true'">
56+
<_DisabledWarnings>$(NoWarn)</_DisabledWarnings>
57+
</PropertyGroup>
58+
<!-- Microsoft CSharp targets -->
59+
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
60+
</Project>
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using System.Reflection;
2+
using System.Runtime.CompilerServices;
3+
using System.Runtime.InteropServices;
4+
5+
// General Information about an assembly is controlled through the following
6+
// set of attributes. Change these attribute values to modify the information
7+
// associated with an assembly.
8+
[assembly: AssemblyTitle("Subscriber")]
9+
[assembly: AssemblyDescription("SubscriberExample")]
10+
[assembly: AssemblyConfiguration("")]
11+
[assembly: AssemblyCompany("LShift Ltd.")]
12+
[assembly: AssemblyProduct("RabbitMQ.Client")]
13+
[assembly: AssemblyCopyright("Copyright © 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.")]
14+
[assembly: AssemblyTrademark("")]
15+
[assembly: AssemblyCulture("")]
16+
17+
// Setting ComVisible to false makes the types in this assembly not visible
18+
// to COM components. If you need to access a type in this assembly from
19+
// COM, set the ComVisible attribute to true on that type.
20+
[assembly: ComVisible(false)]
21+
22+
// The following GUID is for the ID of the typelib if this project is exposed to COM
23+
[assembly: Guid("12187560-4AE6-40af-A04E-43553FCA6C91")]

0 commit comments

Comments
 (0)