Skip to content

Commit 0b50ab7

Browse files
committed
performance tweaks for event bursts
1 parent f812132 commit 0b50ab7

File tree

7 files changed

+109
-11
lines changed

7 files changed

+109
-11
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
namespace WorkflowCore.Interface
2+
{
3+
public interface IGreyList
4+
{
5+
void Add(string id);
6+
void Remove(string id);
7+
bool Contains(string id);
8+
}
9+
}

src/WorkflowCore/ServiceCollectionExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A
4848
services.AddTransient<IWorkflowErrorHandler, TerminateHandler>();
4949
services.AddTransient<IWorkflowErrorHandler, SuspendHandler>();
5050

51+
services.AddSingleton<IGreyList, GreyList>();
5152
services.AddSingleton<IWorkflowController, WorkflowController>();
5253
services.AddSingleton<IActivityController, ActivityController>();
5354
services.AddSingleton<IWorkflowHost, WorkflowHost>();

src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ internal class EventConsumer : QueueConsumer, IBackgroundTask
1616
private readonly IEventRepository _eventRepository;
1717
private readonly IDistributedLockProvider _lockProvider;
1818
private readonly IDateTimeProvider _datetimeProvider;
19+
private readonly IGreyList _greylist;
1920
protected override int MaxConcurrentItems => 2;
2021
protected override QueueType Queue => QueueType.Event;
2122

22-
public EventConsumer(IWorkflowRepository workflowRepository, ISubscriptionRepository subscriptionRepository, IEventRepository eventRepository, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, WorkflowOptions options, IDateTimeProvider datetimeProvider)
23+
public EventConsumer(IWorkflowRepository workflowRepository, ISubscriptionRepository subscriptionRepository, IEventRepository eventRepository, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, WorkflowOptions options, IDateTimeProvider datetimeProvider, IGreyList greylist)
2324
: base(queueProvider, loggerFactory, options)
2425
{
2526
_workflowRepository = workflowRepository;
27+
_greylist = greylist;
2628
_subscriptionRepository = subscriptionRepository;
2729
_eventRepository = eventRepository;
2830
_lockProvider = lockProvider;
@@ -41,6 +43,11 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
4143
{
4244
cancellationToken.ThrowIfCancellationRequested();
4345
var evt = await _eventRepository.GetEvent(itemId);
46+
if (evt.IsProcessed)
47+
{
48+
_greylist.Add($"evt:{evt.Id}");
49+
return;
50+
}
4451
if (evt.EventTime <= _datetimeProvider.UtcNow)
4552
{
4653
IEnumerable<EventSubscription> subs = null;
@@ -60,7 +67,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
6067
subs = await _subscriptionRepository.GetSubscriptions(evt.EventName, evt.EventKey, evt.EventTime);
6168
}
6269

63-
var toQueue = new List<string>();
70+
var toQueue = new HashSet<string>();
6471
var complete = true;
6572

6673
foreach (var sub in subs.ToList())
@@ -79,7 +86,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
7986
}
8087
}
8188

