Skip to content

Commit a61db8b

Browse files
authored
Adds workflow replay-safe logger (#1434)
* Removed obsolete type Signed-off-by: Whit Waldo <[email protected]> * Added missing using Signed-off-by: Whit Waldo <[email protected]> * Adding interface for IWorkflowContext for replayability concerns Signed-off-by: Whit Waldo <[email protected]> * Removed unused IConfiguration Signed-off-by: Whit Waldo <[email protected]> * Added ReplaySafeLogger type Signed-off-by: Whit Waldo <[email protected]> * Building out functionality to expose ReplayLogger in workflow context Signed-off-by: Whit Waldo <[email protected]> * Added license information to file Signed-off-by: Whit Waldo <[email protected]> * Removed unnecessary file Signed-off-by: Whit Waldo <[email protected]> * Updated copyright header for different project, made some tweaks for nullability errors Signed-off-by: Whit Waldo <[email protected]> * Added virtual methods that use the already-available ILoggerFactory to create the ReplaySafeLogger on the WorkflowContext Signed-off-by: Whit Waldo <[email protected]> * Removed unnecessary registration Signed-off-by: Whit Waldo <[email protected]> * Updated example to demonstrate using ReplaySafeLogger in the orchestration context Signed-off-by: Whit Waldo <[email protected]> * Tweaks on visibility and abstraction so that the methods are available in the context made visible to workflow developers Signed-off-by: Whit Waldo <[email protected]> * Removed obsolete type registrations Signed-off-by: Whit Waldo <[email protected]> * Simplified argument null check Signed-off-by: Whit Waldo <[email protected]> * Removed since-removed code leftover from merge Signed-off-by: Whit Waldo <[email protected]> * Added documentation demonstrating how to access the replay-safe logger Signed-off-by: Whit Waldo <[email protected]> * Removed unnecessary and separate ReplaySafeLogger in favor of method to create it off the TaskOrchestrationContext (innerContext) Signed-off-by: Whit Waldo <[email protected]> --------- Signed-off-by: Whit Waldo <[email protected]>
1 parent 2849ec6 commit a61db8b

File tree

8 files changed

+148
-58
lines changed

8 files changed

+148
-58
lines changed

daprdocs/content/en/dotnet-sdk-docs/dotnet-workflow/dotnet-workflowclient-usage.md

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,64 @@ builder.Services.AddDaprWorkflow(options => {
7474

7575
var app = builder.Build();
7676
await app.RunAsync();
77-
```
77+
```
78+
79+
## Injecting Services into Workflow Activities
80+
Workflow activities support the same dependency injection that developers have come to expect of modern C# applications. Assuming a proper
81+
registration at startup, any such type can be injected into the constructor of the workflow activity and available to utilize during
82+
the execution of the workflow. This makes it simple to add logging via an injected `ILogger` or access to other Dapr
83+
building blocks by injecting `DaprClient` or `DaprJobsClient`, for example.
84+
85+
```csharp
86+
internal sealed class SquareNumberActivity : WorkflowActivity<int, int>
87+
{
88+
private readonly ILogger _logger;
89+
90+
public MyActivity(ILogger logger)
91+
{
92+
this._logger = logger;
93+
}
94+
95+
public override Task<int> RunAsync(WorkflowActivityContext context, int input)
96+
{
97+
this._logger.LogInformation("Squaring the value {number}", input);
98+
var result = input * input;
99+
this._logger.LogInformation("Got a result of {squareResult}", result);
100+
101+
return Task.FromResult(result);
102+
}
103+
}
104+
```
105+
106+
### Using ILogger in Workflow
107+
Because workflows must be deterministic, it is not possible to inject arbitrary services into them. For example,
108+
if you were able to inject a standard `ILogger` into a workflow and it needed to be replayed because of an error,
109+
subsequent replay from the event source log would result in the log recording additional operations that didn't actually
110+
take place a second or third time because their results were sourced from the log. This has the potential to introduce
111+
a significant amount of confusion. Rather, a replay-safe logger is made available for use within workflows. It will only
112+
log events the first time the workflow runs and will not log anything whenever the workflow is being replaced.
113+
114+
This logger can be retrieved from a method present on the `WorkflowContext` available on your workflow instance and
115+
otherwise used precisely as you might otherwise use an `ILogger` instance.
116+
117+
An end-to-end sample demonstrating this can be seen in the
118+
[.NET SDK repository](https://github.com/dapr/dotnet-sdk/blob/master/examples/Workflow/WorkflowConsoleApp/Workflows/OrderProcessingWorkflow.cs)
119+
but a brief extraction of this sample is available below.
120+
121+
```csharp
122+
public class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
123+
{
124+
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
125+
{
126+
string orderId = context.InstanceId;
127+
var logger = context.CreateReplaySafeLogger<OrderProcessingWorkflow>(); //Use this method to access the logger instance
128+
129+
logger.LogInformation("Received order {orderId} for {quantity} {name} at ${totalCost}", orderId, order.Quantity, order.Name, order.TotalCost);
130+
131+
//...
132+
}
133+
}
134+
```
135+
136+
137+

examples/Workflow/WorkflowConsoleApp/Workflows/OrderProcessingWorkflow.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using Dapr.Workflow;
2+
using Microsoft.Extensions.Logging;
23
using WorkflowConsoleApp.Activities;
34

45
namespace WorkflowConsoleApp.Workflows
@@ -16,7 +17,10 @@ public class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
1617
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
1718
{
1819
string orderId = context.InstanceId;
20+
var logger = context.CreateReplaySafeLogger<OrderProcessingWorkflow>();
1921

22+
logger.LogInformation("Received order {orderId} for {quantity} {name} at ${totalCost}", orderId, order.Quantity, order.Name, order.TotalCost);
23+
2024
// Notify the user that an order has come through
2125
await context.CallActivityAsync(
2226
nameof(NotifyActivity),
@@ -31,6 +35,8 @@ await context.CallActivityAsync(
3135
// If there is insufficient inventory, fail and let the user know
3236
if (!result.Success)
3337
{
38+
logger.LogError("Insufficient inventory for {orderName}", order.Name);
39+
3440
// End the workflow here since we don't have sufficient inventory
3541
await context.CallActivityAsync(
3642
nameof(NotifyActivity),
@@ -39,8 +45,10 @@ await context.CallActivityAsync(
3945
}
4046

4147
// Require orders over a certain threshold to be approved
42-
if (order.TotalCost > 50000)
48+
const int threshold = 50000;
49+
if (order.TotalCost > threshold)
4350
{
51+
logger.LogInformation("Requesting manager approval since total cost {totalCost} exceeds threshold {threshold}", order.TotalCost, threshold);
4452
// Request manager approval for the order
4553
await context.CallActivityAsync(nameof(RequestApprovalActivity), order);
4654

@@ -51,9 +59,13 @@ await context.CallActivityAsync(
5159
ApprovalResult approvalResult = await context.WaitForExternalEventAsync<ApprovalResult>(
5260
eventName: "ManagerApproval",
5361
timeout: TimeSpan.FromSeconds(30));
62+
63+
logger.LogInformation("Approval result: {approvalResult}", approvalResult);
5464
context.SetCustomStatus($"Approval result: {approvalResult}");
5565
if (approvalResult == ApprovalResult.Rejected)
5666
{
67+
logger.LogWarning("Order was rejected by approver");
68+
5769
// The order was rejected, end the workflow here
5870
await context.CallActivityAsync(
5971
nameof(NotifyActivity),
@@ -63,6 +75,8 @@ await context.CallActivityAsync(
6375
}
6476
catch (TaskCanceledException)
6577
{
78+
logger.LogError("Cancelling order because it didn't receive an approval");
79+
6680
// An approval timeout results in automatic order cancellation
6781
await context.CallActivityAsync(
6882
nameof(NotifyActivity),
@@ -72,6 +86,7 @@ await context.CallActivityAsync(
7286
}
7387

7488
// There is enough inventory available so the user can purchase the item(s). Process their payment
89+
logger.LogInformation("Processing payment as sufficient inventory is available");
7590
await context.CallActivityAsync(
7691
nameof(ProcessPaymentActivity),
7792
new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost),
@@ -88,13 +103,15 @@ await context.CallActivityAsync(
88103
catch (WorkflowTaskFailedException e)
89104
{
90105
// Let them know their payment processing failed
106+
logger.LogError("Order {orderId} failed! Details: {errorMessage}", orderId, e.FailureDetails.ErrorMessage);
91107
await context.CallActivityAsync(
92108
nameof(NotifyActivity),
93109
new Notification($"Order {orderId} Failed! Details: {e.FailureDetails.ErrorMessage}"));
94110
return new OrderResult(Processed: false);
95111
}
96112

97113
// Let them know their payment was processed
114+
logger.LogError("Order {orderId} has completed!", orderId);
98115
await context.CallActivityAsync(
99116
nameof(NotifyActivity),
100117
new Notification($"Order {orderId} has completed!"));

src/Dapr.Workflow/DaprWorkflowContext.cs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
// limitations under the License.
1212
// ------------------------------------------------------------------------
1313

14+
using Microsoft.Extensions.Logging;
15+
1416
namespace Dapr.Workflow
1517
{
1618
using System;
@@ -34,7 +36,7 @@ internal DaprWorkflowContext(TaskOrchestrationContext innerContext)
3436
public override DateTime CurrentUtcDateTime => this.innerContext.CurrentUtcDateTime;
3537

3638
public override bool IsReplaying => this.innerContext.IsReplaying;
37-
39+
3840
public override Task CallActivityAsync(string name, object? input = null, WorkflowTaskOptions? options = null)
3941
{
4042
return WrapExceptions(this.innerContext.CallActivityAsync(name, input, options?.ToDurableTaskOptions()));
@@ -95,6 +97,25 @@ public override Guid NewGuid()
9597
return this.innerContext.NewGuid();
9698
}
9799

100+
/// <summary>
101+
/// Returns an instance of <see cref="ILogger"/> that is replay-safe, meaning that the logger only
102+
/// writes logs when the orchestrator is not replaying previous history.
103+
/// </summary>
104+
/// <param name="categoryName">The logger's category name.</param>
105+
/// <returns>An instance of <see cref="ILogger"/> that is replay-safe.</returns>
106+
public override ILogger CreateReplaySafeLogger(string categoryName) =>
107+
this.innerContext.CreateReplaySafeLogger(categoryName);
108+
109+
/// <inheritdoc cref="CreateReplaySafeLogger(string)" />
110+
/// <param name="type">The type to derive the category name from.</param>
111+
public override ILogger CreateReplaySafeLogger(Type type) =>
112+
this.innerContext.CreateReplaySafeLogger(type);
113+
114+
/// <inheritdoc cref="CreateReplaySafeLogger(string)" />
115+
/// <typeparam name="T">The type to derive category name from.</typeparam>
116+
public override ILogger CreateReplaySafeLogger<T>() =>
117+
this.innerContext.CreateReplaySafeLogger<T>();
118+
98119
static async Task WrapExceptions(Task task)
99120
{
100121
try
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
namespace Dapr.Workflow;
2+
3+
/// <summary>
4+
/// Provides functionality available to orchestration code.
5+
/// </summary>
6+
public interface IWorkflowContext
7+
{
8+
/// <summary>
9+
/// Gets a value indicating whether the orchestration or operation is currently replaying itself.
10+
/// </summary>
11+
/// <remarks>
12+
/// This property is useful when there is logic that needs to run only when *not* replaying. For example,
13+
/// certain types of application logging may become too noisy when duplicated as part of replay. The
14+
/// application code could check to see whether the function is being replayed and then issue
15+
/// the log statements when this value is <c>false</c>.
16+
/// </remarks>
17+
/// <value>
18+
/// <c>true</c> if the orchestration or operation is currently being replayed; otherwise <c>false</c>.
19+
/// </value>
20+
bool IsReplaying { get; }
21+
}

src/Dapr.Workflow/WorkflowContext.cs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
// limitations under the License.
1212
// ------------------------------------------------------------------------
1313

14+
using Microsoft.Extensions.Logging;
15+
1416
namespace Dapr.Workflow
1517
{
1618
using System;
@@ -21,13 +23,13 @@ namespace Dapr.Workflow
2123
/// Context object used by workflow implementations to perform actions such as scheduling activities, durable timers, waiting for
2224
/// external events, and for getting basic information about the current workflow instance.
2325
/// </summary>
24-
public abstract class WorkflowContext
26+
public abstract class WorkflowContext : IWorkflowContext
2527
{
2628
/// <summary>
2729
/// Gets the name of the current workflow.
2830
/// </summary>
2931
public abstract string Name { get; }
30-
32+
3133
/// <summary>
3234
/// Gets the instance ID of the current workflow.
3335
/// </summary>
@@ -271,6 +273,22 @@ public virtual Task CallChildWorkflowAsync(
271273
{
272274
return this.CallChildWorkflowAsync<object>(workflowName, input, options);
273275
}
276+
277+
/// <summary>
278+
/// Returns an instance of <see cref="ILogger"/> that is replay-safe, meaning that the logger only
279+
/// writes logs when the orchestrator is not replaying previous history.
280+
/// </summary>
281+
/// <param name="categoryName">The logger's category name.</param>
282+
/// <returns>An instance of <see cref="ILogger"/> that is replay-safe.</returns>
283+
public abstract ILogger CreateReplaySafeLogger(string categoryName);
284+
285+
/// <inheritdoc cref="CreateReplaySafeLogger(string)" />
286+
/// <param name="type">The type to derive the category name from.</param>
287+
public abstract ILogger CreateReplaySafeLogger(Type type);
288+
289+
/// <inheritdoc cref="CreateReplaySafeLogger(string)" />
290+
/// <typeparam name="T">The type to derive category name from.</typeparam>
291+
public abstract ILogger CreateReplaySafeLogger<T>();
274292

275293
/// <summary>
276294
/// Restarts the workflow with a new input and clears its history.

src/Dapr.Workflow/WorkflowEngineClient.cs

Lines changed: 0 additions & 34 deletions
This file was deleted.

src/Dapr.Workflow/WorkflowLoggingService.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,9 @@ internal sealed class WorkflowLoggingService : IHostedService
2929
private static readonly HashSet<string> registeredWorkflows = new();
3030
private static readonly HashSet<string> registeredActivities = new();
3131

32-
public WorkflowLoggingService(ILogger<WorkflowLoggingService> logger, IConfiguration configuration)
32+
public WorkflowLoggingService(ILogger<WorkflowLoggingService> logger)
3333
{
3434
this.logger = logger;
35-
3635
}
3736
public Task StartAsync(CancellationToken cancellationToken)
3837
{

src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,50 +35,38 @@ public static IServiceCollection AddDaprWorkflow(
3535
Action<WorkflowRuntimeOptions> configure,
3636
ServiceLifetime lifetime = ServiceLifetime.Singleton)
3737
{
38-
if (serviceCollection == null)
39-
{
40-
throw new ArgumentNullException(nameof(serviceCollection));
41-
}
38+
ArgumentNullException.ThrowIfNull(serviceCollection, nameof(serviceCollection));
4239

4340
serviceCollection.AddDaprClient(lifetime: lifetime);
4441
serviceCollection.AddHttpClient();
4542
serviceCollection.AddHostedService<WorkflowLoggingService>();
46-
43+
4744
switch (lifetime)
4845
{
4946
case ServiceLifetime.Singleton:
50-
#pragma warning disable CS0618 // Type or member is obsolete - keeping around temporarily - replaced by DaprWorkflowClient
51-
serviceCollection.TryAddSingleton<WorkflowEngineClient>();
52-
#pragma warning restore CS0618 // Type or member is obsolete
5347
serviceCollection.TryAddSingleton<DaprWorkflowClient>();
5448
serviceCollection.TryAddSingleton<WorkflowRuntimeOptions>();
5549
break;
5650
case ServiceLifetime.Scoped:
57-
#pragma warning disable CS0618 // Type or member is obsolete - keeping around temporarily - replaced by DaprWorkflowClient
58-
serviceCollection.TryAddScoped<WorkflowEngineClient>();
59-
#pragma warning restore CS0618 // Type or member is obsolete
6051
serviceCollection.TryAddScoped<DaprWorkflowClient>();
6152
serviceCollection.TryAddScoped<WorkflowRuntimeOptions>();
6253
break;
6354
case ServiceLifetime.Transient:
64-
#pragma warning disable CS0618 // Type or member is obsolete - keeping around temporarily - replaced by DaprWorkflowClient
65-
serviceCollection.TryAddTransient<WorkflowEngineClient>();
66-
#pragma warning restore CS0618 // Type or member is obsolete
6755
serviceCollection.TryAddTransient<DaprWorkflowClient>();
6856
serviceCollection.TryAddTransient<WorkflowRuntimeOptions>();
6957
break;
7058
default:
7159
throw new ArgumentOutOfRangeException(nameof(lifetime), lifetime, null);
7260
}
73-
61+
7462
serviceCollection.AddOptions<WorkflowRuntimeOptions>().Configure(configure);
75-
63+
7664
//Register the factory and force resolution so the Durable Task client and worker can be registered
7765
using (var scope = serviceCollection.BuildServiceProvider().CreateScope())
7866
{
7967
var httpClientFactory = scope.ServiceProvider.GetRequiredService<IHttpClientFactory>();
8068
var configuration = scope.ServiceProvider.GetService<IConfiguration>();
81-
69+
8270
var factory = new DaprWorkflowClientBuilderFactory(configuration, httpClientFactory);
8371
factory.CreateClientBuilder(serviceCollection, configure);
8472
}

0 commit comments

Comments
 (0)