Skip to content

Commit 4e60782

Browse files
authored
Merge pull request #39 from danielgerlag/azure-storage
Azure storage
2 parents 747233c + 1c671e2 commit 4e60782

File tree

41 files changed

+424
-286
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+424
-286
lines changed

WorkflowCore.sln

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
Microsoft Visual Studio Solution File, Format Version 12.00
33
# Visual Studio 15
4-
VisualStudioVersion = 15.0.26430.6
4+
VisualStudioVersion = 15.0.26228.4
55
MinimumVisualStudioVersion = 10.0.40219.1
66
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{EF47161E-E399-451C-BDE8-E92AAD3BD761}"
77
EndProject
@@ -42,8 +42,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Persistence.Sq
4242
EndProject
4343
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.LockProviders.Redlock", "src\providers\WorkflowCore.LockProviders.Redlock\WorkflowCore.LockProviders.Redlock.csproj", "{05250D58-A59E-4212-8D55-E7BC0396E9F5}"
4444
EndProject
45-
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.LockProviders.ZooKeeper", "src\providers\WorkflowCore.LockProviders.ZooKeeper\WorkflowCore.LockProviders.ZooKeeper.csproj", "{3BBA51EE-765A-4772-A940-7F853E1010FC}"
46-
EndProject
4745
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.QueueProviders.RabbitMQ", "src\providers\WorkflowCore.QueueProviders.RabbitMQ\WorkflowCore.QueueProviders.RabbitMQ.csproj", "{AFAD87C7-B2EE-451E-BA7E-3F5A91358C48}"
4846
EndProject
4947
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample06", "src\samples\WorkflowCore.Sample06\WorkflowCore.Sample06.csproj", "{8FEAFD74-C304-4F75-BA38-4686BE55C891}"
@@ -88,6 +86,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.LockProviders.
8886
EndProject
8987
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Sample13", "src\samples\WorkflowCore.Sample13\WorkflowCore.Sample13.csproj", "{77C49ACA-203E-428C-A4DB-114DFE454988}"
9088
EndProject
89+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Providers.Azure", "src\providers\WorkflowCore.Providers.Azure\WorkflowCore.Providers.Azure.csproj", "{A2374B7C-4198-40B3-B8FE-FAC3DB3F2539}"
90+
EndProject
9191
Global
9292
GlobalSection(SolutionConfigurationPlatforms) = preSolution
9393
Debug|Any CPU = Debug|Any CPU
@@ -142,10 +142,6 @@ Global
142142
{05250D58-A59E-4212-8D55-E7BC0396E9F5}.Debug|Any CPU.Build.0 = Debug|Any CPU
143143
{05250D58-A59E-4212-8D55-E7BC0396E9F5}.Release|Any CPU.ActiveCfg = Release|Any CPU
144144
{05250D58-A59E-4212-8D55-E7BC0396E9F5}.Release|Any CPU.Build.0 = Release|Any CPU
145-
{3BBA51EE-765A-4772-A940-7F853E1010FC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
146-
{3BBA51EE-765A-4772-A940-7F853E1010FC}.Debug|Any CPU.Build.0 = Debug|Any CPU
147-
{3BBA51EE-765A-4772-A940-7F853E1010FC}.Release|Any CPU.ActiveCfg = Release|Any CPU
148-
{3BBA51EE-765A-4772-A940-7F853E1010FC}.Release|Any CPU.Build.0 = Release|Any CPU
149145
{AFAD87C7-B2EE-451E-BA7E-3F5A91358C48}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
150146
{AFAD87C7-B2EE-451E-BA7E-3F5A91358C48}.Debug|Any CPU.Build.0 = Debug|Any CPU
151147
{AFAD87C7-B2EE-451E-BA7E-3F5A91358C48}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -234,6 +230,10 @@ Global
234230
{77C49ACA-203E-428C-A4DB-114DFE454988}.Debug|Any CPU.Build.0 = Debug|Any CPU
235231
{77C49ACA-203E-428C-A4DB-114DFE454988}.Release|Any CPU.ActiveCfg = Release|Any CPU
236232
{77C49ACA-203E-428C-A4DB-114DFE454988}.Release|Any CPU.Build.0 = Release|Any CPU
233+
{A2374B7C-4198-40B3-B8FE-FAC3DB3F2539}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
234+
{A2374B7C-4198-40B3-B8FE-FAC3DB3F2539}.Debug|Any CPU.Build.0 = Debug|Any CPU
235+
{A2374B7C-4198-40B3-B8FE-FAC3DB3F2539}.Release|Any CPU.ActiveCfg = Release|Any CPU
236+
{A2374B7C-4198-40B3-B8FE-FAC3DB3F2539}.Release|Any CPU.Build.0 = Release|Any CPU
237237
EndGlobalSection
238238
GlobalSection(SolutionProperties) = preSolution
239239
HideSolutionNode = FALSE
@@ -254,7 +254,6 @@ Global
254254
{9274B938-3996-4FBA-AE2F-0C82009B1116} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2}
255255
{86BC1E05-E9CE-4E53-B324-885A2FDBCE74} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2}
256256
{05250D58-A59E-4212-8D55-E7BC0396E9F5} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2}
257-
{3BBA51EE-765A-4772-A940-7F853E1010FC} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2}
258257
{AFAD87C7-B2EE-451E-BA7E-3F5A91358C48} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2}
259258
{8FEAFD74-C304-4F75-BA38-4686BE55C891} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
260259
{37B598A8-B054-4ABA-884D-96AEF2511600} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
@@ -277,5 +276,6 @@ Global
277276
{F9F8F9CD-01D9-468B-856D-6A87F0762A01} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
278277
{AAE2E9F9-37EF-4AE1-A200-D37417C9040C} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2}
279278
{77C49ACA-203E-428C-A4DB-114DFE454988} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
279+
{A2374B7C-4198-40B3-B8FE-FAC3DB3F2539} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2}
280280
EndGlobalSection
281281
EndGlobal

