Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 5ca6036

Browse files
author
Anton Vorontsov
committed
Added StopConsuming method for consumers.
1 parent 7601e13 commit 5ca6036

File tree

2 files changed

+35
-6
lines changed

2 files changed

+35
-6
lines changed

src/RabbitMQ.Client.Core.DependencyInjection/IConsumingService.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,10 @@ public interface IConsumingService
1919
/// Start consuming (getting messages).
2020
/// </summary>
2121
void StartConsuming();
22+
23+
/// <summary>
24+
/// Stop consuming (getting messages).
25+
/// </summary>
26+
void StopConsuming();
2227
}
2328
}

src/RabbitMQ.Client.Core.DependencyInjection/QueueService.cs

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ internal sealed class QueueService : IQueueService, IDisposable
3333
readonly IEnumerable<RabbitMqExchange> _exchanges;
3434
readonly ILogger<QueueService> _logger;
3535

36+
IEnumerable<string> _consumerTags;
3637
bool _consumingStarted;
3738
readonly object _lock = new object();
3839

@@ -123,16 +124,34 @@ public void StartConsuming()
123124
return;
124125
}
125126

126-
_consumer.Received += async (sender, eventArgs) => await _messageHandlingService.HandleMessageReceivingEvent(eventArgs, this);
127+
_consumer.Received += ConsumerOnReceived;
127128
_consumingStarted = true;
128129

129130
var consumptionExchanges = _exchanges.Where(x => x.IsConsuming);
130-
foreach (var exchange in consumptionExchanges)
131+
_consumerTags = consumptionExchanges.SelectMany(
132+
exchange => exchange.Options.Queues.Select(
133+
queue => ConsumingChannel.BasicConsume(queue: queue.Name, autoAck: false, consumer: _consumer)))
134+
.Distinct()
135+
.ToList();
136+
}
137+
138+
public void StopConsuming()
139+
{
140+
if (ConsumingChannel is null)
131141
{
132-
foreach (var queue in exchange.Options.Queues)
133-
{
134-
ConsumingChannel.BasicConsume(queue: queue.Name, autoAck: false, consumer: _consumer);
135-
}
142+
throw new ConsumingChannelIsNullException($"Consuming channel is null. Configure {nameof(IConsumingService)} or full functional {nameof(IQueueService)} for consuming messages.");
143+
}
144+
145+
if (!_consumingStarted)
146+
{
147+
return;
148+
}
149+
150+
_consumer.Received -= ConsumerOnReceived;
151+
_consumingStarted = false;
152+
foreach (var tag in _consumerTags)
153+
{
154+
ConsumingChannel.BasicCancel(tag);
136155
}
137156
}
138157

@@ -465,5 +484,10 @@ static Dictionary<string, object> CreateArguments(string exchangeName, string ro
465484
{ "x-message-ttl", secondsDelay * 1000 },
466485
{ "x-expires", secondsDelay * 1000 + QueueExpirationTime }
467486
};
487+
488+
async Task ConsumerOnReceived(object sender, BasicDeliverEventArgs eventArgs)
489+
{
490+
await _messageHandlingService.HandleMessageReceivingEvent(eventArgs, this);
491+
}
468492
}
469493
}

0 commit comments

Comments
 (0)