Skip to content

Commit 090334d

Browse files
author
Pavlo Korobov
committed
RabbitMQProvider.cs improvements
1 parent b80fc13 commit 090334d

File tree

4 files changed

+85
-24
lines changed

4 files changed

+85
-24
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using System;
2+
using System.Linq;
3+
using WorkflowCore.Interface;
4+
5+
namespace WorkflowCore.QueueProviders.RabbitMQ.Interfaces
6+
{
7+
public interface IRabbitMqQueueNameProvider
8+
{
9+
string GetQueueName(QueueType queue);
10+
}
11+
}

src/providers/WorkflowCore.QueueProviders.RabbitMQ/ServiceCollectionExtensions.cs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,52 @@
22
using System;
33
using System.Collections.Generic;
44
using System.Linq;
5-
using System.Threading.Tasks;
5+
using Microsoft.Extensions.DependencyInjection.Extensions;
6+
using WorkflowCore.Interface;
67
using WorkflowCore.Models;
8+
using WorkflowCore.QueueProviders.RabbitMQ.Interfaces;
79
using WorkflowCore.QueueProviders.RabbitMQ.Services;
810

911
namespace Microsoft.Extensions.DependencyInjection
1012
{
13+
public delegate IConnection RabbitMqConnectionFactory(IServiceProvider sp, string clientProvidedName);
14+
1115
public static class ServiceCollectionExtensions
1216
{
1317
public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options, IConnectionFactory connectionFactory)
1418
{
15-
options.UseQueueProvider(sp => new RabbitMQProvider(connectionFactory));
19+
if (options == null) throw new ArgumentNullException(nameof(options));
20+
if (connectionFactory == null) throw new ArgumentNullException(nameof(connectionFactory));
21+
22+
return options
23+
.UseRabbitMQ((sp, name) => connectionFactory.CreateConnection(name));
24+
}
25+
26+
public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options,
27+
IConnectionFactory connectionFactory,
28+
IEnumerable<string> hostnames)
29+
{
30+
if (options == null) throw new ArgumentNullException(nameof(options));
31+
if (connectionFactory == null) throw new ArgumentNullException(nameof(connectionFactory));
32+
if (hostnames == null) throw new ArgumentNullException(nameof(hostnames));
33+
34+
return options
35+
.UseRabbitMQ((sp, name) => connectionFactory.CreateConnection(hostnames.ToList(), name));
36+
}
37+
38+
public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options, RabbitMqConnectionFactory rabbitMqConnectionFactory)
39+
{
40+
if (options == null) throw new ArgumentNullException(nameof(options));
41+
if (rabbitMqConnectionFactory == null) throw new ArgumentNullException(nameof(rabbitMqConnectionFactory));
42+
43+
options.Services.AddSingleton(rabbitMqConnectionFactory);
44+
options.Services.TryAddTransient<IRabbitMqQueueNameProvider, DefaultRabbitMqQueueNameProvider>();
45+
options.UseQueueProvider(RabbitMqQueueProviderFactory);
46+
1647
return options;
1748
}
49+
50+
private static IQueueProvider RabbitMqQueueProviderFactory(IServiceProvider sp)
51+
=> new RabbitMQProvider(sp);
1852
}
1953
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using WorkflowCore.Interface;
2+
using WorkflowCore.QueueProviders.RabbitMQ.Interfaces;
3+
4+
namespace WorkflowCore.QueueProviders.RabbitMQ.Services
5+
{
6+
public class DefaultRabbitMqQueueNameProvider : IRabbitMqQueueNameProvider
7+
{
8+
public string GetQueueName(QueueType queue)
9+
{
10+
switch (queue)
11+
{
12+
case QueueType.Workflow:
13+
return "wfc.workflow_queue";
14+
case QueueType.Event:
15+
return "wfc.event_queue";
16+
case QueueType.Index:
17+
return "wfc.index_queue";
18+
default:
19+
return null;
20+
}
21+
}
22+
}
23+
}

