Skip to content

Commit f41e185

Browse files
authored
Merge pull request #655 from shirados90/feature/rabbit-mq-multinode-connection
Feature/rabbit mq multinode connection
2 parents b80fc13 + d56579a commit f41e185

File tree

8 files changed

+146
-24
lines changed

8 files changed

+146
-24
lines changed

WorkflowCore.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample09s", "s
145145
EndProject
146146
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ScratchPad", "test\ScratchPad\ScratchPad.csproj", "{51BB7DCD-01DD-453D-A1E7-17E5E3DBB14C}"
147147
EndProject
148+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Tests.QueueProviders.RabbitMQ", "test\WorkflowCore.Tests.QueueProviders.RabbitMQ\WorkflowCore.Tests.QueueProviders.RabbitMQ.csproj", "{54DE20BA-EBA7-4BF0-9BD9-F03766849716}"
149+
EndProject
148150
Global
149151
GlobalSection(SolutionConfigurationPlatforms) = preSolution
150152
Debug|Any CPU = Debug|Any CPU
@@ -355,6 +357,10 @@ Global
355357
{51BB7DCD-01DD-453D-A1E7-17E5E3DBB14C}.Debug|Any CPU.Build.0 = Debug|Any CPU
356358
{51BB7DCD-01DD-453D-A1E7-17E5E3DBB14C}.Release|Any CPU.ActiveCfg = Release|Any CPU
357359
{51BB7DCD-01DD-453D-A1E7-17E5E3DBB14C}.Release|Any CPU.Build.0 = Release|Any CPU
360+
{54DE20BA-EBA7-4BF0-9BD9-F03766849716}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
361+
{54DE20BA-EBA7-4BF0-9BD9-F03766849716}.Debug|Any CPU.Build.0 = Debug|Any CPU
362+
{54DE20BA-EBA7-4BF0-9BD9-F03766849716}.Release|Any CPU.ActiveCfg = Release|Any CPU
363+
{54DE20BA-EBA7-4BF0-9BD9-F03766849716}.Release|Any CPU.Build.0 = Release|Any CPU
358364
EndGlobalSection
359365
GlobalSection(SolutionProperties) = preSolution
360366
HideSolutionNode = FALSE
@@ -414,6 +420,7 @@ Global
414420
{5BE6D628-B9DB-4C76-AAEB-8F3800509A84} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
415421
{E32CF21A-29CC-46D1-8044-FCC327F2B281} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
416422
{51BB7DCD-01DD-453D-A1E7-17E5E3DBB14C} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
423+
{54DE20BA-EBA7-4BF0-9BD9-F03766849716} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
417424
EndGlobalSection
418425
GlobalSection(ExtensibilityGlobals) = postSolution
419426
SolutionGuid = {DC0FA8D3-6449-4FDA-BB46-ECF58FAD23B4}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using System;
2+
using System.Linq;
3+
using WorkflowCore.Interface;
4+
5+
namespace WorkflowCore.QueueProviders.RabbitMQ.Interfaces
6+
{
7+
public interface IRabbitMqQueueNameProvider
8+
{
9+
string GetQueueName(QueueType queue);
10+
}
11+
}

src/providers/WorkflowCore.QueueProviders.RabbitMQ/ServiceCollectionExtensions.cs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,54 @@
22
using System;
33
using System.Collections.Generic;
44
using System.Linq;
5-
using System.Threading.Tasks;
5+
using Microsoft.Extensions.DependencyInjection.Extensions;
6+
using WorkflowCore.Interface;
67
using WorkflowCore.Models;
8+
using WorkflowCore.QueueProviders.RabbitMQ.Interfaces;
79
using WorkflowCore.QueueProviders.RabbitMQ.Services;
810

