Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 0c33b84

Browse files
author
Anton Vorontsov
committed
Added timer for batch message handlers.
1 parent e941538 commit 0c33b84

File tree

7 files changed

+215
-14
lines changed

7 files changed

+215
-14
lines changed

src/RabbitMQ.Client.Core.DependencyInjection/BatchMessageHandlers/BaseBatchMessageHandler.cs

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,18 @@ public abstract class BaseBatchMessageHandler : IHostedService, IDisposable
4343
/// Prefetch count value (batch size).
4444
/// </summary>
4545
public abstract ushort PrefetchCount { get; set; }
46+
47+
/// <summary>
48+
/// The TimeSpan period through which messages will be processing.
49+
/// </summary>
50+
public virtual TimeSpan? MessageHandlingPeriod { get; set; }
4651

4752
readonly IRabbitMqConnectionFactory _rabbitMqConnectionFactory;
4853
readonly RabbitMqClientOptions _clientOptions;
4954
readonly ILogger<BaseBatchMessageHandler> _logger;
5055

56+
readonly ConcurrentBag<BasicDeliverEventArgs> _messages = new ConcurrentBag<BasicDeliverEventArgs>();
57+
Timer _timer;
5158
bool _disposed = false;
5259

5360
protected BaseBatchMessageHandler(
@@ -73,27 +80,42 @@ public Task StartAsync(CancellationToken cancellationToken)
7380
Connection = _rabbitMqConnectionFactory.CreateRabbitMqConnection(_clientOptions);
7481
Channel = Connection.CreateModel();
7582
Channel.BasicQos(PrefetchSize, PrefetchCount, false);
83+
84+
if (MessageHandlingPeriod != null)
85+
{
86+
_timer = new Timer(async _ => await ProcessBatchOfMessages(cancellationToken), null, MessageHandlingPeriod.Value, MessageHandlingPeriod.Value);
87+
}
7688

77-
var messages = new ConcurrentBag<BasicDeliverEventArgs>();
7889
var consumer = _rabbitMqConnectionFactory.CreateConsumer(Channel);
7990
consumer.Received += async (sender, eventArgs) =>
8091
{
81-
messages.Add(eventArgs);
82-
if (messages.Count < PrefetchCount)
92+
_messages.Add(eventArgs);
93+
if (_messages.Count < PrefetchCount)
8394
{
8495
return;
8596
}
8697

87-
var byteMessages = messages.Select(x => x.Body).ToList();
88-
await HandleMessages(byteMessages, cancellationToken).ConfigureAwait(false);
89-
var latestDeliveryTag = messages.Max(x => x.DeliveryTag);
90-
messages.Clear();
91-
Channel.BasicAck(latestDeliveryTag, true);
98+
await ProcessBatchOfMessages(cancellationToken);
9299
};
100+
93101
Channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
94102
return Task.CompletedTask;
95103
}
96104

