Skip to content

Commit 6c6eb43

Browse files
committed
support for executing a sub-workflow added + test
1 parent 35a104a commit 6c6eb43

File tree

13 files changed

+249
-5
lines changed

13 files changed

+249
-5
lines changed

src/WorkflowCore.Testing/WorkflowCore.Testing.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.0" />
1313
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.0" />
1414
<PackageReference Include="Microsoft.Extensions.Logging" Version="3.1.0" />
15+
<PackageReference Include="xunit.abstractions" Version="2.0.3" />
1516
</ItemGroup>
1617

1718
<ItemGroup>

src/WorkflowCore/Interface/IWorkflowModifier.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,5 +183,15 @@ IStepBuilder<TData, Activity> Activity(string activityName, Expression<Func<TDat
183183
/// <returns></returns>
184184
IStepBuilder<TData, Activity> Activity(Expression<Func<TData, IStepExecutionContext, string>> activityName, Expression<Func<TData, object>> parameters = null,
185185
Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);
186+
187+
/// <summary>
188+
/// Execute a sub-workflow
189+
/// </summary>
190+
/// <param name="subWorkflowId">Id of the sub-workflow to start</param>
191+
/// <param name="parameters">The data to pass to the sub-workflow</param>
192+
/// <param name="cancelCondition">A condition that when true will cancel this sub-workflow</param>
193+
/// <returns></returns>
194+
IStepBuilder<TData, SubWorkflowStepBody> SubWorkflow(string subWorkflowId, Expression<Func<TData, object>> parameters = null,
195+
Expression<Func<TData, bool>> cancelCondition = null);
186196
}
187197
}

src/WorkflowCore/Models/ExecutionPointer.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ public IReadOnlyCollection<string> Scope
5151
get => _scope;
5252
set => _scope = new List<string>(value);
5353
}
54+
55+
public bool IsComplete => Status == PointerStatus.Complete;
56+
57+
public bool HasChildren => Children?.Count > 0;
5458
}
5559

5660
public enum PointerStatus
Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,57 @@
11
using System;
2+
using Microsoft.Extensions.DependencyInjection;
3+
using Microsoft.Extensions.Logging;
24
using WorkflowCore.Interface;
35
using WorkflowCore.Models;
6+
using WorkflowCore.Models.LifeCycleEvents;
47

58
namespace WorkflowCore.Primitives
69
{
710
public class SubWorkflowStepBody : StepBody
811
{
12+
private readonly IScopeProvider _scopeProvider;
13+
14+
public SubWorkflowStepBody(IScopeProvider scopeProvider)
15+
{
16+
_scopeProvider = scopeProvider;
17+
}
18+
919
public override ExecutionResult Run(IStepExecutionContext context)
1020
{
11-
// TODO: What is this supposed to do?
12-
throw new NotImplementedException();
21+
var eventKey = context.ExecutionPointer.EventKey;
22+
23+
var scope = _scopeProvider.CreateScope(context);
24+
var workflowController = scope.ServiceProvider.GetRequiredService<IWorkflowController>();
25+
var logger = scope.ServiceProvider.GetRequiredService<ILoggerFactory>().CreateLogger(
26+
typeof(SubWorkflowStepBody).Namespace + "." + nameof(SubWorkflowStepBody));
27+
28+
if (!context.ExecutionPointer.EventPublished)
29+
{
30+
var result = workflowController.StartWorkflow(SubWorkflowId, context.Workflow.Data, context.Workflow.Id).Result;
31+
32+
logger.LogDebug("Started sub workflow {Name} with id='{SubId}' from workflow {WorkflowDefinitionId} ({Id})",
33+
SubWorkflowId, result, context.Workflow.WorkflowDefinitionId, context.Workflow.Id);
34+
35+
logger.LogDebug("Workflow {Name} ({SubId}) is waiting for event WorkflowCompleted with key='{EventKey}'",
36+
SubWorkflowId, result, result);
37+
38+
var effectiveDate = DateTime.MinValue;
39+
return ExecutionResult.WaitForEvent(nameof(WorkflowCompleted), result, effectiveDate);
40+
}
41+
42+
logger.LogDebug("Sub workflow {Name} ({SubId}) completed", SubWorkflowId,
43+
context.ExecutionPointer.EventKey);
44+
45+
var persistenceProvider = scope.ServiceProvider.GetRequiredService<IPersistenceProvider>();
46+
47+
Result = persistenceProvider.GetWorkflowInstance(context.ExecutionPointer.EventKey).Result.Data;
48+
return ExecutionResult.Next();
1349
}
50+
51+
public string SubWorkflowId { get; set; }
52+
53+
public object Parameters { get; set; }
54+
55+
public object Result { get; set; }
1456
}
1557
}