911
namespace Microsoft.Extensions.DependencyInjection
1012
{
13+
public delegate IConnection RabbitMqConnectionFactory(IServiceProvider sp, string clientProvidedName);
14+
1115
public static class ServiceCollectionExtensions
1216
{
1317
public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options, IConnectionFactory connectionFactory)
1418
{
15-
options.UseQueueProvider(sp => new RabbitMQProvider(connectionFactory));
19+
if (options == null) throw new ArgumentNullException(nameof(options));
20+
if (connectionFactory == null) throw new ArgumentNullException(nameof(connectionFactory));
21+
22+
return options
23+
.UseRabbitMQ((sp, name) => connectionFactory.CreateConnection(name));
24+
}
25+
26+
public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options,
27+
IConnectionFactory connectionFactory,
28+
IEnumerable<string> hostnames)
29+
{
30+
if (options == null) throw new ArgumentNullException(nameof(options));
31+
if (connectionFactory == null) throw new ArgumentNullException(nameof(connectionFactory));
32+
if (hostnames == null) throw new ArgumentNullException(nameof(hostnames));
33+
34+
return options
35+
.UseRabbitMQ((sp, name) => connectionFactory.CreateConnection(hostnames.ToList(), name));
36+
}
37+
38+
public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options, RabbitMqConnectionFactory rabbitMqConnectionFactory)
39+
{
40+
if (options == null) throw new ArgumentNullException(nameof(options));
41+
if (rabbitMqConnectionFactory == null) throw new ArgumentNullException(nameof(rabbitMqConnectionFactory));
42+
43+
options.Services.AddSingleton(rabbitMqConnectionFactory);
44+
options.Services.TryAddSingleton<IRabbitMqQueueNameProvider, DefaultRabbitMqQueueNameProvider>();
45+
options.UseQueueProvider(RabbitMqQueueProviderFactory);
46+
1647
return options;
1748
}
49+
50+
private static IQueueProvider RabbitMqQueueProviderFactory(IServiceProvider sp)
51+
=> new RabbitMQProvider(sp,
52+
sp.GetRequiredService<IRabbitMqQueueNameProvider>(),
53+
sp.GetRequiredService<RabbitMqConnectionFactory>());
1854
}
1955
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using WorkflowCore.Interface;
2+
using WorkflowCore.QueueProviders.RabbitMQ.Interfaces;
3+
4+
namespace WorkflowCore.QueueProviders.RabbitMQ.Services
5+
{
6+
public class DefaultRabbitMqQueueNameProvider : IRabbitMqQueueNameProvider
7+
{
8+
public string GetQueueName(QueueType queue)
9+
{
10+
switch (queue)
11+
{
12+
case QueueType.Workflow:
13+
return "wfc.workflow_queue";
14+
case QueueType.Event:
15+
return "wfc.event_queue";
16+
case QueueType.Index:
17+
return "wfc.index_queue";
18+
default:
19+
return null;
20+
}
21+
}
22+
}
23+
}

src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,32 @@
77
using System.Text;
88
using System.Threading;
99
using System.Threading.Tasks;
10+
using Microsoft.Extensions.DependencyInjection;
1011
using WorkflowCore.Interface;
1112
using WorkflowCore.Models;
13+
using WorkflowCore.QueueProviders.RabbitMQ.Interfaces;
1214

1315
namespace WorkflowCore.QueueProviders.RabbitMQ.Services
1416
{
1517
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
1618
public class RabbitMQProvider : IQueueProvider
1719
{
18-
private readonly IConnectionFactory _connectionFactory;
20+
private readonly IRabbitMqQueueNameProvider _queueNameProvider;
21+
private readonly RabbitMqConnectionFactory _rabbitMqConnectionFactory;
22+
private readonly IServiceProvider _serviceProvider;
23+
1924
private IConnection _connection = null;
2025
private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.All };
2126

2227
public bool IsDequeueBlocking => false;
2328

24-
public RabbitMQProvider(IConnectionFactory connectionFactory)
29+
public RabbitMQProvider(IServiceProvider serviceProvider,
30+
IRabbitMqQueueNameProvider queueNameProvider,
31+
RabbitMqConnectionFactory connectionFactory)
2532
{
26-
_connectionFactory = connectionFactory;
33+
_serviceProvider = serviceProvider;
34+
_queueNameProvider = queueNameProvider;
35+
_rabbitMqConnectionFactory = connectionFactory;
2736
}
2837

2938
public async Task QueueWork(string id, QueueType queue)
@@ -33,9 +42,9 @@ public async Task QueueWork(string id, QueueType queue)
3342

3443
using (var channel = _connection.CreateModel())
3544
{
36-
channel.QueueDeclare(queue: GetQueueName(queue), durable: true, exclusive: false, autoDelete: false, arguments: null);
45+
channel.QueueDeclare(queue: _queueNameProvider.GetQueueName(queue), durable: true, exclusive: false, autoDelete: false, arguments: null);
3746
var body = Encoding.UTF8.GetBytes(id);
38-
channel.BasicPublish(exchange: "", routingKey: GetQueueName(queue), basicProperties: null, body: body);
47+
channel.BasicPublish(exchange: "", routingKey: _queueNameProvider.GetQueueName(queue), basicProperties: null, body: body);
3948
}
4049
}
4150

