Skip to content

Commit d5cf8ab

Browse files
committed
Refactor RabbitMQ client to use async methods
Updated RabbitMQ client implementation to use asynchronous methods and interfaces, improving performance and scalability. Replaced `Send` with `SendAsync` in `IRabbitMQClient`. Updated `RabbitMQClient`, `RabbitMqExchangeInitializer`, `RabbitMqBackgroundService`, and `Extensions` classes to use async methods. Renamed `IRabbitMqSerializer` to `IRabbitMQSerializer` for consistency. Updated `ConcurrentDictionary` to use `IChannel` instead of `IModel`. Upgraded `RabbitMQ.Client` package to 7.0.0. Improved null safety with nullable reference types. Made minor logging and formatting improvements. Ensured consistent camel casing for RabbitMQ-related names.
1 parent 78a7892 commit d5cf8ab

File tree

20 files changed

+302
-257
lines changed

20 files changed

+302
-257
lines changed

src/Genocs.MessageBrokers.RabbitMQ/Clients/RabbitMqClient.cs

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,25 @@ namespace Genocs.MessageBrokers.RabbitMQ.Clients;
66

77
internal sealed class RabbitMQClient : IRabbitMQClient
88
{
9-
private readonly object _lockObject = new();
109
private const string EmptyContext = "{}";
1110
private readonly IConnection _connection;
1211
private readonly IContextProvider _contextProvider;
13-
private readonly IRabbitMqSerializer _serializer;
12+
private readonly IRabbitMQSerializer _serializer;
1413
private readonly ILogger<RabbitMQClient> _logger;
1514
private readonly bool _contextEnabled;
1615
private readonly bool _loggerEnabled;
1716
private readonly string _spanContextHeader;
1817
private readonly bool _persistMessages;
19-
private int _channelsCount;
20-
private readonly ConcurrentDictionary<int, IModel> _channels = new();
18+
private readonly ConcurrentDictionary<int, IChannel> _channels = new();
2119
private readonly int _maxChannels;
20+
private int _channelsCount;
2221

23-
public RabbitMQClient(ProducerConnection connection, IContextProvider contextProvider, IRabbitMqSerializer serializer,
24-
RabbitMQOptions options, ILogger<RabbitMQClient> logger)
22+
public RabbitMQClient(
23+
ProducerConnection connection,
24+
IContextProvider contextProvider,
25+
IRabbitMQSerializer serializer,
26+
RabbitMQOptions options,
27+
ILogger<RabbitMQClient> logger)
2528
{
2629
_connection = connection.Connection;
2730
_contextProvider = contextProvider;
@@ -34,29 +37,34 @@ public RabbitMQClient(ProducerConnection connection, IContextProvider contextPro
3437
_maxChannels = options.MaxProducerChannels <= 0 ? 1000 : options.MaxProducerChannels;
3538
}
3639

37-
public void Send(object message, IConventions conventions, string messageId = null, string correlationId = null,
38-
string spanContext = null, object messageContext = null, IDictionary<string, object> headers = null)
40+
public async Task SendAsync(
41+
object message,
42+
IConventions conventions,
43+
string? messageId = null,
44+
string? correlationId = null,
45+
string? spanContext = null,
46+
object? messageContext = null,
47+
IDictionary<string, object>? headers = null)
3948
{
40-
var threadId = Thread.CurrentThread.ManagedThreadId;
49+
int threadId = Thread.CurrentThread.ManagedThreadId;
4150
if (!_channels.TryGetValue(threadId, out var channel))
4251
{
43-
lock (_lockObject)
52+
if (_channelsCount >= _maxChannels)
4453
{
45-
if (_channelsCount >= _maxChannels)
46-
{
47-
throw new InvalidOperationException($"Cannot create RabbitMQ producer channel for thread: {threadId} " +
48-
$"(reached the limit of {_maxChannels} channels). " +
49-
"Modify `MaxProducerChannels` setting to allow more channels.");
50-
}
54+
throw new InvalidOperationException($"Cannot create RabbitMQ producer channel for thread: {threadId} " +
55+
$"(reached the limit of {_maxChannels} channels). " +
56+
"Modify `MaxProducerChannels` setting to allow more channels.");
5157

52-
channel = _connection.CreateModel();
53-
_channels.TryAdd(threadId, channel);
54-
_channelsCount++;
55-
if (_loggerEnabled)
56-
{
57-
_logger.LogTrace($"Created a channel for thread: {threadId}, total channels: {_channelsCount}/{_maxChannels}");
58-
}
5958
}
59+
60+
channel = await _connection.CreateChannelAsync();
61+
_channels.TryAdd(threadId, channel);
62+
_channelsCount++;
63+
if (_loggerEnabled)
64+
{
65+
_logger.LogTrace($"Created a channel for thread: {threadId}, total channels: {_channelsCount}/{_maxChannels}");
66+
}
67+
6068
}
6169
else
6270
{
@@ -67,16 +75,23 @@ public void Send(object message, IConventions conventions, string messageId = nu
6775
}
6876

6977
var body = _serializer.Serialize(message);
70-
var properties = channel.CreateBasicProperties();
78+
79+
BasicProperties properties = new BasicProperties();
80+
81+
// var properties = channel.BasicProperties();
7182
properties.Persistent = _persistMessages;
83+
7284
properties.MessageId = string.IsNullOrWhiteSpace(messageId)
7385
? Guid.NewGuid().ToString("N")
7486
: messageId;
87+
7588
properties.CorrelationId = string.IsNullOrWhiteSpace(correlationId)
7689
? Guid.NewGuid().ToString("N")
7790
: correlationId;
91+
7892
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
79-
properties.Headers = new Dictionary<string, object>();
93+
94+
properties.Headers = new Dictionary<string, object?>();
8095

8196
if (_contextEnabled)
8297
{
@@ -108,11 +123,17 @@ public void Send(object message, IConventions conventions, string messageId = nu
108123
$"[id: '{properties.MessageId}', correlation id: '{properties.CorrelationId}']");
109124
}
110125

111-
channel.BasicPublish(conventions.Exchange, conventions.RoutingKey, properties, body.ToArray());
126+
await channel.BasicPublishAsync(conventions.Exchange, conventions.RoutingKey, true, properties, body.ToArray());
112127
}
113128

114-
private void IncludeMessageContext(object context, IBasicProperties properties)
129+
private void IncludeMessageContext(object? context, IBasicProperties properties)
115130
{
131+
if (context is null)
132+
return;
133+
134+
if (properties.Headers is null)
135+
return;
136+
116137
if (context is not null)
117138
{
118139
properties.Headers.Add(_contextProvider.HeaderName, _serializer.Serialize(context).ToArray());

src/Genocs.MessageBrokers.RabbitMQ/Contexts/ContextProvider.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,25 @@ namespace Genocs.MessageBrokers.RabbitMQ.Contexts;
22

33
internal sealed class ContextProvider : IContextProvider
44
{
5-
private readonly IRabbitMqSerializer _serializer;
5+
private readonly IRabbitMQSerializer _serializer;
66
public string HeaderName { get; }
77

8-
public ContextProvider(IRabbitMqSerializer serializer, RabbitMQOptions options)
8+
public ContextProvider(IRabbitMQSerializer serializer, RabbitMQOptions options)
99
{
1010
_serializer = serializer;
1111
HeaderName = string.IsNullOrWhiteSpace(options.Context?.Header)
1212
? "message_context"
1313
: options.Context.Header;
1414
}
1515

16-
public object Get(IDictionary<string, object> headers)
16+
public object? Get(IDictionary<string, object>? headers)
1717
{
1818
if (headers is null)
1919
{
2020
return null;
2121
}
2222

23-
if (!headers.TryGetValue(HeaderName, out var context))
23+
if (!headers.TryGetValue(HeaderName, out object? context))
2424
{
2525
return null;
2626
}

src/Genocs.MessageBrokers.RabbitMQ/Extensions.cs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@ public static class Extensions
3535
/// <param name="serializer">The serializer.</param>
3636
/// <returns></returns>
3737
/// <exception cref="ArgumentException">Raised when configuration is incorrect.</exception>
38-
public static IGenocsBuilder AddRabbitMq(
38+
public static async Task<IGenocsBuilder> AddRabbitMQAsync(
3939
this IGenocsBuilder builder,
4040
string sectionName = SectionName,
4141
Func<IRabbitMqPluginsRegistry, IRabbitMqPluginsRegistry>? plugins = null,
4242
Action<ConnectionFactory>? connectionFactoryConfigurator = null,
43-
IRabbitMqSerializer? serializer = null)
43+
IRabbitMQSerializer? serializer = null)
4444
{
4545
if (string.IsNullOrWhiteSpace(sectionName))
4646
{
@@ -51,7 +51,7 @@ public static IGenocsBuilder AddRabbitMq(
5151
builder.Services.AddSingleton(options);
5252
if (!builder.TryRegister(RegistryName))
5353
{
54-
return builder;
54+
return await Task.FromResult(builder);
5555
}
5656

5757
if (options.HostNames is null || !options.HostNames.Any())
@@ -72,8 +72,8 @@ public static IGenocsBuilder AddRabbitMq(
7272
builder.Services.AddSingleton<IConventionsProvider, ConventionsProvider>();
7373
builder.Services.AddSingleton<IConventionsRegistry, ConventionsRegistry>();
7474
builder.Services.AddSingleton<IRabbitMQClient, RabbitMQClient>();
75-
builder.Services.AddSingleton<IBusPublisher, RabbitMqPublisher>();
76-
builder.Services.AddSingleton<IBusSubscriber, RabbitMqSubscriber>();
75+
builder.Services.AddSingleton<IBusPublisher, RabbitMQPublisher>();
76+
builder.Services.AddSingleton<IBusSubscriber, RabbitMQSubscriber>();
7777
builder.Services.AddSingleton<MessageSubscribersChannel>();
7878
builder.Services.AddTransient<RabbitMqExchangeInitializer>();
7979
builder.Services.AddHostedService<RabbitMqBackgroundService>();
@@ -85,7 +85,7 @@ public static IGenocsBuilder AddRabbitMq(
8585
}
8686
else
8787
{
88-
builder.Services.AddSingleton<IRabbitMqSerializer, SystemTextJsonJsonRabbitMqSerializer>();
88+
builder.Services.AddSingleton<IRabbitMQSerializer, SystemTextJsonJsonRabbitMQSerializer>();
8989
}
9090

9191
var pluginsRegistry = new RabbitMqPluginsRegistry();
@@ -105,7 +105,6 @@ public static IGenocsBuilder AddRabbitMq(
105105
SocketWriteTimeout = options.SocketWriteTimeout,
106106
RequestedChannelMax = options.RequestedChannelMax,
107107
RequestedFrameMax = options.RequestedFrameMax,
108-
DispatchConsumersAsync = true,
109108
ContinuationTimeout = options.ContinuationTimeout,
110109
HandshakeContinuationTimeout = options.HandshakeContinuationTimeout,
111110
NetworkRecoveryInterval = options.NetworkRecoveryInterval,
@@ -117,8 +116,8 @@ public static IGenocsBuilder AddRabbitMq(
117116
connectionFactoryConfigurator?.Invoke(connectionFactory);
118117

119118
logger.LogDebug($"Connecting to RabbitMQ: '{string.Join(", ", options.HostNames)}'...");
120-
var consumerConnection = connectionFactory.CreateConnection(options.HostNames.ToList(), $"{options.ConnectionName}_consumer");
121-
var producerConnection = connectionFactory.CreateConnection(options.HostNames.ToList(), $"{options.ConnectionName}_producer");
119+
var consumerConnection = await connectionFactory.CreateConnectionAsync(options.HostNames.ToList(), $"{options.ConnectionName}_consumer");
120+
var producerConnection = await connectionFactory.CreateConnectionAsync(options.HostNames.ToList(), $"{options.ConnectionName}_producer");
122121
logger.LogDebug($"Connected to RabbitMQ: '{string.Join(", ", options.HostNames)}'.");
123122
builder.Services.AddSingleton(new ConsumerConnection(consumerConnection));
124123
builder.Services.AddSingleton(new ProducerConnection(producerConnection));
@@ -211,6 +210,6 @@ public static IGenocsBuilder AddExceptionToFailedMessageMapper<T>(this IGenocsBu
211210
return builder;
212211
}
213212

214-
public static IBusSubscriber UseRabbitMq(this IApplicationBuilder app)
215-
=> new RabbitMqSubscriber(app.ApplicationServices.GetRequiredService<MessageSubscribersChannel>());
213+
public static IBusSubscriber UseRabbitMQ(this IApplicationBuilder app)
214+
=> new RabbitMQSubscriber(app.ApplicationServices.GetRequiredService<MessageSubscribersChannel>());
216215
}

src/Genocs.MessageBrokers.RabbitMQ/Genocs.MessageBrokers.RabbitMQ.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
<ItemGroup>
2929
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
3030
<PackageReference Include="Polly" Version="8.5.0" />
31-
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
31+
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
3232
</ItemGroup>
3333

3434

src/Genocs.MessageBrokers.RabbitMQ/IRabbitMqClient.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ namespace Genocs.MessageBrokers.RabbitMQ;
22

33
public interface IRabbitMQClient
44
{
5-
void Send(
6-
object? message,
7-
IConventions conventions,
8-
string? messageId = null,
9-
string? correlationId = null,
10-
string? spanContext = null,
11-
object? messageContext = null,
12-
IDictionary<string, object>? headers = null);
5+
Task SendAsync(
6+
object message,
7+
IConventions conventions,
8+
string? messageId = null,
9+
string? correlationId = null,
10+
string? spanContext = null,
11+
object? messageContext = null,
12+
IDictionary<string, object>? headers = null);
1313
}

src/Genocs.MessageBrokers.RabbitMQ/IRabbitMqSerializer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
namespace Genocs.MessageBrokers.RabbitMQ;
22

3-
public interface IRabbitMqSerializer
3+
public interface IRabbitMQSerializer
44
{
55
ReadOnlySpan<byte> Serialize(object value);
66
object? Deserialize(ReadOnlySpan<byte> value, Type type);

src/Genocs.MessageBrokers.RabbitMQ/Initializers/RabbitMqExchangeInitializer.cs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
using System.Reflection;
12
using Genocs.Common.Types;
23
using Microsoft.Extensions.Logging;
34
using RabbitMQ.Client;
4-
using System.Reflection;
55

66
namespace Genocs.MessageBrokers.RabbitMQ.Initializers;
77

@@ -13,16 +13,18 @@ public class RabbitMqExchangeInitializer : IInitializer
1313
private readonly ILogger<RabbitMqExchangeInitializer> _logger;
1414
private readonly bool _loggerEnabled;
1515

16-
public RabbitMqExchangeInitializer(ProducerConnection connection, RabbitMQOptions options,
17-
ILogger<RabbitMqExchangeInitializer> logger)
16+
public RabbitMqExchangeInitializer(
17+
ProducerConnection connection,
18+
RabbitMQOptions options,
19+
ILogger<RabbitMqExchangeInitializer> logger)
1820
{
1921
_connection = connection.Connection;
2022
_options = options;
2123
_logger = logger;
2224
_loggerEnabled = _options.Logger?.Enabled == true;
2325
}
2426

25-
public Task InitializeAsync()
27+
public async Task InitializeAsync()
2628
{
2729
var exchanges = AppDomain.CurrentDomain
2830
.GetAssemblies()
@@ -32,41 +34,41 @@ public Task InitializeAsync()
3234
.Distinct()
3335
.ToList();
3436

35-
using var channel = _connection.CreateModel();
37+
using var channel = await _connection.CreateChannelAsync();
3638
if (_options.Exchange?.Declare == true)
3739
{
3840
Log(_options.Exchange.Name, _options.Exchange.Type);
3941

40-
channel.ExchangeDeclare(
41-
_options.Exchange.Name,
42-
_options.Exchange.Type,
43-
_options.Exchange.Durable,
44-
_options.Exchange.AutoDelete);
42+
await channel.ExchangeDeclareAsync(
43+
_options.Exchange.Name,
44+
_options.Exchange.Type,
45+
_options.Exchange.Durable,
46+
_options.Exchange.AutoDelete);
4547

4648
if (_options.DeadLetter?.Enabled is true && _options.DeadLetter?.Declare is true)
4749
{
48-
channel.ExchangeDeclare(
49-
$"{_options.DeadLetter.Prefix}{_options.Exchange.Name}{_options.DeadLetter.Suffix}",
50-
ExchangeType.Direct,
51-
_options.Exchange.Durable,
52-
_options.Exchange.AutoDelete);
50+
await channel.ExchangeDeclareAsync(
51+
$"{_options.DeadLetter.Prefix}{_options.Exchange.Name}{_options.DeadLetter.Suffix}",
52+
ExchangeType.Direct,
53+
_options.Exchange.Durable,
54+
_options.Exchange.AutoDelete);
5355
}
5456
}
5557

5658
foreach (string? exchange in exchanges)
5759
{
60+
if (string.IsNullOrWhiteSpace(exchange)) continue;
61+
5862
if (exchange.Equals(_options.Exchange?.Name, StringComparison.InvariantCultureIgnoreCase))
5963
{
6064
continue;
6165
}
6266

6367
Log(exchange, DefaultType);
64-
channel.ExchangeDeclare(exchange, DefaultType, true);
68+
await channel.ExchangeDeclareAsync(exchange, DefaultType, true);
6569
}
6670

67-
channel.Close();
68-
69-
return Task.CompletedTask;
71+
await channel.CloseAsync();
7072
}
7173

7274
private void Log(string exchange, string type)

0 commit comments

Comments
 (0)