Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 6942e0f

Browse files
Merge pull request #30 from AntonyVorontsov/feature/stop-consuming
StopConsuming method for consumers.
2 parents 8feb43c + 26b3440 commit 6942e0f

File tree

4 files changed

+89
-7
lines changed

4 files changed

+89
-7
lines changed

docs/message-consumption.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ You can register `IHostedService` and inject an instance of `IQueueService` into
3333
services.AddSingleton<IHostedService, ConsumingService>();
3434
```
3535

36-
And then simply call `StartConsuming` so a consumer can work in the background.
36+
Then simply call `StartConsuming` so a consumer can work in the background. There is also an option which allows you to stop consuming messages - method `StopConsuming` which you can use any time you want to pause a message consumption for any reason.
3737

3838
```c#
3939
public class ConsumingService : IHostedService
@@ -59,6 +59,7 @@ public class ConsumingService : IHostedService
5959
public Task StopAsync(CancellationToken cancellationToken)
6060
{
6161
_logger.LogInformation("Stopping consuming.");
62+
_queueService.StopConsuming();
6263
return Task.CompletedTask;
6364
}
6465
}

src/RabbitMQ.Client.Core.DependencyInjection/Services/IConsumingService.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,10 @@ public interface IConsumingService
1919
/// Start consuming (getting messages).
2020
/// </summary>
2121
void StartConsuming();
22+
23+
/// <summary>
24+
/// Stop consuming (getting messages).
25+
/// </summary>
26+
void StopConsuming();
2227
}
2328
}

src/RabbitMQ.Client.Core.DependencyInjection/Services/QueueService.cs

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public sealed class QueueService : IQueueService, IDisposable
3333
readonly IEnumerable<RabbitMqExchange> _exchanges;
3434
readonly ILogger<QueueService> _logger;
3535

36+
IEnumerable<string> _consumerTags = new List<string>();
3637
bool _consumingStarted;
3738
readonly object _lock = new object();
3839

@@ -131,16 +132,34 @@ public void StartConsuming()
131132
return;
132133
}
133134

134-
_consumer.Received += async (sender, eventArgs) => await _messageHandlingService.HandleMessageReceivingEvent(eventArgs, this);
135+
_consumer.Received += ConsumerOnReceived;
135136
_consumingStarted = true;
136137

137138
var consumptionExchanges = _exchanges.Where(x => x.IsConsuming);
138-
foreach (var exchange in consumptionExchanges)
139+
_consumerTags = consumptionExchanges.SelectMany(
140+
exchange => exchange.Options.Queues.Select(
141+
queue => ConsumingChannel.BasicConsume(queue: queue.Name, autoAck: false, consumer: _consumer)))
142+
.Distinct()
143+
.ToList();
144+
}
145+
146+
public void StopConsuming()
147+
{
148+
if (ConsumingChannel is null)
139149
{
140-
foreach (var queue in exchange.Options.Queues)
141-
{
142-
ConsumingChannel.BasicConsume(queue: queue.Name, autoAck: false, consumer: _consumer);
143-
}
150+
throw new ConsumingChannelIsNullException($"Consuming channel is null. Configure {nameof(IConsumingService)} or full functional {nameof(IQueueService)} for consuming messages.");
151+
}
152+
153+
if (!_consumingStarted)
154+
{
155+
return;
156+
}
157+
158+
_consumer.Received -= ConsumerOnReceived;
159+
_consumingStarted = false;
160+
foreach (var tag in _consumerTags)
161+
{
162+
ConsumingChannel.BasicCancel(tag);
144163
}
145164
}
146165

@@ -479,5 +498,10 @@ static Dictionary<string, object> CreateArguments(string exchangeName, string ro
479498
{ "x-message-ttl", secondsDelay * 1000 },
480499
{ "x-expires", secondsDelay * 1000 + QueueExpirationTime }
481500
};
501+
502+
async Task ConsumerOnReceived(object sender, BasicDeliverEventArgs eventArgs)
503+
{
504+
await _messageHandlingService.HandleMessageReceivingEvent(eventArgs, this);
505+
}
482506
}
483507
}

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/UnitTests/QueueServiceConsumerTests.cs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,58 @@ await consumer.HandleBasicDeliver(
6565
messageHandlingServiceMock.Verify(x => x.HandleMessageReceivingEvent(It.IsAny<BasicDeliverEventArgs>(), It.IsAny<IQueueService>()), Times.Exactly(numberOfMessages));
6666
}
6767

68+
[Theory]
69+
[InlineData(1)]
70+
[InlineData(5)]
71+
[InlineData(10)]
72+
[InlineData(15)]
73+
[InlineData(20)]
74+
[InlineData(25)]
75+
public async Task ShouldProperlyStopConsumingMessages(int numberOfMessages)
76+
{
77+
var channelMock = new Mock<IModel>();
78+
var connectionMock = new Mock<IConnection>();
79+
connectionMock.Setup(x => x.CreateModel())
80+
.Returns(channelMock.Object);
81+
82+
var connectionFactoryMock = new Mock<IRabbitMqConnectionFactory>();
83+
connectionFactoryMock.Setup(x => x.CreateRabbitMqConnection(It.IsAny<RabbitMqClientOptions>()))
84+
.Returns(connectionMock.Object);
85+
86+
var consumer = new AsyncEventingBasicConsumer(channelMock.Object);
87+
connectionFactoryMock.Setup(x => x.CreateConsumer(It.IsAny<IModel>()))
88+
.Returns(consumer);
89+
90+
var messageHandlingServiceMock = new Mock<IMessageHandlingService>();
91+
var queueService = CreateService(connectionFactoryMock.Object, messageHandlingServiceMock.Object);
92+
queueService.StartConsuming();
93+
for (var i = 1; i <= numberOfMessages; i++)
94+
{
95+
await consumer.HandleBasicDeliver(
96+
"1",
97+
(ulong)numberOfMessages,
98+
false,
99+
"exchange",
100+
"routing,key",
101+
null,
102+
new ReadOnlyMemory<byte>());
103+
}
104+
105+
messageHandlingServiceMock.Verify(x => x.HandleMessageReceivingEvent(It.IsAny<BasicDeliverEventArgs>(), It.IsAny<IQueueService>()), Times.Exactly(numberOfMessages));
106+
107+
queueService.StopConsuming();
108+
await consumer.HandleBasicDeliver(
109+
"1",
110+
0,
111+
false,
112+
"exchange",
113+
"routing,key",
114+
null,
115+
new ReadOnlyMemory<byte>());
116+
117+
messageHandlingServiceMock.Verify(x => x.HandleMessageReceivingEvent(It.IsAny<BasicDeliverEventArgs>(), It.IsAny<IQueueService>()), Times.Exactly(numberOfMessages));
118+
}
119+
68120
static IConsumingService CreateService(
69121
IRabbitMqConnectionFactory connectionFactory,
70122
IMessageHandlingService messageHandlingService)

0 commit comments

Comments
 (0)