105+
async Task ProcessBatchOfMessages(CancellationToken cancellationToken)
106+
{
107+
if (!_messages.Any())
108+
{
109+
return;
110+
}
111+
112+
var byteMessages = _messages.Select(x => x.Body).ToList();
113+
await HandleMessages(byteMessages, cancellationToken).ConfigureAwait(false);
114+
var latestDeliveryTag = _messages.Max(x => x.DeliveryTag);
115+
_messages.Clear();
116+
Channel.BasicAck(latestDeliveryTag, true);
117+
}
118+
97119
void ValidateProperties()
98120
{
99121
if (string.IsNullOrEmpty(QueueName))
@@ -117,6 +139,7 @@ void ValidateProperties()
117139

118140
public Task StopAsync(CancellationToken cancellationToken)
119141
{
142+
_timer?.Change(Timeout.Infinite, 0);
120143
_logger.LogInformation($"Batch message handler {GetType()} has been stopped.");
121144
return Task.CompletedTask;
122145
}
@@ -130,6 +153,7 @@ protected virtual void Dispose(bool disposing)
130153

131154
if (disposing)
132155
{
156+
_timer?.Dispose();
133157
Connection?.Dispose();
134158
Channel?.Dispose();
135159
}

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/Stubs/StubBaseBatchMessageHandler.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ public StubBaseBatchMessageHandler(
2626
public override ushort PrefetchCount { get; set; }
2727

2828
public override string QueueName { get; set; }
29+
30+
public override TimeSpan? MessageHandlingPeriod { get; set; }
2931

3032
public override Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
3133
{

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/Stubs/StubBatchMessageHandler.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Collections.Generic;
23
using System.Threading;
34
using System.Threading.Tasks;
@@ -25,6 +26,8 @@ public StubBatchMessageHandler(
2526
public override ushort PrefetchCount { get; set; }
2627

2728
public override string QueueName { get; set; }
29+
30+
public override TimeSpan? MessageHandlingPeriod { get; set; }
2831

2932
public override Task HandleMessages(IEnumerable<string> messages, CancellationToken cancellationToken)
3033
{
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
5+
namespace RabbitMQ.Client.Core.DependencyInjection.Tests.Stubs
6+
{
7+
public class StubCallerDecorator : IStubCaller
8+
{
9+
private readonly IStubCaller Caller;
10+
11+
public StubCallerDecorator(IStubCaller caller)
12+
{
13+
Caller = caller;
14+
}
15+
16+
public EventWaitHandle WaitHandle { get; set; }
17+
18+
public void EmptyCall()
19+
{
20+
WaitHandle.Set();
21+
Caller.EmptyCall();
22+
}
23+
24+
public void Call(ReadOnlyMemory<byte> message)
25+
{
26+
Caller.Call(message);
27+
}
28+
29+
public void Call(string message)
30+
{
31+
Caller.Call(message);
32+
}
33+
34+
public Task CallAsync(string message)
35+
{
36+
return Caller.CallAsync(message);
37+
}
38+
}
39+
}

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/UnitTests/BaseBatchMessageHandlerTests.cs

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ namespace RabbitMQ.Client.Core.DependencyInjection.Tests.UnitTests
1616
{
1717
public class BaseBatchMessageHandlerTests
1818
{
19+
readonly TimeSpan _globalTestsTimeout = TimeSpan.FromSeconds(60);
20+
1921
[Theory]
2022
[InlineData(1, 10)]
2123
[InlineData(5, 47)]
@@ -42,14 +44,14 @@ public async Task ShouldProperlyHandlerMessagesByBatches(ushort prefetchCount, i
4244

4345
var callerMock = new Mock<IStubCaller>();
4446

45-
using var messageHandler = CreateBatchMessageHandler(queueName, prefetchCount, connectionFactoryMock.Object, callerMock.Object);
47+
using var messageHandler = CreateBatchMessageHandler(queueName, prefetchCount, null, connectionFactoryMock.Object, callerMock.Object);
4648
await messageHandler.StartAsync(CancellationToken.None);
4749

4850
for (var i = 0; i < numberOfMessages; i++)
4951
{
5052
await consumer.HandleBasicDeliver(
5153
"1",
52-
(ulong)numberOfMessages,
54+
(ulong)i,
5355
false,
5456
"exchange",
5557
"routing,key",
@@ -65,10 +67,73 @@ await consumer.HandleBasicDeliver(
6567

6668
await messageHandler.StopAsync(CancellationToken.None);
6769
}
70+
71+
[Theory]
72+
[InlineData(1)]
73+
[InlineData(5)]
74+
[InlineData(10)]
75+
[InlineData(16)]
76+
[InlineData(40)]
77+
[InlineData(57)]
78+
public async Task ShouldProperlyHandlerMessagesByTimer(int numberOfMessages)
79+
{
80+
const string queueName = "queue.name";
81+
const ushort prefetchCount = 10;
82+
var handlingPeriod = TimeSpan.FromMilliseconds(100);
83+
84+
var channelMock = new Mock<IModel>();
85+
var connectionMock = new Mock<IConnection>();
86+
connectionMock.Setup(x => x.CreateModel())
87+
.Returns(channelMock.Object);
88+
89+
var connectionFactoryMock = new Mock<IRabbitMqConnectionFactory>();
90+
connectionFactoryMock.Setup(x => x.CreateRabbitMqConnection(It.IsAny<RabbitMqClientOptions>()))
91+
.Returns(connectionMock.Object);
92+
93+
var consumer = new AsyncEventingBasicConsumer(channelMock.Object);
94+
connectionFactoryMock.Setup(x => x.CreateConsumer(It.IsAny<IModel>()))
95+
.Returns(consumer);
96+
97+
using var waitHandle = new AutoResetEvent(false);
98+
var callerMock = new Mock<IStubCaller>();
99+
var caller = new StubCallerDecorator(callerMock.Object)
100+
{
101+
WaitHandle = waitHandle
102+
};
103+
104+
using var messageHandler = CreateBatchMessageHandler(queueName, prefetchCount, handlingPeriod, connectionFactoryMock.Object, caller);
105+
await messageHandler.StartAsync(CancellationToken.None);
106+
107+
const int smallBatchSize = prefetchCount - 1;
108+
var numberOfSmallBatches = (int)Math.Ceiling((double)numberOfMessages / smallBatchSize);
109+
for (var b = 0; b < numberOfSmallBatches; b++)
110+
{
111+
var lowerBound = b * smallBatchSize;
112+
var upperBound = (b + 1) * smallBatchSize > numberOfMessages ? numberOfMessages : (b + 1) * smallBatchSize;
113+
for (var i = lowerBound; i < upperBound; i++)
114+
{
115+
await consumer.HandleBasicDeliver(
116+
"1",
117+
(ulong)i,
118+
false,
119+
"exchange",
120+
"routing,key",
121+
null,
122+
new ReadOnlyMemory<byte>());
123+
}
124+
125+
waitHandle.WaitOne(_globalTestsTimeout);
126+
callerMock.Verify(x => x.EmptyCall(), Times.Exactly(b + 1));
127+
callerMock.Verify(x => x.Call(It.IsAny<ReadOnlyMemory<byte>>()), Times.Exactly(upperBound));
128+
}
129+
130+
await messageHandler.StopAsync(CancellationToken.None);
131+
}
68132

69133
static BaseBatchMessageHandler CreateBatchMessageHandler(
70134
string queueName,
71135
ushort prefetchCount,
136+
TimeSpan? handlingPeriod,
72137
IRabbitMqConnectionFactory connectionFactory,
73138
IStubCaller caller)
74139
{
@@ -85,7 +150,8 @@ static BaseBatchMessageHandler CreateBatchMessageHandler(
85150
loggerMock.Object)
86151
{
87152
QueueName = queueName,
88-
PrefetchCount = prefetchCount
153+
PrefetchCount = prefetchCount,
154+
MessageHandlingPeriod = handlingPeriod
89155
};
90156
}
91157
}

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/UnitTests/BatchMessageHandlerTests.cs

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ namespace RabbitMQ.Client.Core.DependencyInjection.Tests.UnitTests
1616
{
1717
public class BatchMessageHandlerTests
1818
{
19+
readonly TimeSpan _globalTestsTimeout = TimeSpan.FromSeconds(60);
20+
1921
[Theory]
2022
[InlineData(1, 10)]
2123
[InlineData(5, 47)]
@@ -42,14 +44,14 @@ public async Task ShouldProperlyHandlerMessagesByBatches(ushort prefetchCount, i
4244

4345
var callerMock = new Mock<IStubCaller>();
4446

45-
using var messageHandler = CreateBatchMessageHandler(queueName, prefetchCount, connectionFactoryMock.Object, callerMock.Object);
47+
using var messageHandler = CreateBatchMessageHandler(queueName, prefetchCount, null, connectionFactoryMock.Object, callerMock.Object);
4648
await messageHandler.StartAsync(CancellationToken.None);
4749

4850
for (var i = 0; i < numberOfMessages; i++)
4951
{
5052
await consumer.HandleBasicDeliver(
5153
"1",
52-
(ulong)numberOfMessages,
54+
(ulong)i,
5355
false,
5456
"exchange",
5557
"routing,key",
@@ -65,10 +67,73 @@ await consumer.HandleBasicDeliver(
6567

6668
await messageHandler.StopAsync(CancellationToken.None);
6769
}
70+
71+
[Theory]
72+
[InlineData(1)]
73+
[InlineData(5)]
74+
[InlineData(10)]
75+
[InlineData(16)]
76+
[InlineData(40)]
77+
[InlineData(57)]
78+
public async Task ShouldProperlyHandlerMessagesByTimer(int numberOfMessages)
79+
{
80+
const string queueName = "queue.name";
81+
const ushort prefetchCount = 10;
82+
var handlingPeriod = TimeSpan.FromMilliseconds(100);
83+
84+
var channelMock = new Mock<IModel>();
85+
var connectionMock = new Mock<IConnection>();
86+
connectionMock.Setup(x => x.CreateModel())
87+
.Returns(channelMock.Object);
88+
89+
var connectionFactoryMock = new Mock<IRabbitMqConnectionFactory>();
90+
connectionFactoryMock.Setup(x => x.CreateRabbitMqConnection(It.IsAny<RabbitMqClientOptions>()))
91+
.Returns(connectionMock.Object);
92+
93+
var consumer = new AsyncEventingBasicConsumer(channelMock.Object);
94+
connectionFactoryMock.Setup(x => x.CreateConsumer(It.IsAny<IModel>()))
95+
.Returns(consumer);
96+
97+
using var waitHandle = new AutoResetEvent(false);
98+
var callerMock = new Mock<IStubCaller>();
99+
var caller = new StubCallerDecorator(callerMock.Object)
100+
{
101+
WaitHandle = waitHandle
102+
};
103+
104+
using var messageHandler = CreateBatchMessageHandler(queueName, prefetchCount, handlingPeriod, connectionFactoryMock.Object, caller);
105+
await messageHandler.StartAsync(CancellationToken.None);
106+
107+
const int smallBatchSize = prefetchCount - 1;
108+
var numberOfSmallBatches = (int)Math.Ceiling((double)numberOfMessages / smallBatchSize);
109+
for (var b = 0; b < numberOfSmallBatches; b++)
110+
{
111+
var lowerBound = b * smallBatchSize;
112+
var upperBound = (b + 1) * smallBatchSize > numberOfMessages ? numberOfMessages : (b + 1) * smallBatchSize;
113+
for (var i = lowerBound; i < upperBound; i++)
114+
{
115+
await consumer.HandleBasicDeliver(
116+
"1",
117+
(ulong)i,
118+
false,
119+
"exchange",
120+
"routing,key",
121+
null,
122+
new ReadOnlyMemory<byte>());
123+
}
124+
125+
waitHandle.WaitOne(_globalTestsTimeout);
126+
callerMock.Verify(x => x.EmptyCall(), Times.Exactly(b + 1));
127+
callerMock.Verify(x => x.Call(It.IsAny<string>()), Times.Exactly(upperBound));
128+
}
129+
130+
await messageHandler.StopAsync(CancellationToken.None);
131+
}
68132

69133
static BatchMessageHandler CreateBatchMessageHandler(
70134
string queueName,
71135
ushort prefetchCount,
136+
TimeSpan? handlingPeriod,
72137
IRabbitMqConnectionFactory connectionFactory,
73138
IStubCaller caller)
74139
{
@@ -85,7 +150,8 @@ static BatchMessageHandler CreateBatchMessageHandler(
85150
loggerMock.Object)
86151
{
87152
QueueName = queueName,
88-
PrefetchCount = prefetchCount
153+
PrefetchCount = prefetchCount,
154+
MessageHandlingPeriod = handlingPeriod
89155
};
90156
}
91157
}

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/UnitTests/MessageHandlingServiceTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ public async Task ShouldProperlyHandleMessageReceivingEvent(HandleMessageReceivi
9999
asyncMessageHandlerMock.Object.GetType(),
100100
nonCyclicMessageHandlerMock.Object.GetType(),
101101
asyncNonCyclicMessageHandlerMock.Object.GetType());
102+
102103
var testingOrderingModels = GetTestingOrderingModels(
103104
testDataModel,
104105
messageHandlerMock,

0 commit comments

Comments
 (0)