Skip to content

Commit 0ef8be8

Browse files
committed
Merge branch 'master' into issue-1072
# Conflicts: # src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs
2 parents 60cfe3d + 97974b2 commit 0ef8be8

File tree

32 files changed

+606
-123
lines changed

32 files changed

+606
-123
lines changed

.github/FUNDING.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# These are supported funding model platforms
2+
3+
github: [danielgerlag]

src/Directory.Build.props

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
<PackageLicenseUrl>https://github.com/danielgerlag/workflow-core/blob/master/LICENSE.md</PackageLicenseUrl>
55
<RepositoryType>git</RepositoryType>
66
<RepositoryUrl>https://github.com/danielgerlag/workflow-core.git</RepositoryUrl>
7-
<Version>3.6.4</Version>
8-
<AssemblyVersion>3.6.4.0</AssemblyVersion>
9-
<FileVersion>3.6.4.0</FileVersion>
7+
<Version>3.7.0</Version>
8+
<AssemblyVersion>3.7.0.0</AssemblyVersion>
9+
<FileVersion>3.7.0.0</FileVersion>
1010
<PackageIconUrl>https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png</PackageIconUrl>
11-
<PackageVersion>3.6.4</PackageVersion>
11+
<PackageVersion>3.7.0</PackageVersion>
1212
</PropertyGroup>
1313
</Project>

src/WorkflowCore.DSL/Models/v1/StepSourceV1.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace WorkflowCore.Models.DefinitionStorage.v1
77
public class StepSourceV1
88
{
99
public string StepType { get; set; }
10-
10+
1111
public string Id { get; set; }
1212

1313
public string Name { get; set; }
@@ -29,8 +29,9 @@ public class StepSourceV1
2929
public ExpandoObject Inputs { get; set; } = new ExpandoObject();
3030

3131
public Dictionary<string, string> Outputs { get; set; } = new Dictionary<string, string>();
32-
32+
3333
public Dictionary<string, string> SelectNextStep { get; set; } = new Dictionary<string, string>();
3434

35+
public bool ProceedOnCancel { get; set; } = false;
3536
}
3637
}

src/WorkflowCore.DSL/Services/DefinitionLoader.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ private WorkflowStepCollection ConvertSteps(ICollection<StepSourceV1> source, Ty
101101
targetStep.ErrorBehavior = nextStep.ErrorBehavior;
102102
targetStep.RetryInterval = nextStep.RetryInterval;
103103
targetStep.ExternalId = $"{nextStep.Id}";
104+
targetStep.ProceedOnCancel = nextStep.ProceedOnCancel;
104105

105106
AttachInputs(nextStep, dataType, stepType, targetStep);
106107
AttachOutputs(nextStep, dataType, stepType, targetStep);

src/WorkflowCore/Interface/IWorkflowModifier.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,5 +152,15 @@ IContainerStepBuilder<TData, Recur, TStepBody> Recur(Expression<Func<TData, Time
152152
IStepBuilder<TData, Activity> Activity(string activityName, Expression<Func<TData, object>> parameters = null,
153153
Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);
154154

155+
/// <summary>
156+
/// Wait here until an external activity is complete
157+
/// </summary>
158+
/// <param name="activityName">The name used to identify the activity to wait for</param>
159+
/// <param name="parameters">The data to pass the external activity worker</param>
160+
/// <param name="effectiveDate">Listen for events as of this effective date</param>
161+
/// <param name="cancelCondition">A conditon that when true will cancel this WaitFor</param>
162+
/// <returns></returns>
163+
IStepBuilder<TData, Activity> Activity(Expression<Func<TData, IStepExecutionContext, string>> activityName, Expression<Func<TData, object>> parameters = null,
164+
Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);
155165
}
156166
}

src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ public interface IWorkflowRepository
1212

1313
Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default);
1414

15+
Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default);
16+
1517
Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default);
1618

1719
[Obsolete]

src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,11 @@ public virtual void Start()
5353
public virtual void Stop()
5454
{
5555
_cancellationTokenSource.Cancel();
56-
DispatchTask.Wait();
57-
DispatchTask = null;
56+
if (DispatchTask != null)
57+
{
58+
DispatchTask.Wait();
59+
DispatchTask = null;
60+
}
5861
}
5962

6063
private async Task Execute()

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5555
finally
5656
{
5757
WorkflowActivity.Enrich(result);
58-
await _persistenceStore.PersistWorkflow(workflow, cancellationToken);
58+
await _persistenceStore.PersistWorkflow(workflow, result.Subscriptions, cancellationToken);
5959
await QueueProvider.QueueWork(itemId, QueueType.Index);
6060
_greylist.Remove($"wf:{itemId}");
6161
}
@@ -68,7 +68,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
6868
{
6969
foreach (var sub in result.Subscriptions)
7070
{
71-
await SubscribeEvent(sub, _persistenceStore, cancellationToken);
71+
await TryProcessSubscription(sub, _persistenceStore, cancellationToken);
7272
}
7373

7474
await _persistenceStore.PersistErrors(result.Errors, cancellationToken);
@@ -98,12 +98,8 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand()
9898

9999
}
100100

101-
private async Task SubscribeEvent(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken)
101+
private async Task TryProcessSubscription(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken)
102102
{
103-
//TODO: move to own class
104-
Logger.LogDebug("Subscribing to event {EventName} {EventKey} for workflow {WorkflowId} step {StepId}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId);
105-
106-
await persistenceStore.CreateEventSubscription(subscription, cancellationToken);
107103
if (subscription.EventName != Event.EventTypeActivity)
108104
{
109105
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken);

src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,25 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _
4646
}
4747
}
4848

49+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
50+
{
51+
lock (_instances)
52+
{
53+
var existing = _instances.First(x => x.Id == workflow.Id);
54+
_instances.Remove(existing);
55+
_instances.Add(workflow);
56+
57+
lock (_subscriptions)
58+
{
59+
foreach (var subscription in subscriptions)
60+
{
61+
subscription.Id = Guid.NewGuid().ToString();
62+
_subscriptions.Add(subscription);
63+
}
64+
}
65+
}
66+
}
67+
4968
public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken _ = default)
5069
{
5170
lock (_instances)

src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,16 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)
5050

5151
public Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default) => _innerService.PersistWorkflow(workflow);
5252

53+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
54+
{
55+
await PersistWorkflow(workflow, cancellationToken);
56+
57+
foreach(var subscription in subscriptions)
58+
{
59+
await CreateEventSubscription(subscription, cancellationToken);
60+
}
61+
}
62+
5363
public Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.TerminateSubscription(eventSubscriptionId);
5464
public Task<EventSubscription> GetSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.GetSubscription(eventSubscriptionId);
5565

0 commit comments

Comments
 (0)