src/WorkflowCore/Interface/IDistributedLockProvider.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ public interface IDistributedLockProvider
1515

1616
Task ReleaseLock(string Id);
1717

18-
void Start();
18+
Task Start();
1919

20-
void Stop();
20+
Task Stop();
2121
}
2222
}

src/WorkflowCore/Interface/IQueueProvider.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ public interface IQueueProvider : IDisposable
2727
/// <returns></returns>
2828
Task<string> DequeueWork(QueueType queue);
2929

30-
void Start();
31-
void Stop();
30+
Task Start();
31+
Task Stop();
3232

3333
}
3434

src/WorkflowCore/Services/EventThread.cs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Linq;
55
using System.Text;
66
using System.Threading;
7+
using System.Threading.Tasks;
78
using WorkflowCore.Interface;
89
using WorkflowCore.Models;
910

@@ -41,30 +42,30 @@ public void Stop()
4142
_thread.Join();
4243
}
4344

44-
private void RunEvents()
45+
private async void RunEvents()
4546
{
4647
while (!_shutdown)
4748
{
4849
try
4950
{
50-
var eventId = _queueProvider.DequeueWork(QueueType.Event).Result;
51+
var eventId = await _queueProvider.DequeueWork(QueueType.Event);
5152
if (eventId != null)
5253
{
53-
if (_lockProvider.AcquireLock($"evt:{eventId}").Result)
54+
if (await _lockProvider.AcquireLock($"evt:{eventId}"))
5455
{
5556
try
5657
{
57-
var evt = _persistenceStore.GetEvent(eventId).Result;
58+
var evt = await _persistenceStore.GetEvent(eventId);
5859
if (evt.EventTime <= DateTime.Now.ToUniversalTime())
5960
{
60-
var subs = _persistenceStore.GetSubcriptions(evt.EventName, evt.EventKey, evt.EventTime).Result;
61+
var subs = await _persistenceStore.GetSubcriptions(evt.EventName, evt.EventKey, evt.EventTime);
6162
var success = true;
6263

63-
foreach (var sub in subs)
64-
success = success && SeedSubscription(evt, sub);
64+
foreach (var sub in subs.ToList())
65+
success = success && await SeedSubscription(evt, sub);
6566

6667
if (success)
67-
_persistenceStore.MarkEventProcessed(eventId).Wait();
68+
await _persistenceStore.MarkEventProcessed(eventId);
6869
}
6970
}
7071
catch (Exception ex)
@@ -73,7 +74,7 @@ private void RunEvents()
7374
}
7475
finally
7576
{
76-
_lockProvider.ReleaseLock($"evt:{eventId}");
77+
await _lockProvider.ReleaseLock($"evt:{eventId}");
7778
}
7879
}
7980
else
@@ -84,7 +85,7 @@ private void RunEvents()
8485
}
8586
else
8687
{
87-
Thread.Sleep(_options.IdleTime); //no work
88+
await Task.Delay(_options.IdleTime); //no work
8889
}
8990

9091
}
@@ -95,13 +96,13 @@ private void RunEvents()
9596
}
9697
}
9798

