Skip to content

Commit 8c8bee9

Browse files
authored
Merge branch 'danielgerlag:master' into master
2 parents 2ff5282 + f3042be commit 8c8bee9

File tree

7 files changed

+153
-16
lines changed

7 files changed

+153
-16
lines changed

src/Directory.Build.props

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
<PackageLicenseUrl>https://github.com/danielgerlag/workflow-core/blob/master/LICENSE.md</PackageLicenseUrl>
55
<RepositoryType>git</RepositoryType>
66
<RepositoryUrl>https://github.com/danielgerlag/workflow-core.git</RepositoryUrl>
7-
<Version>3.5.1</Version>
8-
<AssemblyVersion>3.5.1.0</AssemblyVersion>
9-
<FileVersion>3.5.1.0</FileVersion>
7+
<Version>3.5.3</Version>
8+
<AssemblyVersion>3.5.3.0</AssemblyVersion>
9+
<FileVersion>3.5.3.0</FileVersion>
1010
<PackageIconUrl>https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png</PackageIconUrl>
11-
<PackageVersion>3.5.1</PackageVersion>
11+
<PackageVersion>3.5.3</PackageVersion>
1212
</PropertyGroup>
1313
</Project>

src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,13 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
7474
complete = complete && await SeedSubscription(evt, sub, toQueue, cancellationToken);
7575

7676
if (complete)
77+
{
7778
await _eventRepository.MarkEventProcessed(itemId, cancellationToken);
79+
}
80+
else
81+
{
82+
_greylist.Remove($"evt:{evt.Id}");
83+
}
7884

7985
foreach (var eventId in toQueue)
8086
await QueueProvider.QueueWork(eventId, QueueType.Event);
@@ -87,7 +93,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
8793
}
8894

8995
private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, HashSet<string> toQueue, CancellationToken cancellationToken)
90-
{
96+
{
9197
foreach (var eventId in await _eventRepository.GetEvents(sub.EventName, sub.EventKey, sub.SubscribeAsOf, cancellationToken))
9298
{
9399
if (eventId == evt.Id)

src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ private async void PollRunnables(object target)
9393
if (_greylist.Contains($"evt:{item}"))
9494
{
9595
_logger.LogDebug($"Got greylisted event {item}");
96-
_greylist.Add($"evt:{item}");
9796
continue;
9897
}
9998
_logger.LogDebug($"Got unprocessed event {item}");

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
3535
Logger.LogInformation("Workflow locked {0}", itemId);
3636
return;
3737
}
38-
38+
3939
WorkflowInstance workflow = null;
4040
WorkflowExecutorResult result = null;
41-
41+
4242
try
4343
{
4444
cancellationToken.ThrowIfCancellationRequested();
@@ -77,22 +77,53 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
7777
}
7878
}
7979
}
80-
80+
8181
}
82-
82+
8383
private async Task SubscribeEvent(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken)
8484
{
8585
//TODO: move to own class
8686
Logger.LogDebug("Subscribing to event {0} {1} for workflow {2} step {3}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId);
87-
87+
8888
await persistenceStore.CreateEventSubscription(subscription, cancellationToken);
8989
if (subscription.EventName != Event.EventTypeActivity)
9090
{
9191
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken);
92+
9293
foreach (var evt in events)
9394
{
94-
await persistenceStore.MarkEventUnprocessed(evt, cancellationToken);
95-
await QueueProvider.QueueWork(evt, QueueType.Event);
95+
var eventKey = $"evt:{evt}";
96+
bool acquiredLock = false;
97+
try
98+
{
99+
acquiredLock = await _lockProvider.AcquireLock(eventKey, cancellationToken);
100+
int attempt = 0;
101+
while (!acquiredLock && attempt < 10)
102+
{
103+
await Task.Delay(Options.IdleTime, cancellationToken);
104+
acquiredLock = await _lockProvider.AcquireLock(eventKey, cancellationToken);
105+
106+
attempt++;
107+
}
108+
109+
if (!acquiredLock)
110+
{
111+
Logger.LogWarning($"Failed to lock {evt}");
112+
}
113+
else
114+
{
115+
_greylist.Remove(eventKey);
116+
await persistenceStore.MarkEventUnprocessed(evt, cancellationToken);
117+
await QueueProvider.QueueWork(evt, QueueType.Event);
118+
}
119+
}
120+
finally
121+
{
122+
if (acquiredLock)
123+
{
124+
await _lockProvider.ReleaseLock(eventKey);
125+
}
126+
}
96127
}
97128
}
98129
}

src/WorkflowCore/Services/ErrorHandlers/SuspendHandler.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP
2929
WorkflowDefinitionId = workflow.WorkflowDefinitionId,
3030
Version = workflow.Version
3131
});
32+
33+
step.PrimeForRetry(pointer);
3234
}
3335
}
3436
}

