Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 77fe9fd

Browse files
author
Anton Vorontsov
committed
Added batch message handler unit tests.
1 parent 2426d86 commit 77fe9fd

File tree

15 files changed

+200
-59
lines changed

15 files changed

+200
-59
lines changed

docs/message-consumption.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -323,18 +323,19 @@ public class CustomBatchMessageHandler : BatchMessageHandler
323323
readonly ILogger<CustomBatchMessageHandler> _logger;
324324

325325
public CustomBatchMessageHandler(
326+
IRabbitMqConnectionFactory rabbitMqConnectionFactory,
326327
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
327-
ILogger<CustomBatchMessageHandler> logger)
328-
: base(batchConsumerConnectionOptions, logger)
328+
ILogger<AnotherCustomBatchMessageHandler> logger)
329+
: base(rabbitMqConnectionFactory, batchConsumerConnectionOptions, logger)
329330
{
330331
_logger = logger;
331332
}
332333

333-
protected override ushort PrefetchCount { get; set; } = 50;
334+
public override ushort PrefetchCount { get; set; } = 50;
334335

335-
protected override string QueueName { get; set; } = "another.queue.name";
336+
public override string QueueName { get; set; } = "another.queue.name";
336337

337-
protected override Task HandleMessage(IEnumerable<string> messages, CancellationToken cancellationToken)
338+
public override Task HandleMessages(IEnumerable<string> messages, CancellationToken cancellationToken)
338339
{
339340
_logger.LogInformation("Handling a batch of messages.");
340341
foreach (var message in messages)
@@ -361,11 +362,11 @@ public class CustomBatchMessageHandler : BaseBatchMessageHandler
361362
_logger = logger;
362363
}
363364

364-
protected override ushort PrefetchCount { get; set; } = 3;
365+
public override ushort PrefetchCount { get; set; } = 3;
365366

366-
protected override string QueueName { get; set; } = "queue.name";
367+
public override string QueueName { get; set; } = "queue.name";
367368

