Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 67da53b

Browse files
author
Anton Vorontsov
committed
Added integration test for re-queuing messages.
1 parent 17a0003 commit 67da53b

File tree

3 files changed

+71
-5
lines changed

3 files changed

+71
-5
lines changed

src/RabbitMQ.Client.Core.DependencyInjection/Services/MessageHandlingService.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,15 @@ public async Task HandleMessageReceivingEvent(BasicDeliverEventArgs eventArgs, I
4545

4646
var matchingRoutes = GetMatchingRoutePatterns(eventArgs.Exchange, eventArgs.RoutingKey);
4747
await ProcessMessage(eventArgs.Exchange, message, queueService, matchingRoutes).ConfigureAwait(false);
48+
queueService.ConsumingChannel.BasicAck(eventArgs.DeliveryTag, false);
4849
_logger.LogInformation($"Message processing finished successfully. Acknowledge has been sent with deliveryTag {eventArgs.DeliveryTag}.");
4950
}
5051
catch (Exception exception)
5152
{
53+
queueService.ConsumingChannel.BasicAck(eventArgs.DeliveryTag, false);
5254
_logger.LogError(new EventId(), exception, $"An error occurred while processing received message with the delivery tag {eventArgs.DeliveryTag}.");
5355
await HandleFailedMessageProcessing(eventArgs, queueService).ConfigureAwait(false);
5456
}
55-
finally
56-
{
57-
queueService.ConsumingChannel.BasicAck(eventArgs.DeliveryTag, false);
58-
}
5957
}
6058

6159
IEnumerable<string> GetMatchingRoutePatterns(string exchange, string routingKey)

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/IntegrationTests/QueueServiceTests.cs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public class QueueServiceTests
1919
const string DefaultExchangeName = "exchange.name";
2020
const string FirstRoutingKey = "first.routing.key";
2121
const string SecondRoutingKey = "second.routing.key";
22+
const int RequeueAttempts = 4;
2223

2324
[Fact]
2425
public async Task ShouldProperlyPublishAndConsumeMessages()
@@ -47,7 +48,7 @@ public async Task ShouldProperlyPublishAndConsumeMessages()
4748
.AddAsyncMessageHandlerTransient<StubAsyncMessageHandler>(SecondRoutingKey)
4849
.AddAsyncNonCyclicMessageHandlerTransient<StubAsyncNonCyclicMessageHandler>(SecondRoutingKey);
4950

50-
var serviceProvider = serviceCollection.BuildServiceProvider();
51+
await using var serviceProvider = serviceCollection.BuildServiceProvider();
5152
var queueService = serviceProvider.GetRequiredService<IQueueService>();
5253
queueService.StartConsuming();
5354

@@ -66,6 +67,50 @@ public async Task ShouldProperlyPublishAndConsumeMessages()
6667
resetEvent.WaitOne(_globalTestsTimeout);
6768
callerMock.Verify(x => x.CallAsync(It.IsAny<string>()), Times.Exactly(2));
6869
}
70+
71+
[Fact]
72+
public async Task ShouldProperlyRequeueMessages()
73+
{
74+
var connectionFactoryMock = new Mock<RabbitMqConnectionFactory> { CallBase = true }
75+
.As<IRabbitMqConnectionFactory>();
76+
77+
AsyncEventingBasicConsumer consumer = null;
78+
connectionFactoryMock.Setup(x => x.CreateConsumer(It.IsAny<IModel>()))
79+
.Returns<IModel>(channel =>
80+
{
81+
consumer = new AsyncEventingBasicConsumer(channel);
82+
return consumer;
83+
});
84+
85+
var callerMock = new Mock<IStubCaller>();
86+
87+
var serviceCollection = new ServiceCollection();
88+
serviceCollection
89+
.AddSingleton(connectionFactoryMock.Object)
90+
.AddSingleton(callerMock.Object)
91+
.AddRabbitMqClient(GetClientOptions())
92+
.AddConsumptionExchange(DefaultExchangeName, GetExchangeOptions())
93+
.AddMessageHandlerTransient<StubExceptionMessageHandler>(FirstRoutingKey);
94+
95+
await using var serviceProvider = serviceCollection.BuildServiceProvider();
96+
var queueService = serviceProvider.GetRequiredService<IQueueService>();
97+
queueService.StartConsuming();
98+
99+
using var resetEvent = new AutoResetEvent(false);
100+
consumer.Received += (sender, @event) =>
101+
{
102+
resetEvent.Set();
103+
return Task.CompletedTask;
104+
};
105+
106+
await queueService.SendAsync(new { Message = "message" }, DefaultExchangeName, FirstRoutingKey);
107+
108+
for (var i = 1; i <= RequeueAttempts + 1; i++)
109+
{
110+
resetEvent.WaitOne(_globalTestsTimeout);
111+
}
112+
callerMock.Verify(x => x.Call(It.IsAny<string>()), Times.Exactly(RequeueAttempts + 1));
113+
}
69114

70115
static RabbitMqClientOptions GetClientOptions() =>
71116
new RabbitMqClientOptions
@@ -82,6 +127,8 @@ static RabbitMqExchangeOptions GetExchangeOptions() =>
82127
{
83128
Type = "direct",
84129
DeadLetterExchange = "exchange.dlx",
130+
RequeueAttempts = RequeueAttempts,
131+
RequeueTimeoutMilliseconds = 50,
85132
Queues = new List<RabbitMqQueueOptions>
86133
{
87134
new RabbitMqQueueOptions
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
using System;
2+
using RabbitMQ.Client.Core.DependencyInjection.MessageHandlers;
3+
4+
namespace RabbitMQ.Client.Core.DependencyInjection.Tests.Stubs
5+
{
6+
public class StubExceptionMessageHandler : IMessageHandler
7+
{
8+
readonly IStubCaller _caller;
9+
10+
public StubExceptionMessageHandler(IStubCaller caller)
11+
{
12+
_caller = caller;
13+
}
14+
15+
public void Handle(string message, string routingKey)
16+
{
17+
_caller.Call($"{message}:{routingKey}");
18+
throw new Exception();
19+
}
20+
}
21+
}

0 commit comments

Comments
 (0)