Skip to content

Commit 2aaf7bd

Browse files
committed
Add middleware runner, step executor, and ability to run middleware around workflow
steps and before/after workflow Add sample with a sample middleware for retrying and log correlation as well as workflow pre/post samples Add async overloads for StartWorkflow and WaitForWorkflowToComplete in integration tests Add error handling of post workflow middleware Add docs for workflow middleware
1 parent 32fedfd commit 2aaf7bd

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1713
-58
lines changed

WorkflowCore.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ScratchPad", "test\ScratchP
147147
EndProject
148148
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Tests.QueueProviders.RabbitMQ", "test\WorkflowCore.Tests.QueueProviders.RabbitMQ\WorkflowCore.Tests.QueueProviders.RabbitMQ.csproj", "{54DE20BA-EBA7-4BF0-9BD9-F03766849716}"
149149
EndProject
150+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Sample19", "src\samples\WorkflowCore.Sample19\WorkflowCore.Sample19.csproj", "{1223ED47-3E5E-4960-B70D-DFAF550F6666}"
151+
EndProject
150152
Global
151153
GlobalSection(SolutionConfigurationPlatforms) = preSolution
152154
Debug|Any CPU = Debug|Any CPU
@@ -361,6 +363,10 @@ Global
361363
{54DE20BA-EBA7-4BF0-9BD9-F03766849716}.Debug|Any CPU.Build.0 = Debug|Any CPU
362364
{54DE20BA-EBA7-4BF0-9BD9-F03766849716}.Release|Any CPU.ActiveCfg = Release|Any CPU
363365
{54DE20BA-EBA7-4BF0-9BD9-F03766849716}.Release|Any CPU.Build.0 = Release|Any CPU
366+
{1223ED47-3E5E-4960-B70D-DFAF550F6666}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
367+
{1223ED47-3E5E-4960-B70D-DFAF550F6666}.Debug|Any CPU.Build.0 = Debug|Any CPU
368+
{1223ED47-3E5E-4960-B70D-DFAF550F6666}.Release|Any CPU.ActiveCfg = Release|Any CPU
369+
{1223ED47-3E5E-4960-B70D-DFAF550F6666}.Release|Any CPU.Build.0 = Release|Any CPU
364370
EndGlobalSection
365371
GlobalSection(SolutionProperties) = preSolution
366372
HideSolutionNode = FALSE
@@ -421,6 +427,7 @@ Global
421427
{E32CF21A-29CC-46D1-8044-FCC327F2B281} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
422428
{51BB7DCD-01DD-453D-A1E7-17E5E3DBB14C} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
423429
{54DE20BA-EBA7-4BF0-9BD9-F03766849716} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
430+
{1223ED47-3E5E-4960-B70D-DFAF550F6666} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
424431
EndGlobalSection
425432
GlobalSection(ExtensibilityGlobals) = postSolution
426433
SolutionGuid = {DC0FA8D3-6449-4FDA-BB46-ECF58FAD23B4}

docs/samples.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,6 @@
3232