src/WorkflowCore/Primitives/WaitFor.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ public override ExecutionResult Run(IStepExecutionContext context)
2525
effectiveDate = EffectiveDate;
2626
}
2727

28-
return ExecutionResult.WaitForEvent(EventName, EventKey, effectiveDate);
28+
var eventKey = context.Workflow.Reference ?? EventKey;
29+
return ExecutionResult.WaitForEvent(EventName, eventKey, effectiveDate);
2930
}
3031

3132
EventData = context.ExecutionPointer.EventData;

src/WorkflowCore/ServiceCollectionExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A
8181
services.AddTransient<ISyncWorkflowRunner, SyncWorkflowRunner>();
8282

8383
services.AddTransient<Foreach>();
84+
services.AddTransient<SubWorkflowStepBody>();
8485

8586
return services;
8687
}

src/WorkflowCore/Services/ExecutionResultProcessor.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,16 @@ public ExecutionResultProcessor(IExecutionPointerFactory pointerFactory, IDateTi
2929

3030
public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, ExecutionResult result, WorkflowExecutorResult workflowResult)
3131
{
32+
var stepInfo = $"{step.Name ?? step.BodyType.Name} ({step.Id})";
33+
3234
pointer.PersistenceData = result.PersistenceData;
3335
pointer.Outcome = result.OutcomeValue;
3436
if (result.SleepFor.HasValue)
3537
{
3638
pointer.SleepUntil = _datetimeProvider.UtcNow.Add(result.SleepFor.Value);
3739
pointer.Status = PointerStatus.Sleeping;
40+
_logger.LogDebug("Step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId}) will sleep for {SleepUntil}",
41+
stepInfo, workflow.WorkflowDefinitionId, workflow.Id, result.SleepFor.Value);
3842
}
3943

4044
if (!string.IsNullOrEmpty(result.EventName))
@@ -54,6 +58,9 @@ public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition
5458
SubscribeAsOf = result.EventAsOf,
5559
SubscriptionData = result.SubscriptionData
5660
});
61+
62+
_logger.LogDebug("Step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId}) waiting for event {EventName}",
63+
stepInfo, workflow.WorkflowDefinitionId, workflow.Id, pointer.EventName);
5764
}
5865

5966
if (result.Proceed)
@@ -87,6 +94,9 @@ public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition
8794
WorkflowDefinitionId = workflow.WorkflowDefinitionId,
8895
Version = workflow.Version
8996
});
97+
98+
_logger.LogDebug("Step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId}) completed",
99+
stepInfo, workflow.WorkflowDefinitionId, workflow.Id);
90100
}
91101
else
92102
{

src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,5 +588,24 @@ public IStepBuilder<TData, Activity> Activity(Expression<Func<TData, IStepExecut
588588
Step.Outcomes.Add(new ValueOutcome { NextStep = newStep.Id });
589589
return stepBuilder;
590590
}
591+
592+
public IStepBuilder<TData, SubWorkflowStepBody> SubWorkflow(
593+
string subWorkflowId,
594+
Expression<Func<TData, object>> parameters = null,
595+
Expression<Func<TData, bool>> cancelCondition = null)
596+
{
597+
var newStep = new WorkflowStep<SubWorkflowStepBody>();
598+
newStep.CancelCondition = cancelCondition;
599+
600+
WorkflowBuilder.AddStep(newStep);
601+
var stepBuilder = new StepBuilder<TData, SubWorkflowStepBody>(WorkflowBuilder, newStep);
602+
stepBuilder.Input((step) => step.SubWorkflowId, (data) => subWorkflowId);
603+
604+
if (parameters != null)
605+
stepBuilder.Input((step) => step.Parameters, parameters);
606+
607+
Step.Outcomes.Add(new ValueOutcome { NextStep = newStep.Id });
608+
return stepBuilder;
609+
}
591610
}
592611
}

