Skip to content

Commit d082e25

Browse files
committed
azure locks
1 parent 8bcd781 commit d082e25

File tree

4 files changed

+48
-36
lines changed

4 files changed

+48
-36
lines changed

src/WorkflowCore/Services/EventThread.cs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Linq;
55
using System.Text;
66
using System.Threading;
7+
using System.Threading.Tasks;
78
using WorkflowCore.Interface;
89
using WorkflowCore.Models;
910

@@ -41,30 +42,30 @@ public void Stop()
4142
_thread.Join();
4243
}
4344

44-
private void RunEvents()
45+
private async void RunEvents()
4546
{
4647
while (!_shutdown)
4748
{
4849
try
4950
{
50-
var eventId = _queueProvider.DequeueWork(QueueType.Event).Result;
51+
var eventId = await _queueProvider.DequeueWork(QueueType.Event);
5152
if (eventId != null)
5253
{
53-
if (_lockProvider.AcquireLock($"evt:{eventId}").Result)
54+
if (await _lockProvider.AcquireLock($"evt:{eventId}"))
5455
{
5556
try
5657
{
57-
var evt = _persistenceStore.GetEvent(eventId).Result;
58+
var evt = await _persistenceStore.GetEvent(eventId);
5859
if (evt.EventTime <= DateTime.Now.ToUniversalTime())
5960
{
60-
var subs = _persistenceStore.GetSubcriptions(evt.EventName, evt.EventKey, evt.EventTime).Result;
61+
var subs = await _persistenceStore.GetSubcriptions(evt.EventName, evt.EventKey, evt.EventTime);
6162
var success = true;
6263

63-
foreach (var sub in subs)
64-
success = success && SeedSubscription(evt, sub);
64+
foreach (var sub in subs.ToList())
65+
success = success && await SeedSubscription(evt, sub);
6566

6667
if (success)
67-
_persistenceStore.MarkEventProcessed(eventId).Wait();
68+
await _persistenceStore.MarkEventProcessed(eventId);
6869
}
6970
}
7071
catch (Exception ex)
@@ -73,7 +74,7 @@ private void RunEvents()
7374
}
7475
finally
7576
{
76-
_lockProvider.ReleaseLock($"evt:{eventId}");
77+
await _lockProvider.ReleaseLock($"evt:{eventId}");
7778
}
7879
}
7980
else
@@ -84,7 +85,7 @@ private void RunEvents()
8485
}
8586
else
8687
{
87-
Thread.Sleep(_options.IdleTime); //no work
88+
await Task.Delay(_options.IdleTime); //no work
8889
}
8990

9091
}
@@ -95,13 +96,13 @@ private void RunEvents()
9596
}
9697
}
9798

98-
private bool SeedSubscription(Event evt, EventSubscription sub)
99+
private async Task<bool> SeedSubscription(Event evt, EventSubscription sub)
99100
{
100-
if (_lockProvider.AcquireLock(sub.WorkflowId).Result)
101+
if (await _lockProvider.AcquireLock(sub.WorkflowId))
101102
{
102103
try
103104
{
104-
var workflow = _persistenceStore.GetWorkflowInstance(sub.WorkflowId).Result;
105+
var workflow = await _persistenceStore.GetWorkflowInstance(sub.WorkflowId);
105106
var pointers = workflow.ExecutionPointers.Where(p => p.EventName == sub.EventName && p.EventKey == sub.EventKey && !p.EventPublished);
106107
foreach (var p in pointers)
107108
{
@@ -110,8 +111,8 @@ private bool SeedSubscription(Event evt, EventSubscription sub)
110111
p.Active = true;
111112
}
112113
workflow.NextExecution = 0;
113-
_persistenceStore.PersistWorkflow(workflow).Wait();
114-
_persistenceStore.TerminateSubscription(sub.Id).Wait();
114+
await _persistenceStore.PersistWorkflow(workflow);
115+
await _persistenceStore.TerminateSubscription(sub.Id);
115116
return true;
116117
}
117118
catch (Exception ex)
@@ -121,8 +122,8 @@ private bool SeedSubscription(Event evt, EventSubscription sub)
121122
}
122123
finally
123124
{
124-
_lockProvider.ReleaseLock(sub.WorkflowId).Wait();
125-
_queueProvider.QueueWork(sub.WorkflowId, QueueType.Workflow);
125+
await _lockProvider.ReleaseLock(sub.WorkflowId);
126+
await _queueProvider.QueueWork(sub.WorkflowId, QueueType.Workflow);
126127
}
127128
}
128129
else

