Skip to content

Commit 1a7d9a9

Browse files
committed
reorg
resource pools
1 parent 5af8d23 commit 1a7d9a9

20 files changed

+197
-159
lines changed

src/WorkflowCore/Interface/IPersistenceProvider.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public interface IPersistenceProvider
1616

1717
Task PersistWorkflow(WorkflowInstance workflow);
1818

19-
Task<IEnumerable<string>> GetRunnableInstances();
19+
Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt);
2020

2121
Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take);
2222

@@ -32,7 +32,7 @@ public interface IPersistenceProvider
3232

3333
Task<Event> GetEvent(string id);
3434

35-
Task<IEnumerable<string>> GetRunnableEvents();
35+
Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt);
3636

3737
Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf);
3838

src/WorkflowCore/ServiceCollectionExtensions.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
using WorkflowCore.Services;
88
using WorkflowCore.Models;
99
using Microsoft.Extensions.Logging;
10+
using Microsoft.Extensions.ObjectPool;
1011
using WorkflowCore.Primitives;
12+
using WorkflowCore.Services.BackgroundTasks;
1113

1214
namespace Microsoft.Extensions.DependencyInjection
1315
{
@@ -24,15 +26,18 @@ public static void AddWorkflow(this IServiceCollection services, Action<Workflow
2426
services.AddSingleton<IWorkflowRegistry, WorkflowRegistry>();
2527
services.AddSingleton<WorkflowOptions>(options);
2628

27-
services.AddTransient<IBackgroundTask, WorkflowTaskDispatcher>();
28-
services.AddTransient<IBackgroundTask, EventTaskDispatcher>();
29+
services.AddTransient<IBackgroundTask, WorkflowConsumer>();
30+
services.AddTransient<IBackgroundTask, EventConsumer>();
2931
services.AddTransient<IBackgroundTask, RunnablePoller>();
3032

3133
services.AddSingleton<IWorkflowHost, WorkflowHost>();
3234
services.AddTransient<IWorkflowExecutor, WorkflowExecutor>();
3335
services.AddTransient<IWorkflowBuilder, WorkflowBuilder>();
3436
services.AddTransient<IDateTimeProvider, DateTimeProvider>();
3537

38+
services.AddTransient<IPooledObjectPolicy<IPersistenceProvider>, InjectedObjectPoolPolicy<IPersistenceProvider>>();
39+
services.AddTransient<IPooledObjectPolicy<IWorkflowExecutor>, InjectedObjectPoolPolicy<IWorkflowExecutor>>();
40+
3641
services.AddTransient<Foreach>();
3742
}
3843
}

src/WorkflowCore/Services/EventTaskDispatcher.cs renamed to src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,22 @@
1-
using Microsoft.Extensions.Logging;
2-
using System;
3-
using System.Collections.Generic;
1+
using System;
42
using System.Linq;
5-
using System.Text;
63
using System.Threading;
74
using System.Threading.Tasks;
5+
using Microsoft.Extensions.Logging;
86
using WorkflowCore.Interface;
97
using WorkflowCore.Models;
108

