Skip to content

Commit dc68266

Browse files
author
Matthias Radestock
committed
merge bug20945 into default
2 parents 5bd997a + 362a3d8 commit dc68266

File tree

4 files changed

+278
-24
lines changed

4 files changed

+278
-24
lines changed

default.build

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,8 @@
288288
<property name="example.program" value="DeclareQueue"/><call target="one-example"/>
289289
<property name="example.program" value="AddServer"/><call target="one-example"/>
290290
<property name="example.program" value="AddClient"/><call target="one-example"/>
291+
<property name="example.program" value="ShutdownableServer"/><call target="one-example"/>
292+
<property name="example.program" value="ShutdownableClient"/><call target="one-example"/>
291293
</target>
292294

293295
<target name="build-unit" description="compile unit tests"

src/client/messagepatterns/Subscription.cs

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,12 @@ public class Subscription: IEnumerable, IEnumerator, IDisposable {
9898
public IModel Model { get { return m_model; } }
9999

100100
protected string m_queueName;
101-
protected QueueingBasicConsumer m_consumer;
102-
protected string m_consumerTag;
103101
protected bool m_noAck;
104-
protected bool m_shouldDelete;
102+
103+
protected readonly object m_consumerLock = new object();
104+
protected volatile QueueingBasicConsumer m_consumer;
105+
protected string m_consumerTag;
106+
protected volatile bool m_shouldDelete;
105107

106108
///<summary>Retrieve the queue name we have subscribed to. May
107109
///be a server-generated name, depending on how the
@@ -127,9 +129,10 @@ public class Subscription: IEnumerable, IEnumerator, IDisposable {
127129
protected BasicDeliverEventArgs m_latestEvent;
128130

129131
///<summary>Returns the most recent value returned by Next(),
130-
///or null when either no values have been retrieved yet, or
131-
///the most recent value has already been Ack()ed. See also
132-
///the documentation for Ack().</summary>
132+
///or null when either no values have been retrieved yet, the
133+
///end of the subscription has been reached, or the most
134+
///recent value has already been Ack()ed. See also the
135+
///documentation for Ack().</summary>
133136
public BasicDeliverEventArgs LatestEvent { get { return m_latestEvent; } }
134137

135138
///<summary>Creates a new Subscription in "noAck" mode,
@@ -219,21 +222,33 @@ public Subscription(IModel model, string queueName, bool noAck)
219222
public void Close()
220223
{
221224
try {
222-
if (m_consumer != null) {
223-
m_model.BasicCancel(m_consumerTag);
224-
}
225-
if (m_shouldDelete) {
226-
m_shouldDelete = false;
225+
bool shouldCancelConsumer = false;
226+
bool shouldDelete = false;
227+
228+
lock (m_consumerLock) {
229+
if (m_consumer != null) {
230+
shouldCancelConsumer = true;
231+
m_consumer = null;
232+
}
233+
234+
shouldDelete = m_shouldDelete;
227235
// We set m_shouldDelete false before attempting
228236
// the delete, because trying twice is worse than
229237
// trying once and failing.
238+
m_shouldDelete = false;
239+
}
240+
241+
if (shouldCancelConsumer) {
242+
m_model.BasicCancel(m_consumerTag);
243+
m_consumerTag = null;
244+
}
245+
246+
if (shouldDelete) {
230247
m_model.QueueDelete(m_queueName, false, false, false);
231248
}
232249
} catch (OperationInterruptedException) {
233250
// We don't mind, here.
234251
}
235-
m_consumer = null;
236-
m_consumerTag = null;
237252
}
238253

239254
///<summary>Causes the queue to which we have subscribed to be
@@ -317,11 +332,16 @@ public void Ack(BasicDeliverEventArgs evt)
317332
public BasicDeliverEventArgs Next()
318333
{
319334
try {
320-
if (m_consumer == null) {
335+
// Alias the pointer as otherwise it may change out
336+
// from under us by the operation of Close() from
337+
// another thread.
338+
QueueingBasicConsumer consumer = m_consumer;
339+
if (consumer == null) {
321340
// Closed!
322-
throw new InvalidOperationException();
341+
m_latestEvent = null;
342+
} else {
343+
m_latestEvent = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
323344
}
324-
m_latestEvent = (BasicDeliverEventArgs) m_consumer.Queue.Dequeue();
325345
} catch (EndOfStreamException) {
326346
m_latestEvent = null;
327347
}
@@ -375,16 +395,21 @@ public BasicDeliverEventArgs Next()
375395
public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
376396
{
377397
try {
378-
if (m_consumer == null) {
398+
// Alias the pointer as otherwise it may change out
399+
// from under us by the operation of Close() from
400+
// another thread.
401+
QueueingBasicConsumer consumer = m_consumer;
402+
if (consumer == null) {
379403
// Closed!
380-
throw new InvalidOperationException();
381-
}
382-
object qValue;
383-
if (!m_consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
384-
result = null;
385-
return false;
404+
m_latestEvent = null;
405+
} else {
406+
object qValue;
407+
if (!consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
408+
result = null;
409+
return false;
410+
}
411+
m_latestEvent = (BasicDeliverEventArgs) qValue;
386412
}
387-
m_latestEvent = (BasicDeliverEventArgs) qValue;
388413
} catch (EndOfStreamException) {
389414
m_latestEvent = null;
390415
}

src/examples/ShutdownableClient.cs

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2009 LShift Ltd., Cohesive Financial
8+
// Technologies LLC., and Rabbit Technologies Ltd.
9+
//
10+
// Licensed under the Apache License, Version 2.0 (the "License");
11+
// you may not use this file except in compliance with the License.
12+
// You may obtain a copy of the License at
13+
//
14+
// http://www.apache.org/licenses/LICENSE-2.0
15+
//
16+
// Unless required by applicable law or agreed to in writing, software
17+
// distributed under the License is distributed on an "AS IS" BASIS,
18+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19+
// See the License for the specific language governing permissions and
20+
// limitations under the License.
21+
//---------------------------------------------------------------------------
22+
//
23+
// The MPL v1.1:
24+
//
25+
//---------------------------------------------------------------------------
26+
// The contents of this file are subject to the Mozilla Public License
27+
// Version 1.1 (the "License"); you may not use this file except in
28+
// compliance with the License. You may obtain a copy of the License at
29+
// http://www.rabbitmq.com/mpl.html
30+
//
31+
// Software distributed under the License is distributed on an "AS IS"
32+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33+
// License for the specific language governing rights and limitations
34+
// under the License.
35+
//
36+
// The Original Code is The RabbitMQ .NET Client.
37+
//
38+
// The Initial Developers of the Original Code are LShift Ltd,
39+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
40+
//
41+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
42+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
43+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
44+
// Technologies LLC, and Rabbit Technologies Ltd.
45+
//
46+
// Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
47+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
48+
// Copyright (C) 2007-2009 Cohesive Financial Technologies
49+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
50+
// (C) 2007-2009 Rabbit Technologies Ltd.
51+
//
52+
// All Rights Reserved.
53+
//
54+
// Contributor(s): ______________________________________.
55+
//
56+
//---------------------------------------------------------------------------
57+
using System;
58+
using System.IO;
59+
using System.Text;
60+
61+
using RabbitMQ.Client;
62+
using RabbitMQ.Client.Content;
63+
using RabbitMQ.Client.MessagePatterns;
64+
65+
namespace RabbitMQ.Client.Examples {
66+
public class ShutdownableClient {
67+
public static int Main(string[] args) {
68+
try {
69+
if (args.Length < 1) {
70+
Console.Error.WriteLine("Usage: ShutdownableClient <hostname>[:<portnumber>] [<secondsdelay>]");
71+
Console.Error.WriteLine("RabbitMQ .NET client version "+typeof(IModel).Assembly.GetName().Version.ToString());
72+
return 1;
73+
}
74+
75+
using (IConnection conn = new ConnectionFactory().CreateConnection(args[0])) {
76+
using (IModel ch = conn.CreateModel()) {
77+
object[] callArgs = new object[1];
78+
if (args.Length > 1) {
79+
callArgs[0] = double.Parse(args[1]);
80+
} else {
81+
callArgs[0] = (double) 0.0;
82+
}
83+
84+
SimpleRpcClient client = new SimpleRpcClient(ch, "ShutdownableServer");
85+
client.TimeoutMilliseconds = 5000;
86+
client.TimedOut += new EventHandler(TimedOutHandler);
87+
client.Disconnected += new EventHandler(DisconnectedHandler);
88+
object[] reply = client.Call(callArgs);
89+
if (reply == null) {
90+
Console.WriteLine("Timeout or disconnection.");
91+
} else {
92+
Console.WriteLine("Reply: {0}", reply[0]);
93+
}
94+
}
95+
}
96+
return 0;
97+
} catch (Exception e) {
98+
Console.Error.WriteLine(e);
99+
return 2;
100+
}
101+
}
102+
103+
public static void TimedOutHandler(object sender, EventArgs e) {
104+
Console.WriteLine("Timed out.");
105+
}
106+
107+
public static void DisconnectedHandler(object sender, EventArgs e) {
108+
Console.WriteLine("Disconnected.");
109+
}
110+
}
111+
}

src/examples/ShutdownableServer.cs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2009 LShift Ltd., Cohesive Financial
8+
// Technologies LLC., and Rabbit Technologies Ltd.
9+
//
10+
// Licensed under the Apache License, Version 2.0 (the "License");
11+
// you may not use this file except in compliance with the License.
12+
// You may obtain a copy of the License at
13+
//
14+
// http://www.apache.org/licenses/LICENSE-2.0
15+
//
16+
// Unless required by applicable law or agreed to in writing, software
17+
// distributed under the License is distributed on an "AS IS" BASIS,
18+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19+
// See the License for the specific language governing permissions and
20+
// limitations under the License.
21+
//---------------------------------------------------------------------------
22+
//
23+
// The MPL v1.1:
24+
//
25+
//---------------------------------------------------------------------------
26+
// The contents of this file are subject to the Mozilla Public License
27+
// Version 1.1 (the "License"); you may not use this file except in
28+
// compliance with the License. You may obtain a copy of the License at
29+
// http://www.rabbitmq.com/mpl.html
30+
//
31+
// Software distributed under the License is distributed on an "AS IS"
32+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33+
// License for the specific language governing rights and limitations
34+
// under the License.
35+
//
36+
// The Original Code is The RabbitMQ .NET Client.
37+
//
38+
// The Initial Developers of the Original Code are LShift Ltd,
39+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
40+
//
41+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
42+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
43+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
44+
// Technologies LLC, and Rabbit Technologies Ltd.
45+
//
46+
// Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
47+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
48+
// Copyright (C) 2007-2009 Cohesive Financial Technologies
49+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
50+
// (C) 2007-2009 Rabbit Technologies Ltd.
51+
//
52+
// All Rights Reserved.
53+
//
54+
// Contributor(s): ______________________________________.
55+
//
56+
//---------------------------------------------------------------------------
57+
using System;
58+
using System.Timers;
59+
60+
using RabbitMQ.Client;
61+
using RabbitMQ.Client.Content;
62+
using RabbitMQ.Client.MessagePatterns;
63+
using RabbitMQ.Util;
64+
65+
namespace RabbitMQ.Client.Examples {
66+
public class ShutdownableServer: SimpleRpcServer {
67+
public static int Main(string[] args) {
68+
try {
69+
if (args.Length < 1) {
70+
Console.Error.WriteLine("Usage: ShutdownableServer <hostname>[:<portnumber>]");
71+
Console.Error.WriteLine("RabbitMQ .NET client version "+typeof(IModel).Assembly.GetName().Version.ToString());
72+
return 1;
73+
}
74+
75+
using (IConnection conn = new ConnectionFactory().CreateConnection(args[0])) {
76+
using (IModel ch = conn.CreateModel()) {
77+
Subscription sub = new Subscription(ch, "ShutdownableServer");
78+
new ShutdownableServer(sub).MainLoop();
79+
Console.Out.WriteLine("Returned from MainLoop.");
80+
}
81+
}
82+
Console.Out.WriteLine("Leaving the program.");
83+
return 0;
84+
} catch (Exception e) {
85+
Console.Error.WriteLine(e);
86+
return 2;
87+
}
88+
}
89+
90+
public ShutdownableServer(Subscription sub): base(sub) {}
91+
92+
public override void HandleStreamMessageCall(IStreamMessageBuilder replyWriter,
93+
bool isRedelivered,
94+
IBasicProperties requestProperties,
95+
object[] args)
96+
{
97+
Console.Out.WriteLine("ShutdownableServer received a {0} request.",
98+
isRedelivered ? "redelivered" : "new");
99+
if ((double) args[0] == 0) {
100+
Console.Out.WriteLine("Shutting down immediately.");
101+
Close();
102+
replyWriter.WriteObject("Shut down immediately");
103+
} else {
104+
Timer t = new Timer((int) (((double) args[0]) * 1000));
105+
t.Elapsed += new ElapsedEventHandler(OnTimeout);
106+
t.Enabled = true;
107+
replyWriter.WriteObject("Will shut down in " + args[0] + " seconds");
108+
}
109+
}
110+
111+
private void OnTimeout(object source, ElapsedEventArgs e) {
112+
Console.Out.WriteLine("Delayed shutdown happening now.");
113+
Close();
114+
}
115+
}
116+
}

0 commit comments

Comments
 (0)