Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 5ea4e3d

Browse files
author
Anton Vorontsov
committed
Fixed concurrency in getting messages while processing a batch.
1 parent 0c33b84 commit 5ea4e3d

File tree

3 files changed

+27
-8
lines changed

3 files changed

+27
-8
lines changed

src/RabbitMQ.Client.Core.DependencyInjection/BatchMessageHandlers/BaseBatchMessageHandler.cs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public abstract class BaseBatchMessageHandler : IHostedService, IDisposable
5555

5656
readonly ConcurrentBag<BasicDeliverEventArgs> _messages = new ConcurrentBag<BasicDeliverEventArgs>();
5757
Timer _timer;
58+
object _lock = new object();
5859
bool _disposed = false;
5960

6061
protected BaseBatchMessageHandler(
@@ -104,18 +105,34 @@ public Task StartAsync(CancellationToken cancellationToken)
104105

105106
async Task ProcessBatchOfMessages(CancellationToken cancellationToken)
106107
{
107-
if (!_messages.Any())
108+
109+
var messages = GetMessages();
110+
if (!messages.Any())
108111
{
109112
return;
110113
}
111114

112-
var byteMessages = _messages.Select(x => x.Body).ToList();
115+
var byteMessages = messages.Select(x => x.Body).ToList();
113116
await HandleMessages(byteMessages, cancellationToken).ConfigureAwait(false);
114-
var latestDeliveryTag = _messages.Max(x => x.DeliveryTag);
115-
_messages.Clear();
117+
var latestDeliveryTag = messages.Max(x => x.DeliveryTag);
116118
Channel.BasicAck(latestDeliveryTag, true);
117119
}
118120

121+
IList<BasicDeliverEventArgs> GetMessages()
122+
{
123+
lock (_lock)
124+
{
125+
if (!_messages.Any())
126+
{
127+
return new List<BasicDeliverEventArgs>();
128+
}
129+
130+
var messages = _messages.ToList();
131+
_messages.Clear();
132+
return messages;
133+
}
134+
}
135+
119136
void ValidateProperties()
120137
{
121138
if (string.IsNullOrEmpty(QueueName))

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/UnitTests/BaseBatchMessageHandlerTests.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,11 @@ await consumer.HandleBasicDeliver(
123123
}
124124

125125
waitHandle.WaitOne(_globalTestsTimeout);
126-
callerMock.Verify(x => x.EmptyCall(), Times.Exactly(b + 1));
127-
callerMock.Verify(x => x.Call(It.IsAny<ReadOnlyMemory<byte>>()), Times.Exactly(upperBound));
128126
}
129127

128+
callerMock.Verify(x => x.EmptyCall(), Times.Exactly(numberOfSmallBatches));
129+
callerMock.Verify(x => x.Call(It.IsAny<ReadOnlyMemory<byte>>()), Times.Exactly(numberOfMessages));
130+
130131
await messageHandler.StopAsync(CancellationToken.None);
131132
}
132133

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/UnitTests/BatchMessageHandlerTests.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,11 @@ await consumer.HandleBasicDeliver(
123123
}
124124

125125
waitHandle.WaitOne(_globalTestsTimeout);
126-
callerMock.Verify(x => x.EmptyCall(), Times.Exactly(b + 1));
127-
callerMock.Verify(x => x.Call(It.IsAny<string>()), Times.Exactly(upperBound));
128126
}
129127

128+
callerMock.Verify(x => x.EmptyCall(), Times.Exactly(numberOfSmallBatches));
129+
callerMock.Verify(x => x.Call(It.IsAny<string>()), Times.Exactly(numberOfMessages));
130+
130131
await messageHandler.StopAsync(CancellationToken.None);
131132
}
132133

0 commit comments

Comments
 (0)