@@ -46,15 +55,15 @@ public async Task<string> DequeueWork(QueueType queue, CancellationToken cancell
4655

4756
using (var channel = _connection.CreateModel())
4857
{
49-
channel.QueueDeclare(queue: GetQueueName(queue),
58+
channel.QueueDeclare(queue: _queueNameProvider.GetQueueName(queue),
5059
durable: true,
5160
exclusive: false,
5261
autoDelete: false,
5362
arguments: null);
5463

5564
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
5665

57-
var msg = channel.BasicGet(GetQueueName(queue), false);
66+
var msg = channel.BasicGet(_queueNameProvider.GetQueueName(queue), false);
5867
if (msg != null)
5968
{
6069
var data = Encoding.UTF8.GetString(msg.Body);
@@ -76,7 +85,7 @@ public void Dispose()
7685

7786
public async Task Start()
7887
{
79-
_connection = _connectionFactory.CreateConnection("Workflow-Core");
88+
_connection = _rabbitMqConnectionFactory(_serviceProvider, "Workflow-Core");
8089
}
8190

8291
public async Task Stop()
@@ -88,20 +97,6 @@ public async Task Stop()
8897
}
8998
}
9099

91-
private string GetQueueName(QueueType queue)
92-
{
93-
switch (queue)
94-
{
95-
case QueueType.Workflow:
96-
return "wfc.workflow_queue";
97-
case QueueType.Event:
98-
return "wfc.event_queue";
99-
case QueueType.Index:
100-
return "wfc.index_queue";
101-
}
102-
return null;
103-
}
104-
105100
}
106101
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
107102
}

src/providers/WorkflowCore.QueueProviders.RabbitMQ/WorkflowCore.QueueProviders.RabbitMQ.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
<Description>Queue provider for Workflow-core using RabbitMQ</Description>
2020
<AssemblyVersion>2.0.0.0</AssemblyVersion>
2121
<FileVersion>2.0.0.0</FileVersion>
22+
<PackageVersion>2.1.0</PackageVersion>
2223
</PropertyGroup>
2324

2425
<ItemGroup>
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using FluentAssertions;
2+
using WorkflowCore.Interface;
3+
using WorkflowCore.QueueProviders.RabbitMQ.Services;
4+
using Xunit;
5+
6+
namespace WorkflowCore.Tests.QueueProviders.RabbitMQ.Tests
7+
{
8+
public class DefaultRabbitMqQueueNameProviderTests
9+
{
10+
private readonly DefaultRabbitMqQueueNameProvider _sut;
11+
12+
public DefaultRabbitMqQueueNameProviderTests()
13+
{
14+
_sut = new DefaultRabbitMqQueueNameProvider();
15+
}
16+
17+
[Theory]
18+
[InlineData(QueueType.Event, "wfc.event_queue")]
19+
[InlineData(QueueType.Index, "wfc.index_queue")]
20+
[InlineData(QueueType.Workflow, "wfc.workflow_queue")]
21+
public void GetQueueName_ValidInput_ReturnsValidQueueName(QueueType queueType, string queueName)
22+
{
23+
var result = _sut.GetQueueName(queueType);
24+
25+
result.Should().Be(queueName);
26+
}
27+
}
28+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>netcoreapp3.1</TargetFramework>
5+
6+
<IsPackable>false</IsPackable>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<PackageReference Include="FluentAssertions" Version="5.10.3" />
11+
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
12+
<PackageReference Include="xunit" Version="2.4.0" />
13+
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" />
14+
<PackageReference Include="coverlet.collector" Version="1.2.0" />
15+
</ItemGroup>
16+
17+
<ItemGroup>
18+
<ProjectReference Include="..\..\src\providers\WorkflowCore.QueueProviders.RabbitMQ\WorkflowCore.QueueProviders.RabbitMQ.csproj" />
19+
</ItemGroup>
20+
21+
</Project>

0 commit comments

Comments
 (0)