Skip to content

Commit a890428

Browse files
committed
Merge branch 'hotfix-2.0.5'
2 parents 38a43b8 + 23478e5 commit a890428

File tree

1 file changed

+24
-21
lines changed

1 file changed

+24
-21
lines changed

src/NServiceBus.RabbitMQ/RabbitMqDequeueStrategy.cs

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void Init(Address address, TransactionSettings transactionSettings, Func<
5050
this.endProcessMessage = endProcessMessage;
5151
workQueue = address.Queue;
5252

53-
autoAck = !transactionSettings.IsTransactional;
53+
noAck = !transactionSettings.IsTransactional;
5454

5555
if (purgeOnStartup)
5656
{
@@ -146,10 +146,13 @@ void StartConsumer(string queue)
146146

147147
void ConsumeMessages(object state)
148148
{
149-
try
149+
if (!tracksRunningThreads.Wait(TimeSpan.FromSeconds(1)))
150150
{
151-
tracksRunningThreads.Wait();
151+
return;
152+
}
152153

154+
try
155+
{
153156
var parameters = (ConsumeParams)state;
154157
var connection = connectionManager.GetConsumeConnection();
155158

@@ -159,7 +162,7 @@ void ConsumeMessages(object state)
159162

160163
var consumer = new QueueingBasicConsumer(channel);
161164

162-
channel.BasicConsume(parameters.Queue, autoAck, consumer);
165+
channel.BasicConsume(parameters.Queue, noAck, consumer);
163166

164167
circuitBreaker.Success();
165168

@@ -196,7 +199,7 @@ void ConsumeMessages(object state)
196199
messageProcessedOk = tryProcessMessage(transportMessage);
197200
}
198201

199-
if (!autoAck)
202+
if (!noAck)
200203
{
201204
if (messageProcessedOk)
202205
{
@@ -212,7 +215,7 @@ void ConsumeMessages(object state)
212215
{
213216
exception = ex;
214217

215-
if (!autoAck)
218+
if (!noAck)
216219
{
217220
channel.BasicReject(message.DeliveryTag, true);
218221
}
@@ -224,6 +227,18 @@ void ConsumeMessages(object state)
224227
}
225228
}
226229
}
230+
catch (EndOfStreamException)
231+
{
232+
// If no items are present and the queue is in a closed
233+
// state, or if at any time while waiting the queue
234+
// transitions to a closed state (by a call to Close()), this
235+
// method will throw EndOfStreamException.
236+
237+
// We need to put a delay here otherwise we end-up doing a tight loop that causes
238+
// CPU spikes
239+
Thread.Sleep(1000);
240+
throw;
241+
}
227242
catch (IOException)
228243
{
229244
//Unable to write data to the transport connection: An existing connection was forcibly closed by the remote host.
@@ -237,21 +252,9 @@ void ConsumeMessages(object state)
237252

238253
static BasicDeliverEventArgs DequeueMessage(QueueingBasicConsumer consumer)
239254
{
240-
BasicDeliverEventArgs rawMessage = null;
241-
242-
var messageDequeued = false;
255+
BasicDeliverEventArgs rawMessage;
243256

244-
try
245-
{
246-
messageDequeued = consumer.Queue.Dequeue(1000, out rawMessage);
247-
}
248-
catch (EndOfStreamException)
249-
{
250-
// If no items are present and the queue is in a closed
251-
// state, or if at any time while waiting the queue
252-
// transitions to a closed state (by a call to Close()), this
253-
// method will throw EndOfStreamException.
254-
}
257+
var messageDequeued = consumer.Queue.Dequeue(1000, out rawMessage);
255258

256259
if (!messageDequeued)
257260
{
@@ -273,7 +276,7 @@ void Purge()
273276

274277
RepeatedFailuresOverTimeCircuitBreaker circuitBreaker;
275278

276-
bool autoAck;
279+
bool noAck;
277280
SemaphoreSlim tracksRunningThreads;
278281
Action<TransportMessage, Exception> endProcessMessage;
279282
CancellationTokenSource tokenSource;

0 commit comments

Comments
 (0)