Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 8feb43c

Browse files
Merge pull request #40 from AntonyVorontsov/feature/testing
Feature/testing
2 parents 1fdab17 + d205475 commit 8feb43c

19 files changed

+474
-72
lines changed

docs/message-consumption.md

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -323,18 +323,19 @@ public class CustomBatchMessageHandler : BatchMessageHandler
323323
readonly ILogger<CustomBatchMessageHandler> _logger;
324324

325325
public CustomBatchMessageHandler(
326+
IRabbitMqConnectionFactory rabbitMqConnectionFactory,
326327
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
327328
ILogger<CustomBatchMessageHandler> logger)
328-
: base(batchConsumerConnectionOptions, logger)
329+
: base(rabbitMqConnectionFactory, batchConsumerConnectionOptions, logger)
329330
{
330331
_logger = logger;
331332
}
332333

333-
protected override ushort PrefetchCount { get; set; } = 50;
334+
public override ushort PrefetchCount { get; set; } = 50;
334335

335-
protected override string QueueName { get; set; } = "another.queue.name";
336+
public override string QueueName { get; set; } = "another.queue.name";
336337

337-
protected override Task HandleMessage(IEnumerable<string> messages, CancellationToken cancellationToken)
338+
public override Task HandleMessages(IEnumerable<string> messages, CancellationToken cancellationToken)
338339
{
339340
_logger.LogInformation("Handling a batch of messages.");
340341
foreach (var message in messages)
@@ -354,18 +355,19 @@ public class CustomBatchMessageHandler : BaseBatchMessageHandler
354355
readonly ILogger<CustomBatchMessageHandler> _logger;
355356

356357
public CustomBatchMessageHandler(
358+
IRabbitMqConnectionFactory rabbitMqConnectionFactory,
357359
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
358360
ILogger<CustomBatchMessageHandler> logger)
359-
: base(batchConsumerConnectionOptions, logger)
361+
: base(rabbitMqConnectionFactory, batchConsumerConnectionOptions, logger)
360362
{
361363
_logger = logger;
362364
}
363365

364-
protected override ushort PrefetchCount { get; set; } = 3;
366+
public override ushort PrefetchCount { get; set; } = 3;
365367

366-
protected override string QueueName { get; set; } = "queue.name";
368+
public override string QueueName { get; set; } = "queue.name";
367369

368-
protected override Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
370+
public override Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
369371
{
370372
_logger.LogInformation("Handling a batch of messages.");
371373
foreach (var message in messages)

examples/Examples.BatchMessageHandler/AnotherCustomBatchMessageHandler.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Threading.Tasks;
44
using Microsoft.Extensions.Logging;
55
using RabbitMQ.Client.Core.DependencyInjection.Models;
6+
using RabbitMQ.Client.Core.DependencyInjection.Services;
67

78
namespace Examples.BatchMessageHandler
89
{
@@ -11,18 +12,19 @@ public class AnotherCustomBatchMessageHandler : RabbitMQ.Client.Core.DependencyI
1112
readonly ILogger<AnotherCustomBatchMessageHandler> _logger;
1213

1314
public AnotherCustomBatchMessageHandler(
15+
IRabbitMqConnectionFactory rabbitMqConnectionFactory,
1416
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
1517
ILogger<AnotherCustomBatchMessageHandler> logger)
16-
: base(batchConsumerConnectionOptions, logger)
18+
: base(rabbitMqConnectionFactory, batchConsumerConnectionOptions, logger)
1719
{
1820
_logger = logger;
1921
}
2022

21-
protected override ushort PrefetchCount { get; set; } = 5;
23+
public override ushort PrefetchCount { get; set; } = 5;
2224

23-
protected override string QueueName { get; set; } = "another.queue.name";
25+
public override string QueueName { get; set; } = "another.queue.name";
2426

25-
protected override Task HandleMessage(IEnumerable<string> messages, CancellationToken cancellationToken)
27+
public override Task HandleMessages(IEnumerable<string> messages, CancellationToken cancellationToken)
2628
{
2729
_logger.LogInformation("Handling a batch of messages.");
2830
foreach (var message in messages)

examples/Examples.BatchMessageHandler/CustomBatchMessageHandler.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using Microsoft.Extensions.Logging;
77
using RabbitMQ.Client.Core.DependencyInjection.BatchMessageHandlers;
88
using RabbitMQ.Client.Core.DependencyInjection.Models;
9+
using RabbitMQ.Client.Core.DependencyInjection.Services;
910

1011
namespace Examples.BatchMessageHandler
1112
{
@@ -14,18 +15,19 @@ public class CustomBatchMessageHandler : BaseBatchMessageHandler
1415
readonly ILogger<CustomBatchMessageHandler> _logger;
1516

1617
public CustomBatchMessageHandler(
18+
IRabbitMqConnectionFactory rabbitMqConnectionFactory,
1719
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
1820
ILogger<CustomBatchMessageHandler> logger)
19-
: base(batchConsumerConnectionOptions, logger)
21+
: base(rabbitMqConnectionFactory, batchConsumerConnectionOptions, logger)
2022
{
2123
_logger = logger;
2224
}
2325

24-
protected override ushort PrefetchCount { get; set; } = 3;
26+
public override ushort PrefetchCount { get; set; } = 3;
2527

26-
protected override string QueueName { get; set; } = "queue.name";
28+
public override string QueueName { get; set; } = "queue.name";
2729

28-
protected override Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
30+
public override Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
2931
{
3032
_logger.LogInformation("Handling a batch of messages.");
3133
foreach (var message in messages)

readme.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,18 +249,19 @@ public class CustomBatchMessageHandler : BatchMessageHandler
249249
readonly ILogger<CustomBatchMessageHandler> _logger;
250250

251251
public CustomBatchMessageHandler(
252+
IRabbitMqConnectionFactory rabbitMqConnectionFactory,
252253
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
253254
ILogger<CustomBatchMessageHandler> logger)
254-
: base(batchConsumerConnectionOptions, logger)
255+
: base(rabbitMqConnectionFactory, batchConsumerConnectionOptions, logger)
255256
{
256257
_logger = logger;
257258
}
258259

259-
protected override ushort PrefetchCount { get; set; } = 50;
260+
public override ushort PrefetchCount { get; set; } = 50;
260261

261-
protected override string QueueName { get; set; } = "another.queue.name";
262+
public override string QueueName { get; set; } = "another.queue.name";
262263

263-
protected override Task HandleMessage(IEnumerable<string> messages, CancellationToken cancellationToken)
264+
public override Task HandleMessages(IEnumerable<string> messages, CancellationToken cancellationToken)
264265
{
265266
_logger.LogInformation("Handling a batch of messages.");
266267
foreach (var message in messages)

src/RabbitMQ.Client.Core.DependencyInjection/BatchMessageHandlers/BaseBatchMessageHandler.cs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
using Microsoft.Extensions.Logging;
99
using RabbitMQ.Client.Core.DependencyInjection.Configuration;
1010
using RabbitMQ.Client.Core.DependencyInjection.Exceptions;
11-
using RabbitMQ.Client.Core.DependencyInjection.InternalExtensions;
1211
using RabbitMQ.Client.Core.DependencyInjection.Models;
12+
using RabbitMQ.Client.Core.DependencyInjection.Services;
1313
using RabbitMQ.Client.Events;
1414

1515
namespace RabbitMQ.Client.Core.DependencyInjection.BatchMessageHandlers
@@ -22,32 +22,36 @@ public abstract class BaseBatchMessageHandler : IHostedService, IDisposable
2222
/// <summary>
2323
/// A connection which is in use by batch message handler.
2424
/// </summary>
25-
protected IConnection Connection { get; private set; }
25+
public IConnection Connection { get; private set; }
2626

2727
/// <summary>
2828
/// A channel that has been created using the connection.
2929
/// </summary>
30-
protected IModel Channel { get; private set; }
30+
public IModel Channel { get; private set; }
3131

3232
/// <summary>
3333
/// Prefetch size value that can be overridden.
3434
/// </summary>
35-
protected virtual uint PrefetchSize { get; set; } = 0;
35+
public virtual uint PrefetchSize { get; set; } = 0;
3636

3737
/// <summary>
3838
/// Queue name which will be read by that batch message handler.
3939
/// </summary>
40-
protected abstract string QueueName { get; set; }
40+
public abstract string QueueName { get; set; }
4141

4242
/// <summary>
4343
/// Prefetch count value (batch size).
4444
/// </summary>
45-
protected abstract ushort PrefetchCount { get; set; }
45+
public abstract ushort PrefetchCount { get; set; }
4646

47+
readonly IRabbitMqConnectionFactory _rabbitMqConnectionFactory;
4748
readonly RabbitMqClientOptions _clientOptions;
4849
readonly ILogger<BaseBatchMessageHandler> _logger;
50+
51+
bool _disposed = false;
4952

5053
protected BaseBatchMessageHandler(
54+
IRabbitMqConnectionFactory rabbitMqConnectionFactory,
5155
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
5256
ILogger<BaseBatchMessageHandler> logger)
5357
{
@@ -58,19 +62,20 @@ protected BaseBatchMessageHandler(
5862
}
5963

6064
_clientOptions = optionsContainer.ClientOptions ?? throw new ArgumentNullException($"Consumer client options is null for {nameof(BaseBatchMessageHandler)}.", nameof(optionsContainer.ClientOptions));
65+
_rabbitMqConnectionFactory = rabbitMqConnectionFactory;
6166
_logger = logger;
6267
}
6368

6469
public Task StartAsync(CancellationToken cancellationToken)
6570
{
6671
ValidateProperties();
6772
_logger.LogInformation($"Batch message handler {GetType()} has been started.");
68-
Connection = RabbitMqFactoryExtensions.CreateRabbitMqConnection(_clientOptions);
73+
Connection = _rabbitMqConnectionFactory.CreateRabbitMqConnection(_clientOptions);
6974
Channel = Connection.CreateModel();
7075
Channel.BasicQos(PrefetchSize, PrefetchCount, false);
7176

7277
var messages = new ConcurrentBag<BasicDeliverEventArgs>();
73-
var consumer = new AsyncEventingBasicConsumer(Channel);
78+
var consumer = _rabbitMqConnectionFactory.CreateConsumer(Channel);
7479
consumer.Received += async (sender, eventArgs) =>
7580
{
7681
messages.Add(eventArgs);
@@ -108,7 +113,7 @@ void ValidateProperties()
108113
/// <param name="messages">A collection of messages as bytes.</param>
109114
/// <param name="cancellationToken">Cancellation token.</param>
110115
/// <returns></returns>
111-
protected abstract Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken);
116+
public abstract Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken);
112117

113118
public Task StopAsync(CancellationToken cancellationToken)
114119
{
@@ -118,14 +123,29 @@ public Task StopAsync(CancellationToken cancellationToken)
118123

119124
protected virtual void Dispose(bool disposing)
120125
{
121-
Connection?.Dispose();
122-
Channel?.Dispose();
126+
if (_disposed)
127+
{
128+
return;
129+
}
130+
131+
if (disposing)
132+
{
133+
Connection?.Dispose();
134+
Channel?.Dispose();
135+
}
136+
137+
_disposed = true;
123138
}
124139

125140
public void Dispose()
126141
{
127142
Dispose(true);
128143
GC.SuppressFinalize(this);
129144
}
145+
146+
~BaseBatchMessageHandler()
147+
{
148+
Dispose(false);
149+
}
130150
}
131151
}

src/RabbitMQ.Client.Core.DependencyInjection/BatchMessageHandlers/BatchMessageHandler.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Threading.Tasks;
77
using Microsoft.Extensions.Logging;
88
using RabbitMQ.Client.Core.DependencyInjection.Models;
9+
using RabbitMQ.Client.Core.DependencyInjection.Services;
910

1011
namespace RabbitMQ.Client.Core.DependencyInjection.BatchMessageHandlers
1112
{
@@ -15,9 +16,10 @@ namespace RabbitMQ.Client.Core.DependencyInjection.BatchMessageHandlers
1516
public abstract class BatchMessageHandler : BaseBatchMessageHandler
1617
{
1718
protected BatchMessageHandler(
19+
IRabbitMqConnectionFactory rabbitMqConnectionFactory,
1820
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
1921
ILogger<BatchMessageHandler> logger)
20-
: base(batchConsumerConnectionOptions, logger)
22+
: base(rabbitMqConnectionFactory, batchConsumerConnectionOptions, logger)
2123
{
2224
}
2325

@@ -27,10 +29,10 @@ protected BatchMessageHandler(
2729
/// <param name="messages">A collection of messages as bytes.</param>
2830
/// <param name="cancellationToken">Cancellation token.</param>
2931
/// <returns></returns>
30-
protected override async Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
32+
public override async Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
3133
{
3234
var decodedMessages = messages.Select(x => Encoding.UTF8.GetString(x.ToArray()));
33-
await HandleMessage(decodedMessages, cancellationToken).ConfigureAwait(false);
35+
await HandleMessages(decodedMessages, cancellationToken).ConfigureAwait(false);
3436
}
3537

3638
/// <summary>
@@ -39,6 +41,6 @@ protected override async Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> m
3941
/// <param name="messages">A collection of messages.</param>
4042
/// <param name="cancellationToken">Cancellation token.</param>
4143
/// <returns></returns>
42-
protected abstract Task HandleMessage(IEnumerable<string> messages, CancellationToken cancellationToken);
44+
public abstract Task HandleMessages(IEnumerable<string> messages, CancellationToken cancellationToken);
4345
}
4446
}

src/RabbitMQ.Client.Core.DependencyInjection/RabbitMqClientDependencyInjectionExtensions.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ static IServiceCollection AddRabbitMqClientInfrastructure(this IServiceCollectio
231231
{
232232
services.AddOptions();
233233
services.AddLogging(options => options.AddConsole());
234+
services.TryAddSingleton<IRabbitMqConnectionFactory, RabbitMqConnectionFactory>();
234235
services.TryAddSingleton<IMessageHandlerContainerBuilder, MessageHandlerContainerBuilder>();
235236
services.TryAddSingleton<IMessageHandlingService, MessageHandlingService>();
236237
return services;
@@ -240,6 +241,7 @@ static IServiceCollection ResolveSingletonQueueService(this IServiceCollection s
240241
{
241242
services.TryAddSingleton<IQueueService>(provider => new QueueService(
242243
guid,
244+
provider.GetService<IRabbitMqConnectionFactory>(),
243245
provider.GetServices<RabbitMqConnectionOptionsContainer>(),
244246
provider.GetService<IMessageHandlingService>(),
245247
provider.GetServices<RabbitMqExchange>(),
@@ -251,6 +253,7 @@ static IServiceCollection ResolveTransientQueueService(this IServiceCollection s
251253
{
252254
services.TryAddTransient<IQueueService>(provider => new QueueService(
253255
guid,
256+
provider.GetService<IRabbitMqConnectionFactory>(),
254257
provider.GetServices<RabbitMqConnectionOptionsContainer>(),
255258
provider.GetService<IMessageHandlingService>(),
256259
provider.GetServices<RabbitMqExchange>(),
@@ -262,6 +265,7 @@ static IServiceCollection ResolveSingletonProducingService(this IServiceCollecti
262265
{
263266
services.TryAddSingleton<IProducingService>(provider => new QueueService(
264267
guid,
268+
provider.GetService<IRabbitMqConnectionFactory>(),
265269
provider.GetServices<RabbitMqConnectionOptionsContainer>(),
266270
provider.GetService<IMessageHandlingService>(),
267271
provider.GetServices<RabbitMqExchange>(),
@@ -273,6 +277,7 @@ static IServiceCollection ResolveTransientProducingService(this IServiceCollecti
273277
{
274278
services.TryAddTransient<IProducingService>(provider => new QueueService(
275279
guid,
280+
provider.GetService<IRabbitMqConnectionFactory>(),
276281
provider.GetServices<RabbitMqConnectionOptionsContainer>(),
277282
provider.GetService<IMessageHandlingService>(),
278283
provider.GetServices<RabbitMqExchange>(),
@@ -284,6 +289,7 @@ static IServiceCollection ResolveSingletonConsumingService(this IServiceCollecti
284289
{
285290
services.TryAddSingleton<IConsumingService>(provider => new QueueService(
286291
guid,
292+
provider.GetService<IRabbitMqConnectionFactory>(),
287293
provider.GetServices<RabbitMqConnectionOptionsContainer>(),
288294
provider.GetService<IMessageHandlingService>(),
289295
provider.GetServices<RabbitMqExchange>(),
@@ -295,6 +301,7 @@ static IServiceCollection ResolveTransientConsumingService(this IServiceCollecti
295301
{
296302
services.TryAddTransient<IConsumingService>(provider => new QueueService(
297303
guid,
304+
provider.GetService<IRabbitMqConnectionFactory>(),
298305
provider.GetServices<RabbitMqConnectionOptionsContainer>(),
299306
provider.GetService<IMessageHandlingService>(),
300307
provider.GetServices<RabbitMqExchange>(),
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using RabbitMQ.Client.Core.DependencyInjection.Configuration;
2+
using RabbitMQ.Client.Events;
3+
4+
namespace RabbitMQ.Client.Core.DependencyInjection.Services
5+
{
6+
/// <summary>
7+
/// Interface of the service that is responsible for creating RabbitMQ connections depending on options <see cref="RabbitMqClientOptions"/>.
8+
/// </summary>
9+
public interface IRabbitMqConnectionFactory
10+
{
11+
/// <summary>
12+
/// Create a RabbitMQ connection.
13+
/// </summary>
14+
/// <param name="options">An instance of options <see cref="RabbitMqClientOptions"/>.</param>
15+
/// <returns>An instance of connection <see cref="IConnection"/>.</returns>
16+
/// <remarks>If options parameter is null the method return null too.</remarks>
17+
IConnection CreateRabbitMqConnection(RabbitMqClientOptions options);
18+
19+
/// <summary>
20+
/// Create a consumer depending on the connection channel.
21+
/// </summary>
22+
/// <param name="channel">Connection channel.</param>
23+
/// <returns>A consumer instance <see cref="AsyncEventingBasicConsumer"/>.</returns>
24+
AsyncEventingBasicConsumer CreateConsumer(IModel channel);
25+
}
26+
}

0 commit comments

Comments
 (0)