Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 2b0cc7a

Browse files
author
Anton Vorontsov
committed
Changed consumer to AsyncEventingBasicConsumer.
1 parent 2ee9015 commit 2b0cc7a

File tree

1 file changed

+6
-8
lines changed

1 file changed

+6
-8
lines changed

src/RabbitMQ.Client.Core.DependencyInjection/QueueService.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,8 @@ internal class QueueService : IQueueService, IDisposable
2828
readonly IMessageHandlingService _messageHandlingService;
2929
readonly IEnumerable<RabbitMqExchange> _exchanges;
3030
readonly ILogger<QueueService> _logger;
31-
readonly EventingBasicConsumer _consumer;
32-
33-
EventHandler<BasicDeliverEventArgs> _receivedMessage;
31+
readonly AsyncEventingBasicConsumer _consumer;
32+
3433
bool _consumingStarted;
3534
readonly object _lock = new object();
3635

@@ -61,7 +60,7 @@ public QueueService(
6160
_channel.CallbackException += HandleChannelCallbackException;
6261
_channel.BasicRecoverOk += HandleChannelBasicRecoverOk;
6362

64-
_consumer = new EventingBasicConsumer(_channel);
63+
_consumer = new AsyncEventingBasicConsumer(_channel);
6564
StartClient();
6665
}
6766

@@ -100,7 +99,7 @@ public void StartConsuming()
10099
return;
101100
}
102101

103-
_consumer.Received += _receivedMessage;
102+
_consumer.Received += async (sender, eventArgs) => await _messageHandlingService.HandleMessageReceivingEvent(eventArgs, this);
104103
_consumingStarted = true;
105104

106105
var consumptionExchanges = _exchanges.Where(x => x.IsConsuming);
@@ -264,8 +263,6 @@ void HandleChannelCallbackException(object sender, CallbackExceptionEventArgs @e
264263

265264
void StartClient()
266265
{
267-
_receivedMessage = async (sender, eventArgs) => await _messageHandlingService.HandleMessageReceivingEvent(eventArgs, this);
268-
269266
var deadLetterExchanges = _exchanges
270267
.Where(x => !string.IsNullOrEmpty(x.Options.DeadLetterExchange))
271268
.Select(x => x.Options.DeadLetterExchange)
@@ -401,7 +398,8 @@ static IConnection CreateRabbitMqConnection(RabbitMqClientOptions options)
401398
AutomaticRecoveryEnabled = options.AutomaticRecoveryEnabled,
402399
TopologyRecoveryEnabled = options.TopologyRecoveryEnabled,
403400
RequestedConnectionTimeout = options.RequestedConnectionTimeout,
404-
RequestedHeartbeat = options.RequestedHeartbeat
401+
RequestedHeartbeat = options.RequestedHeartbeat,
402+
DispatchConsumersAsync = true
405403
};
406404

407405
if (options.TcpEndpoints?.Any() == true)

0 commit comments

Comments
 (0)