Skip to content

Commit 23478e5

Browse files
author
John Simons
committed
High CPU spikes when channel is closed.
Fixes #63
1 parent c5e3924 commit 23478e5

File tree

1 file changed

+19
-19
lines changed

1 file changed

+19
-19
lines changed

src/NServiceBus.RabbitMQ/RabbitMqDequeueStrategy.cs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void Init(Address address, TransactionSettings transactionSettings, Func<
5050
this.endProcessMessage = endProcessMessage;
5151
workQueue = address.Queue;
5252

53-
autoAck = !transactionSettings.IsTransactional;
53+
noAck = !transactionSettings.IsTransactional;
5454

5555
if (purgeOnStartup)
5656
{
@@ -162,7 +162,7 @@ void ConsumeMessages(object state)
162162

163163
var consumer = new QueueingBasicConsumer(channel);
164164

165-
channel.BasicConsume(parameters.Queue, autoAck, consumer);
165+
channel.BasicConsume(parameters.Queue, noAck, consumer);
166166

167167
circuitBreaker.Success();
168168

@@ -199,7 +199,7 @@ void ConsumeMessages(object state)
199199
messageProcessedOk = tryProcessMessage(transportMessage);
200200
}
201201

202-
if (!autoAck)
202+
if (!noAck)
203203
{
204204
if (messageProcessedOk)
205205
{
@@ -215,7 +215,7 @@ void ConsumeMessages(object state)
215215
{
216216
exception = ex;
217217

218-
if (!autoAck)
218+
if (!noAck)
219219
{
220220
channel.BasicReject(message.DeliveryTag, true);
221221
}
@@ -227,6 +227,18 @@ void ConsumeMessages(object state)
227227
}
228228
}
229229
}
230+
catch (EndOfStreamException)
231+
{
232+
// If no items are present and the queue is in a closed
233+
// state, or if at any time while waiting the queue
234+
// transitions to a closed state (by a call to Close()), this
235+
// method will throw EndOfStreamException.
236+
237+
// We need to put a delay here otherwise we end-up doing a tight loop that causes
238+
// CPU spikes
239+
Thread.Sleep(1000);
240+
throw;
241+
}
230242
catch (IOException)
231243
{
232244
//Unable to write data to the transport connection: An existing connection was forcibly closed by the remote host.
@@ -240,21 +252,9 @@ void ConsumeMessages(object state)
240252

241253
static BasicDeliverEventArgs DequeueMessage(QueueingBasicConsumer consumer)
242254
{
243-
BasicDeliverEventArgs rawMessage = null;
255+
BasicDeliverEventArgs rawMessage;
244256

245-
var messageDequeued = false;
246-
247-
try
248-
{
249-
messageDequeued = consumer.Queue.Dequeue(1000, out rawMessage);
250-
}
251-
catch (EndOfStreamException)
252-
{
253-
// If no items are present and the queue is in a closed
254-
// state, or if at any time while waiting the queue
255-
// transitions to a closed state (by a call to Close()), this
256-
// method will throw EndOfStreamException.
257-
}
257+
var messageDequeued = consumer.Queue.Dequeue(1000, out rawMessage);
258258

259259
if (!messageDequeued)
260260
{
@@ -276,7 +276,7 @@ void Purge()
276276

277277
RepeatedFailuresOverTimeCircuitBreaker circuitBreaker;
278278

279-
bool autoAck;
279+
bool noAck;
280280
SemaphoreSlim tracksRunningThreads;
281281
Action<TransportMessage, Exception> endProcessMessage;
282282
CancellationTokenSource tokenSource;

0 commit comments

Comments
 (0)