Skip to content

Commit 3c18c7d

Browse files
author
Matthias Radestock
committed
permit Dequeue()ing after Close() until queue is empty
EndOfStreamException is now only thrown by Dequeue*() methods when the queue is/gets closed *and* is empty.
1 parent d93c0af commit 3c18c7d

File tree

2 files changed

+15
-16
lines changed

2 files changed

+15
-16
lines changed

src/client/api/QueueingBasicConsumer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ namespace RabbitMQ.Client
7676
/// When the consumer is closed, through BasicCancel or through
7777
/// the shutdown of the underlying IModel or IConnection, the
7878
/// SharedQueue's Close() method is called, which causes any
79-
/// threads blocked on the queue's Enqueue() or Dequeue()
80-
/// operations to throw EndOfStreamException (see the comment for
79+
/// Enqueue() operations, and Dequeue() operations when the queue
80+
/// is empty, to throw EndOfStreamException (see the comment for
8181
/// SharedQueue.Close()).
8282
///</para>
8383
///<para>

src/util/SharedQueue.cs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,10 @@ public class SharedQueue: IDisposable {
7777
public SharedQueue() {
7878
}
7979

80-
///<summary>Close the queue. Causes all waiting threads to be
81-
///interrupted (with EndOfStreamException) and all further
82-
///Enqueue() and Dequeue() operations to throw
83-
///EndOfStreamException.</summary>
80+
///<summary>Close the queue. Causes all further Enqueue()
81+
///operations to throw EndOfStreamException, and all pending
82+
///or subsequent Dequeue() operations to throw an
83+
///EndOfStreamException once the queue is empty.</summary>
8484
public void Close() {
8585
lock (m_queue) {
8686
m_isOpen = false;
@@ -121,12 +121,11 @@ public void Enqueue(object o) {
121121
///<remarks>
122122
///Callers of Dequeue() will block if no items are available
123123
///until some other thread calls Enqueue() or the queue is
124-
///closed. If the queue is closed (by a call to Close()), this
125-
///method will throw EndOfStreamException.
124+
///closed. In the latter case this method will throw
125+
///EndOfStreamException.
126126
///</remarks>
127127
public object Dequeue() {
128128
lock (m_queue) {
129-
EnsureIsOpen();
130129
while (m_queue.Count == 0) {
131130
EnsureIsOpen();
132131
Monitor.Wait(m_queue);
@@ -151,15 +150,15 @@ public object Dequeue() {
151150
/// whereas Dequeue() will.
152151
///</para>
153152
///<para>
154-
/// If, at the time of call, the queue is in a closed state
155-
/// (by a call to Close()), this method will throw
156-
/// EndOfStreamException.
153+
/// If at the time of call the queue is empty and in a
154+
/// closed state (following a call to Close()), then this
155+
/// method will throw EndOfStreamException.
157156
///</para>
158157
///</remarks>
159158
public object DequeueNoWait(object defaultValue) {
160159
lock (m_queue) {
161-
EnsureIsOpen();
162160
if (m_queue.Count == 0) {
161+
EnsureIsOpen();
163162
return defaultValue;
164163
} else {
165164
return m_queue.Dequeue();
@@ -197,7 +196,8 @@ public object DequeueNoWait(object defaultValue) {
197196
/// System.Threading.Monitor.Wait(object,int).
198197
///</para>
199198
///<para>
200-
/// If, at any time during the call, the queue is in or
199+
/// If no items are present and the queue is in a closed
200+
/// state, or if at any time while waiting the queue is
201201
/// transitions to a closed state (by a call to Close()), this
202202
/// method will throw EndOfStreamException.
203203
///</para>
@@ -210,8 +210,8 @@ public bool Dequeue(int millisecondsTimeout, out object result) {
210210

211211
DateTime startTime = DateTime.Now;
212212
lock (m_queue) {
213-
EnsureIsOpen();
214213
while (m_queue.Count == 0) {
214+
EnsureIsOpen();
215215
int elapsedTime = (int) ((DateTime.Now - startTime).TotalMilliseconds);
216216
int remainingTime = millisecondsTimeout - elapsedTime;
217217
if (remainingTime <= 0) {
@@ -220,7 +220,6 @@ public bool Dequeue(int millisecondsTimeout, out object result) {
220220
}
221221

222222
Monitor.Wait(m_queue, remainingTime);
223-
EnsureIsOpen();
224223
}
225224

226225
result = m_queue.Dequeue();

0 commit comments

Comments
 (0)