Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 17a0003

Browse files
author
Anton Vorontsov
committed
Added logic for re-queuing messages multiple times.
1 parent 4c1e1fe commit 17a0003

File tree

3 files changed

+35
-22
lines changed

3 files changed

+35
-22
lines changed

src/RabbitMQ.Client.Core.DependencyInjection/Configuration/RabbitMqClientOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class RabbitMqClientOptions
2222
/// Collection of RabbitMQ server host names.
2323
/// It can be used when RabbitMQ HA cluster is running, and you want to connect multiple hosts.
2424
/// If HostNames collection is null or empty then HostName will be used to create connection.
25-
/// Otherwise HostNames collection will be used and HostName property value will be ignored.
25+
/// Otherwise, HostNames collection will be used and HostName property value will be ignored.
2626
/// </summary>
2727
/// <remarks>
2828
/// Has the second priority between properties TcpEndpoints, HostNames and HostName.

src/RabbitMQ.Client.Core.DependencyInjection/Services/MessageHandlingService.cs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ namespace RabbitMQ.Client.Core.DependencyInjection.Services
1616
/// </summary>
1717
public class MessageHandlingService : IMessageHandlingService
1818
{
19-
// TODO: Убрать это.
20-
const int ResendTimeout = 60;
21-
2219
readonly IEnumerable<RabbitMqExchange> _exchanges;
2320
readonly IEnumerable<MessageHandlerContainer> _messageHandlerContainers;
2421
readonly ILogger<MessageHandlingService> _logger;
@@ -204,26 +201,47 @@ async Task HandleFailedMessageProcessing(BasicDeliverEventArgs eventArgs, IQueue
204201
_logger.LogWarning($"DeadLetterExchange has not been configured for an exchange \"{eventArgs.Exchange}\". The message won't be re-queued.");
205202
return;
206203
}
204+
205+
if (exchange.Options.RequeueTimeoutMilliseconds < 1)
206+
{
207+
_logger.LogWarning($"The value RequeueTimeoutMilliseconds for an exchange \"{eventArgs.Exchange}\" less than 1 millisecond. Configuration is invalid. The message won't be re-queued.");
208+
return;
209+
}
207210

208-
// TODO: Проверять параметры.
209-
// TODO: добавить методы расширения для delayed сообщений.
210-
// TODO: перепроверить, как будут создаваться очереди для delayed сообщений - переделать на milliseconds в именовании.
211-
// TODO: Закинуть это дальше.
211+
if (exchange.Options.RequeueAttempts < 1)
212+
{
213+
_logger.LogWarning($"The value RequeueAttempts for an exchange \"{eventArgs.Exchange}\" less than 1. Configuration is invalid. The message won't be re-queued.");
214+
return;
215+
}
216+
212217
if (eventArgs.BasicProperties.Headers is null)
213218
{
214219
eventArgs.BasicProperties.Headers = new Dictionary<string, object>();
215220
}
216221

217222
if (!eventArgs.BasicProperties.Headers.ContainsKey("re-queue-attempts"))
218223
{
219-
eventArgs.BasicProperties.Headers.Add("re-queue-attempts", true);
220-
await queueService.SendAsync(eventArgs.Body, eventArgs.BasicProperties, eventArgs.Exchange, eventArgs.RoutingKey, ResendTimeout);
221-
_logger.LogInformation("The failed message has been re-queued.");
224+
eventArgs.BasicProperties.Headers.Add("re-queue-attempts", 1);
225+
await RequeueMessage(eventArgs, queueService, exchange.Options.RequeueTimeoutMilliseconds);
226+
return;
227+
}
228+
229+
var currentAttempt = (int)eventArgs.BasicProperties.Headers["re-queue-attempts"];
230+
if (currentAttempt < exchange.Options.RequeueAttempts)
231+
{
232+
eventArgs.BasicProperties.Headers["re-queue-attempts"] = currentAttempt + 1;
233+
await RequeueMessage(eventArgs, queueService, exchange.Options.RequeueTimeoutMilliseconds);
222234
}
223235
else
224236
{
225-
_logger.LogInformation("The failed message would not be re-queued.");
237+
_logger.LogInformation("The failed message would not be re-queued. Attempts limit exceeded.");
226238
}
227239
}
240+
241+
async Task RequeueMessage(BasicDeliverEventArgs eventArgs, IQueueService queueService, int timeoutMilliseconds)
242+
{
243+
await queueService.SendAsync(eventArgs.Body, eventArgs.BasicProperties, eventArgs.Exchange, eventArgs.RoutingKey, timeoutMilliseconds);
244+
_logger.LogInformation("The failed message has been re-queued.");
245+
}
228246
}
229247
}

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/UnitTests/MessageHandlingServiceTests.cs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,10 @@ static IEnumerable<MessageHandlerOrderingModel> GetMessageHandlerOrderingModels(
228228

229229
static IEnumerable<MessageHandlerOrderingContainerTestModel> GetTestingOrderingModels(
230230
HandleMessageReceivingEventTestDataModel testDataModel,
231-
Mock<IMessageHandler> messageHandlerMock,
232-
Mock<IAsyncMessageHandler> asyncMessageHandlerMock,
233-
Mock<INonCyclicMessageHandler> nonCyclicMessageHandlerMock,
234-
Mock<IAsyncNonCyclicMessageHandler> asyncNonCyclicMessageHandlerMock)
231+
IMock<IMessageHandler> messageHandlerMock,
232+
IMock<IAsyncMessageHandler> asyncMessageHandlerMock,
233+
IMock<INonCyclicMessageHandler> nonCyclicMessageHandlerMock,
234+
IMock<IAsyncNonCyclicMessageHandler> asyncNonCyclicMessageHandlerMock)
235235
{
236236
var collection = new List<MessageHandlerOrderingContainerTestModel>
237237
{
@@ -265,13 +265,8 @@ static IEnumerable<MessageHandlerOrderingContainerTestModel> GetTestingOrderingM
265265
var orderedCollection = collection.OrderByDescending(x => x.OrderValue)
266266
.ThenByDescending(x => x.MessageHandler.GetHashCode())
267267
.ToList();
268-
foreach (var item in orderedCollection)
268+
foreach (var item in orderedCollection.Where(item => item.ShouldTrigger))
269269
{
270-
if (!item.ShouldTrigger)
271-
{
272-
continue;
273-
}
274-
275270
item.CallOrder = callOrder++;
276271
}
277272
return orderedCollection;

0 commit comments

Comments
 (0)