src/providers/WorkflowCore.Persistence.MongoDB/ServiceCollectionExtensions.cs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,25 @@ namespace Microsoft.Extensions.DependencyInjection
99
{
1010
public static class ServiceCollectionExtensions
1111
{
12-
public static WorkflowOptions UseMongoDB(this WorkflowOptions options, string mongoUrl, string databaseName)
12+
public static WorkflowOptions UseMongoDB(
13+
this WorkflowOptions options,
14+
string mongoUrl,
15+
string databaseName,
16+
Action<MongoClientSettings> configureClient = default)
1317
{
1418
options.UsePersistence(sp =>
1519
{
16-
var client = new MongoClient(mongoUrl);
20+
var mongoClientSettings = MongoClientSettings.FromConnectionString(mongoUrl);
21+
configureClient?.Invoke(mongoClientSettings);
22+
var client = new MongoClient(mongoClientSettings);
1723
var db = client.GetDatabase(databaseName);
1824
return new MongoPersistenceProvider(db);
1925
});
2026
options.Services.AddTransient<IWorkflowPurger>(sp =>
2127
{
22-
var client = new MongoClient(mongoUrl);
28+
var mongoClientSettings = MongoClientSettings.FromConnectionString(mongoUrl);
29+
configureClient?.Invoke(mongoClientSettings);
30+
var client = new MongoClient(mongoClientSettings);
2331
var db = client.GetDatabase(databaseName);
2432
return new WorkflowPurger(db);
2533
});
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
using System;
2+
using WorkflowCore.Interface;
3+
using WorkflowCore.Models;
4+
using Xunit;
5+
using FluentAssertions;
6+
using WorkflowCore.Testing;
7+
using System.Threading;
8+
9+
namespace WorkflowCore.IntegrationTests.Scenarios
10+
{
11+
public sealed class ParallelEventsScenario
12+
: WorkflowTest<ParallelEventsScenario.ParallelEventsWorkflow, ParallelEventsScenario.MyDataClass>
13+
{
14+
private const string EVENT_KEY = nameof(EVENT_KEY);
15+
16+
public class MyDataClass
17+
{
18+
public string StrValue1 { get; set; }
19+
public string StrValue2 { get; set; }
20+
}
21+
22+
public class ParallelEventsWorkflow : IWorkflow<MyDataClass>
23+
{
24+
public string Id => "EventWorkflow";
25+
public int Version => 1;
26+
public void Build(IWorkflowBuilder<MyDataClass> builder)
27+
{
28+
builder
29+
.StartWith(context => ExecutionResult.Next())
30+
.Parallel()
31+
.Do(then =>
32+
then.WaitFor("Event1", data => EVENT_KEY).Then(x =>
33+
{
34+
Thread.Sleep(300);
35+
return ExecutionResult.Next();
36+
}))
37+
.Do(then =>
38+
then.WaitFor("Event2", data => EVENT_KEY).Then(x =>
39+
{
40+
Thread.Sleep(100);
41+
return ExecutionResult.Next();
42+
}))
43+
.Do(then =>
44+
then.WaitFor("Event3", data => EVENT_KEY).Then(x =>
45+
{
46+
Thread.Sleep(1000);
47+
return ExecutionResult.Next();
48+
}))
49+
.Do(then =>
50+
then.WaitFor("Event4", data => EVENT_KEY).Then(x =>
51+
{
52+
Thread.Sleep(100);
53+
return ExecutionResult.Next();
54+
}))
55+
.Do(then =>
56+
then.WaitFor("Event5", data => EVENT_KEY).Then(x =>
57+
{
58+
Thread.Sleep(100);
59+
return ExecutionResult.Next();
60+
}))
61+
.Join()
62+
.Then(x =>
63+
{
64+
return ExecutionResult.Next();
65+
});
66+
}
67+
}
68+
69+
public ParallelEventsScenario()
70+
{
71+
Setup();
72+
}
73+
74+
[Fact]
75+
public void Scenario()
76+
{
77+
var eventKey = Guid.NewGuid().ToString();
78+
var workflowId = StartWorkflow(new MyDataClass { StrValue1 = eventKey, StrValue2 = eventKey });
79+
Host.PublishEvent("Event1", EVENT_KEY, "Pass1");
80+
Host.PublishEvent("Event2", EVENT_KEY, "Pass2");
81+
Host.PublishEvent("Event3", EVENT_KEY, "Pass3");
82+
Host.PublishEvent("Event4", EVENT_KEY, "Pass4");
83+
Host.PublishEvent("Event5", EVENT_KEY, "Pass5");
84+
85+
WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));
86+
87+
GetStatus(workflowId).Should().Be(WorkflowStatus.Complete);
88+
UnhandledStepErrors.Count.Should().Be(0);
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)