98-
private bool SeedSubscription(Event evt, EventSubscription sub)
99+
private async Task<bool> SeedSubscription(Event evt, EventSubscription sub)
99100
{
100-
if (_lockProvider.AcquireLock(sub.WorkflowId).Result)
101+
if (await _lockProvider.AcquireLock(sub.WorkflowId))
101102
{
102103
try
103104
{
104-
var workflow = _persistenceStore.GetWorkflowInstance(sub.WorkflowId).Result;
105+
var workflow = await _persistenceStore.GetWorkflowInstance(sub.WorkflowId);
105106
var pointers = workflow.ExecutionPointers.Where(p => p.EventName == sub.EventName && p.EventKey == sub.EventKey && !p.EventPublished);
106107
foreach (var p in pointers)
107108
{
@@ -110,8 +111,8 @@ private bool SeedSubscription(Event evt, EventSubscription sub)
110111
p.Active = true;
111112
}
112113
workflow.NextExecution = 0;
113-
_persistenceStore.PersistWorkflow(workflow).Wait();
114-
_persistenceStore.TerminateSubscription(sub.Id).Wait();
114+
await _persistenceStore.PersistWorkflow(workflow);
115+
await _persistenceStore.TerminateSubscription(sub.Id);
115116
return true;
116117
}
117118
catch (Exception ex)
@@ -121,8 +122,8 @@ private bool SeedSubscription(Event evt, EventSubscription sub)
121122
}
122123
finally
123124
{
124-
_lockProvider.ReleaseLock(sub.WorkflowId).Wait();
125-
_queueProvider.QueueWork(sub.WorkflowId, QueueType.Workflow);
125+
await _lockProvider.ReleaseLock(sub.WorkflowId);
126+
await _queueProvider.QueueWork(sub.WorkflowId, QueueType.Workflow);
126127
}
127128
}
128129
else

