diff --git a/README.md b/README.md index 00add92..55a1aa4 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ Prerequisites: * [AspNet](src/AspNet) - Demonstration of a generic host worker and an ASP.NET workflow starter. * [ClientMtls](src/ClientMtls) - How to use client certificate authentication, e.g. for Temporal Cloud. * [ContextPropagation](src/ContextPropagation) - Context propagation via interceptors. +* [CounterInterceptor](src/CounterInterceptor/) - Simple Workflow and Client Interceptors example. * [DependencyInjection](src/DependencyInjection) - How to inject dependencies in activities and use generic hosts for workers * [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs. * [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource. diff --git a/TemporalioSamples.sln b/TemporalioSamples.sln index 3299e81..df5871f 100644 --- a/TemporalioSamples.sln +++ b/TemporalioSamples.sln @@ -57,6 +57,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ContextPr EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.SafeMessageHandlers", "src\SafeMessageHandlers\TemporalioSamples.SafeMessageHandlers.csproj", "{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.CounterInterceptor", "src\CounterInterceptor\TemporalioSamples.CounterInterceptor.csproj", "{F9C44936-8BF9-4919-BB66-8F1888E22AEB}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -155,6 +157,10 @@ Global {FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Debug|Any CPU.Build.0 = Debug|Any CPU {FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Release|Any CPU.ActiveCfg = Release|Any CPU {FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Release|Any CPU.Build.0 = Release|Any CPU + {F9C44936-8BF9-4919-BB66-8F1888E22AEB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F9C44936-8BF9-4919-BB66-8F1888E22AEB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F9C44936-8BF9-4919-BB66-8F1888E22AEB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F9C44936-8BF9-4919-BB66-8F1888E22AEB}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -185,5 +191,6 @@ Global {B3DB7B8C-7BD3-4A53-A809-AB6279B1A630} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {7B797D20-485F-441D-8E71-AF7E315FA9CF} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} + {F9C44936-8BF9-4919-BB66-8F1888E22AEB} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} EndGlobalSection EndGlobal diff --git a/src/CounterInterceptor/Counts.cs b/src/CounterInterceptor/Counts.cs new file mode 100644 index 0000000..2949e76 --- /dev/null +++ b/src/CounterInterceptor/Counts.cs @@ -0,0 +1,43 @@ +namespace TemporalioSamples.CounterInterceptor; +public class Counts +{ + private uint clientExecutions; + private uint clientQueries; + private uint clientSignals; + private uint workflowReplays; + private uint workflowSignals; + private uint workflowQueries; + private uint workflowChildExecutions; + private uint workflowActivityExecutions; + + public ref uint ClientExecutions => ref clientExecutions; + + public ref uint ClientSignals => ref clientSignals; + + public ref uint ClientQueries => ref clientQueries; + + public string ClientInfo() => + $"\n\tTotal Number of Workflow Exec: {ClientExecutions}\n\t" + + $"Total Number of Signals: {ClientSignals}\n\t" + + $"Total Number of Queries: {ClientQueries}"; + + public ref uint WorkflowReplays => ref workflowReplays; + + public ref uint WorkflowSignals => ref workflowSignals; + + public ref uint WorkflowQueries => ref workflowQueries; + + public ref uint WorkflowChildExecutions => ref workflowChildExecutions; + + public ref uint WorkflowActivityExecutions => ref workflowActivityExecutions; + + public string WorkflowInfo() => + $"\n\tTotal Number of Workflow Replays: {WorkflowReplays}\n\t" + + $"Total Number of Child Workflow Exec: {WorkflowChildExecutions}\n\t" + + $"Total Number of Activity Exec: {WorkflowActivityExecutions}\n\t" + + $"Total Number of Signals: {WorkflowSignals}\n\t" + + $"Total Number of Queries: {WorkflowQueries}"; + + public override string ToString() => + ClientInfo() + WorkflowInfo(); +} \ No newline at end of file diff --git a/src/CounterInterceptor/MyActivities.cs b/src/CounterInterceptor/MyActivities.cs new file mode 100644 index 0000000..d038093 --- /dev/null +++ b/src/CounterInterceptor/MyActivities.cs @@ -0,0 +1,15 @@ +namespace TemporalioSamples.CounterInterceptor; + +using System.Diagnostics; +using Temporalio.Activities; + +public class MyActivities +{ + [Activity] + public string SayHello(string name, string title) => + $"Hello {title} {name}"; + + [Activity] + public string SayGoodBye(string name, string title) => + $"Goodby {title} {name}"; +} \ No newline at end of file diff --git a/src/CounterInterceptor/MyChildWorkflow.workflow.cs b/src/CounterInterceptor/MyChildWorkflow.workflow.cs new file mode 100644 index 0000000..3b24389 --- /dev/null +++ b/src/CounterInterceptor/MyChildWorkflow.workflow.cs @@ -0,0 +1,17 @@ +namespace TemporalioSamples.CounterInterceptor; + +using Temporalio.Workflows; + +[Workflow] +public class MyChildWorkflow +{ + private readonly ActivityOptions activityOptions = new() + { + StartToCloseTimeout = TimeSpan.FromSeconds(10), + }; + + [WorkflowRun] + public async Task RunAsync(string name, string title) => + await Workflow.ExecuteActivityAsync((MyActivities act) => act.SayHello(name, title), activityOptions) + + await Workflow.ExecuteActivityAsync((MyActivities act) => act.SayGoodBye(name, title), activityOptions); +} \ No newline at end of file diff --git a/src/CounterInterceptor/MyCounterInterceptor.cs b/src/CounterInterceptor/MyCounterInterceptor.cs new file mode 100644 index 0000000..22fb9fd --- /dev/null +++ b/src/CounterInterceptor/MyCounterInterceptor.cs @@ -0,0 +1,144 @@ +namespace TemporalioSamples.CounterInterceptor; + +using System.Collections.Concurrent; +using Temporalio.Activities; +using Temporalio.Client; +using Temporalio.Client.Interceptors; +using Temporalio.Worker.Interceptors; +using Temporalio.Workflows; + +public class MyCounterInterceptor : IClientInterceptor, IWorkerInterceptor +{ + public ConcurrentDictionary Counts { get; } = new(); + + public string WorkerInfo() => + string.Join( + "\n", + Counts.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value.WorkflowInfo()}")); + + public string ClientInfo() => + string.Join( + "\n", + Counts.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value.ClientInfo()}")); + + public ClientOutboundInterceptor InterceptClient(ClientOutboundInterceptor nextInterceptor) => + new ClientOutbound(this, nextInterceptor); + + public WorkflowInboundInterceptor InterceptWorkflow(WorkflowInboundInterceptor nextInterceptor) => + new WorkflowInbound(this, nextInterceptor); + + public ActivityInboundInterceptor InterceptActivity(ActivityInboundInterceptor nextInterceptor) => + new ActivityInbound(this, nextInterceptor); + + private void Increment(string id, Action increment) => + increment(Counts.GetOrAdd(id, _ => new())); + + private sealed class ClientOutbound : ClientOutboundInterceptor + { + private MyCounterInterceptor root; + + public ClientOutbound(MyCounterInterceptor root, ClientOutboundInterceptor next) + : base(next) => this.root = root; + + public override Task> StartWorkflowAsync( + StartWorkflowInput input) + { + var id = input.Options.Id ?? "None"; + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].ClientExecutions)); + return base.StartWorkflowAsync(input); + } + + public override Task SignalWorkflowAsync(SignalWorkflowInput input) + { + var id = input.Id; + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].ClientSignals)); + return base.SignalWorkflowAsync(input); + } + + public override Task QueryWorkflowAsync(QueryWorkflowInput input) + { + var id = input.Id; + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].ClientQueries)); + return base.QueryWorkflowAsync(input); + } + } + + private sealed class WorkflowInbound : WorkflowInboundInterceptor + { + private readonly MyCounterInterceptor root; + + internal WorkflowInbound(MyCounterInterceptor root, WorkflowInboundInterceptor next) + : base(next) => this.root = root; + + public override void Init(WorkflowOutboundInterceptor outbound) => + base.Init(new WorkflowOutbound(root, outbound)); + + public override Task ExecuteWorkflowAsync(ExecuteWorkflowInput input) + { + // Count only if we're not replaying + if (!Workflow.Unsafe.IsReplaying) + { + var id = Workflow.Info.WorkflowId; + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowReplays)); + } + return base.ExecuteWorkflowAsync(input); + } + + public override Task HandleSignalAsync(HandleSignalInput input) + { + // Count only if we're not replaying + if (!Workflow.Unsafe.IsReplaying) + { + var id = Workflow.Info.WorkflowId; + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowSignals)); + } + return base.HandleSignalAsync(input); + } + + public override object? HandleQuery(HandleQueryInput input) + { + // Count only if we're not replaying + if (!Workflow.Unsafe.IsReplaying) + { + var id = Workflow.Info.WorkflowId; + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowQueries)); + } + return base.HandleQuery(input); + } + } + + private sealed class WorkflowOutbound : WorkflowOutboundInterceptor + { + private readonly MyCounterInterceptor root; + + internal WorkflowOutbound(MyCounterInterceptor root, WorkflowOutboundInterceptor next) + : base(next) => this.root = root; + + public override Task> StartChildWorkflowAsync( + StartChildWorkflowInput input) + { + // Count only if we're not replaying + if (!Workflow.Unsafe.IsReplaying) + { + var id = Workflow.Info.WorkflowId; + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowChildExecutions)); + } + return base.StartChildWorkflowAsync(input); + } + } + + private sealed class ActivityInbound : ActivityInboundInterceptor + { + private readonly MyCounterInterceptor root; + + internal ActivityInbound(MyCounterInterceptor root, ActivityInboundInterceptor next) + : base(next) => this.root = root; + + public override Task ExecuteActivityAsync(ExecuteActivityInput input) + { + var id = ActivityExecutionContext.Current.Info.WorkflowId; + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowActivityExecutions)); + return base.ExecuteActivityAsync(input); + } + } +} \ No newline at end of file diff --git a/src/CounterInterceptor/MyWorkflow.workflow.cs b/src/CounterInterceptor/MyWorkflow.workflow.cs new file mode 100644 index 0000000..bf0652c --- /dev/null +++ b/src/CounterInterceptor/MyWorkflow.workflow.cs @@ -0,0 +1,43 @@ +namespace TemporalioSamples.CounterInterceptor; + +using Temporalio.Workflows; + +[Workflow] +public class MyWorkflow +{ + private bool exit; // Automatically defaults to false + + [WorkflowRun] + public async Task RunAsync() + { + // Wait for greeting info + await Workflow.WaitConditionAsync(() => + !string.IsNullOrEmpty(Name) && !string.IsNullOrEmpty(Title)); + + // Execute Child Workflow + var result = await Workflow.ExecuteChildWorkflowAsync( + (MyChildWorkflow wf) => wf.RunAsync(Name, Title), + new() { Id = "counter-interceptor-child" }); + + // Wait for exit signal + await Workflow.WaitConditionAsync(() => exit); + + return result; + } + + [WorkflowSignal] + public async Task SignalNameAndTitleAsync(string name, string title) + { + Name = name; + Title = title; + } + + [WorkflowQuery] + public string Name { get; private set; } = string.Empty; + + [WorkflowQuery] + public string Title { get; private set; } = string.Empty; + + [WorkflowSignal] + public async Task ExitAsync() => exit = true; +} \ No newline at end of file diff --git a/src/CounterInterceptor/Program.cs b/src/CounterInterceptor/Program.cs new file mode 100644 index 0000000..a4d500d --- /dev/null +++ b/src/CounterInterceptor/Program.cs @@ -0,0 +1,73 @@ +namespace TemporalioSamples.CounterInterceptor; + +using Temporalio.Client; +using Temporalio.Worker; + +internal class Program +{ + private static async Task Main(string[] args) + { + var counterInterceptor = new MyCounterInterceptor(); + var client = await TemporalClient.ConnectAsync( + options: new("localhost:7233") + { + Interceptors = new[] + { + counterInterceptor, + }, + }); + + var activities = new MyActivities(); + + var taskQueue = "CounterInterceptorTaskQueue"; + + var workerOptions = new TemporalWorkerOptions(taskQueue). + AddAllActivities(activities). + AddWorkflow(). + AddWorkflow(); + + // workerOptions.Interceptors = new[] { counterInterceptor }; + using var worker = new TemporalWorker( + client, + workerOptions); + + // Run worker until cancelled + Console.WriteLine("Running worker..."); + + // Start the workers + await worker.ExecuteAsync(async () => + { + // Start the workflow + var handle = await client.StartWorkflowAsync( + (MyWorkflow wf) => wf.RunAsync(), + new(id: Guid.NewGuid().ToString(), taskQueue: taskQueue)); + + Console.WriteLine("Sending name and title to workflow"); + await handle.SignalAsync(wf => wf.SignalNameAndTitleAsync("John", "Customer")); + + var name = await handle.QueryAsync(wf => wf.Name); + var title = await handle.QueryAsync(wf => wf.Title); + + // Send exit signal to workflow + await handle.SignalAsync(wf => wf.ExitAsync()); + + var result = await handle.GetResultAsync(); + + Console.WriteLine($"Workflow result is {result}"); + + Console.WriteLine("Query results: "); + Console.WriteLine($"\tName: {name}"); + Console.WriteLine($"\tTitle: {title}"); + + // Print worker counter info + Console.WriteLine("\nCollected Worker Counter Info:\n"); + Console.WriteLine(counterInterceptor.WorkerInfo()); + Console.WriteLine($"Number of unique workflows: {counterInterceptor.Counts.Count}"); + + // Print client counter info + Console.WriteLine(); + Console.WriteLine("Collected Client Counter Info:\n"); + Console.WriteLine(counterInterceptor.ClientInfo()); + }); + } +} \ No newline at end of file diff --git a/src/CounterInterceptor/README.md b/src/CounterInterceptor/README.md new file mode 100644 index 0000000..5ce04d1 --- /dev/null +++ b/src/CounterInterceptor/README.md @@ -0,0 +1,13 @@ +# dotnet-counter-interceptor +The sample demonstrates: +- the use of a Worker Workflow Interceptor that counts the number of Workflow Executions, Child Workflow Executions, and Activity Executions and the number of Signals and Queries. It is based +off of the [Java sample](https://github.com/temporalio/samples-java/tree/main) located [here](https://github.com/temporalio/samples-java/tree/main/core/src/main/java/io/temporal/samples/countinterceptor) +- the use of a Client Workflow Interceptor that counts the number of Workflow Executions and the number of Signals and Queries. + +To run, first see [README.md](https://github.com/temporalio/samples-dotnet/blob/main/README.md) for prerequisites + +## Run Worker and Client +```bash +# make sure you have temporal server running +dotnet run +``` diff --git a/src/CounterInterceptor/TemporalioSamples.CounterInterceptor.csproj b/src/CounterInterceptor/TemporalioSamples.CounterInterceptor.csproj new file mode 100644 index 0000000..967ff4c --- /dev/null +++ b/src/CounterInterceptor/TemporalioSamples.CounterInterceptor.csproj @@ -0,0 +1,7 @@ + + + + Exe + + + diff --git a/tests/CounterInterceptor/MyWorkflowTests.cs b/tests/CounterInterceptor/MyWorkflowTests.cs new file mode 100644 index 0000000..c99f36c --- /dev/null +++ b/tests/CounterInterceptor/MyWorkflowTests.cs @@ -0,0 +1,81 @@ +using Temporalio.Client; +using Temporalio.Worker; +using TemporalioSamples.CounterInterceptor; +using Xunit; +using Xunit.Abstractions; + +namespace TemporalioSamples.Tests.CounterInterceptor; + +public class MyWorkflowTests : WorkflowEnvironmentTestBase +{ + public MyWorkflowTests(ITestOutputHelper output, WorkflowEnvironment env) + : base(output, env) + { + } + + [Fact] + public async Task RunAsync_CounterInterceptor() + { + var counterInterceptor = new MyCounterInterceptor(); + + // Add the interceptor to the client + var clientOptions = (TemporalClientOptions)Client.Options.Clone(); + clientOptions.Interceptors = new[] + { + counterInterceptor, + }; + + var client = new TemporalClient(Client.Connection, clientOptions); + + var taskQueue = Guid.NewGuid().ToString(); + + var workerOptions = new TemporalWorkerOptions(taskQueue). + AddAllActivities(new MyActivities()). + AddWorkflow(). + AddWorkflow(); + + var parentWorkflowId = "ParentWorkflowId"; + // Be sure that this matches the ID in the Workflow + var childWorkflowId = "counter-interceptor-child"; + + using var worker = new TemporalWorker(client, workerOptions); + await worker.ExecuteAsync(async () => + { + var handle = await client.StartWorkflowAsync( + (MyWorkflow wf) => wf.RunAsync(), + new( + id: parentWorkflowId, + taskQueue: taskQueue)); + + await handle.SignalAsync(wf => wf.SignalNameAndTitleAsync("John", "Customer")); + + var name = await handle.QueryAsync(wf => wf.Name); + var title = await handle.QueryAsync(wf => wf.Title); + + // Send exit signal to workflow + await handle.SignalAsync(wf => wf.ExitAsync()); + + // Wait for the workflow to complete + var result = await handle.GetResultAsync(); + + // Validate that the worker counters have the correct numbers for the parent + Assert.Equal(1U, counterInterceptor.Counts[parentWorkflowId].WorkflowReplays); + Assert.Equal(1U, counterInterceptor.Counts[parentWorkflowId].WorkflowChildExecutions); + Assert.Equal(0U, counterInterceptor.Counts[parentWorkflowId].WorkflowActivityExecutions); + Assert.Equal(2U, counterInterceptor.Counts[parentWorkflowId].WorkflowSignals); + Assert.Equal(0U, counterInterceptor.Counts[parentWorkflowId].WorkflowQueries); + + // Validate the worker counters have the correct numbers for the child + Assert.Equal(1U, counterInterceptor.Counts[childWorkflowId].WorkflowReplays); + Assert.Equal(0U, counterInterceptor.Counts[childWorkflowId].WorkflowChildExecutions); + Assert.Equal(2U, counterInterceptor.Counts[childWorkflowId].WorkflowActivityExecutions); + Assert.Equal(0U, counterInterceptor.Counts[childWorkflowId].WorkflowSignals); + Assert.Equal(0U, counterInterceptor.Counts[childWorkflowId].WorkflowQueries); + + // Validate the client counters have correct numbers + Assert.Equal(1U, counterInterceptor.Counts[parentWorkflowId].ClientExecutions); + Assert.Equal(2U, counterInterceptor.Counts[parentWorkflowId].ClientSignals); + Assert.Equal(2U, counterInterceptor.Counts[parentWorkflowId].ClientQueries); + }); + } +} \ No newline at end of file diff --git a/tests/TemporalioSamples.Tests.csproj b/tests/TemporalioSamples.Tests.csproj index a13ca8e..a8ba929 100644 --- a/tests/TemporalioSamples.Tests.csproj +++ b/tests/TemporalioSamples.Tests.csproj @@ -22,6 +22,7 @@ +