src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,10 +296,16 @@ public IStepBuilder<TData, Activity> Activity(string activityName, Expression<Fu
296296
{
297297
return Start().Activity(activityName, parameters, effectiveDate, cancelCondition);
298298
}
299+
299300
public IStepBuilder<TData, Activity> Activity(Expression<Func<TData, IStepExecutionContext, string>> activityName, Expression<Func<TData, object>> parameters = null, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null)
300301
{
301302
return Start().Activity(activityName, parameters, effectiveDate, cancelCondition);
302303
}
304+
305+
public IStepBuilder<TData, SubWorkflowStepBody> SubWorkflow(string subWorkflowId, Expression<Func<TData, object>> parameters = null, Expression<Func<TData, bool>> cancelCondition = null)
306+
{
307+
return Start().SubWorkflow(subWorkflowId, parameters, cancelCondition);
308+
}
303309

304310
private IStepBuilder<TData, InlineStepBody> Start()
305311
{

src/WorkflowCore/Services/WorkflowExecutor.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,11 @@ private async Task ExecuteStep(WorkflowInstance workflow, WorkflowStep step, Exe
156156
CancellationToken = cancellationToken
157157
};
158158

159+
var stepInfo = $"{step.Name ?? step.BodyType.Name} ({step.Id})";
160+
159161
using (var scope = _scopeProvider.CreateScope(context))
160162
{
161-
_logger.LogDebug("Starting step {StepName} on workflow {WorkflowId}", step.Name, workflow.Id);
163+
_logger.LogDebug("Starting step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId})", stepInfo, workflow.WorkflowDefinitionId, workflow.Id);
162164

163165
IStepBody body = step.ConstructBody(scope.ServiceProvider);
164166
var stepExecutor = scope.ServiceProvider.GetRequiredService<IStepExecutor>();
@@ -221,6 +223,7 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
221223

222224
if (workflow.Status == WorkflowStatus.Complete)
223225
{
226+
await OnComplete(workflow, def);
224227
return;
225228
}
226229

@@ -236,7 +239,7 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
236239
workflow.NextExecution = Math.Min(pointerSleep, workflow.NextExecution ?? pointerSleep);
237240
}
238241

239-
foreach (var pointer in workflow.ExecutionPointers.Where(x => x.Active && (x.Children ?? new List<string>()).Count > 0))
242+
foreach (var pointer in workflow.ExecutionPointers.Where(x => x.Active && x.HasChildren))
240243
{
241244
if (!workflow.ExecutionPointers.FindByScope(pointer.Id).All(x => x.EndTime.HasValue))
242245
continue;
@@ -256,6 +259,11 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
256259
return;
257260
}
258261

262+
await OnComplete(workflow, def);
263+
}
264+
265+
private async Task OnComplete(WorkflowInstance workflow, WorkflowDefinition def)
266+
{
259267
workflow.Status = WorkflowStatus.Complete;
260268
workflow.CompleteTime = _datetimeProvider.UtcNow;
261269

@@ -264,6 +272,8 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
264272
var middlewareRunner = scope.ServiceProvider.GetRequiredService<IWorkflowMiddlewareRunner>();
265273
await middlewareRunner.RunPostMiddleware(workflow, def);
266274
}
275+
276+
_logger.LogDebug("Workflow {WorkflowDefinitionId} ({Id}) completed", workflow.WorkflowDefinitionId, workflow.Id);
267277

268278
_publisher.PublishNotification(new WorkflowCompleted
269279
{

0 commit comments

Comments
 (0)