src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,30 @@
77
using System.Text;
88
using System.Threading;
99
using System.Threading.Tasks;
10+
using Microsoft.Extensions.DependencyInjection;
1011
using WorkflowCore.Interface;
1112
using WorkflowCore.Models;
13+
using WorkflowCore.QueueProviders.RabbitMQ.Interfaces;
1214

1315
namespace WorkflowCore.QueueProviders.RabbitMQ.Services
1416
{
1517
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
1618
public class RabbitMQProvider : IQueueProvider
1719
{
18-
private readonly IConnectionFactory _connectionFactory;
20+
private readonly IRabbitMqQueueNameProvider _queueNameProvider;
21+
private readonly RabbitMqConnectionFactory _rabbitMqConnectionFactory;
22+
private readonly IServiceProvider _serviceProvider;
23+
1924
private IConnection _connection = null;
2025
private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.All };
2126

2227
public bool IsDequeueBlocking => false;
2328

24-
public RabbitMQProvider(IConnectionFactory connectionFactory)
29+
public RabbitMQProvider(IServiceProvider serviceProvider)
2530
{
26-
_connectionFactory = connectionFactory;
31+
_serviceProvider = serviceProvider;
32+
_queueNameProvider = _serviceProvider.GetRequiredService<IRabbitMqQueueNameProvider>();
33+
_rabbitMqConnectionFactory = _serviceProvider.GetRequiredService<RabbitMqConnectionFactory>();
2734
}
2835

2936
public async Task QueueWork(string id, QueueType queue)
@@ -33,9 +40,9 @@ public async Task QueueWork(string id, QueueType queue)
3340

3441
using (var channel = _connection.CreateModel())
3542
{
36-
channel.QueueDeclare(queue: GetQueueName(queue), durable: true, exclusive: false, autoDelete: false, arguments: null);
43+
channel.QueueDeclare(queue: _queueNameProvider.GetQueueName(queue), durable: true, exclusive: false, autoDelete: false, arguments: null);
3744
var body = Encoding.UTF8.GetBytes(id);
38-
channel.BasicPublish(exchange: "", routingKey: GetQueueName(queue), basicProperties: null, body: body);
45+
channel.BasicPublish(exchange: "", routingKey: _queueNameProvider.GetQueueName(queue), basicProperties: null, body: body);
3946
}
4047
}
4148

@@ -46,15 +53,15 @@ public async Task<string> DequeueWork(QueueType queue, CancellationToken cancell
4653

4754
using (var channel = _connection.CreateModel())
4855
{
49-
channel.QueueDeclare(queue: GetQueueName(queue),
56+
channel.QueueDeclare(queue: _queueNameProvider.GetQueueName(queue),
5057
durable: true,
5158
exclusive: false,
5259
autoDelete: false,
5360
arguments: null);
5461

5562
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
5663

57-
var msg = channel.BasicGet(GetQueueName(queue), false);
64+
var msg = channel.BasicGet(_queueNameProvider.GetQueueName(queue), false);
5865
if (msg != null)
5966
{
6067
var data = Encoding.UTF8.GetString(msg.Body);
@@ -76,7 +83,7 @@ public void Dispose()
7683

7784
public async Task Start()
7885
{
79-
_connection = _connectionFactory.CreateConnection("Workflow-Core");
86+
_connection = _rabbitMqConnectionFactory(_serviceProvider, "Workflow-Core");
8087
}
8188

8289
public async Task Stop()
@@ -88,20 +95,6 @@ public async Task Stop()
8895
}
8996
}
9097

91-
private string GetQueueName(QueueType queue)
92-
{
93-
switch (queue)
94-
{
95-
case QueueType.Workflow:
96-
return "wfc.workflow_queue";
97-
case QueueType.Event:
98-
return "wfc.event_queue";
99-
case QueueType.Index:
100-
return "wfc.index_queue";
101-
}
102-
return null;
103-
}
104-
10598
}
10699
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
107100
}

0 commit comments

Comments
 (0)