368-
protected override Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
369+
public override Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
369370
{
370371
_logger.LogInformation("Handling a batch of messages.");
371372
foreach (var message in messages)

examples/Examples.BatchMessageHandler/AnotherCustomBatchMessageHandler.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ public AnotherCustomBatchMessageHandler(
2020
_logger = logger;
2121
}
2222

23-
protected override ushort PrefetchCount { get; set; } = 5;
23+
public override ushort PrefetchCount { get; set; } = 5;
2424

25-
protected override string QueueName { get; set; } = "another.queue.name";
25+
public override string QueueName { get; set; } = "another.queue.name";
2626

27-
protected override Task HandleMessage(IEnumerable<string> messages, CancellationToken cancellationToken)
27+
public override Task HandleMessages(IEnumerable<string> messages, CancellationToken cancellationToken)
2828
{
2929
_logger.LogInformation("Handling a batch of messages.");
3030
foreach (var message in messages)

examples/Examples.BatchMessageHandler/CustomBatchMessageHandler.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ public CustomBatchMessageHandler(
2323
_logger = logger;
2424
}
2525

26-
protected override ushort PrefetchCount { get; set; } = 3;
26+
public override ushort PrefetchCount { get; set; } = 3;
2727

28-
protected override string QueueName { get; set; } = "queue.name";
28+
public override string QueueName { get; set; } = "queue.name";
2929

30-
protected override Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
30+
public override Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
3131
{
3232
_logger.LogInformation("Handling a batch of messages.");
3333
foreach (var message in messages)

readme.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -249,18 +249,19 @@ public class CustomBatchMessageHandler : BatchMessageHandler
249249
readonly ILogger<CustomBatchMessageHandler> _logger;
250250

251251
public CustomBatchMessageHandler(
252+
IRabbitMqConnectionFactory rabbitMqConnectionFactory,
252253
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
253-
ILogger<CustomBatchMessageHandler> logger)
254-
: base(batchConsumerConnectionOptions, logger)
254+
ILogger<AnotherCustomBatchMessageHandler> logger)
255+
: base(rabbitMqConnectionFactory, batchConsumerConnectionOptions, logger)
255256
{
256257
_logger = logger;
257258
}
258259

259-
protected override ushort PrefetchCount { get; set; } = 50;
260+
public override ushort PrefetchCount { get; set; } = 50;
260261

261-
protected override string QueueName { get; set; } = "another.queue.name";
262+
public override string QueueName { get; set; } = "another.queue.name";
262263

263-
protected override Task HandleMessage(IEnumerable<string> messages, CancellationToken cancellationToken)
264+
public override Task HandleMessages(IEnumerable<string> messages, CancellationToken cancellationToken)
264265
{
265266
_logger.LogInformation("Handling a batch of messages.");
266267
foreach (var message in messages)

src/RabbitMQ.Client.Core.DependencyInjection/BatchMessageHandlers/BaseBatchMessageHandler.cs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,31 +22,33 @@ public abstract class BaseBatchMessageHandler : IHostedService, IDisposable
2222
/// <summary>
2323
/// A connection which is in use by batch message handler.
2424
/// </summary>
25-
protected IConnection Connection { get; private set; }
25+
public IConnection Connection { get; private set; }
2626

2727
/// <summary>
2828
/// A channel that has been created using the connection.
2929
/// </summary>
30-
protected IModel Channel { get; private set; }
30+
public IModel Channel { get; private set; }
3131

3232
/// <summary>
3333
/// Prefetch size value that can be overridden.
3434
/// </summary>
35-
protected virtual uint PrefetchSize { get; set; } = 0;
35+
public virtual uint PrefetchSize { get; set; } = 0;
3636

3737
/// <summary>
3838
/// Queue name which will be read by that batch message handler.
3939
/// </summary>
40-
protected abstract string QueueName { get; set; }
40+
public abstract string QueueName { get; set; }
4141

4242
/// <summary>
4343
/// Prefetch count value (batch size).
4444
/// </summary>
45-
protected abstract ushort PrefetchCount { get; set; }
45+
public abstract ushort PrefetchCount { get; set; }
4646

4747
readonly IRabbitMqConnectionFactory _rabbitMqConnectionFactory;
4848
readonly RabbitMqClientOptions _clientOptions;
4949
readonly ILogger<BaseBatchMessageHandler> _logger;
50+
51+
bool _disposed = false;
5052

5153
protected BaseBatchMessageHandler(
5254
IRabbitMqConnectionFactory rabbitMqConnectionFactory,
@@ -73,7 +75,7 @@ public Task StartAsync(CancellationToken cancellationToken)
7375
Channel.BasicQos(PrefetchSize, PrefetchCount, false);
7476

7577
var messages = new ConcurrentBag<BasicDeliverEventArgs>();
76-
var consumer = new AsyncEventingBasicConsumer(Channel);
78+
var consumer = _rabbitMqConnectionFactory.CreateConsumer(Channel);
7779
consumer.Received += async (sender, eventArgs) =>
7880
{
7981
messages.Add(eventArgs);
@@ -111,7 +113,7 @@ void ValidateProperties()
111113
/// <param name="messages">A collection of messages as bytes.</param>
112114
/// <param name="cancellationToken">Cancellation token.</param>
113115
/// <returns></returns>
114-
protected abstract Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken);
116+
public abstract Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken);
115117

116118
public Task StopAsync(CancellationToken cancellationToken)
117119
{
@@ -121,14 +123,29 @@ public Task StopAsync(CancellationToken cancellationToken)
121123

122124
protected virtual void Dispose(bool disposing)
123125
{
124-
Connection?.Dispose();
125-
Channel?.Dispose();
126+
if (_disposed)
127+
{
128+
return;
129+
}
130+
131+
if (disposing)
132+
{
133+
Connection?.Dispose();
134+
Channel?.Dispose();
135+
}
136+
137+
_disposed = true;
126138
}
127139

128140
public void Dispose()
129141
{
130142
Dispose(true);
131143
GC.SuppressFinalize(this);
132144
}
145+
146+
~BaseBatchMessageHandler()
147+
{
148+
Dispose(false);
149+
}
133150
}
134151
}

src/RabbitMQ.Client.Core.DependencyInjection/BatchMessageHandlers/BatchMessageHandler.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ protected BatchMessageHandler(
2929
/// <param name="messages">A collection of messages as bytes.</param>
3030
/// <param name="cancellationToken">Cancellation token.</param>
3131
/// <returns></returns>
32-
protected override async Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
32+
public override async Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
3333
{
3434
var decodedMessages = messages.Select(x => Encoding.UTF8.GetString(x.ToArray()));
35-
await HandleMessage(decodedMessages, cancellationToken).ConfigureAwait(false);
35+
await HandleMessages(decodedMessages, cancellationToken).ConfigureAwait(false);
3636
}
3737

3838
/// <summary>
@@ -41,6 +41,6 @@ protected override async Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> m
4141
/// <param name="messages">A collection of messages.</param>
4242
/// <param name="cancellationToken">Cancellation token.</param>
4343
/// <returns></returns>
44-
protected abstract Task HandleMessage(IEnumerable<string> messages, CancellationToken cancellationToken);
44+
public abstract Task HandleMessages(IEnumerable<string> messages, CancellationToken cancellationToken);
4545
}
4646
}

src/RabbitMQ.Client.Core.DependencyInjection/Services/IRabbitMqConnectionFactory.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using RabbitMQ.Client.Core.DependencyInjection.Configuration;
2+
using RabbitMQ.Client.Events;
23

34
namespace RabbitMQ.Client.Core.DependencyInjection.Services
45
{
@@ -14,5 +15,12 @@ public interface IRabbitMqConnectionFactory
1415
/// <returns>An instance of connection <see cref="IConnection"/>.</returns>
1516
/// <remarks>If options parameter is null the method return null too.</remarks>
1617
IConnection CreateRabbitMqConnection(RabbitMqClientOptions options);
18+
19+
/// <summary>
20+
/// Create a consumer depending on the connection channel.
21+
/// </summary>
22+
/// <param name="channel">Connection channel.</param>
23+
/// <returns>A consumer instance <see cref="AsyncEventingBasicConsumer"/>.</returns>
24+
AsyncEventingBasicConsumer CreateConsumer(IModel channel);
1725
}
1826
}

