Skip to content

Commit 1cfd717

Browse files
committed
merge bug21465 into default
2 parents a704c32 + ce91ecd commit 1cfd717

File tree

3 files changed

+90
-29
lines changed

3 files changed

+90
-29
lines changed

src/client/api/QueueingBasicConsumer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ namespace RabbitMQ.Client
7676
/// When the consumer is closed, through BasicCancel or through
7777
/// the shutdown of the underlying IModel or IConnection, the
7878
/// SharedQueue's Close() method is called, which causes any
79-
/// threads blocked on the queue's Enqueue() or Dequeue()
80-
/// operations to throw EndOfStreamException (see the comment for
79+
/// Enqueue() operations, and Dequeue() operations when the queue
80+
/// is empty, to throw EndOfStreamException (see the comment for
8181
/// SharedQueue.Close()).
8282
///</para>
8383
///<para>

src/unit/TestSharedQueue.cs

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,24 +65,52 @@
6565

6666
[TestFixture]
6767
public class TestSharedQueue {
68+
69+
public delegate void Thunk();
70+
6871
public class DelayedEnqueuer
6972
{
7073
public SharedQueue m_q;
7174
public int m_delayMs;
7275
public object m_v;
73-
public void Run()
76+
public DelayedEnqueuer(SharedQueue q, int delayMs, object v) {
77+
m_q = q;
78+
m_delayMs = delayMs;
79+
m_v = v;
80+
}
81+
public void EnqueueValue()
7482
{
7583
Thread.Sleep(m_delayMs);
7684
m_q.Enqueue(m_v);
7785
}
86+
public void Dequeue()
87+
{
88+
m_q.Dequeue();
89+
}
90+
public void DequeueNoWaitZero()
91+
{
92+
m_q.DequeueNoWait(0);
93+
}
94+
public void DequeueAfterOneIntoV()
95+
{
96+
m_q.Dequeue(1, out m_v);
97+
}
98+
public void BackgroundEofExpectingDequeue()
99+
{
100+
ExpectEof(new Thunk(this.Dequeue));
101+
}
78102
}
79103

80104
public static void EnqueueAfter(int delayMs, SharedQueue q, object v) {
81-
DelayedEnqueuer de = new DelayedEnqueuer();
82-
de.m_q = q;
83-
de.m_delayMs = delayMs;
84-
de.m_v = v;
85-
new Thread(new ThreadStart(de.Run)).Start();
105+
DelayedEnqueuer de = new DelayedEnqueuer(q, delayMs, v);
106+
new Thread(new ThreadStart(de.EnqueueValue)).Start();
107+
}
108+
109+
public static void ExpectEof(Thunk thunk) {
110+
try {
111+
thunk();
112+
Assert.Fail("expected System.IO.EndOfStreamException");
113+
} catch (System.IO.EndOfStreamException) {}
86114
}
87115

88116
public DateTime m_startTime;
@@ -248,4 +276,44 @@ public void TestDoublePoll() {
248276
Assert.IsTrue(r);
249277
Assert.AreEqual(123, v);
250278
}
279+
280+
[Test]
281+
public void TestCloseWhenEmpty() {
282+
DelayedEnqueuer de = new DelayedEnqueuer(new SharedQueue(), 0, 1);
283+
de.m_q.Close();
284+
ExpectEof(new Thunk(de.EnqueueValue));
285+
ExpectEof(new Thunk(de.Dequeue));
286+
ExpectEof(new Thunk(de.DequeueNoWaitZero));
287+
ExpectEof(new Thunk(de.DequeueAfterOneIntoV));
288+
}
289+
290+
[Test]
291+
public void TestCloseWhenFull() {
292+
SharedQueue q = new SharedQueue();
293+
object v;
294+
q.Enqueue(1);
295+
q.Enqueue(2);
296+
q.Enqueue(3);
297+
q.Close();
298+
DelayedEnqueuer de = new DelayedEnqueuer(q, 0, 4);
299+
ExpectEof(new Thunk(de.EnqueueValue));
300+
Assert.AreEqual(1, q.Dequeue());
301+
Assert.AreEqual(2, q.DequeueNoWait(0));
302+
bool r = q.Dequeue(1, out v);
303+
Assert.IsTrue(r);
304+
Assert.AreEqual(3, v);
305+
ExpectEof(new Thunk(de.Dequeue));
306+
}
307+
308+
[Test]
309+
public void TestCloseWhenWaiting() {
310+
SharedQueue q = new SharedQueue();
311+
DelayedEnqueuer de = new DelayedEnqueuer(q, 0, null);
312+
Thread t =
313+
new Thread(new ThreadStart(de.BackgroundEofExpectingDequeue));
314+
t.Start();
315+
Thread.Sleep(10);
316+
q.Close();
317+
t.Join();
318+
}
251319
}

