Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit f15618b

Browse files
author
Anton Vorontsov
committed
Fixed version. Added base implementation of base batch message handler. Changed namespaces.
1 parent 62f4609 commit f15618b

27 files changed

+484
-73
lines changed

RabbitMQ.Client.Core.DependencyInjection.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client.Core.Depend
4545
EndProject
4646
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Examples.AdvancedConfiguration", "examples\Examples.AdvancedConfiguration\Examples.AdvancedConfiguration.csproj", "{0C713913-8D54-49E7-AADD-45497216E2EF}"
4747
EndProject
48+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Examples.BatchMessageHandler", "examples\Examples.BatchMessageHandler\Examples.BatchMessageHandler.csproj", "{BC9BE326-AEFE-42F2-B592-80126188514D}"
49+
EndProject
4850
Global
4951
GlobalSection(SolutionConfigurationPlatforms) = preSolution
5052
Debug|Any CPU = Debug|Any CPU
@@ -75,6 +77,10 @@ Global
7577
{0C713913-8D54-49E7-AADD-45497216E2EF}.Debug|Any CPU.Build.0 = Debug|Any CPU
7678
{0C713913-8D54-49E7-AADD-45497216E2EF}.Release|Any CPU.ActiveCfg = Release|Any CPU
7779
{0C713913-8D54-49E7-AADD-45497216E2EF}.Release|Any CPU.Build.0 = Release|Any CPU
80+
{BC9BE326-AEFE-42F2-B592-80126188514D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
81+
{BC9BE326-AEFE-42F2-B592-80126188514D}.Debug|Any CPU.Build.0 = Debug|Any CPU
82+
{BC9BE326-AEFE-42F2-B592-80126188514D}.Release|Any CPU.ActiveCfg = Release|Any CPU
83+
{BC9BE326-AEFE-42F2-B592-80126188514D}.Release|Any CPU.Build.0 = Release|Any CPU
7884
EndGlobalSection
7985
GlobalSection(SolutionProperties) = preSolution
8086
HideSolutionNode = FALSE
@@ -87,6 +93,7 @@ Global
8793
{93D59B0E-856C-4260-B50E-57FE5C0F5073} = {BC4CDBDE-4AEE-44B3-B00B-380EB1AC5E62}
8894
{85907F19-00B0-4BA6-9B7C-0452A174903D} = {9D0289B2-C566-46CC-A53A-471BCBA0F277}
8995
{0C713913-8D54-49E7-AADD-45497216E2EF} = {04EFD8F7-5120-4072-9728-64203F6F4873}
96+
{BC9BE326-AEFE-42F2-B592-80126188514D} = {04EFD8F7-5120-4072-9728-64203F6F4873}
9097
EndGlobalSection
9198
GlobalSection(ExtensibilityGlobals) = postSolution
9299
SolutionGuid = {0EBBD182-65B2-47F9-ABBE-64B5B8C9652F}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using Microsoft.Extensions.Logging;
5+
using RabbitMQ.Client.Core.DependencyInjection.BatchMessageHandlers;
6+
using RabbitMQ.Client.Core.DependencyInjection.Models;
7+
8+
namespace Examples.BatchMessageHandler
9+
{
10+
public class AnotherCustomBatchMessageHandler : BaseBatchMessageHandler
11+
{
12+
readonly ILogger<AnotherCustomBatchMessageHandler> _logger;
13+
14+
public AnotherCustomBatchMessageHandler(
15+
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
16+
ILogger<AnotherCustomBatchMessageHandler> logger)
17+
: base(batchConsumerConnectionOptions, logger)
18+
{
19+
_logger = logger;
20+
}
21+
22+
protected override TimeSpan Period { get; set; } = TimeSpan.FromSeconds(10);
23+
24+
protected override ushort PrefetchCount { get; set; } = 5;
25+
26+
protected override string QueueName { get; set; } = "another.queue.name";
27+
28+
protected override Task HandleMessages()
29+
{
30+
_logger.LogInformation($"Handling from {typeof(AnotherCustomBatchMessageHandler)} messages");
31+
return Task.CompletedTask;
32+
}
33+
}
34+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using Microsoft.Extensions.Logging;
5+
using RabbitMQ.Client.Core.DependencyInjection.BatchMessageHandlers;
6+
using RabbitMQ.Client.Core.DependencyInjection.Models;
7+
8+
namespace Examples.BatchMessageHandler
9+
{
10+
public class CustomBatchMessageHandler : BaseBatchMessageHandler
11+
{
12+
readonly ILogger<CustomBatchMessageHandler> _logger;
13+
14+
public CustomBatchMessageHandler(
15+
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
16+
ILogger<CustomBatchMessageHandler> logger)
17+
: base(batchConsumerConnectionOptions, logger)
18+
{
19+
_logger = logger;
20+
}
21+
22+
protected override TimeSpan Period { get; set; } = TimeSpan.FromMinutes(10);
23+
24+
protected override ushort PrefetchCount { get; set; } = 5;
25+
26+
protected override string QueueName { get; set; } = "queue.name";
27+
28+
protected override Task HandleMessages()
29+
{
30+
_logger.LogInformation("Handling messages");
31+
return Task.CompletedTask;
32+
}
33+
}
34+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>netcoreapp3.1</TargetFramework>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<PackageReference Include="Microsoft.Extensions.Configuration" Version="3.1.3" />
10+
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.3" />
11+
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.3" />
12+
<PackageReference Include="Microsoft.Extensions.Logging" Version="3.1.3" />
13+
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.3" />
14+
</ItemGroup>
15+
16+
<ItemGroup>
17+
<ProjectReference Include="..\..\src\RabbitMQ.Client.Core.DependencyInjection\RabbitMQ.Client.Core.DependencyInjection.csproj" />
18+
</ItemGroup>
19+
20+
<ItemGroup>
21+
<None Update="appsettings.json">
22+
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
23+
</None>
24+
</ItemGroup>
25+
26+
</Project>
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
using System.Threading.Tasks;
2+
using Microsoft.Extensions.Configuration;
3+
using Microsoft.Extensions.Hosting;
4+
using Microsoft.Extensions.Logging;
5+
using RabbitMQ.Client.Core.DependencyInjection;
6+
using RabbitMQ.Client.Core.DependencyInjection.Configuration;
7+
8+
namespace Examples.BatchMessageHandler
9+
{
10+
public static class Program
11+
{
12+
public static async Task Main()
13+
{
14+
var builder = new HostBuilder()
15+
.ConfigureAppConfiguration((hostingContext, config) =>
16+
{
17+
config.AddJsonFile("appsettings.json", optional: false);
18+
})
19+
.ConfigureServices((hostContext, services) =>
20+
{
21+
// Let's configure two different BatchMessageHandlers with different methods.
22+
// First - configuring an appsettings.json section.
23+
services.AddBatchMessageHandler<CustomBatchMessageHandler>(hostContext.Configuration.GetSection("RabbitMq"));
24+
25+
// Second one - passing configuration instance.
26+
// var rabbitMqConfiguration = new RabbitMqClientOptions
27+
// {
28+
// HostName = "127.0.0.1",
29+
// Port = 5672,
30+
// UserName = "guest",
31+
// Password = "guest"
32+
// };
33+
// services.AddBatchMessageHandler<AnotherCustomBatchMessageHandler>(rabbitMqConfiguration);
34+
})
35+
.ConfigureLogging((hostingContext, logging) =>
36+
{
37+
logging.AddConsole();
38+
});
39+
await builder.RunConsoleAsync();
40+
}
41+
}
42+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"RabbitMq": {
3+
"HostName": "127.0.0.1",
4+
"Port": "5672",
5+
"UserName": "guest",
6+
"Password": "guest"
7+
}
8+
}

src/RabbitMQ.Client.Core.DependencyInjection/AsyncMessageHandlerDependencyInjectionExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
using Microsoft.Extensions.DependencyInjection;
22
using System.Collections.Generic;
33
using System.Linq;
4-
using RabbitMQ.Client.Core.DependencyInjection.Extensions;
4+
using RabbitMQ.Client.Core.DependencyInjection.InternalExtensions;
55
using RabbitMQ.Client.Core.DependencyInjection.MessageHandlers;
66

77
namespace RabbitMQ.Client.Core.DependencyInjection

src/RabbitMQ.Client.Core.DependencyInjection/AsyncNonCyclicMessageHandlerDependencyInjectionExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
using Microsoft.Extensions.DependencyInjection;
22
using System.Collections.Generic;
33
using System.Linq;
4-
using RabbitMQ.Client.Core.DependencyInjection.Extensions;
4+
using RabbitMQ.Client.Core.DependencyInjection.InternalExtensions;
55
using RabbitMQ.Client.Core.DependencyInjection.MessageHandlers;
66

77
namespace RabbitMQ.Client.Core.DependencyInjection
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
using System.Linq;
2+
using Microsoft.Extensions.Configuration;
3+
using Microsoft.Extensions.DependencyInjection;
4+
using RabbitMQ.Client.Core.DependencyInjection.BatchMessageHandlers;
5+
using RabbitMQ.Client.Core.DependencyInjection.Configuration;
6+
using RabbitMQ.Client.Core.DependencyInjection.Exceptions;
7+
using RabbitMQ.Client.Core.DependencyInjection.InternalExtensions;
8+
using RabbitMQ.Client.Core.DependencyInjection.Models;
9+
10+
namespace RabbitMQ.Client.Core.DependencyInjection
11+
{
12+
/// <summary>
13+
/// DI extensions for batch message handlers.
14+
/// </summary>
15+
public static class BatchMessageHandlerDependencyInjectionExtensions
16+
{
17+
/// <summary>
18+
/// Add batch message handler.
19+
/// </summary>
20+
/// <typeparam name="TBatchMessageHandler">Batch message handler type.</typeparam>
21+
/// <param name="services">Service collection.</param>
22+
/// <param name="configuration">RabbitMq configuration section.</param>
23+
/// <returns>Service collection.</returns>
24+
public static IServiceCollection AddBatchMessageHandler<TBatchMessageHandler>(this IServiceCollection services, IConfiguration configuration)
25+
where TBatchMessageHandler : BaseBatchMessageHandler
26+
{
27+
services.CheckIfBatchMessageHandlerAlreadyConfigured<TBatchMessageHandler>();
28+
var configurationInstance = RabbitMqClientOptionsDependencyInjectionExtensions.GetRabbitMqClientOptionsInstance(configuration);
29+
services.ConfigureBatchConsumerConnectionOptions<TBatchMessageHandler>(configurationInstance);
30+
services.AddHostedService<TBatchMessageHandler>();
31+
return services;
32+
}
33+
34+
/// <summary>
35+
/// Add batch message handler.
36+
/// </summary>
37+
/// <typeparam name="TBatchMessageHandler">Batch message handler type.</typeparam>
38+
/// <param name="services">Service collection.</param>
39+
/// <param name="configuration">RabbitMq configuration <see cref="RabbitMqClientOptions"/>.</param>
40+
/// <returns>Service collection.</returns>
41+
public static IServiceCollection AddBatchMessageHandler<TBatchMessageHandler>(this IServiceCollection services, RabbitMqClientOptions configuration)
42+
where TBatchMessageHandler : BaseBatchMessageHandler
43+
{
44+
services.CheckIfBatchMessageHandlerAlreadyConfigured<TBatchMessageHandler>();
45+
services.ConfigureBatchConsumerConnectionOptions<TBatchMessageHandler>(configuration);
46+
services.AddHostedService<TBatchMessageHandler>();
47+
return services;
48+
}
49+
50+
static IServiceCollection ConfigureBatchConsumerConnectionOptions<TBatchMessageHandler>(this IServiceCollection services, RabbitMqClientOptions clientOptions)
51+
where TBatchMessageHandler : BaseBatchMessageHandler
52+
{
53+
var options = new BatchConsumerConnectionOptions
54+
{
55+
Type = typeof(TBatchMessageHandler),
56+
ClientOptions = clientOptions
57+
};
58+
var serviceDescriptor = new ServiceDescriptor(typeof(BatchConsumerConnectionOptions), options);
59+
services.Add(serviceDescriptor);
60+
return services;
61+
}
62+
63+
static IServiceCollection CheckIfBatchMessageHandlerAlreadyConfigured<TBatchMessageHandler>(this IServiceCollection services)
64+
{
65+
var descriptor = services.FirstOrDefault(x => x.ServiceType == typeof(TBatchMessageHandler));
66+
if (descriptor != null)
67+
{
68+
throw new BatchMessageHandlerAlreadyConfiguredException(typeof(TBatchMessageHandler), $"A batch message handler of type {typeof(TBatchMessageHandler)} has already been configured.");
69+
}
70+
return services;
71+
}
72+
}
73+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Microsoft.Extensions.Hosting;
8+
using Microsoft.Extensions.Logging;
9+
using RabbitMQ.Client.Core.DependencyInjection.Configuration;
10+
using RabbitMQ.Client.Core.DependencyInjection.Exceptions;
11+
using RabbitMQ.Client.Core.DependencyInjection.InternalExtensions;
12+
using RabbitMQ.Client.Core.DependencyInjection.Models;
13+
using RabbitMQ.Client.Events;
14+
15+
namespace RabbitMQ.Client.Core.DependencyInjection.BatchMessageHandlers
16+
{
17+
public abstract class BaseBatchMessageHandler : IHostedService, IDisposable
18+
{
19+
protected virtual TimeSpan DueTo { get; set; } = TimeSpan.Zero;
20+
21+
protected abstract TimeSpan Period { get; set; }
22+
23+
protected virtual uint PrefetchSize { get; set; } = 0;
24+
25+
protected abstract string QueueName { get; set; }
26+
27+
protected abstract ushort PrefetchCount { get; set; }
28+
29+
readonly RabbitMqClientOptions _clientOptions;
30+
readonly ILogger<BaseBatchMessageHandler> _logger;
31+
Timer _timer;
32+
33+
protected BaseBatchMessageHandler(
34+
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
35+
ILogger<BaseBatchMessageHandler> logger)
36+
{
37+
var optionsContainer = batchConsumerConnectionOptions.FirstOrDefault(x => x.Type == GetType());
38+
if (optionsContainer is null)
39+
{
40+
throw new ArgumentNullException($"Client connection options for {nameof(BaseBatchMessageHandler)} has not been found.", nameof(batchConsumerConnectionOptions));
41+
}
42+
43+
_clientOptions = optionsContainer.ClientOptions ?? throw new ArgumentNullException($"Consumer client options is null for {nameof(BaseBatchMessageHandler)}.", nameof(optionsContainer.ClientOptions));
44+
_logger = logger;
45+
}
46+
47+
public Task StartAsync(CancellationToken cancellationToken)
48+
{
49+
ValidateProperties();
50+
_logger.LogInformation("BatchMessageHandler has been started.");
51+
_timer = new Timer(
52+
async state => await StartPeriodicJob(cancellationToken),
53+
null,
54+
DueTo,
55+
Period);
56+
return Task.CompletedTask;
57+
}
58+
59+
void ValidateProperties()
60+
{
61+
if (string.IsNullOrEmpty(QueueName))
62+
{
63+
throw new BatchMessageHandlerInvalidPropertyValueException("Queue name could not be empty.", nameof(QueueName));
64+
}
65+
66+
if (PrefetchCount < 1)
67+
{
68+
throw new BatchMessageHandlerInvalidPropertyValueException("PrefetchCount value should be more than one.", nameof(PrefetchCount));
69+
}
70+
}
71+
72+
async Task StartPeriodicJob(CancellationToken cancellationToken)
73+
{
74+
while (!cancellationToken.IsCancellationRequested)
75+
{
76+
using var connection = RabbitMqFactoryExtensions.CreateRabbitMqConnection(_clientOptions);
77+
using var channel = connection.CreateModel();
78+
79+
channel.BasicQos(PrefetchSize, PrefetchCount, false);
80+
81+
var consumer = new AsyncEventingBasicConsumer(channel);
82+
consumer.Received += async (sender, eventArgs) => await HandleMessageReceivingEvent(eventArgs);
83+
channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
84+
}
85+
}
86+
87+
async Task HandleMessageReceivingEvent(BasicDeliverEventArgs eventArgs)
88+
{
89+
var message = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
90+
_logger.LogInformation($"A new message was received with deliveryTag {eventArgs.DeliveryTag}.");
91+
_logger.LogInformation(message);
92+
}
93+
94+
protected abstract Task HandleMessages();
95+
96+
public Task StopAsync(CancellationToken cancellationToken)
97+
{
98+
_logger.LogInformation("BatchMessageHandler has been stopped.");
99+
_timer?.Change(Timeout.Infinite, 0);
100+
return Task.CompletedTask;
101+
}
102+
103+
public void Dispose()
104+
{
105+
_timer?.Dispose();
106+
}
107+
}
108+
}

0 commit comments

Comments
 (0)