Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 6c5152d

Browse files
author
Anton Vorontsov
committed
Moved factory to the extensions directory. Added IProducingService and IConsumingService and DI extension methods for them. Made a IQueueService separation.
1 parent 30bdd77 commit 6c5152d

File tree

13 files changed

+658
-334
lines changed

13 files changed

+658
-334
lines changed

examples/Examples.ConsumerConsole/Program.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,22 @@ public static void Main()
2121

2222
var serviceProvider = serviceCollection.BuildServiceProvider();
2323

24-
var queueService = serviceProvider.GetRequiredService<IQueueService>();
25-
queueService.StartConsuming();
24+
var consumingService = serviceProvider.GetRequiredService<IConsumingService>();
25+
consumingService.StartConsuming();
26+
27+
var producingService = serviceProvider.GetRequiredService<IProducingService>();
28+
29+
// This is just an example.
30+
//producingService.SendString("I am sending messages!", "exchange", "routing.key");
2631
}
2732

2833
static void ConfigureServices(IServiceCollection services)
2934
{
3035
var rabbitMqSection = Configuration.GetSection("RabbitMq");
3136
var exchangeSection = Configuration.GetSection("RabbitMqExchange");
3237

33-
services.AddRabbitMqClient(rabbitMqSection)
38+
services.AddRabbitMqConsumingClientSingleton(rabbitMqSection)
39+
.AddRabbitMqProducingClientSingleton(rabbitMqSection)
3440
.AddConsumptionExchange("exchange.name", exchangeSection)
3541
.AddAsyncMessageHandlerSingleton<CustomAsyncMessageHandler>("routing.key");
3642
//.AddAsyncNonCyclicMessageHandlerSingleton<CustomAsyncNonCyclicMessageHandler>("routing.key");
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
namespace RabbitMQ.Client.Core.DependencyInjection.Configuration
2+
{
3+
public class RabbitMqConnectionOptions
4+
{
5+
public RabbitMqClientOptions ProducerOptions { get; set; }
6+
7+
public RabbitMqClientOptions ConsumerOptions { get; set; }
8+
}
9+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using System;
2+
3+
namespace RabbitMQ.Client.Core.DependencyInjection.Exceptions
4+
{
5+
/// <summary>
6+
/// An exception that is thrown during the process of starting a consumer when the channel is null.
7+
/// </summary>
8+
public class ConsumingChannelIsNullException : Exception
9+
{
10+
public ConsumingChannelIsNullException(string message) : base(message)
11+
{
12+
}
13+
}
14+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using System;
2+
3+
namespace RabbitMQ.Client.Core.DependencyInjection.Exceptions
4+
{
5+
/// <summary>
6+
/// An exception that is thrown during the publication of a message when the channel is null.
7+
/// </summary>
8+
public class ProducingChannelIsNullException : Exception
9+
{
10+
public ProducingChannelIsNullException(string message) : base(message)
11+
{
12+
}
13+
}
14+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
using System.Linq;
2+
using RabbitMQ.Client.Core.DependencyInjection.Configuration;
3+
4+
namespace RabbitMQ.Client.Core.DependencyInjection.Extensions
5+
{
6+
/// <summary>
7+
/// Extensions that contain business logic of creating RabbitMQ connections depending on options <see cref="RabbitMqClientOptions"/>.
8+
/// </summary>
9+
internal static class RabbitMqFactoryExtensions
10+
{
11+
/// <summary>
12+
/// Create a RabbitMQ connection.
13+
/// </summary>
14+
/// <param name="options">An instance of options <see cref="RabbitMqClientOptions"/>.</param>
15+
/// <returns>An instance of connection <see cref="IConnection"/>.</returns>
16+
/// <remarks>If options parameter is null the method return null too.</remarks>
17+
internal static IConnection CreateRabbitMqConnection(RabbitMqClientOptions options)
18+
{
19+
if (options is null)
20+
{
21+
return null;
22+
}
23+
24+
var factory = new ConnectionFactory
25+
{
26+
Port = options.Port,
27+
UserName = options.UserName,
28+
Password = options.Password,
29+
VirtualHost = options.VirtualHost,
30+
AutomaticRecoveryEnabled = options.AutomaticRecoveryEnabled,
31+
TopologyRecoveryEnabled = options.TopologyRecoveryEnabled,
32+
RequestedConnectionTimeout = options.RequestedConnectionTimeout,
33+
RequestedHeartbeat = options.RequestedHeartbeat,
34+
DispatchConsumersAsync = true
35+
};
36+
37+
if (options.TcpEndpoints?.Any() == true)
38+
{
39+
var clientEndpoints = options.TcpEndpoints.Select(x => new AmqpTcpEndpoint(x.HostName, x.Port)).ToList();
40+
return factory.CreateConnection(clientEndpoints);
41+
}
42+
43+
return string.IsNullOrEmpty(options.ClientProvidedName)
44+
? CreateConnection(options, factory)
45+
: CreateNamedConnection(options, factory);
46+
}
47+
48+
static IConnection CreateNamedConnection(RabbitMqClientOptions options, ConnectionFactory factory)
49+
{
50+
if (options.HostNames?.Any() == true)
51+
{
52+
return factory.CreateConnection(options.HostNames.ToList(), options.ClientProvidedName);
53+
}
54+
55+
factory.HostName = options.HostName;
56+
return factory.CreateConnection(options.ClientProvidedName);
57+
}
58+
59+
static IConnection CreateConnection(RabbitMqClientOptions options, ConnectionFactory factory)
60+
{
61+
if (options.HostNames?.Any() == true)
62+
{
63+
return factory.CreateConnection(options.HostNames.ToList());
64+
}
65+
66+
factory.HostName = options.HostName;
67+
return factory.CreateConnection();
68+
}
69+
}
70+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
namespace RabbitMQ.Client.Core.DependencyInjection
2+
{
3+
/// <summary>
4+
/// Custom RabbitMQ consuming service interface.
5+
/// </summary>
6+
public interface IConsumingService
7+
{
8+
/// <summary>
9+
/// RabbitMQ consuming connection.
10+
/// </summary>
11+
IConnection ConsumingConnection { get; }
12+
13+
/// <summary>
14+
/// RabbitMQ consuming channel.
15+
/// </summary>
16+
IModel ConsumingChannel { get; }
17+
18+
/// <summary>
19+
/// Start consuming (getting messages).
20+
/// </summary>
21+
void StartConsuming();
22+
}
23+
}
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
using System.Threading.Tasks;
2+
3+
namespace RabbitMQ.Client.Core.DependencyInjection
4+
{
5+
/// <summary>
6+
/// Custom RabbitMQ producing service interface.
7+
/// </summary>
8+
public interface IProducingService
9+
{
10+
/// <summary>
11+
/// RabbitMQ producing connection.
12+
/// </summary>
13+
IConnection Connection { get; }
14+
15+
/// <summary>
16+
/// RabbitMQ producing channel.
17+
/// </summary>
18+
IModel Channel { get; }
19+
20+
/// <summary>
21+
/// Send a message.
22+
/// </summary>
23+
/// <typeparam name="T">Model class.</typeparam>
24+
/// <param name="object">Object message.</param>
25+
/// <param name="exchangeName">Exchange name.</param>
26+
/// <param name="routingKey">Routing key.</param>
27+
void Send<T>(T @object, string exchangeName, string routingKey) where T : class;
28+
29+
/// <summary>
30+
/// Send a delayed message.
31+
/// </summary>
32+
/// <typeparam name="T">Model class.</typeparam>
33+
/// <param name="object">Object message.</param>
34+
/// <param name="exchangeName">Exchange name.</param>
35+
/// <param name="routingKey">Routing key.</param>
36+
/// <param name="secondsDelay">Delay time.</param>
37+
void Send<T>(T @object, string exchangeName, string routingKey, int secondsDelay) where T : class;
38+
39+
/// <summary>
40+
/// Send a message.
41+
/// </summary>
42+
/// <param name="json">Json message.</param>
43+
/// <param name="exchangeName">Exchange name.</param>
44+
/// <param name="routingKey">Routing key.</param>
45+
void SendJson(string json, string exchangeName, string routingKey);
46+
47+
/// <summary>
48+
/// Send a delayed message.
49+
/// </summary>
50+
/// <param name="json"></param>
51+
/// <param name="exchangeName"></param>
52+
/// <param name="routingKey"></param>
53+
/// <param name="secondsDelay">Delay time.</param>
54+
void SendJson(string json, string exchangeName, string routingKey, int secondsDelay);
55+
56+
/// <summary>
57+
/// Send a message.
58+
/// </summary>
59+
/// <param name="message">Message.</param>
60+
/// <param name="exchangeName">Exchange name.</param>
61+
/// <param name="routingKey">Routing key.</param>
62+
void SendString(string message, string exchangeName, string routingKey);
63+
64+
/// <summary>
65+
/// Send a delayed message.
66+
/// </summary>
67+
/// <param name="message">Message.</param>
68+
/// <param name="exchangeName">Exchange name.</param>
69+
/// <param name="routingKey">Routing key.</param>
70+
/// <param name="secondsDelay">Delay time.</param>
71+
void SendString(string message, string exchangeName, string routingKey, int secondsDelay);
72+
73+
/// <summary>
74+
/// Send a message.
75+
/// </summary>
76+
/// <param name="bytes">Byte array message.</param>
77+
/// <param name="properties">Message properties.</param>
78+
/// <param name="exchangeName">Exchange name.</param>
79+
/// <param name="routingKey">Routing key.</param>
80+
void Send(byte[] bytes, IBasicProperties properties, string exchangeName, string routingKey);
81+
82+
/// <summary>
83+
/// Send a delayed message.
84+
/// </summary>
85+
/// <param name="bytes">Byte array message.</param>
86+
/// <param name="properties">Message properties.</param>
87+
/// <param name="exchangeName">Exchange name.</param>
88+
/// <param name="routingKey">Routing key.</param>
89+
/// <param name="secondsDelay">Delay time.</param>
90+
void Send(byte[] bytes, IBasicProperties properties, string exchangeName, string routingKey, int secondsDelay);
91+
92+
/// <summary>
93+
/// Send a message asynchronously.
94+
/// </summary>
95+
/// <typeparam name="T">Model class.</typeparam>
96+
/// <param name="object">Object message.</param>
97+
/// <param name="exchangeName">Exchange name.</param>
98+
/// <param name="routingKey">Routing key.</param>
99+
Task SendAsync<T>(T @object, string exchangeName, string routingKey) where T : class;
100+
101+
/// <summary>
102+
/// Send a delayed message asynchronously.
103+
/// </summary>
104+
/// <typeparam name="T">Model class.</typeparam>
105+
/// <param name="object">Object message.</param>
106+
/// <param name="exchangeName">Exchange name.</param>
107+
/// <param name="routingKey">Routing key.</param>
108+
/// <param name="secondsDelay">Delay time.</param>
109+
Task SendAsync<T>(T @object, string exchangeName, string routingKey, int secondsDelay) where T : class;
110+
111+
/// <summary>
112+
/// Send a message asynchronously.
113+
/// </summary>
114+
/// <param name="json">Json message.</param>
115+
/// <param name="exchangeName">Exchange name.</param>
116+
/// <param name="routingKey">Routing key.</param>
117+
Task SendJsonAsync(string json, string exchangeName, string routingKey);
118+
119+
/// <summary>
120+
/// Send a delayed message asynchronously.
121+
/// </summary>
122+
/// <param name="json">Json message.</param>
123+
/// <param name="exchangeName">Exchange name.</param>
124+
/// <param name="routingKey">Routing key.</param>
125+
/// <param name="secondsDelay">Delay time.</param>
126+
Task SendJsonAsync(string json, string exchangeName, string routingKey, int secondsDelay);
127+
128+
/// <summary>
129+
/// Send a message asynchronously.
130+
/// </summary>
131+
/// <param name="message">Message.</param>
132+
/// <param name="exchangeName">Exchange name.</param>
133+
/// <param name="routingKey">Routing key.</param>
134+
Task SendStringAsync(string message, string exchangeName, string routingKey);
135+
136+
/// <summary>
137+
/// Send a delayed message asynchronously.
138+
/// </summary>
139+
/// <param name="message">Message.</param>
140+
/// <param name="exchangeName">Exchange name.</param>
141+
/// <param name="routingKey">Routing key.</param>
142+
/// <param name="secondsDelay">Delay time.</param>
143+
Task SendStringAsync(string message, string exchangeName, string routingKey, int secondsDelay);
144+
145+
/// <summary>
146+
/// Send a message asynchronously.
147+
/// </summary>
148+
/// <param name="bytes">Byte array message.</param>
149+
/// <param name="properties">Message properties.</param>
150+
/// <param name="exchangeName">Exchange name.</param>
151+
/// <param name="routingKey">Routing key.</param>
152+
Task SendAsync(byte[] bytes, IBasicProperties properties, string exchangeName, string routingKey);
153+
154+
/// <summary>
155+
/// Send a delayed message asynchronously.
156+
/// </summary>
157+
/// <param name="bytes">Byte array message.</param>
158+
/// <param name="properties">Message properties.</param>
159+
/// <param name="exchangeName">Exchange name.</param>
160+
/// <param name="routingKey">Routing key.</param>
161+
/// <param name="secondsDelay">Delay time.</param>
162+
Task SendAsync(byte[] bytes, IBasicProperties properties, string exchangeName, string routingKey, int secondsDelay);
163+
}
164+
}

0 commit comments

Comments
 (0)