src/util/SharedQueue.cs

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161

6262
namespace RabbitMQ.Util {
6363
///<summary>A thread-safe shared queue implementation.</summary>
64-
public class SharedQueue: IDisposable {
64+
public class SharedQueue {
6565
///<summary>The shared queue.</summary>
6666
///<remarks>
6767
///Subclasses must ensure appropriate locking discipline when
@@ -77,23 +77,17 @@ public class SharedQueue: IDisposable {
7777
public SharedQueue() {
7878
}
7979

80-
///<summary>Close the queue. Causes all waiting threads to be
81-
///interrupted (with EndOfStreamException) and all further
82-
///Enqueue() and Dequeue() operations to throw
83-
///EndOfStreamException.</summary>
80+
///<summary>Close the queue. Causes all further Enqueue()
81+
///operations to throw EndOfStreamException, and all pending
82+
///or subsequent Dequeue() operations to throw an
83+
///EndOfStreamException once the queue is empty.</summary>
8484
public void Close() {
8585
lock (m_queue) {
8686
m_isOpen = false;
8787
Monitor.PulseAll(m_queue);
8888
}
8989
}
9090

91-
///<summary>Implement IDisposable.Dispose. Delegates directly
92-
///to Close().</summary>
93-
public void Dispose() {
94-
Close();
95-
}
96-
9791
///<summary>Call only when the lock on m_queue is held.</summary>
9892
/// <exception cref="EndOfStreamException" />
9993
private void EnsureIsOpen() {
@@ -121,12 +115,11 @@ public void Enqueue(object o) {
121115
///<remarks>
122116
///Callers of Dequeue() will block if no items are available
123117
///until some other thread calls Enqueue() or the queue is
124-
///closed. If the queue is closed (by a call to Close()), this
125-
///method will throw EndOfStreamException.
118+
///closed. In the latter case this method will throw
119+
///EndOfStreamException.
126120
///</remarks>
127121
public object Dequeue() {
128122
lock (m_queue) {
129-
EnsureIsOpen();
130123
while (m_queue.Count == 0) {
131124
EnsureIsOpen();
132125
Monitor.Wait(m_queue);
@@ -151,15 +144,15 @@ public object Dequeue() {
151144
/// whereas Dequeue() will.
152145
///</para>
153146
///<para>
154-
/// If, at the time of call, the queue is in a closed state
155-
/// (by a call to Close()), this method will throw
156-
/// EndOfStreamException.
147+
/// If at the time of call the queue is empty and in a
148+
/// closed state (following a call to Close()), then this
149+
/// method will throw EndOfStreamException.
157150
///</para>
158151
///</remarks>
159152
public object DequeueNoWait(object defaultValue) {
160153
lock (m_queue) {
161-
EnsureIsOpen();
162154
if (m_queue.Count == 0) {
155+
EnsureIsOpen();
163156
return defaultValue;
164157
} else {
165158
return m_queue.Dequeue();
@@ -197,7 +190,8 @@ public object DequeueNoWait(object defaultValue) {
197190
/// System.Threading.Monitor.Wait(object,int).
198191
///</para>
199192
///<para>
200-
/// If, at any time during the call, the queue is in or
193+
/// If no items are present and the queue is in a closed
194+
/// state, or if at any time while waiting the queue
201195
/// transitions to a closed state (by a call to Close()), this
202196
/// method will throw EndOfStreamException.
203197
///</para>
@@ -210,8 +204,8 @@ public bool Dequeue(int millisecondsTimeout, out object result) {
210204

211205
DateTime startTime = DateTime.Now;
212206
lock (m_queue) {
213-
EnsureIsOpen();
214207
while (m_queue.Count == 0) {
208+
EnsureIsOpen();
215209
int elapsedTime = (int) ((DateTime.Now - startTime).TotalMilliseconds);
216210
int remainingTime = millisecondsTimeout - elapsedTime;
217211
if (remainingTime <= 0) {
@@ -220,7 +214,6 @@ public bool Dequeue(int millisecondsTimeout, out object result) {
220214
}
221215

222216
Monitor.Wait(m_queue, remainingTime);
223-
EnsureIsOpen();
224217
}
225218

226219
result = m_queue.Dequeue();

0 commit comments

Comments
 (0)