Skip to content

Commit 2a34537

Browse files
committed
TPL migration
1 parent a0f929e commit 2a34537

File tree

43 files changed

+606
-501
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+606
-501
lines changed

src/WorkflowCore/Interface/IDistributedLockProvider.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Threading;
45
using System.Threading.Tasks;
56

67
namespace WorkflowCore.Interface
@@ -11,7 +12,7 @@ namespace WorkflowCore.Interface
1112
/// </remarks>
1213
public interface IDistributedLockProvider
1314
{
14-
Task<bool> AcquireLock(string Id);
15+
Task<bool> AcquireLock(string Id, CancellationToken cancellationToken);
1516

1617
Task ReleaseLock(string Id);
1718

src/WorkflowCore/Interface/IQueueProvider.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Threading;
45
using System.Threading.Tasks;
56
using WorkflowCore.Models;
67

@@ -25,7 +26,9 @@ public interface IQueueProvider : IDisposable
2526
/// If the queue is empty, NULL is returned
2627
/// </summary>
2728
/// <returns></returns>
28-
Task<string> DequeueWork(QueueType queue);
29+
Task<string> DequeueWork(QueueType queue, CancellationToken cancellationToken);
30+
31+
bool IsDequeueBlocking { get; }
2932

3033
Task Start();
3134
Task Stop();

src/WorkflowCore/Models/WorkflowOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public WorkflowOptions()
2020
{
2121
//set defaults
2222
PollInterval = TimeSpan.FromSeconds(10);
23-
IdleTime = TimeSpan.FromMilliseconds(500);
23+
IdleTime = TimeSpan.FromMilliseconds(100);
2424
ErrorRetryInterval = TimeSpan.FromSeconds(60);
2525

2626
QueueFactory = new Func<IServiceProvider, IQueueProvider>(sp => new SingleNodeQueueProvider());

src/WorkflowCore/Models/WorkflowStep.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ public virtual IStepBody ConstructBody(IServiceProvider serviceProvider)
6565
}
6666
return body;
6767
}
68-
6968
}
7069

7170
public enum ExecutionPipelineDirective { Next = 0, Defer = 1, EndWorkflow = 2 }

src/WorkflowCore/ServiceCollectionExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ public static void AddWorkflow(this IServiceCollection services, Action<Workflow
2424
services.AddSingleton<IWorkflowRegistry, WorkflowRegistry>();
2525
services.AddSingleton<WorkflowOptions>(options);
2626

27-
services.AddTransient<IBackgroundTask, WorkflowTask>();
28-
services.AddTransient<IBackgroundTask, EventTask>();
27+
services.AddTransient<IBackgroundTask, WorkflowTaskDispatcher>();
28+
services.AddTransient<IBackgroundTask, EventTaskDispatcher>();
2929
services.AddTransient<IBackgroundTask, RunnablePoller>();
3030

3131
services.AddSingleton<IWorkflowHost, WorkflowHost>();

src/WorkflowCore/Services/EventTask.cs

Lines changed: 0 additions & 140 deletions
This file was deleted.
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
using Microsoft.Extensions.Logging;
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Text;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using WorkflowCore.Interface;
9+
using WorkflowCore.Models;
10+
11+
namespace WorkflowCore.Services
12+
{
13+
class EventTaskDispatcher : QueueTaskDispatcher, IBackgroundTask
14+
{
15+
private readonly IPersistenceProvider _persistenceStore;
16+
private readonly IDistributedLockProvider _lockProvider;
17+
private readonly IDateTimeProvider _datetimeProvider;
18+
19+
protected override QueueType Queue => QueueType.Event;
20+
21+
public EventTaskDispatcher(IPersistenceProvider persistenceStore, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, WorkflowOptions options, IDateTimeProvider datetimeProvider)
22+
: base(queueProvider, loggerFactory, options)
23+
{
24+
_persistenceStore = persistenceStore;
25+
_lockProvider = lockProvider;
26+
_datetimeProvider = datetimeProvider;
27+
}
28+
29+
protected override async Task ProcessItem(string itemId, CancellationToken cancellationToken)
30+
{
31+
if (await _lockProvider.AcquireLock($"evt:{itemId}", cancellationToken))
32+
{
33+
try
34+
{
35+
cancellationToken.ThrowIfCancellationRequested();
36+
var evt = await _persistenceStore.GetEvent(itemId);
37+
if (evt.EventTime <= _datetimeProvider.Now.ToUniversalTime())
38+
{
39+
var subs = await _persistenceStore.GetSubcriptions(evt.EventName, evt.EventKey, evt.EventTime);
40+
var success = true;
41+
42+
foreach (var sub in subs.ToList())
43+
success = success && await SeedSubscription(evt, sub, cancellationToken);
44+
45+
if (success)
46+
await _persistenceStore.MarkEventProcessed(itemId);
47+
}
48+
}
49+
finally
50+
{
51+
await _lockProvider.ReleaseLock($"evt:{itemId}");
52+
}
53+
}
54+
else
55+
{
56+
Logger.LogInformation($"Event locked {itemId}");
57+
}
58+
}
59+
60+
private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, CancellationToken cancellationToken)
61+
{
62+
if (await _lockProvider.AcquireLock(sub.WorkflowId, cancellationToken))
63+
{
64+
try
65+
{
66+
var workflow = await _persistenceStore.GetWorkflowInstance(sub.WorkflowId);
67+
var pointers = workflow.ExecutionPointers.Where(p => p.EventName == sub.EventName && p.EventKey == sub.EventKey && !p.EventPublished);
68+
foreach (var p in pointers)
69+
{
70+
p.EventData = evt.EventData;
71+
p.EventPublished = true;
72+
p.Active = true;
73+
}
74+
workflow.NextExecution = 0;
75+
await _persistenceStore.PersistWorkflow(workflow);
76+
await _persistenceStore.TerminateSubscription(sub.Id);
77+
return true;
78+
}
79+
catch (Exception ex)
80+
{
81+
Logger.LogError(ex.Message);
82+
return false;
83+
}
84+
finally
85+
{
86+
await _lockProvider.ReleaseLock(sub.WorkflowId);
87+
await QueueProvider.QueueWork(sub.WorkflowId, QueueType.Workflow);
88+
}
89+
}
90+
else
91+
{
92+
Logger.LogInformation("Workflow locked {0}", sub.WorkflowId);
93+
return false;
94+
}
95+
}
96+
}
97+
}

0 commit comments

Comments
 (0)