3333
[Exposing a REST API](https://github.com/danielgerlag/workflow-core/tree/master/src/samples/WebApiSample)
3434

35-
[Human(User) Workflow](https://github.com/danielgerlag/workflow-core/tree/master/src/samples/WorkflowCore.Sample08)
35+
[Human(User) Workflow](https://github.com/danielgerlag/workflow-core/tree/master/src/samples/WorkflowCore.Sample08)
36+
37+
[Workflow Middleware](https://github.com/danielgerlag/workflow-core/tree/master/src/samples/WorkflowCore.Sample19)

docs/workflow-middleware.md

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
# Workflow Middleware
2+
3+
Workflows can be extended with Middleware that run before/after workflows start/complete as well as around workflow steps to provide flexibility in implementing cross-cutting concerns such as [log correlation](https://www.frakkingsweet.com/net-core-log-correlation-easy-access-to-headers/), [retries](https://docs.microsoft.com/en-us/dotnet/architecture/microservices/implement-resilient-applications/implement-http-call-retries-exponential-backoff-polly), and other use-cases.
4+
5+
This is done by implementing and registering `IWorkflowMiddleware` for workflows or `IWorkflowStepMiddleware` for steps.
6+
7+
## Step Middleware
8+
9+
Step middleware lets you run additional code around the execution of a given step and alter its behavior. Implementing a step middleware should look familiar to anyone familiar with [ASP.NET Core's middleware pipeline](https://docs.microsoft.com/en-us/aspnet/core/fundamentals/middleware/?view=aspnetcore-3.1) or [`HttpClient`'s `DelegatingHandler` middleware](https://docs.microsoft.com/en-us/aspnet/core/fundamentals/http-requests?view=aspnetcore-3.1#outgoing-request-middleware).
10+
11+
### Usage
12+
13+
First, create your own middleware class that implements `IWorkflowStepMiddleware`. Here's an example of a middleware that adds workflow ID and step ID to the log correlation context of every workflow step in your app.
14+
15+
**Important:** You must make sure to call `next()` as part of your middleware. If you do not do this, your step will never run.
16+
17+
```cs
18+
public class LogCorrelationStepMiddleware : IWorkflowStepMiddleware
19+
{
20+
private readonly ILogger<LogCorrelationStepMiddleware> _log;
21+
22+
public LogCorrelationStepMiddleware(
23+
ILogger<LogCorrelationStepMiddleware> log)
24+
{
25+
_log = log;
26+
}
27+
28+
public async Task<ExecutionResult> HandleAsync(
29+
IStepExecutionContext context,
30+
IStepBody body,
31+
WorkflowStepDelegate next)
32+
{
33+
var workflowId = context.Workflow.Id;
34+
var stepId = context.Step.Id;
35+
36+
// Uses log scope to add a few attributes to the scope
37+
using (_log.BeginScope("{@WorkflowId}", workflowId))
38+
using (_log.BeginScope("{@StepId}", stepId))
39+
{
40+
// Calling next ensures step gets executed
41+
return await next();
42+
}
43+
}
44+
}
45+
```
46+
47+
Here's another example of a middleware that uses the [Polly](https://github.com/App-vNext/Polly) dotnet resiliency library to implement retries on workflow steps based off a custom retry policy.
48+
49+
```cs
50+
public class PollyRetryStepMiddleware : IWorkflowStepMiddleware
51+
{
52+
private const string StepContextKey = "WorkflowStepContext";
53+
private const int MaxRetries = 3;
54+
private readonly ILogger<PollyRetryStepMiddleware> _log;
55+
56+
public PollyRetryMiddleware(ILogger<PollyRetryStepMiddleware> log)
57+
{
58+
_log = log;
59+
}
60+
61+
// Consult Polly's docs for more information on how to build
62+
// retry policies:
63+
// https://github.com/App-vNext/Polly
64+
public IAsyncPolicy<ExecutionResult> GetRetryPolicy() =>
65+
Policy<ExecutionResult>
66+
.Handle<TimeoutException>()
67+
.RetryAsync(
68+
MaxRetries,
69+
(result, retryCount, context) =>
70+
UpdateRetryCount(
71+
result.Exception,
72+
retryCount,
73+
context[StepContextKey] as IStepExecutionContext)
74+
);
75+
76+
public async Task<ExecutionResult> HandleAsync(
77+
IStepExecutionContext context,
78+
IStepBody body,
79+
WorkflowStepDelegate next
80+
)
81+
{
82+
return await GetRetryPolicy().ExecuteAsync(
83+
ctx => next(),
84+
// The step execution context gets passed down so that
85+
// the step is accessible within the retry policy
86+
new Dictionary<string, object>
87+
{
88+
{ StepContextKey, context }
89+
});
90+
}
91+
92+
private Task UpdateRetryCount(
93+
Exception exception,
94+
int retryCount,
95+
IStepExecutionContext stepContext)
96+
{
97+
var stepInstance = stepContext.ExecutionPointer;
98+
stepInstance.RetryCount = retryCount;
99+
return Task.CompletedTask;
100+
}
101+
}
102+
```
103+
104+
## Pre/Post Workflow Middleware
105+
106+
Workflow middleware run either before a workflow starts or after a workflow completes and can be used to hook into the workflow lifecycle or alter the workflow itself before it is started.
107+
108+
### Pre Workflow Middleware
109+
110+
These middleware get run before the workflow is started and can potentially alter properties on the `WorkflowInstance`.
111+
112+
The following example illustrates setting the `Description` property on the `WorkflowInstance` using a middleware that interprets the data on the passed workflow. This is useful in cases where you want the description of the workflow to be derived from the data passed to the workflow.
113+
114+
Note that you use `WorkflowMiddlewarePhase.PreWorkflow` to specify that it runs before the workflow starts.
115+
116+
**Important:** You should call `next` as part of the workflow middleware to ensure that the next workflow in the chain runs.
117+
118+
```cs
119+
// AddDescriptionWorkflowMiddleware.cs
120+
public class AddDescriptionWorkflowMiddleware : IWorkflowMiddleware
121+
{
122+
public WorkflowMiddlewarePhase Phase =>
123+
WorkflowMiddlewarePhase.PreWorkflow;
124+
125+
public Task HandleAsync(
126+
WorkflowInstance workflow,
127+
WorkflowDelegate next
128+
)
129+
{
130+
if (workflow.Data is IDescriptiveWorkflowParams descriptiveParams)
131+
{
132+
workflow.Description = descriptiveParams.Description;
133+
}
134+
135+
return next();
136+
}
137+
}
138+
139+
// IDescriptiveWorkflowParams.cs
140+
public interface IDescriptiveWorkflowParams
141+
{
142+
string Description { get; }
143+
}
144+
145+
// MyWorkflowParams.cs
146+
public MyWorkflowParams : IDescriptiveWorkflowParams
147+
{
148+
public string Description => $"Run task '{TaskName}'";
149+
150+
public string TaskName { get; set; }
151+
}
152+
```
153+
154+
### Exception Handling in Pre Workflow Middleware
155+
156+
Pre workflow middleware exception handling gets treated differently from post workflow middleware. Since the middleware runs before the workflow starts, any exceptions thrown within a pre workflow middleware will bubble up to the `StartWorkflow` method and it is up to the caller of `StartWorkflow` to handle the exception and act accordingly.
157+
158+
```cs
159+
public async Task MyMethodThatStartsAWorkflow()
160+
{
161+
try
162+
{
163+
await host.StartWorkflow("HelloWorld", 1, null);
164+
}
165+
catch(Exception ex)
166+
{
167+
// Handle the exception appropriately
168+
}
169+
}
170+
```
171+
172+
### Post Workflow Middleware
173+
174+
These middleware get run after the workflow has completed and can be used to perform additional actions for all workflows in your app.
175+
176+
The following example illustrates how you can use a post workflow middleware to print a summary of the workflow to console.
177+
178+
Note that you use `WorkflowMiddlewarePhase.PostWorkflow` to specify that it runs after the workflow completes.
179+
180+
**Important:** You should call `next` as part of the workflow middleware to ensure that the next workflow in the chain runs.
181+
182+
```cs
183+
public class PrintWorkflowSummaryMiddleware : IWorkflowMiddleware
184+
{
185+
private readonly ILogger<PrintWorkflowSummaryMiddleware> _log;
186+
187+
public PrintWorkflowSummaryMiddleware(
188+
ILogger<PrintWorkflowSummaryMiddleware> log
189+
)
190+
{
191+
_log = log;
192+
}
193+
194+
public WorkflowMiddlewarePhase Phase =>
195+
WorkflowMiddlewarePhase.PostWorkflow;
196+
197+
public Task HandleAsync(
198+
WorkflowInstance workflow,
199+
WorkflowDelegate next
200+
)
201+
{
202+
if (!workflow.CompleteTime.HasValue)
203+
{
204+
return next();
205+
}
206+
207+
var duration = workflow.CompleteTime.Value - workflow.CreateTime;
208+
_log.LogInformation($@"Workflow {workflow.Description} completed in {duration:g}");
209+
210+
foreach (var step in workflow.ExecutionPointers)
211+
{
212+
var stepName = step.StepName;
213+
var stepDuration = (step.EndTime - step.StartTime) ?? TimeSpan.Zero;
214+
_log.LogInformation($" - Step {stepName} completed in {stepDuration:g}");
215+
}
216+
217+
return next();
218+
}
219+
}
220+
```
221+
222+
### Exception Handling in Post Workflow Middleware
223+
224+
Post workflow middleware exception handling gets treated differently from pre workflow middleware. At the time that the workflow completes, your workflow has ran already so an uncaught exception would be difficult to act on.
225+
226+
By default, if a workflow middleware throws an exception, it will be logged and the workflow will complete as normal. This behavior can be changed, however.
227+
228+
To override the default post workflow error handling for all workflows in your app, just register a new `IWorkflowMiddlewareErrorHandler` in the dependency injection framework with your custom behavior as follows.
229+
230+
```cs
231+
// CustomMiddlewareErrorHandler.cs
232+
public class CustomHandler : IWorkflowMiddlewareErrorHandler
233+
{
234+
public Task HandleAsync(Exception ex)
235+
{
236+
// Handle your error asynchronously
237+
}
238+
}
239+
240+
// Startup.cs
241+
public void ConfigureServices(IServiceCollection services)
242+
{
243+
// Other workflow configuration
244+
services.AddWorkflow();
245+
246+
// Should go after .AddWorkflow()
247+
services.AddTransient<IWorkflowMiddlewareErrorHandler, CustomHandler>();
248+
}
249+
```
250+
251+
## Registering Middleware
252+
253+
In order for middleware to take effect, they must be registered with the built-in dependency injection framework using the convenience helpers.
254+
255+
**Note:** Middleware will be run in the order that they are registered with middleware that are registered earlier running earlier in the chain and finishing later in the chain. For pre/post workflow middleware, all pre middleware will be run before a workflow starts and all post middleware will be run after a workflow completes.
256+
257+
```cs
258+
public class Startup
259+
{
260+
public void ConfigureServices(IServiceCollection services)
261+
{
262+
...
263+
264+
// Add workflow middleware
265+
services.AddWorkflowMiddleware<AddDescriptionWorkflowMiddleware>();
266+
services.AddWorkflowMiddleware<PrintWorkflowSummaryMiddleware>();
267+
268+
// Add step middleware
269+
services.AddWorkflowStepMiddleware<LogCorrelationStepMiddleware>();
270+
services.AddWorkflowStepMiddleware<PollyRetryMiddleware>();
271+
272+
...
273+
}
274+
}
275+
```
276+
277+
## More Information
278+
279+
See the [Workflow Middleware](https://github.com/danielgerlag/workflow-core/tree/master/src/samples/WorkflowCore.Sample19) sample for full examples of workflow middleware in action.

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ nav:
99
- Saga transactions: sagas.md
1010
- JSON / YAML Definitions: json-yaml.md
1111
- Persistence: persistence.md
12+
- Middleware: workflow-middleware.md
1213
- Multi-node clusters: multi-node-clusters.md
1314
- ASP.NET Core: using-with-aspnet-core.md
1415
- Elasticsearch plugin: elastic-search.md

src/WorkflowCore.DSL/Models/v1/DefinitionSourceV1.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System;
22
using System.Collections.Generic;
3-
using System.Text;
43

54
namespace WorkflowCore.Models.DefinitionStorage.v1
65
{

src/WorkflowCore.DSL/Services/DefinitionLoader.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ private WorkflowStepCollection ConvertSteps(ICollection<StepSourceV1> source, Ty
9393

9494
AttachInputs(nextStep, dataType, stepType, targetStep);
9595
AttachOutputs(nextStep, dataType, stepType, targetStep);
96-
96+
9797
if (nextStep.Do != null)
9898
{
9999
foreach (var branch in nextStep.Do)
@@ -242,7 +242,7 @@ private void AttachOutcomes(StepSourceV1 source, Type dataType, WorkflowStep ste
242242
var outcomeParameter = Expression.Parameter(typeof(object), "outcome");
243243

244244
foreach (var nextStep in source.SelectNextStep)
245-
{
245+
{
246246
var sourceDelegate = DynamicExpressionParser.ParseLambda(new[] { dataParameter, outcomeParameter }, typeof(object), nextStep.Value).Compile();
247247
Expression<Func<object, object, bool>> sourceExpr = (data, outcome) => System.Convert.ToBoolean(sourceDelegate.DynamicInvoke(data, outcome));
248248
step.Outcomes.Add(new ExpressionOutcome<object>(sourceExpr)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using System.Threading.Tasks;
2+
using WorkflowCore.Models;
3+
4+
namespace WorkflowCore.Interface
5+
{
6+
/// <summary>
7+
/// Executes a workflow step.
8+
/// </summary>
9+
public interface IStepExecutor
10+
{
11+
/// <summary>
12+
/// Runs the passed <see cref="IStepBody"/> in the given <see cref="IStepExecutionContext"/>.
13+
/// </summary>
14+
/// <param name="context">The <see cref="IStepExecutionContext"/> in which to execute the step.</param>
15+
/// <param name="body">The <see cref="IStepBody"/> body.</param>
16+
/// <returns>A <see cref="Task{ExecutionResult}"/> to wait for the result of running the step</returns>
17+
Task<ExecutionResult> ExecuteStep(
18+
IStepExecutionContext context,
19+
IStepBody body
20+
);
21+
}
22+
}

src/WorkflowCore/Interface/IWorkflowBuilder.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public interface IWorkflowBuilder
99
{
1010
List<WorkflowStep> Steps { get; }
1111

12-
int LastStep { get; }
12+
int LastStep { get; }
1313

1414
IWorkflowBuilder<T> UseData<T>();
1515

@@ -21,7 +21,7 @@ public interface IWorkflowBuilder
2121
}
2222

2323
public interface IWorkflowBuilder<TData> : IWorkflowBuilder, IWorkflowModifier<TData, InlineStepBody>
24-
{
24+
{
2525
IStepBuilder<TData, TStep> StartWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
2626

2727
IStepBuilder<TData, InlineStepBody> StartWith(Func<IStepExecutionContext, ExecutionResult> body);
@@ -33,6 +33,5 @@ public interface IWorkflowBuilder<TData> : IWorkflowBuilder, IWorkflowModifier<T
3333
IWorkflowBuilder<TData> UseDefaultErrorBehavior(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);
3434

3535
IWorkflowBuilder<TData> CreateBranch();
36-
3736
}
38-
}
37+
}

0 commit comments

Comments
 (0)