Skip to content
This repository was archived by the owner on Apr 17, 2025. It is now read-only.

Commit 6da8910

Browse files
committed
Aggiunta implementazione RabbitMQ
1 parent b9667e8 commit 6da8910

File tree

4 files changed

+199
-0
lines changed

4 files changed

+199
-0
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
namespace NET6CustomLibrary.RabbitMQ;
2+
3+
internal class MessageManager : IMessageSender, IDisposable
4+
{
5+
private const string MaxPriorityHeader = "x-max-priority";
6+
7+
internal IConnection Connection { get; private set; }
8+
internal IModel Channel { get; private set; }
9+
10+
private readonly MessageManagerSettings messageManagerSettings;
11+
private readonly QueueSettings queueSettings;
12+
13+
public MessageManager(MessageManagerSettings messageManagerSettings, QueueSettings queueSettings)
14+
{
15+
var factory = new ConnectionFactory { Uri = new Uri(messageManagerSettings.ConnectionString) };
16+
Connection = factory.CreateConnection();
17+
18+
Channel = Connection.CreateModel();
19+
20+
if (messageManagerSettings.QueuePrefetchCount > 0)
21+
{
22+
Channel.BasicQos(0, messageManagerSettings.QueuePrefetchCount, false);
23+
}
24+
25+
Channel.ExchangeDeclare(messageManagerSettings.ExchangeName, ExchangeType.Direct, durable: true);
26+
27+
foreach (var queue in queueSettings.Queues)
28+
{
29+
var args = new Dictionary<string, object>
30+
{
31+
[MaxPriorityHeader] = 10
32+
};
33+
34+
Channel.QueueDeclare(queue.Name, durable: true, exclusive: false, autoDelete: false, args);
35+
Channel.QueueBind(queue.Name, messageManagerSettings.ExchangeName, queue.Name, null);
36+
}
37+
38+
this.messageManagerSettings = messageManagerSettings;
39+
this.queueSettings = queueSettings;
40+
}
41+
42+
public Task PublishAsync<T>(T message, int priority = 1) where T : class
43+
{
44+
var sendBytes = Encoding.UTF8.GetBytes(JsonSerializer.Serialize<object>(message, messageManagerSettings.JsonSerializerOptions ?? JsonOptions.Default));
45+
46+
var routingKey = queueSettings.Queues.First(q => q.Type == typeof(T)).Name;
47+
return PublishAsync(sendBytes.AsMemory(), routingKey, priority);
48+
}
49+
50+
private Task PublishAsync(ReadOnlyMemory<byte> body, string routingKey, int priority = 1)
51+
{
52+
var properties = Channel.CreateBasicProperties();
53+
properties.Persistent = true;
54+
properties.Priority = Convert.ToByte(priority);
55+
56+
Channel.BasicPublish(messageManagerSettings.ExchangeName, routingKey, properties, body);
57+
return Task.CompletedTask;
58+
}
59+
60+
public void MarkAsComplete(BasicDeliverEventArgs message) => Channel.BasicAck(message.DeliveryTag, false);
61+
62+
public void MarkAsRejected(BasicDeliverEventArgs message) => Channel.BasicReject(message.DeliveryTag, false);
63+
64+
public void Dispose()
65+
{
66+
try
67+
{
68+
if (Channel.IsOpen)
69+
{
70+
Channel.Close();
71+
}
72+
73+
if (Connection.IsOpen)
74+
{
75+
Connection.Close();
76+
}
77+
}
78+
catch
79+
{
80+
}
81+
82+
GC.SuppressFinalize(this);
83+
}
84+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
namespace NET6CustomLibrary.RabbitMQ;
2+
3+
public class MessageManagerSettings
4+
{
5+
public string ConnectionString { get; set; }
6+
public string ExchangeName { get; set; }
7+
public ushort QueuePrefetchCount { get; set; }
8+
public JsonSerializerOptions JsonSerializerOptions { get; set; } = JsonOptions.Default;
9+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
namespace NET6CustomLibrary.RabbitMQ;
2+
3+
internal class QueueListener<T> : BackgroundService where T : class
4+
{
5+
private readonly MessageManager messageManager;
6+
private readonly MessageManagerSettings messageManagerSettings;
7+
private readonly ILogger logger;
8+
private readonly IServiceProvider serviceProvider;
9+
private readonly string queueName;
10+
11+
public QueueListener(MessageManager messageManager, MessageManagerSettings messageManagerSettings, QueueSettings settings, ILogger<QueueListener<T>> logger, IServiceProvider serviceProvider)
12+
{
13+
this.messageManager = messageManager;
14+
this.messageManagerSettings = messageManagerSettings;
15+
this.logger = logger;
16+
this.serviceProvider = serviceProvider;
17+
18+
queueName = settings.Queues.First(q => q.Type == typeof(T)).Name;
19+
}
20+
21+
public override Task StartAsync(CancellationToken cancellationToken)
22+
{
23+
logger.LogDebug("RabbitMQ Listener for {QueueName} started", queueName);
24+
25+
return base.StartAsync(cancellationToken);
26+
}
27+
28+
public override Task StopAsync(CancellationToken cancellationToken)
29+
{
30+
logger.LogDebug("RabbitMQ Listener for {QueueName} stopped", queueName);
31+
32+
return base.StopAsync(cancellationToken);
33+
}
34+
35+
protected override Task ExecuteAsync(CancellationToken stoppingToken)
36+
{
37+
stoppingToken.ThrowIfCancellationRequested();
38+
39+
var consumer = new EventingBasicConsumer(messageManager.Channel);
40+
consumer.Received += async (_, message) =>
41+
{
42+
try
43+
{
44+
logger.LogDebug("Messaged received: {Request}", Encoding.UTF8.GetString(message.Body.Span));
45+
46+
using var scope = serviceProvider.CreateScope();
47+
48+
var receiver = scope.ServiceProvider.GetRequiredService<IMessageReceiver<T>>();
49+
var response = JsonSerializer.Deserialize<T>(message.Body.Span, messageManagerSettings.JsonSerializerOptions ?? JsonOptions.Default);
50+
await receiver.ReceiveAsync(response, stoppingToken);
51+
52+
messageManager.MarkAsComplete(message);
53+
54+
logger.LogDebug("Message processed");
55+
}
56+
catch (Exception ex)
57+
{
58+
messageManager.MarkAsRejected(message);
59+
logger.LogError(ex, "Unexpected error while processing message");
60+
}
61+
62+
stoppingToken.ThrowIfCancellationRequested();
63+
};
64+
65+
messageManager.Channel.BasicConsume(queueName, autoAck: false, consumer);
66+
67+
return Task.CompletedTask;
68+
}
69+
70+
public override void Dispose()
71+
{
72+
messageManager.Dispose();
73+
base.Dispose();
74+
75+
GC.SuppressFinalize(this);
76+
}
77+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
namespace NET6CustomLibrary.RabbitMQ;
2+
3+
public static class RabbitMQExtensions
4+
{
5+
public static IMessagingBuilder AddRabbitMq(this IServiceCollection services, Action<MessageManagerSettings> messageManagerConfiguration, Action<QueueSettings> queuesConfiguration)
6+
{
7+
services.AddSingleton<MessageManager>();
8+
services.AddSingleton<IMessageSender>(provider => provider.GetService<MessageManager>());
9+
10+
var messageManagerSettings = new MessageManagerSettings();
11+
messageManagerConfiguration.Invoke(messageManagerSettings);
12+
services.AddSingleton(messageManagerSettings);
13+
14+
var queueSettings = new QueueSettings();
15+
queuesConfiguration.Invoke(queueSettings);
16+
services.AddSingleton(queueSettings);
17+
18+
return new DefaultMessagingBuilder(services);
19+
}
20+
21+
public static IMessagingBuilder AddReceiver<TObject, TReceiver>(this IMessagingBuilder builder) where TObject : class
22+
where TReceiver : class, IMessageReceiver<TObject>
23+
{
24+
builder.Services.AddHostedService<QueueListener<TObject>>();
25+
builder.Services.AddTransient<IMessageReceiver<TObject>, TReceiver>();
26+
27+
return builder;
28+
}
29+
}

0 commit comments

Comments
 (0)