src/RabbitMQ.Client.Core.DependencyInjection/Services/QueueService.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,8 @@ void ConfigureConnectionInfrastructure(RabbitMqConnectionOptionsContainer option
327327
ConsumingChannel = ConsumingConnection.CreateModel();
328328
ConsumingChannel.CallbackException += HandleChannelCallbackException;
329329
ConsumingChannel.BasicRecoverOk += HandleChannelBasicRecoverOk;
330-
331-
_consumer = new AsyncEventingBasicConsumer(ConsumingChannel);
330+
331+
_consumer = _rabbitMqConnectionFactory.CreateConsumer(ConsumingChannel);
332332
}
333333
}
334334

@@ -444,7 +444,7 @@ void ValidateArguments(string exchangeName, string routingKey)
444444
string GetDeadLetterExchange(string exchangeName)
445445
{
446446
var exchange = _exchanges.FirstOrDefault(x => x.Name == exchangeName);
447-
if (string.IsNullOrEmpty(exchange.Options.DeadLetterExchange))
447+
if (string.IsNullOrEmpty(exchange?.Options?.DeadLetterExchange))
448448
{
449449
throw new ArgumentException($"Exchange {nameof(exchangeName)} has not been configured with a dead letter exchange.", nameof(exchangeName));
450450
}

src/RabbitMQ.Client.Core.DependencyInjection/Services/RabbitMqConnectionFactory.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Linq;
22
using RabbitMQ.Client.Core.DependencyInjection.Configuration;
3+
using RabbitMQ.Client.Events;
34

45
namespace RabbitMQ.Client.Core.DependencyInjection.Services
56
{
@@ -45,6 +46,14 @@ public IConnection CreateRabbitMqConnection(RabbitMqClientOptions options)
4546
: CreateNamedConnection(options, factory);
4647
}
4748

49+
50+
/// <summary>
51+
/// Create a consumer depending on the connection channel.
52+
/// </summary>
53+
/// <param name="channel">Connection channel.</param>
54+
/// <returns>A consumer instance <see cref="AsyncEventingBasicConsumer"/>.</returns>
55+
public AsyncEventingBasicConsumer CreateConsumer(IModel channel) => new AsyncEventingBasicConsumer(channel);
56+
4857
static IConnection CreateNamedConnection(RabbitMqClientOptions options, ConnectionFactory factory)
4958
{
5059
if (options.HostNames?.Any() == true)
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Microsoft.Extensions.Logging;
6+
using Moq;
7+
using RabbitMQ.Client.Core.DependencyInjection.BatchMessageHandlers;
8+
using RabbitMQ.Client.Core.DependencyInjection.Configuration;
9+
using RabbitMQ.Client.Core.DependencyInjection.Models;
10+
using RabbitMQ.Client.Core.DependencyInjection.Services;
11+
using RabbitMQ.Client.Core.DependencyInjection.Tests.UnitTests.Stubs;
12+
using RabbitMQ.Client.Events;
13+
using Xunit;
14+
15+
namespace RabbitMQ.Client.Core.DependencyInjection.Tests.UnitTests
16+
{
17+
public class BaseBatchMessageHandlerTests
18+
{
19+
[Theory]
20+
[InlineData(1, 10)]
21+
[InlineData(5, 47)]
22+
[InlineData(10, 185)]
23+
[InlineData(16, 200)]
24+
[InlineData(20, 310)]
25+
[InlineData(25, 400)]
26+
public async Task ShouldProperlyHandlerMessagesByBatches(ushort prefetchCount, int numberOfMessages)
27+
{
28+
const string queueName = "queue.name";
29+
30+
var channelMock = new Mock<IModel>();
31+
var connectionMock = new Mock<IConnection>();
32+
connectionMock.Setup(x => x.CreateModel())
33+
.Returns(channelMock.Object);
34+
35+
var connectionFactoryMock = new Mock<IRabbitMqConnectionFactory>();
36+
connectionFactoryMock.Setup(x => x.CreateRabbitMqConnection(It.IsAny<RabbitMqClientOptions>()))
37+
.Returns(connectionMock.Object);
38+
39+
var consumer = new AsyncEventingBasicConsumer(channelMock.Object);
40+
connectionFactoryMock.Setup(x => x.CreateConsumer(It.IsAny<IModel>()))
41+
.Returns(consumer);
42+
43+
var callerMock = new Mock<IStubCaller>();
44+
45+
var messageHandler = CreateBatchMessageHandler(queueName, prefetchCount, connectionFactoryMock.Object, callerMock.Object);
46+
await messageHandler.StartAsync(CancellationToken.None);
47+
48+
for (var i = 0; i < numberOfMessages; i++)
49+
{
50+
await consumer.HandleBasicDeliver(
51+
"1",
52+
(ulong)numberOfMessages,
53+
false,
54+
"exchange",
55+
"routing,key",
56+
null,
57+
new ReadOnlyMemory<byte>());
58+
}
59+
60+
var numberOfBatches = numberOfMessages / prefetchCount;
61+
callerMock.Verify(x => x.EmptyCall(), Times.Exactly(numberOfBatches));
62+
63+
var processedMessages = numberOfBatches * prefetchCount;
64+
callerMock.Verify(x => x.Call(It.IsAny<ReadOnlyMemory<byte>>()), Times.Exactly(processedMessages));
65+
66+
await messageHandler.StopAsync(CancellationToken.None);
67+
}
68+
69+
BaseBatchMessageHandler CreateBatchMessageHandler(
70+
string queueName,
71+
ushort prefetchCount,
72+
IRabbitMqConnectionFactory connectionFactory,
73+
IStubCaller caller)
74+
{
75+
var connectionOptions = new BatchConsumerConnectionOptions
76+
{
77+
Type = typeof(StubBaseBatchMessageHandler),
78+
ClientOptions = new RabbitMqClientOptions()
79+
};
80+
var loggerMock = new Mock<ILogger<StubBaseBatchMessageHandler>>();
81+
return new StubBaseBatchMessageHandler(
82+
caller,
83+
connectionFactory,
84+
new List<BatchConsumerConnectionOptions> { connectionOptions },
85+
loggerMock.Object)
86+
{
87+
QueueName = queueName,
88+
PrefetchCount = prefetchCount
89+
};
90+
}
91+
}
92+
}

0 commit comments

Comments
 (0)