11-
namespace WorkflowCore.Services
9+
namespace WorkflowCore.Services.BackgroundTasks
1210
{
13-
class EventTaskDispatcher : QueueTaskDispatcher, IBackgroundTask
11+
internal class EventConsumer : QueueConsumer, IBackgroundTask
1412
{
1513
private readonly IPersistenceProvider _persistenceStore;
1614
private readonly IDistributedLockProvider _lockProvider;
1715
private readonly IDateTimeProvider _datetimeProvider;
1816

1917
protected override QueueType Queue => QueueType.Event;
2018

21-
public EventTaskDispatcher(IPersistenceProvider persistenceStore, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, WorkflowOptions options, IDateTimeProvider datetimeProvider)
19+
public EventConsumer(IPersistenceProvider persistenceStore, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, WorkflowOptions options, IDateTimeProvider datetimeProvider)
2220
: base(queueProvider, loggerFactory, options)
2321
{
2422
_persistenceStore = persistenceStore;

src/WorkflowCore/Services/QueueTaskDispatcher.cs renamed to src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
1-
using Microsoft.Extensions.Logging;
2-
using System;
3-
using System.Collections.Generic;
1+
using System;
42
using System.Threading;
53
using System.Threading.Tasks;
64
using System.Threading.Tasks.Dataflow;
5+
using Microsoft.Extensions.Logging;
76
using WorkflowCore.Interface;
87
using WorkflowCore.Models;
98

10-
namespace WorkflowCore.Services
9+
namespace WorkflowCore.Services.BackgroundTasks
1110
{
12-
public abstract class QueueTaskDispatcher : IBackgroundTask
11+
internal abstract class QueueConsumer : IBackgroundTask
1312
{
1413
protected abstract QueueType Queue { get; }
1514
protected virtual int MaxConcurrentItems => Math.Max(Environment.ProcessorCount, 2);
@@ -20,7 +19,7 @@ public abstract class QueueTaskDispatcher : IBackgroundTask
2019
protected Task DispatchTask;
2120
private CancellationTokenSource _cancellationTokenSource;
2221

23-
protected QueueTaskDispatcher(IQueueProvider queueProvider, ILoggerFactory loggerFactory, WorkflowOptions options)
22+
protected QueueConsumer(IQueueProvider queueProvider, ILoggerFactory loggerFactory, WorkflowOptions options)
2423
{
2524
QueueProvider = queueProvider;
2625
Options = options;
@@ -62,20 +61,20 @@ private async void Execute()
6261
{
6362
try
6463
{
65-
if (SpinWait.SpinUntil(() => actionBlock.InputCount == 0, Options.IdleTime))
66-
{
67-
var item = await QueueProvider.DequeueWork(Queue, cancelToken);
64+
if (!SpinWait.SpinUntil(() => actionBlock.InputCount == 0, Options.IdleTime))
65+
continue;
6866

69-
if (item == null)
70-
{
71-
if (!QueueProvider.IsDequeueBlocking)
72-
await Task.Delay(Options.IdleTime, cancelToken);
73-
continue;
74-
}
67+
var item = await QueueProvider.DequeueWork(Queue, cancelToken);
7568

76-
if (!actionBlock.Post(item))
77-
await QueueProvider.QueueWork(item, Queue);
69+
if (item == null)
70+
{
71+
if (!QueueProvider.IsDequeueBlocking)
72+
await Task.Delay(Options.IdleTime, cancelToken);
73+
continue;
7874
}
75+
76+
if (!actionBlock.Post(item))
77+
await QueueProvider.QueueWork(item, Queue);
7978
}
8079
catch (OperationCanceledException)
8180
{

src/WorkflowCore/Services/RunnablePoller.cs renamed to src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
1-
using Microsoft.Extensions.Logging;
2-
using System;
3-
using System.Collections.Generic;
1+
using System;
42
using System.Linq;
5-
using System.Text;
63
using System.Threading;
4+
using Microsoft.Extensions.Logging;
75
using WorkflowCore.Interface;
86
using WorkflowCore.Models;
97

10-
namespace WorkflowCore.Services
8+
namespace WorkflowCore.Services.BackgroundTasks
119
{
12-
class RunnablePoller : IBackgroundTask
10+
internal class RunnablePoller : IBackgroundTask
1311
{
1412
private readonly IPersistenceProvider _persistenceStore;
1513
private readonly IDistributedLockProvider _lockProvider;
@@ -54,7 +52,7 @@ private async void PollRunnables(object target)
5452
try
5553
{
5654
_logger.LogInformation("Polling for runnable workflows");
57-
var runnables = await _persistenceStore.GetRunnableInstances();
55+
var runnables = await _persistenceStore.GetRunnableInstances(DateTime.Now);
5856
foreach (var item in runnables)
5957
{
6058
_logger.LogDebug("Got runnable instance {0}", item);
@@ -79,7 +77,7 @@ private async void PollRunnables(object target)
7977
try
8078
{
8179
_logger.LogInformation("Polling for unprocessed events");
82-
var events = await _persistenceStore.GetRunnableEvents();
80+
var events = await _persistenceStore.GetRunnableEvents(DateTime.Now);
8381
foreach (var item in events.ToList())
8482
{
8583
_logger.LogDebug($"Got unprocessed event {item}");
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Microsoft.Extensions.Logging;
5+
using Microsoft.Extensions.ObjectPool;
6+
using WorkflowCore.Interface;
7+
using WorkflowCore.Models;
8+
9+
namespace WorkflowCore.Services.BackgroundTasks
10+
{
11+
internal class WorkflowConsumer : QueueConsumer, IBackgroundTask
12+
{
13+
private readonly IDistributedLockProvider _lockProvider;
14+
private readonly IDateTimeProvider _datetimeProvider;
15+
private readonly ObjectPool<IPersistenceProvider> _persistenceStorePool;
16+
private readonly ObjectPool<IWorkflowExecutor> _executorPool;
17+
18+
protected override QueueType Queue => QueueType.Workflow;
19+
20+
public WorkflowConsumer(IPooledObjectPolicy<IPersistenceProvider> persistencePoolPolicy, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, IPooledObjectPolicy<IWorkflowExecutor> executorPoolPolicy, IDateTimeProvider datetimeProvider, WorkflowOptions options)
21+
: base(queueProvider, loggerFactory, options)
22+
{
23+
_persistenceStorePool = new DefaultObjectPool<IPersistenceProvider>(persistencePoolPolicy);
24+
_executorPool = new DefaultObjectPool<IWorkflowExecutor>(executorPoolPolicy);
25+
_lockProvider = lockProvider;
26+
_datetimeProvider = datetimeProvider;
27+
}
28+
29+
protected override async Task ProcessItem(string itemId, CancellationToken cancellationToken)
30+
{
31+
if (await _lockProvider.AcquireLock(itemId, cancellationToken))
32+
{
33+
WorkflowInstance workflow = null;
34+
WorkflowExecutorResult result = null;
35+
var persistenceStore = _persistenceStorePool.Get();
36+
try
37+
{
38+
try
39+
{
40+
cancellationToken.ThrowIfCancellationRequested();
41+
workflow = await persistenceStore.GetWorkflowInstance(itemId);
42+
if (workflow.Status == WorkflowStatus.Runnable)
43+
{
44+
var executor = _executorPool.Get();
45+
try
46+
{
47+
result = await executor.Execute(workflow, Options);
48+
}
49+
finally
50+
{
51+
_executorPool.Return(executor);
52+
await persistenceStore.PersistWorkflow(workflow);
53+
}
54+
}
55+
}
56+
finally
57+
{
58+
await _lockProvider.ReleaseLock(itemId);
59+
if ((workflow != null) && (result != null))
60+
{
61+
foreach (var sub in result.Subscriptions)
62+
await SubscribeEvent(sub, persistenceStore);
63+
64+
await persistenceStore.PersistErrors(result.Errors);
65+
66+
var readAheadTicks = _datetimeProvider.Now.Add(Options.PollInterval).ToUniversalTime().Ticks;
67+
68+
if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue && workflow.NextExecution.Value < readAheadTicks)
69+
{
70+
new Task(() => FutureQueue(workflow, cancellationToken)).Start();
71+
}
72+
}
73+
}
74+
}
75+
finally
76+
{
77+
_persistenceStorePool.Return(persistenceStore);
78+
}
79+
}
80+
else
81+
{
82+
Logger.LogInformation("Workflow locked {0}", itemId);
83+
}
84+
}
85+
86+
private async Task SubscribeEvent(EventSubscription subscription, IPersistenceProvider persistenceStore)
87+
{
88+
//TODO: move to own class
89+
Logger.LogDebug("Subscribing to event {0} {1} for workflow {2} step {3}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId);
90+
91+
await persistenceStore.CreateEventSubscription(subscription);
92+
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf);
93+
foreach (var evt in events)
94+
{
95+
await persistenceStore.MarkEventUnprocessed(evt);
96+
await QueueProvider.QueueWork(evt, QueueType.Event);
97+
}
98+
}
99+
100+
private async void FutureQueue(WorkflowInstance workflow, CancellationToken cancellationToken)
101+
{
102+
try
103+
{
104+
if (!workflow.NextExecution.HasValue)
105+
return;
106+
107+
var target = (workflow.NextExecution.Value - _datetimeProvider.Now.ToUniversalTime().Ticks);
108+
if (target > 0)
109+
await Task.Delay(TimeSpan.FromTicks(target), cancellationToken);
110+
111+
await QueueProvider.QueueWork(workflow.Id, QueueType.Workflow);
112+
}
113+
catch (Exception ex)
114+
{
115+
Logger.LogError(ex.Message);
116+
}
117+
}
118+
}
119+
}

src/WorkflowCore/Services/MemoryPersistenceProvider.cs renamed to src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ public async Task PersistWorkflow(WorkflowInstance workflow)
4040
}
4141
}
4242

43-
public async Task<IEnumerable<string>> GetRunnableInstances()
43+
public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt)
4444
{
4545
lock (_instances)
4646
{
47-
var now = DateTime.Now.ToUniversalTime().Ticks;
47+
var now = asAt.ToUniversalTime().Ticks;
4848
return _instances.Where(x => x.NextExecution.HasValue && x.NextExecution <= now).Select(x => x.Id).ToList();
4949
}
5050
}
@@ -132,13 +132,13 @@ public async Task MarkEventProcessed(string id)
132132
}
133133
}
134134

135-
public async Task<IEnumerable<string>> GetRunnableEvents()
135+
public async Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt)
136136
{
137137
lock (_events)
138138
{
139139
return _events
140140
.Where(x => !x.IsProcessed)
141-
.Where(x => x.EventTime <= DateTime.Now.ToUniversalTime())
141+
.Where(x => x.EventTime <= asAt.ToUniversalTime())
142142
.Select(x => x.Id)
143143
.ToList();
144144
}
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)