src/WorkflowCore/Services/WorkflowThread.cs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,24 +47,24 @@ public void Stop()
4747
/// <summary>
4848
/// Worker thread body
4949
/// </summary>
50-
private void RunWorkflows()
50+
private async void RunWorkflows()
5151
{
5252
while (!_shutdown)
5353
{
5454
try
5555
{
56-
var workflowId = _queueProvider.DequeueWork(QueueType.Workflow).Result;
56+
var workflowId = await _queueProvider.DequeueWork(QueueType.Workflow);
5757
if (workflowId != null)
5858
{
5959
try
6060
{
61-
if (_lockProvider.AcquireLock(workflowId).Result)
61+
if (await _lockProvider.AcquireLock(workflowId))
6262
{
6363
WorkflowInstance workflow = null;
6464
WorkflowExecutorResult result = null;
6565
try
6666
{
67-
workflow = _persistenceStore.GetWorkflowInstance(workflowId).Result;
67+
workflow = await _persistenceStore.GetWorkflowInstance(workflowId);
6868
if (workflow.Status == WorkflowStatus.Runnable)
6969
{
7070
try
@@ -73,22 +73,22 @@ private void RunWorkflows()
7373
}
7474
finally
7575
{
76-
_persistenceStore.PersistWorkflow(workflow).Wait();
76+
await _persistenceStore.PersistWorkflow(workflow);
7777
}
7878
}
7979
}
8080
finally
8181
{
82-
_lockProvider.ReleaseLock(workflowId).Wait();
82+
await _lockProvider.ReleaseLock(workflowId);
8383
if ((workflow != null) && (result != null))
8484
{
8585
foreach (var sub in result.Subscriptions)
86-
SubscribeEvent(sub);
86+
await SubscribeEvent(sub);
8787

88-
_persistenceStore.PersistErrors(result.Errors);
88+
await _persistenceStore.PersistErrors(result.Errors);
8989

9090
if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue && workflow.NextExecution.Value < DateTime.Now.ToUniversalTime().Ticks)
91-
_queueProvider.QueueWork(workflowId, QueueType.Workflow);
91+
await _queueProvider.QueueWork(workflowId, QueueType.Workflow);
9292
}
9393
}
9494
}
@@ -104,7 +104,7 @@ private void RunWorkflows()
104104
}
105105
else
106106
{
107-
Thread.Sleep(_options.IdleTime); //no work
107+
await Task.Delay(_options.IdleTime); //no work
108108
}
109109

110110
}
@@ -115,17 +115,17 @@ private void RunWorkflows()
115115
}
116116
}
117117

118-
private void SubscribeEvent(EventSubscription subscription)
118+
private async Task SubscribeEvent(EventSubscription subscription)
119119
{
120120
//TODO: move to own class
121121
_logger.LogDebug("Subscribing to event {0} {1} for workflow {2} step {3}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId);
122122

123-
_persistenceStore.CreateEventSubscription(subscription).Wait();
124-
var events = _persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf).Result;
123+
await _persistenceStore.CreateEventSubscription(subscription);
124+
var events = await _persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf);
125125
foreach (var evt in events)
126126
{
127-
_persistenceStore.MarkEventUnprocessed(evt).Wait();
128-
_queueProvider.QueueWork(evt, QueueType.Event);
127+
await _persistenceStore.MarkEventUnprocessed(evt);
128+
await _queueProvider.QueueWork(evt, QueueType.Event);
129129
}
130130
}
131131
}

src/providers/WorkflowCore.Providers.Azure/Services/AzureLockManager.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ public class AzureLockManager: IDistributedLockProvider
1919
private readonly ILogger _logger;
2020
private readonly List<ControlledLock> _locks = new List<ControlledLock>();
2121
private Timer _renewTimer;
22-
private TimeSpan LockTimeout => TimeSpan.FromMinutes(5);
23-
private TimeSpan RenewInterval => TimeSpan.FromMinutes(3);
22+
private TimeSpan LockTimeout => TimeSpan.FromMinutes(1);
23+
private TimeSpan RenewInterval => TimeSpan.FromSeconds(45);
2424

2525
public AzureLockManager(string connectionString, ILoggerFactory logFactory)
2626
{
@@ -67,7 +67,14 @@ public async Task ReleaseLock(string Id)
6767

6868
if (entry != null)
6969
{
70-
await entry.Blob.ReleaseLeaseAsync(AccessCondition.GenerateLeaseCondition(entry.Id));
70+
try
71+
{
72+
await entry.Blob.ReleaseLeaseAsync(AccessCondition.GenerateLeaseCondition(entry.Id));
73+
}
74+
catch (Exception ex)
75+
{
76+
_logger.LogError($"Error releasing lock - {ex.Message}");
77+
}
7178
lock (_locks)
7279
{
7380
_locks.Remove(entry);

src/samples/WorkflowCore.Sample04/Program.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,11 @@ private static IServiceProvider ConfigureServices()
4646
//services.AddWorkflow(x => x.UsePostgreSQL(@"Server=127.0.0.1;Port=5432;Database=workflow;User Id=postgres;", true, true));
4747
//services.AddWorkflow(x => x.UseSqlite(@"Data Source=database.db;", true));
4848

49-
services.AddWorkflow(x => x.UseAzureSyncronization(@"UseDevelopmentStorage=true"));
49+
services.AddWorkflow(x =>
50+
{
51+
x.UseAzureSyncronization(@"UseDevelopmentStorage=true");
52+
x.UseThreads(1);
53+
});
5054

5155
//redis = ConnectionMultiplexer.Connect("127.0.0.1");
5256
//services.AddWorkflow(x =>

0 commit comments

Comments
 (0)