Skip to content

Commit 8bcd781

Browse files
committed
azure storage lock and queue providers
1 parent 27926fa commit 8bcd781

File tree

8 files changed

+272
-6
lines changed

8 files changed

+272
-6
lines changed

WorkflowCore.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.LockProviders.
8888
EndProject
8989
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Sample13", "src\samples\WorkflowCore.Sample13\WorkflowCore.Sample13.csproj", "{77C49ACA-203E-428C-A4DB-114DFE454988}"
9090
EndProject
91+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Providers.Azure", "src\providers\WorkflowCore.Providers.Azure\WorkflowCore.Providers.Azure.csproj", "{A2374B7C-4198-40B3-B8FE-FAC3DB3F2539}"
92+
EndProject
9193
Global
9294
GlobalSection(SolutionConfigurationPlatforms) = preSolution
9395
Debug|Any CPU = Debug|Any CPU
@@ -234,6 +236,10 @@ Global
234236
{77C49ACA-203E-428C-A4DB-114DFE454988}.Debug|Any CPU.Build.0 = Debug|Any CPU
235237
{77C49ACA-203E-428C-A4DB-114DFE454988}.Release|Any CPU.ActiveCfg = Release|Any CPU
236238
{77C49ACA-203E-428C-A4DB-114DFE454988}.Release|Any CPU.Build.0 = Release|Any CPU
239+
{A2374B7C-4198-40B3-B8FE-FAC3DB3F2539}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
240+
{A2374B7C-4198-40B3-B8FE-FAC3DB3F2539}.Debug|Any CPU.Build.0 = Debug|Any CPU
241+
{A2374B7C-4198-40B3-B8FE-FAC3DB3F2539}.Release|Any CPU.ActiveCfg = Release|Any CPU
242+
{A2374B7C-4198-40B3-B8FE-FAC3DB3F2539}.Release|Any CPU.Build.0 = Release|Any CPU
237243
EndGlobalSection
238244
GlobalSection(SolutionProperties) = preSolution
239245
HideSolutionNode = FALSE
@@ -277,5 +283,6 @@ Global
277283
{F9F8F9CD-01D9-468B-856D-6A87F0762A01} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
278284
{AAE2E9F9-37EF-4AE1-A200-D37417C9040C} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2}
279285
{77C49ACA-203E-428C-A4DB-114DFE454988} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
286+
{A2374B7C-4198-40B3-B8FE-FAC3DB3F2539} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2}
280287
EndGlobalSection
281288
EndGlobal
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using Microsoft.WindowsAzure.Storage.Blob;
5+
6+
namespace WorkflowCore.Providers.Azure.Models
7+
{
8+
class ControlledLock
9+
{
10+
public string Id { get; set; }
11+
public string LeaseId { get; set; }
12+
public CloudBlockBlob Blob { get; set; }
13+
14+
public ControlledLock(string id, string leaseId, CloudBlockBlob blob)
15+
{
16+
Id = id;
17+
LeaseId = leaseId;
18+
Blob = blob;
19+
}
20+
}
21+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using Microsoft.Extensions.Logging;
6+
using WorkflowCore.Models;
7+
using WorkflowCore.Providers.Azure.Services;
8+
9+
namespace Microsoft.Extensions.DependencyInjection
10+
{
11+
public static class ServiceCollectionExtensions
12+
{
13+
public static WorkflowOptions UseAzureSyncronization(this WorkflowOptions options, string connectionString)
14+
{
15+
options.UseQueueProvider(sp => new AzureStorageQueueProvider(connectionString, sp.GetService<ILoggerFactory>()));
16+
options.UseDistributedLockManager(sp => new AzureLockManager(connectionString, sp.GetService<ILoggerFactory>()));
17+
return options;
18+
}
19+
}
20+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Microsoft.Extensions.Logging;
8+
using Microsoft.WindowsAzure.Storage;
9+
using Microsoft.WindowsAzure.Storage.Blob;
10+
using WorkflowCore.Interface;
11+
using WorkflowCore.Providers.Azure.Models;
12+
13+
namespace WorkflowCore.Providers.Azure.Services
14+
{
15+
public class AzureLockManager: IDistributedLockProvider
16+
{
17+
private readonly CloudBlobClient _client;
18+
private readonly CloudBlobContainer _container;
19+
private readonly ILogger _logger;
20+
private readonly List<ControlledLock> _locks = new List<ControlledLock>();
21+
private Timer _renewTimer;
22+
private TimeSpan LockTimeout => TimeSpan.FromMinutes(5);
23+
private TimeSpan RenewInterval => TimeSpan.FromMinutes(3);
24+
25+
public AzureLockManager(string connectionString, ILoggerFactory logFactory)
26+
{
27+
_logger = logFactory.CreateLogger<AzureLockManager>();
28+
var account = CloudStorageAccount.Parse(connectionString);
29+
_client = account.CreateCloudBlobClient();
30+
31+
_container = _client.GetContainerReference("workflowcore-locks");
32+
_container.CreateIfNotExistsAsync().Wait();
33+
}
34+
35+
public async Task<bool> AcquireLock(string Id)
36+
{
37+
var blob = _container.GetBlockBlobReference(Id);
38+
39+
if (!await blob.ExistsAsync())
40+
{
41+
await blob.UploadTextAsync(string.Empty);
42+
}
43+
44+
try
45+
{
46+
var leaseId = await blob.AcquireLeaseAsync(LockTimeout);
47+
lock (_locks)
48+
{
49+
_locks.Add(new ControlledLock(Id, leaseId, blob));
50+
}
51+
return true;
52+
}
53+
catch (StorageException ex)
54+
{
55+
_logger.LogDebug($"Failed to acquire lock {Id} - {ex.Message}");
56+
return false;
57+
}
58+
}
59+
60+
public async Task ReleaseLock(string Id)
61+
{
62+
ControlledLock entry = null;
63+
lock (_locks)
64+
{
65+
entry = _locks.FirstOrDefault(x => x.Id == Id);
66+
}
67+
68+
if (entry != null)
69+
{
70+
await entry.Blob.ReleaseLeaseAsync(AccessCondition.GenerateLeaseCondition(entry.Id));
71+
lock (_locks)
72+
{
73+
_locks.Remove(entry);
74+
}
75+
}
76+
}
77+
78+
public void Start()
79+
{
80+
_renewTimer = new Timer(RenewLeases, null, RenewInterval, RenewInterval);
81+
}
82+
83+
public void Stop()
84+
{
85+
if (_renewTimer == null)
86+
return;
87+
88+
_renewTimer.Dispose();
89+
_renewTimer = null;
90+
}
91+
92+
private void RenewLeases(object state)
93+
{
94+
_logger.LogDebug("Renewing active leases");
95+
lock (_locks)
96+
{
97+
foreach (var entry in _locks)
98+
RenewLock(entry);
99+
}
100+
}
101+
102+
private async Task RenewLock(ControlledLock entry)
103+
{
104+
try
105+
{
106+
await entry.Blob.RenewLeaseAsync(AccessCondition.GenerateLeaseCondition(entry.LeaseId));
107+
}
108+
catch (Exception ex)
109+
{
110+
_logger.LogError($"Error renewing lease - {ex.Message}");
111+
}
112+
}
113+
}
114+
115+
}
116+
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using System.Threading.Tasks;
5+
using Microsoft.Extensions.Logging;
6+
using Microsoft.WindowsAzure.Storage;
7+
using Microsoft.WindowsAzure.Storage.Queue;
8+
using WorkflowCore.Interface;
9+
10+
namespace WorkflowCore.Providers.Azure.Services
11+
{
12+
public class AzureStorageQueueProvider : IQueueProvider
13+
{
14+
private readonly ILogger _logger;
15+
private readonly CloudQueue _workflowQueue;
16+
private readonly CloudQueue _eventQueue;
17+
18+
public AzureStorageQueueProvider(string connectionString, ILoggerFactory logFactory)
19+
{
20+
_logger = logFactory.CreateLogger<AzureStorageQueueProvider>();
21+
var account = CloudStorageAccount.Parse(connectionString);
22+
var client = account.CreateCloudQueueClient();
23+
24+
_workflowQueue = client.GetQueueReference("workflowcore-workflows");
25+
_eventQueue = client.GetQueueReference("workflowcore-events");
26+
27+
_workflowQueue.CreateIfNotExistsAsync().Wait();
28+
_eventQueue.CreateIfNotExistsAsync().Wait();
29+
}
30+
31+
public async Task QueueWork(string id, QueueType queue)
32+
{
33+
var msg = new CloudQueueMessage(id);
34+
35+
switch (queue)
36+
{
37+
case QueueType.Workflow:
38+
await _workflowQueue.AddMessageAsync(msg);
39+
break;
40+
case QueueType.Event:
41+
await _eventQueue.AddMessageAsync(msg);
42+
break;
43+
}
44+
}
45+
46+
public async Task<string> DequeueWork(QueueType queue)
47+
{
48+
CloudQueue cloudQueue = null;
49+
switch (queue)
50+
{
51+
case QueueType.Workflow:
52+
cloudQueue = _workflowQueue;
53+
break;
54+
case QueueType.Event:
55+
cloudQueue = _eventQueue;
56+
break;
57+
}
58+
59+
if (cloudQueue == null)
60+
return null;
61+
62+
var msg = await cloudQueue.GetMessageAsync();
63+
64+
if (msg == null)
65+
return null;
66+
67+
await cloudQueue.DeleteMessageAsync(msg);
68+
return msg.AsString;
69+
}
70+
71+
public void Start()
72+
{
73+
}
74+
75+
public void Stop()
76+
{
77+
}
78+
79+
public void Dispose()
80+
{
81+
}
82+
}
83+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>netstandard1.3</TargetFramework>
5+
</PropertyGroup>
6+
7+
<ItemGroup>
8+
<PackageReference Include="WindowsAzure.Storage" Version="8.1.3" />
9+
</ItemGroup>
10+
11+
<ItemGroup>
12+
<ProjectReference Include="..\..\WorkflowCore\WorkflowCore.csproj" />
13+
</ItemGroup>
14+
15+
</Project>

src/samples/WorkflowCore.Sample04/Program.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,22 @@ private static IServiceProvider ConfigureServices()
4040
//setup dependency injection
4141
IServiceCollection services = new ServiceCollection();
4242
services.AddLogging();
43-
services.AddWorkflow();
43+
//services.AddWorkflow();
4444
//services.AddWorkflow(x => x.UseMongoDB(@"mongodb://localhost:27017", "workflow"));
4545
//services.AddWorkflow(x => x.UseSqlServer(@"Server=.\SQLEXPRESS;Database=WorkflowCore;Trusted_Connection=True;", true, true));
4646
//services.AddWorkflow(x => x.UsePostgreSQL(@"Server=127.0.0.1;Port=5432;Database=workflow;User Id=postgres;", true, true));
4747
//services.AddWorkflow(x => x.UseSqlite(@"Data Source=database.db;", true));
48+
49+
services.AddWorkflow(x => x.UseAzureSyncronization(@"UseDevelopmentStorage=true"));
50+
4851
//redis = ConnectionMultiplexer.Connect("127.0.0.1");
4952
//services.AddWorkflow(x =>
5053
//{
51-
// x.UseMongoDB(@"mongodb://192.168.0.12:27017", "workflow");
52-
//x.UseZeroMQLocking(5551, "192.168.0.29:5551".Split(';'));
53-
//x.UseZeroMQQueuing(5552, "192.168.0.29:5552".Split(';'));
54-
//x.UseRabbitMQ(new ConnectionFactory() { HostName = "localhost" });
55-
//x.UseRedlock(redis);
54+
// x.UseMongoDB(@"mongodb://192.168.0.12:27017", "workflow");
55+
//x.UseZeroMQLocking(5551, "192.168.0.29:5551".Split(';'));
56+
//x.UseZeroMQQueuing(5552, "192.168.0.29:5552".Split(';'));
57+
//x.UseRabbitMQ(new ConnectionFactory() { HostName = "localhost" });
58+
//x.UseRedlock(redis);
5659
//});
5760

5861

src/samples/WorkflowCore.Sample04/WorkflowCore.Sample04.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
<ProjectReference Include="..\..\providers\WorkflowCore.Persistence.PostgreSQL\WorkflowCore.Persistence.PostgreSQL.csproj" />
1717
<ProjectReference Include="..\..\providers\WorkflowCore.Persistence.Sqlite\WorkflowCore.Persistence.Sqlite.csproj" />
1818
<ProjectReference Include="..\..\providers\WorkflowCore.Persistence.SqlServer\WorkflowCore.Persistence.SqlServer.csproj" />
19+
<ProjectReference Include="..\..\providers\WorkflowCore.Providers.Azure\WorkflowCore.Providers.Azure.csproj" />
1920
<ProjectReference Include="..\..\WorkflowCore\WorkflowCore.csproj" />
2021
<ProjectReference Include="..\..\providers\WorkflowCore.LockProviders.Redlock\WorkflowCore.LockProviders.Redlock.csproj" />
2122
<ProjectReference Include="..\..\providers\WorkflowCore.LockProviders.ZeroMQ\WorkflowCore.LockProviders.ZeroMQ.csproj" />

0 commit comments

Comments
 (0)