src/WorkflowCore/Services/RunnablePoller.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,25 +45,25 @@ public void Stop()
4545
/// Poll the persistence store for workflows ready to run.
4646
/// Poll the persistence store for stashed unpublished events
4747
/// </summary>
48-
private void PollRunnables(object target)
48+
private async void PollRunnables(object target)
4949
{
5050
try
5151
{
52-
if (_lockProvider.AcquireLock("poll runnables").Result)
52+
if (await _lockProvider.AcquireLock("poll runnables"))
5353
{
5454
try
5555
{
5656
_logger.LogInformation("Polling for runnable workflows");
57-
var runnables = _persistenceStore.GetRunnableInstances().Result;
57+
var runnables = await _persistenceStore.GetRunnableInstances();
5858
foreach (var item in runnables)
5959
{
6060
_logger.LogDebug("Got runnable instance {0}", item);
61-
_queueProvider.QueueWork(item, QueueType.Workflow);
61+
await _queueProvider.QueueWork(item, QueueType.Workflow);
6262
}
6363
}
6464
finally
6565
{
66-
_lockProvider.ReleaseLock("poll runnables").Wait();
66+
await _lockProvider.ReleaseLock("poll runnables");
6767
}
6868
}
6969
}
@@ -74,21 +74,21 @@ private void PollRunnables(object target)
7474

7575
try
7676
{
77-
if (_lockProvider.AcquireLock("unprocessed events").Result)
77+
if (await _lockProvider.AcquireLock("unprocessed events"))
7878
{
7979
try
8080
{
8181
_logger.LogInformation("Polling for unprocessed events");
82-
var events = _persistenceStore.GetRunnableEvents().Result.ToList();
83-
foreach (var item in events)
82+
var events = await _persistenceStore.GetRunnableEvents();
83+
foreach (var item in events.ToList())
8484
{
8585
_logger.LogDebug($"Got unprocessed event {item}");
86-
_queueProvider.QueueWork(item, QueueType.Event);
86+
await _queueProvider.QueueWork(item, QueueType.Event);
8787
}
8888
}
8989
finally
9090
{
91-
_lockProvider.ReleaseLock("unprocessed events").Wait();
91+
await _lockProvider.ReleaseLock("unprocessed events");
9292
}
9393
}
9494
}

src/WorkflowCore/Services/SingleNodeLockProvider.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ public async Task ReleaseLock(string Id)
3737
}
3838
}
3939

40-
public void Start()
40+
public async Task Start()
4141
{
4242

4343
}
4444

45-
public void Stop()
45+
public async Task Stop()
4646
{
4747

4848
}

src/WorkflowCore/Services/SingleNodeQueueProvider.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ public async Task<string> DequeueWork(QueueType queue)
3232
return null;
3333
}
3434

35-
public void Start()
35+
public async Task Start()
3636
{
3737
}
3838

39-
public void Stop()
39+
public async Task Stop()
4040
{
4141
}
4242

src/WorkflowCore/Services/WorkflowHost.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ public async Task<string> StartWorkflow<TData>(string workflowId, int? version,
9292
public void Start()
9393
{
9494
_shutdown = false;
95-
QueueProvider.Start();
96-
LockProvider.Start();
95+
QueueProvider.Start().Wait();
96+
LockProvider.Start().Wait();
9797
for (int i = 0; i < Options.ThreadCount; i++)
9898
{
9999
Logger.LogInformation("Starting worker thread #{0}", i);
@@ -167,7 +167,7 @@ public void RegisterWorkflow<TWorkflow, TData>()
167167

168168
public async Task<bool> SuspendWorkflow(string workflowId)
169169
{
170-
if (LockProvider.AcquireLock(workflowId).Result)
170+
if (await LockProvider.AcquireLock(workflowId))
171171
{
172172
try
173173
{
@@ -190,7 +190,7 @@ public async Task<bool> SuspendWorkflow(string workflowId)
190190

191191
public async Task<bool> ResumeWorkflow(string workflowId)
192192
{
193-
if (LockProvider.AcquireLock(workflowId).Result)
193+
if (await LockProvider.AcquireLock(workflowId))
194194
{
195195
bool requeue = false;
196196
try
@@ -217,7 +217,7 @@ public async Task<bool> ResumeWorkflow(string workflowId)
217217

218218
public async Task<bool> TerminateWorkflow(string workflowId)
219219
{
220-
if (LockProvider.AcquireLock(workflowId).Result)
220+
if (await LockProvider.AcquireLock(workflowId))
221221
{
222222
try
223223
{

0 commit comments

Comments
 (0)