82-
private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, List<string> toQueue, CancellationToken cancellationToken)
89+
private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, HashSet<string> toQueue, CancellationToken cancellationToken)
8390
{
8491
foreach (var eventId in await _eventRepository.GetEvents(sub.EventName, sub.EventKey, sub.SubscribeAsOf))
8592
{

src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ internal class RunnablePoller : IBackgroundTask
1313
private readonly IDistributedLockProvider _lockProvider;
1414
private readonly IQueueProvider _queueProvider;
1515
private readonly ILogger _logger;
16+
private readonly IGreyList _greylist;
1617
private readonly WorkflowOptions _options;
1718
private Timer _pollTimer;
1819

19-
public RunnablePoller(IPersistenceProvider persistenceStore, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, WorkflowOptions options)
20+
public RunnablePoller(IPersistenceProvider persistenceStore, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, IGreyList greylist, WorkflowOptions options)
2021
{
2122
_persistenceStore = persistenceStore;
23+
_greylist = greylist;
2224
_queueProvider = queueProvider;
2325
_logger = loggerFactory.CreateLogger<RunnablePoller>();
2426
_lockProvider = lockProvider;
@@ -55,7 +57,13 @@ private async void PollRunnables(object target)
5557
var runnables = await _persistenceStore.GetRunnableInstances(DateTime.Now);
5658
foreach (var item in runnables)
5759
{
60+
if (_greylist.Contains($"wf:{item}"))
61+
{
62+
_logger.LogDebug($"Got greylisted workflow {item}");
63+
continue;
64+
}
5865
_logger.LogDebug("Got runnable instance {0}", item);
66+
_greylist.Add($"wf:{item}");
5967
await _queueProvider.QueueWork(item, QueueType.Workflow);
6068
}
6169
}
@@ -80,8 +88,15 @@ private async void PollRunnables(object target)
8088
var events = await _persistenceStore.GetRunnableEvents(DateTime.Now);
8189
foreach (var item in events.ToList())
8290
{
91+
if (_greylist.Contains($"evt:{item}"))
92+
{
93+
_logger.LogDebug($"Got greylisted event {item}");
94+
_greylist.Add($"evt:{item}");
95+
continue;
96+
}
8397
_logger.LogDebug($"Got unprocessed event {item}");
84-
await _queueProvider.QueueWork(item, QueueType.Event);
98+
_greylist.Add($"evt:{item}");
99+
await _queueProvider.QueueWork(item, QueueType.Event);
85100
}
86101
}
87102
finally

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,16 @@ internal class WorkflowConsumer : QueueConsumer, IBackgroundTask
1414
private readonly IDateTimeProvider _datetimeProvider;
1515
private readonly IPersistenceProvider _persistenceStore;
1616
private readonly IWorkflowExecutor _executor;
17+
private readonly IGreyList _greylist;
1718

1819
protected override int MaxConcurrentItems => Options.MaxConcurrentWorkflows;
1920
protected override QueueType Queue => QueueType.Workflow;
2021

21-
public WorkflowConsumer(IPersistenceProvider persistenceProvider, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, IWorkflowExecutor executor, IDateTimeProvider datetimeProvider, WorkflowOptions options)
22+
public WorkflowConsumer(IPersistenceProvider persistenceProvider, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, IWorkflowExecutor executor, IDateTimeProvider datetimeProvider, IGreyList greylist, WorkflowOptions options)
2223
: base(queueProvider, loggerFactory, options)
2324
{
2425
_persistenceStore = persistenceProvider;
26+
_greylist = greylist;
2527
_executor = executor;
2628
_lockProvider = lockProvider;
2729
_datetimeProvider = datetimeProvider;
@@ -43,7 +45,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
4345
cancellationToken.ThrowIfCancellationRequested();
4446
workflow = await _persistenceStore.GetWorkflowInstance(itemId);
4547
if (workflow.Status == WorkflowStatus.Runnable)
46-
{
48+
{
4749
try
4850
{
4951
result = await _executor.Execute(workflow);
@@ -52,6 +54,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5254
{
5355
await _persistenceStore.PersistWorkflow(workflow);
5456
await QueueProvider.QueueWork(itemId, QueueType.Index);
57+
_greylist.Remove($"wf:{itemId}");
5558
}
5659
}
5760
}

src/WorkflowCore/Services/GreyList.cs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
using Microsoft.Extensions.Logging;
2+
using System;
3+
using System.Collections.Concurrent;
4+
using System.Threading;
5+
using WorkflowCore.Interface;
6+
7+
namespace WorkflowCore.Services
8+
{
9+
public class GreyList : IGreyList, IDisposable
10+
{
11+
private readonly Timer _cycleTimer;
12+
private readonly ConcurrentDictionary<string, DateTime> _list;
13+
private readonly ILogger _logger;
14+
private const int CYCLE_TIME = 600;
15+
16+
public GreyList(ILoggerFactory loggerFactory)
17+
{
18+
_logger = loggerFactory.CreateLogger<GreyList>();
19+
_list = new ConcurrentDictionary<string, DateTime>();
20+
_cycleTimer = new Timer(new TimerCallback(Cycle), null, TimeSpan.FromSeconds(CYCLE_TIME), TimeSpan.FromSeconds(CYCLE_TIME));
21+
}
22+
23+
public void Add(string id)
24+
{
25+
_list.AddOrUpdate(id, DateTime.Now, (key, val) => DateTime.Now);
26+
}
27+
28+
public bool Contains(string id)
29+
{
30+
if (!_list.TryGetValue(id, out var start))
31+
return false;
32+
33+
var result = start > (DateTime.Now.AddMinutes(-2));
34+
35+
if (!result)
36+
_list.TryRemove(id, out var _);
37+
38+
return result;
39+
}
40+
41+
private void Cycle(object target)
42+
{
43+
try
44+
{
45+
_list.Clear();
46+
}
47+
catch (Exception ex)
48+
{
49+
_logger.LogError(ex, ex.Message);
50+
}
51+
}
52+
53+
public void Dispose()
54+
{
55+
_cycleTimer.Dispose();
56+
}
57+
58+
public void Remove(string id)
59+
{
60+
_list.TryRemove(id, out var _);
61+
}
62+
}
63+
}

src/WorkflowCore/WorkflowCore.csproj

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
1616
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
1717
<Description>Workflow Core is a light weight workflow engine targeting .NET Standard.</Description>
18-
<Version>3.2.0</Version>
19-
<AssemblyVersion>3.2.0.0</AssemblyVersion>
20-
<FileVersion>3.2.0.0</FileVersion>
18+
<Version>3.2.1</Version>
19+
<AssemblyVersion>3.2.1.0</AssemblyVersion>
20+
<FileVersion>3.2.1.0</FileVersion>
2121
<PackageReleaseNotes></PackageReleaseNotes>
2222
<PackageIconUrl>https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png</PackageIconUrl>
23-
<PackageVersion>3.2.0</PackageVersion>
23+
<PackageVersion>3.2.1</PackageVersion>
2424
</PropertyGroup>
2525

2626
<ItemGroup>

0 commit comments

Comments
 (0)