Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 9669798

Browse files
author
Anton Vorontsov
committed
Review changes, more locks.
1 parent f7791d7 commit 9669798

File tree

1 file changed

+9
-6
lines changed

1 file changed

+9
-6
lines changed

src/RabbitMQ.Client.Core.DependencyInjection/BatchMessageHandlers/BaseBatchMessageHandler.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public abstract class BaseBatchMessageHandler : IHostedService, IDisposable
5555

5656
readonly ConcurrentBag<BasicDeliverEventArgs> _messages = new ConcurrentBag<BasicDeliverEventArgs>();
5757
Timer _timer;
58-
object _lock = new object();
58+
readonly object _lock = new object();
5959
bool _disposed = false;
6060

6161
protected BaseBatchMessageHandler(
@@ -84,19 +84,22 @@ public Task StartAsync(CancellationToken cancellationToken)
8484

8585
if (MessageHandlingPeriod != null)
8686
{
87-
_timer = new Timer(async _ => await ProcessBatchOfMessages(cancellationToken), null, MessageHandlingPeriod.Value, MessageHandlingPeriod.Value);
87+
_timer = new Timer(async _ => await ProcessBatchOfMessages(cancellationToken).ConfigureAwait(false), null, MessageHandlingPeriod.Value, MessageHandlingPeriod.Value);
8888
}
8989

9090
var consumer = _rabbitMqConnectionFactory.CreateConsumer(Channel);
9191
consumer.Received += async (sender, eventArgs) =>
9292
{
93-
_messages.Add(eventArgs);
94-
if (_messages.Count < PrefetchCount)
93+
lock (_lock)
9594
{
96-
return;
95+
_messages.Add(eventArgs);
96+
if (_messages.Count < PrefetchCount)
97+
{
98+
return;
99+
}
97100
}
98101

99-
await ProcessBatchOfMessages(cancellationToken);
102+
await ProcessBatchOfMessages(cancellationToken).ConfigureAwait(false);
100103
};
101104

102105
Channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);

0 commit comments

Comments
 (0)