Skip to content

Commit a0fcca2

Browse files
authored
Merge pull request #404 from glucaci/servicebus
Add Azure ServiceBus provider
2 parents 54022e6 + a00f3c6 commit a0fcca2

File tree

6 files changed

+155
-10
lines changed

6 files changed

+155
-10
lines changed

src/providers/WorkflowCore.Providers.Azure/README.md

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,33 @@
22

33
* Provides [DLM](https://en.wikipedia.org/wiki/Distributed_lock_manager) support on [Workflow Core](../../README.md) using Azure Blob Storage leases.
44
* Provides Queueing support on [Workflow Core](../../README.md) using Azure Storage queues.
5+
* Provides event hub support on [Workflow Core](../../README.md) backed by Azure Service Bus.
56

67
This makes it possible to have a cluster of nodes processing your workflows.
78

89
## Installing
910

1011
Install the NuGet package "WorkflowCore.Providers.Azure"
1112

13+
Using Nuget package console
1214
```
1315
PM> Install-Package WorkflowCore.Providers.Azure
1416
```
17+
Using .NET CLI
18+
```
19+
dotnet add package WorkflowCore.Providers.Azure
20+
```
1521

1622
## Usage
1723

18-
Use the .UseAzureSyncronization extension method when building your service provider.
24+
Use the `IServiceCollection` extension methods when building your service provider
25+
* .UseAzureSynchronization
26+
* .UseAzureServiceBusEventHub
1927

2028
```C#
21-
services.AddWorkflow(x => x.UseAzureSyncronization("azure storage connection string"));
29+
services.AddWorkflow(options =>
30+
{
31+
options.UseAzureSynchronization("azure storage connection string");
32+
options.UseAzureServiceBusEventHub("service bus connection string", "topic name", "subscription name");
33+
});
2234
```
Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,28 @@
1-
using System;
2-
using System.Collections.Generic;
3-
using System.Linq;
4-
using System.Threading.Tasks;
5-
using Microsoft.Extensions.Logging;
1+
using Microsoft.Extensions.Logging;
62
using WorkflowCore.Models;
73
using WorkflowCore.Providers.Azure.Services;
84

95
namespace Microsoft.Extensions.DependencyInjection
106
{
117
public static class ServiceCollectionExtensions
128
{
13-
public static WorkflowOptions UseAzureSyncronization(this WorkflowOptions options, string connectionString)
9+
public static WorkflowOptions UseAzureSynchronization(this WorkflowOptions options, string connectionString)
1410
{
1511
options.UseQueueProvider(sp => new AzureStorageQueueProvider(connectionString, sp.GetService<ILoggerFactory>()));
1612
options.UseDistributedLockManager(sp => new AzureLockManager(connectionString, sp.GetService<ILoggerFactory>()));
1713
return options;
1814
}
15+
16+
public static WorkflowOptions UseAzureServiceBusEventHub(
17+
this WorkflowOptions options,
18+
string connectionString,
19+
string topicName,
20+
string subscriptionName)
21+
{
22+
options.UseEventHub(sp => new ServiceBusLifeCycleEventHub(
23+
connectionString, topicName, subscriptionName, sp.GetService<ILoggerFactory>()));
24+
25+
return options;
26+
}
1927
}
2028
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Microsoft.Azure.ServiceBus;
7+
using Microsoft.Extensions.Logging;
8+
using Newtonsoft.Json;
9+
using WorkflowCore.Interface;
10+
using WorkflowCore.Models.LifeCycleEvents;
11+
12+
namespace WorkflowCore.Providers.Azure.Services
13+
{
14+
public class ServiceBusLifeCycleEventHub : ILifeCycleEventHub
15+
{
16+
private readonly ITopicClient _topicClient;
17+
private readonly ILogger _logger;
18+
private readonly ISubscriptionClient _subscriptionClient;
19+
private readonly ICollection<Action<LifeCycleEvent>> _subscribers =
20+
new HashSet<Action<LifeCycleEvent>>();
21+
private readonly JsonSerializerSettings _serializerSettings =
22+
new JsonSerializerSettings
23+
{
24+
TypeNameHandling = TypeNameHandling.All,
25+
ReferenceLoopHandling = ReferenceLoopHandling.Error,
26+
};
27+
28+
public ServiceBusLifeCycleEventHub(
29+
string connectionString,
30+
string topicName,
31+
string subscriptionName,
32+
ILoggerFactory logFactory)
33+
{
34+
_subscriptionClient = new SubscriptionClient(
35+
connectionString, topicName, subscriptionName);
36+
_topicClient = new TopicClient(connectionString, topicName);
37+
_logger = logFactory.CreateLogger(GetType());
38+
}
39+
40+
public async Task PublishNotification(LifeCycleEvent evt)
41+
{
42+
var payload = JsonConvert.SerializeObject(evt, _serializerSettings);
43+
var message = new Message(Encoding.Default.GetBytes(payload))
44+
{
45+
Label = evt.Reference
46+
};
47+
48+
await _topicClient.SendAsync(message);
49+
}
50+
51+
public void Subscribe(Action<LifeCycleEvent> action)
52+
{
53+
_subscribers.Add(action);
54+
}
55+
56+
public Task Start()
57+
{
58+
var sessionHandlerOptions = new SessionHandlerOptions(ExceptionHandler)
59+
{
60+
MaxConcurrentSessions = 1,
61+
AutoComplete = false
62+
};
63+
64+
_subscriptionClient.RegisterSessionHandler(
65+
MessageHandler, sessionHandlerOptions);
66+
67+
return Task.CompletedTask;
68+
}
69+
70+
public async Task Stop()
71+
{
72+
await _topicClient.CloseAsync();
73+
await _subscriptionClient.CloseAsync();
74+
}
75+
76+
private async Task MessageHandler(
77+
IMessageSession messageSession,
78+
Message message,
79+
CancellationToken cancellationToken)
80+
{
81+
try
82+
{
83+
var payload = Encoding.Default.GetString(message.Body);
84+
var evt = JsonConvert.DeserializeObject<LifeCycleEvent>(
85+
payload, _serializerSettings);
86+
87+
NotifySubscribers(evt);
88+
89+
await _subscriptionClient
90+
.CompleteAsync(message.SystemProperties.LockToken)
91+
.ConfigureAwait(false);
92+
}
93+
catch
94+
{
95+
await _subscriptionClient
96+
.AbandonAsync(message.SystemProperties.LockToken);
97+
}
98+
}
99+
100+
private Task ExceptionHandler(ExceptionReceivedEventArgs arg)
101+
{
102+
_logger.LogWarning(
103+
default, arg.Exception, "Error on receiving events");
104+
105+
return Task.CompletedTask;
106+
}
107+
108+
private void NotifySubscribers(LifeCycleEvent evt)
109+
{
110+
foreach (var subscriber in _subscribers)
111+
{
112+
try
113+
{
114+
subscriber(evt);
115+
}
116+
catch (Exception ex)
117+
{
118+
_logger.LogWarning(
119+
default, ex, $"Error on event subscriber: {ex.Message}");
120+
}
121+
}
122+
}
123+
}
124+
}

src/providers/WorkflowCore.Providers.Azure/WorkflowCore.Providers.Azure.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
</PropertyGroup>
2020

2121
<ItemGroup>
22+
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="4.0.0" />
2223
<PackageReference Include="WindowsAzure.Storage" Version="9.3.3" />
2324
</ItemGroup>
2425

src/samples/WorkflowCore.Sample04/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private static IServiceProvider ConfigureServices()
5151

5252
//services.AddWorkflow(x =>
5353
//{
54-
// x.UseAzureSyncronization(@"UseDevelopmentStorage=true");
54+
// x.UseAzureSynchronization(@"UseDevelopmentStorage=true");
5555
// x.UseMongoDB(@"mongodb://localhost:27017", "workflow9999");
5656
//});
5757

src/samples/WorkflowCore.Sample13/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ private static IServiceProvider ConfigureServices()
4141

4242
//services.AddWorkflow(x =>
4343
//{
44-
// x.UseAzureSyncronization(@"UseDevelopmentStorage=true");
44+
// x.UseAzureSynchronization(@"UseDevelopmentStorage=true");
4545
// x.UseMongoDB(@"mongodb://localhost:27017", "workflow-test002");
4646
//});
4747

0 commit comments

Comments
 (0)