From d678366ff56f7cec4679e97b9faa00eecf1c4c16 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Tue, 23 Jul 2024 14:17:53 -0400 Subject: [PATCH 01/19] Added CounterInterceptor Sample --- README.md | 1 + src/CounterInterceptor/ClientCounter.cs | 89 +++++++++++++++ src/CounterInterceptor/Constants.cs | 7 ++ src/CounterInterceptor/MyActivities.cs | 19 +++ .../MyChildWorkflow.workflow.cs | 21 ++++ src/CounterInterceptor/MyWorkflow.workflow.cs | 56 +++++++++ src/CounterInterceptor/Program.cs | 108 ++++++++++++++++++ src/CounterInterceptor/README.md | 28 +++++ .../SimpleClientCallsInterceptor.cs | 65 +++++++++++ .../SimpleCounterWorkerInterceptor.cs | 99 ++++++++++++++++ ...emporalioSamples.CounterInterceptor.csproj | 7 ++ src/CounterInterceptor/WorkerCounter.cs | 91 +++++++++++++++ 12 files changed, 591 insertions(+) create mode 100644 src/CounterInterceptor/ClientCounter.cs create mode 100644 src/CounterInterceptor/Constants.cs create mode 100644 src/CounterInterceptor/MyActivities.cs create mode 100644 src/CounterInterceptor/MyChildWorkflow.workflow.cs create mode 100644 src/CounterInterceptor/MyWorkflow.workflow.cs create mode 100644 src/CounterInterceptor/Program.cs create mode 100644 src/CounterInterceptor/README.md create mode 100644 src/CounterInterceptor/SimpleClientCallsInterceptor.cs create mode 100644 src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs create mode 100644 src/CounterInterceptor/TemporalioSamples.CounterInterceptor.csproj create mode 100644 src/CounterInterceptor/WorkerCounter.cs diff --git a/README.md b/README.md index cd6234d..3243047 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ Prerequisites: * [ActivityWorker](src/ActivityWorker) - Use .NET activities from a workflow in another language. * [AspNet](src/AspNet) - Demonstration of a generic host worker and an ASP.NET workflow starter. * [ClientMtls](src/ClientMtls) - How to use client certificate authentication, e.g. for Temporal Cloud. +* [CounterInterceptor](src/CounterInterceptor/) - Simple Workflow and Client Interceptors example. * [ContextPropagation](src/ContextPropagation) - Context propagation via interceptors. * [DependencyInjection](src/DependencyInjection) - How to inject dependencies in activities and use generic hosts for workers * [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs. diff --git a/src/CounterInterceptor/ClientCounter.cs b/src/CounterInterceptor/ClientCounter.cs new file mode 100644 index 0000000..8e66414 --- /dev/null +++ b/src/CounterInterceptor/ClientCounter.cs @@ -0,0 +1,89 @@ +namespace TermporalioSamples.CounterInterceptor; + +using System.Numerics; + +public class ClientCounter +{ + private const string NumberOfWorkflowExecutions = "numOfWorkflowExec"; + private const string NumberOfSignals = "numOfSignals"; + private const string NumberOfQueries = "numOfQueries"; + private static Dictionary> perWorkflowIdDictionary = + new Dictionary>(); + + public static string Info() + { + string result = string.Empty; + foreach (var item in perWorkflowIdDictionary) + { + var info = item.Value; + result = result + + "\n** Workflow ID: " + item.Key + + "\n\tTotal Number of Workflow Exec: " + info[NumberOfWorkflowExecutions] + + "\n\tTotal Number of Signals: " + info[NumberOfSignals] + + "\n\tTotal Number of Queries: " + info[NumberOfQueries]; + } + + return result; + } + + public BigInteger? NumOfWorkflowExecutions(string workflowId) + { + return perWorkflowIdDictionary[workflowId][NumberOfWorkflowExecutions]; + } + + public BigInteger? NumOfSignals(string workflowId) + { + return perWorkflowIdDictionary[workflowId][NumberOfSignals]; + } + + public BigInteger? NumOfQueries(string workflowId) + { + return perWorkflowIdDictionary[workflowId][NumberOfQueries]; + } + + public void AddStartInvocation(string workflowId) + { + Add(workflowId, NumberOfWorkflowExecutions); + } + + public void AddSignalInvocation(string workflowId) + { + Add(workflowId, NumberOfSignals); + } + + public void AddQueryInvocation(string workflowId) + { + Add(workflowId, NumberOfQueries); + } + + // Creates a default counter info map for a workflowid + private static Dictionary GetDefaultInfoMap() + { + return new Dictionary() + { + { NumberOfWorkflowExecutions, 0 }, + { NumberOfSignals, 0 }, + { NumberOfQueries, 0 }, + }; + } + + private void Add(string workflowId, string type) + { + if (!perWorkflowIdDictionary.TryGetValue(workflowId, out Dictionary? value)) + { + value = GetDefaultInfoMap(); + perWorkflowIdDictionary.Add(workflowId, value); + } + + if (value[type] == null) + { + value[type] = 1; + } + else + { + var current = value[type]; + var next = current + 1; + value[type] = next; + } + } +} \ No newline at end of file diff --git a/src/CounterInterceptor/Constants.cs b/src/CounterInterceptor/Constants.cs new file mode 100644 index 0000000..35ed588 --- /dev/null +++ b/src/CounterInterceptor/Constants.cs @@ -0,0 +1,7 @@ +namespace TermporalioSamples.CounterInterceptor; + +public static class Constants +{ + public const string TaskQueue = "counter-interceptor-sample"; + public const string ChildWorkflowId = "TestInterceptorChildWorkflow"; +} \ No newline at end of file diff --git a/src/CounterInterceptor/MyActivities.cs b/src/CounterInterceptor/MyActivities.cs new file mode 100644 index 0000000..b6459e1 --- /dev/null +++ b/src/CounterInterceptor/MyActivities.cs @@ -0,0 +1,19 @@ +namespace TermporalioSamples.CounterInterceptor; + +using System.Diagnostics; +using Temporalio.Activities; + +public class MyActivities +{ + [Activity] + public string SayHello(string name, string title) + { + return "Hello " + title + " " + name; + } + + [Activity] + public string SayGoodBye(string name, string title) + { + return "Goodbye " + title + " " + name; + } +} diff --git a/src/CounterInterceptor/MyChildWorkflow.workflow.cs b/src/CounterInterceptor/MyChildWorkflow.workflow.cs new file mode 100644 index 0000000..6bb465c --- /dev/null +++ b/src/CounterInterceptor/MyChildWorkflow.workflow.cs @@ -0,0 +1,21 @@ +namespace TermporalioSamples.CounterInterceptor; + +using Temporalio.Workflows; + +[Workflow] +public class MyChildWorkflow +{ + private readonly ActivityOptions activityOptions = new() + { + StartToCloseTimeout = TimeSpan.FromSeconds(10), + }; + + [WorkflowRun] + public async Task ExecChildAsync(string name, string title) + { + string result = await Workflow.ExecuteActivityAsync((MyActivities act) => act.SayHello(name, title), activityOptions); + result += await Workflow.ExecuteActivityAsync((MyActivities act) => act.SayGoodBye(name, title), activityOptions); + + return result; + } +} \ No newline at end of file diff --git a/src/CounterInterceptor/MyWorkflow.workflow.cs b/src/CounterInterceptor/MyWorkflow.workflow.cs new file mode 100644 index 0000000..8bec052 --- /dev/null +++ b/src/CounterInterceptor/MyWorkflow.workflow.cs @@ -0,0 +1,56 @@ +namespace TermporalioSamples.CounterInterceptor; + +using Temporalio.Workflows; + +[Workflow] +public class MyWorkflow +{ + private string name = string.Empty; + private string title = string.Empty; + private bool exit; // automatically defaults to false + + [WorkflowRun] + public async Task ExecAsync() + { + // wait for greeting info + await Workflow.WaitConditionAsync(() => name != null && title != null); + + // Execute Child Workflow + string result = await Workflow.ExecuteChildWorkflowAsync( + (MyChildWorkflow wf) => wf.ExecChildAsync(name, title), + new() + { + Id = Constants.ChildWorkflowId, + }); + + // Wait for exit signal + await Workflow.WaitConditionAsync(() => exit != false); + + return result; + } + + [WorkflowSignal] + public async Task SignalNameAndTitleAsync(string name, string title) + { + this.name = name; + this.title = title; + } + + [WorkflowQuery] + public string QueryName() + { + return name; + } + + [WorkflowQuery] + public string QueryTitle() + { + return title; + } + + [WorkflowSignal] + public async Task ExitAsync() + { + this.exit = true; + } +} \ No newline at end of file diff --git a/src/CounterInterceptor/Program.cs b/src/CounterInterceptor/Program.cs new file mode 100644 index 0000000..8c1001b --- /dev/null +++ b/src/CounterInterceptor/Program.cs @@ -0,0 +1,108 @@ +namespace TermporalioSamples.CounterInterceptor; + +using Temporalio.Client; +using Temporalio.Worker; + +internal class Program +{ + private static async Task Main(string[] args) + { + static string GetEnvVarWithDefault(string envName, string defaultValue) + { + string? value = Environment.GetEnvironmentVariable(envName); + if (string.IsNullOrEmpty(value)) + { + return defaultValue; + } + return value; + } + + var address = GetEnvVarWithDefault("TEMPORAL_ADDRESS", "127.0.0.1:7233"); + var temporalNamespace = GetEnvVarWithDefault("TEMPORAL_NAMESPACE", "default"); + var tlsCertPath = GetEnvVarWithDefault("TEMPORAL_TLS_CERT", string.Empty); + var tlsKeyPath = GetEnvVarWithDefault("TEMPORAL_TLS_KEY", string.Empty); + TlsOptions? tls = null; + if (!string.IsNullOrEmpty(tlsCertPath) && !string.IsNullOrEmpty(tlsKeyPath)) + { + tls = new() + { + ClientCert = await File.ReadAllBytesAsync(tlsCertPath), + ClientPrivateKey = await File.ReadAllBytesAsync(tlsKeyPath), + }; + } + + var client = await TemporalClient.ConnectAsync( + options: new(address) + { + Namespace = temporalNamespace, + Tls = tls, + Interceptors = new[] + { + new SimpleClientCallsInterceptor(), + }, + }); + + using var tokenSource = new CancellationTokenSource(); + Console.CancelKeyPress += (_, eventArgs) => + { + tokenSource.Cancel(); + eventArgs.Cancel = true; + }; + + var activities = new MyActivities(); + + var workerOptions = new TemporalWorkerOptions(Constants.TaskQueue). + AddAllActivities(activities). + AddWorkflow(). + AddWorkflow(); + + workerOptions.Interceptors = new[] { new SimpleCounterWorkerInterceptor() }; + + using var worker = new TemporalWorker( + client, + workerOptions); + + // Run worker until cancelled + Console.WriteLine("Running worker..."); + try + { + // Start the workers + var workerResult = worker.ExecuteAsync(tokenSource.Token); + + // start the workflow + var handle = await client.StartWorkflowAsync( + (MyWorkflow wf) => wf.ExecAsync(), + new(id: Guid.NewGuid().ToString(), taskQueue: Constants.TaskQueue)); + + Console.WriteLine("Sending name and title to workflow"); + await handle.SignalAsync(wf => wf.SignalNameAndTitleAsync("John", "Customer")); + + var name = await handle.QueryAsync(wf => wf.QueryName()); + var title = await handle.QueryAsync(wf => wf.QueryTitle()); + + // send exit signal to workflow + await handle.SignalAsync(wf => wf.ExitAsync()); + + var result = await handle.GetResultAsync(); + + Console.WriteLine($"Workflow result is {result}", result); + + Console.WriteLine("Query results: "); + Console.WriteLine("Name: " + name); + Console.WriteLine("Title: " + title); + + // print worker counter info + Console.WriteLine("Collected Worker Counter Info: "); + Console.WriteLine(WorkerCounter.Info()); + + // print client counter info + Console.WriteLine(); + Console.WriteLine("Collected Client Counter Info:"); + Console.WriteLine(ClientCounter.Info()); + } + catch (OperationCanceledException) + { + Console.WriteLine("Worker cancelled"); + } + } +} \ No newline at end of file diff --git a/src/CounterInterceptor/README.md b/src/CounterInterceptor/README.md new file mode 100644 index 0000000..1dbb81d --- /dev/null +++ b/src/CounterInterceptor/README.md @@ -0,0 +1,28 @@ +# dotnet-counter-interceptor +The sample demonstrates: +- the use of a simple Worker Workflow Interceptor that counts the number of Workflow Executions, Child Workflow Executions, and Activity Executions as well as the number of Signals and Queries. It is based +off of the [Java Sample](https://github.com/temporalio/samples-java/tree/main) located [here](https://github.com/temporalio/samples-java/tree/main/core/src/main/java/io/temporal/samples/countinterceptor) +- the use of a simple Client Workflow Interceptor that counts the number of Workflow Executions as well as the number of Signals and Queries. + +## Start local Temporal Server +```bash +# run only once +temporal server start-dev +``` + +## Run Worker Locally +```bash +# make sure you have temporal server running (see section above) +dotnet run +``` + +## Run Worker using Temporal Cloud +```bash +# set up environment variables +export TEMPORAL_NAMESPACE=. +export TEMPORAL_ADDRESS=..tmprl.cloud:7233 +export TEMPORAL_TLS_CERT=/path/to/cert +export TEMPORAL_TLS_KEY=/path/to/key +# run the worker +dotnet run +``` \ No newline at end of file diff --git a/src/CounterInterceptor/SimpleClientCallsInterceptor.cs b/src/CounterInterceptor/SimpleClientCallsInterceptor.cs new file mode 100644 index 0000000..819d77a --- /dev/null +++ b/src/CounterInterceptor/SimpleClientCallsInterceptor.cs @@ -0,0 +1,65 @@ +namespace TermporalioSamples.CounterInterceptor; + +using Temporalio.Client; +using Temporalio.Client.Interceptors; + +public class SimpleClientCallsInterceptor : IClientInterceptor +{ + private ClientCounter clientCounter; + + public SimpleClientCallsInterceptor() + { + this.clientCounter = new ClientCounter(); + } + + public ClientOutboundInterceptor InterceptClient(ClientOutboundInterceptor nextInterceptor) => + new ClientOutbound(this, nextInterceptor); + + private class ClientOutbound : ClientOutboundInterceptor + { + private SimpleClientCallsInterceptor root; + + public ClientOutbound(SimpleClientCallsInterceptor root, ClientOutboundInterceptor next) + : base(next) => this.root = root; + + public override Task> StartWorkflowAsync( + StartWorkflowInput input) + { + var id = CheckId(input.Options.Id); + root.clientCounter.AddStartInvocation(id); + return base.StartWorkflowAsync(input); + } + + public override Task SignalWorkflowAsync(SignalWorkflowInput input) + { + var id = CheckId(input.Id); + root.clientCounter.AddSignalInvocation(id); + return base.SignalWorkflowAsync(input); + } + + public override Task QueryWorkflowAsync(QueryWorkflowInput input) + { + var id = CheckId(input.Id); + root.clientCounter.AddQueryInvocation(id); + return base.QueryWorkflowAsync(input); + } + + public override Task> StartWorkflowUpdateAsync( + StartWorkflowUpdateInput input) + { + // not tracking this + return base.StartWorkflowUpdateAsync(input); + } + + private static string CheckId(string? id) + { + var returnValue = "None"; + if (id != null) + { + returnValue = id; + } + + return returnValue; + } + } +} \ No newline at end of file diff --git a/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs b/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs new file mode 100644 index 0000000..07fbf1c --- /dev/null +++ b/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs @@ -0,0 +1,99 @@ +namespace TermporalioSamples.CounterInterceptor; + +using Temporalio.Activities; +using Temporalio.Worker.Interceptors; +using Temporalio.Workflows; + +public class SimpleCounterWorkerInterceptor : IWorkerInterceptor +{ + public WorkflowInboundInterceptor InterceptWorkflow(WorkflowInboundInterceptor nextInterceptor) => + new WorkflowInbound(this, nextInterceptor); + + public ActivityInboundInterceptor InterceptActivity(ActivityInboundInterceptor nextInterceptor) => + new ActivityInbound(this, nextInterceptor); + + private class WorkflowInbound : WorkflowInboundInterceptor + { + private readonly SimpleCounterWorkerInterceptor root; + + internal WorkflowInbound(SimpleCounterWorkerInterceptor root, WorkflowInboundInterceptor next) + : base(next) => this.root = root; + + public override void Init(WorkflowOutboundInterceptor outbound) + { + base.Init(new WorkflowOutbound(root, outbound)); + } + + public override Task ExecuteWorkflowAsync(ExecuteWorkflowInput input) + { + WorkerCounter.Add(Workflow.Info.WorkflowId, WorkerCounter.NumberOfWorkflowExecutions); + return base.ExecuteWorkflowAsync(input); + } + + public override Task HandleSignalAsync(HandleSignalInput input) + { + WorkerCounter.Add(Workflow.Info.WorkflowId, WorkerCounter.NumberOfSignals); + return base.HandleSignalAsync(input); + } + + public override object? HandleQuery(HandleQueryInput input) + { + WorkerCounter.Add(Workflow.Info.WorkflowId, WorkerCounter.NumberOfQueries); + return base.HandleQuery(input); + } + + public override void ValidateUpdate(HandleUpdateInput input) + { + // not monitoring + base.ValidateUpdate(input); + } + } + + private sealed class WorkflowOutbound : WorkflowOutboundInterceptor + { + private readonly SimpleCounterWorkerInterceptor root; + + internal WorkflowOutbound(SimpleCounterWorkerInterceptor root, WorkflowOutboundInterceptor next) + : base(next) => this.root = root; + + public override Task ScheduleActivityAsync( + ScheduleActivityInput input) + { + return base.ScheduleActivityAsync(input); + } + + public override Task SignalChildWorkflowAsync(SignalChildWorkflowInput input) + { + return base.SignalChildWorkflowAsync(input); + } + + public override Task SignalExternalWorkflowAsync(SignalExternalWorkflowInput input) + { + return base.SignalExternalWorkflowAsync(input); + } + + public override Task> StartChildWorkflowAsync( + StartChildWorkflowInput input) + { + WorkerCounter.Add(Workflow.Info.WorkflowId, WorkerCounter.NumberOfChildWorkflowExecutions); + return base.StartChildWorkflowAsync(input); + } + } + + private sealed class ActivityInbound : ActivityInboundInterceptor + { + private readonly SimpleCounterWorkerInterceptor root; + + internal ActivityInbound(SimpleCounterWorkerInterceptor root, ActivityInboundInterceptor next) + : base(next) + { + this.root = root; + } + + public override Task ExecuteActivityAsync(ExecuteActivityInput input) + { + WorkerCounter.Add(ActivityExecutionContext.Current.Info.WorkflowId, WorkerCounter.NumberOfActivityExecutions); + return base.ExecuteActivityAsync(input); + } + } +} \ No newline at end of file diff --git a/src/CounterInterceptor/TemporalioSamples.CounterInterceptor.csproj b/src/CounterInterceptor/TemporalioSamples.CounterInterceptor.csproj new file mode 100644 index 0000000..967ff4c --- /dev/null +++ b/src/CounterInterceptor/TemporalioSamples.CounterInterceptor.csproj @@ -0,0 +1,7 @@ + + + + Exe + + + diff --git a/src/CounterInterceptor/WorkerCounter.cs b/src/CounterInterceptor/WorkerCounter.cs new file mode 100644 index 0000000..4a21182 --- /dev/null +++ b/src/CounterInterceptor/WorkerCounter.cs @@ -0,0 +1,91 @@ +namespace TermporalioSamples.CounterInterceptor; + +using System.Numerics; + +public static class WorkerCounter +{ + public const string NumberOfWorkflowExecutions = "numOfWorkflowExec"; + public const string NumberOfChildWorkflowExecutions = "numOfChildWorkflowExec"; + public const string NumberOfActivityExecutions = "numOfActivityExec"; + public const string NumberOfSignals = "numOfSignals"; + public const string NumberOfQueries = "numOfQueries"; + + private static Dictionary> perWorkflowIdDictionary = + new Dictionary>(); + + public static void Add(string workflowId, string type) + { + if (!perWorkflowIdDictionary.TryGetValue(workflowId, out Dictionary? value)) + { + value = DefaultInfoMap(); + perWorkflowIdDictionary.Add(workflowId, value); + } + + if (value[type] == null) + { + value[type] = 1; + } + else + { + var current = value[type]; + var next = current + 1; + value[type] = next; + } + } + + public static BigInteger? NumOfWorkflowExecutions(string workflowId) + { + return perWorkflowIdDictionary[workflowId][NumberOfWorkflowExecutions]; + } + + public static BigInteger? NumOfChildWorkflowExecutions(string workflowId) + { + return perWorkflowIdDictionary[workflowId][NumberOfChildWorkflowExecutions]; + } + + public static BigInteger? NumOfActivityExecutions(string workflowId) + { + return perWorkflowIdDictionary[workflowId][NumberOfActivityExecutions]; + } + + public static BigInteger? NumOfSignals(string workflowId) + { + return perWorkflowIdDictionary[workflowId][NumberOfSignals]; + } + + public static BigInteger? NumOfQueries(string workflowId) + { + return perWorkflowIdDictionary[workflowId][NumberOfQueries]; + } + + public static string Info() + { + string result = string.Empty; + foreach (var item in perWorkflowIdDictionary) + { + var info = item.Value; + result = result + + "\n** Workflow ID: " + item.Key + + "\n\tTotal Number of Workflow Exec: " + info[NumberOfWorkflowExecutions] + + "\n\tTotal Number of Child Worflow Exec: " + info[NumberOfChildWorkflowExecutions] + + "\n\tTotal Number of Activity Exec: " + info[NumberOfActivityExecutions] + + "\n\tTotal Number of Signals: " + info[NumberOfSignals] + + "\n\tTotal Number of Queries: " + info[NumberOfQueries]; + } + + return result; + } + + // Creates a default counter info map for a workflowid + private static Dictionary DefaultInfoMap() + { + return new Dictionary() + { + { NumberOfWorkflowExecutions, 0 }, + { NumberOfChildWorkflowExecutions, 0 }, + { NumberOfActivityExecutions, 0 }, + { NumberOfSignals, 0 }, + { NumberOfQueries, 0 }, + }; + } +} \ No newline at end of file From f428229361bbd20b8dd6298300430ec4829171f1 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Tue, 23 Jul 2024 15:39:25 -0400 Subject: [PATCH 02/19] Fixed spelling error. Added tests --- src/CounterInterceptor/ClientCounter.cs | 8 +- src/CounterInterceptor/Constants.cs | 2 +- src/CounterInterceptor/MyActivities.cs | 2 +- .../MyChildWorkflow.workflow.cs | 2 +- src/CounterInterceptor/MyWorkflow.workflow.cs | 2 +- src/CounterInterceptor/Program.cs | 2 +- .../SimpleClientCallsInterceptor.cs | 2 +- .../SimpleCounterWorkerInterceptor.cs | 2 +- src/CounterInterceptor/WorkerCounter.cs | 2 +- tests/CounterInterceptor/MyWorkflowTests.cs | 76 +++++++++++++++++++ tests/TemporalioSamples.Tests.csproj | 1 + 11 files changed, 89 insertions(+), 12 deletions(-) create mode 100644 tests/CounterInterceptor/MyWorkflowTests.cs diff --git a/src/CounterInterceptor/ClientCounter.cs b/src/CounterInterceptor/ClientCounter.cs index 8e66414..f8966a3 100644 --- a/src/CounterInterceptor/ClientCounter.cs +++ b/src/CounterInterceptor/ClientCounter.cs @@ -1,4 +1,4 @@ -namespace TermporalioSamples.CounterInterceptor; +namespace TemporalioSamples.CounterInterceptor; using System.Numerics; @@ -26,17 +26,17 @@ public static string Info() return result; } - public BigInteger? NumOfWorkflowExecutions(string workflowId) + public static BigInteger? NumOfWorkflowExecutions(string workflowId) { return perWorkflowIdDictionary[workflowId][NumberOfWorkflowExecutions]; } - public BigInteger? NumOfSignals(string workflowId) + public static BigInteger? NumOfSignals(string workflowId) { return perWorkflowIdDictionary[workflowId][NumberOfSignals]; } - public BigInteger? NumOfQueries(string workflowId) + public static BigInteger? NumOfQueries(string workflowId) { return perWorkflowIdDictionary[workflowId][NumberOfQueries]; } diff --git a/src/CounterInterceptor/Constants.cs b/src/CounterInterceptor/Constants.cs index 35ed588..a25a077 100644 --- a/src/CounterInterceptor/Constants.cs +++ b/src/CounterInterceptor/Constants.cs @@ -1,4 +1,4 @@ -namespace TermporalioSamples.CounterInterceptor; +namespace TemporalioSamples.CounterInterceptor; public static class Constants { diff --git a/src/CounterInterceptor/MyActivities.cs b/src/CounterInterceptor/MyActivities.cs index b6459e1..2e07dbc 100644 --- a/src/CounterInterceptor/MyActivities.cs +++ b/src/CounterInterceptor/MyActivities.cs @@ -1,4 +1,4 @@ -namespace TermporalioSamples.CounterInterceptor; +namespace TemporalioSamples.CounterInterceptor; using System.Diagnostics; using Temporalio.Activities; diff --git a/src/CounterInterceptor/MyChildWorkflow.workflow.cs b/src/CounterInterceptor/MyChildWorkflow.workflow.cs index 6bb465c..ee590d5 100644 --- a/src/CounterInterceptor/MyChildWorkflow.workflow.cs +++ b/src/CounterInterceptor/MyChildWorkflow.workflow.cs @@ -1,4 +1,4 @@ -namespace TermporalioSamples.CounterInterceptor; +namespace TemporalioSamples.CounterInterceptor; using Temporalio.Workflows; diff --git a/src/CounterInterceptor/MyWorkflow.workflow.cs b/src/CounterInterceptor/MyWorkflow.workflow.cs index 8bec052..470f08c 100644 --- a/src/CounterInterceptor/MyWorkflow.workflow.cs +++ b/src/CounterInterceptor/MyWorkflow.workflow.cs @@ -1,4 +1,4 @@ -namespace TermporalioSamples.CounterInterceptor; +namespace TemporalioSamples.CounterInterceptor; using Temporalio.Workflows; diff --git a/src/CounterInterceptor/Program.cs b/src/CounterInterceptor/Program.cs index 8c1001b..44882b0 100644 --- a/src/CounterInterceptor/Program.cs +++ b/src/CounterInterceptor/Program.cs @@ -1,4 +1,4 @@ -namespace TermporalioSamples.CounterInterceptor; +namespace TemporalioSamples.CounterInterceptor; using Temporalio.Client; using Temporalio.Worker; diff --git a/src/CounterInterceptor/SimpleClientCallsInterceptor.cs b/src/CounterInterceptor/SimpleClientCallsInterceptor.cs index 819d77a..438c48c 100644 --- a/src/CounterInterceptor/SimpleClientCallsInterceptor.cs +++ b/src/CounterInterceptor/SimpleClientCallsInterceptor.cs @@ -1,4 +1,4 @@ -namespace TermporalioSamples.CounterInterceptor; +namespace TemporalioSamples.CounterInterceptor; using Temporalio.Client; using Temporalio.Client.Interceptors; diff --git a/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs b/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs index 07fbf1c..dd7a305 100644 --- a/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs +++ b/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs @@ -1,4 +1,4 @@ -namespace TermporalioSamples.CounterInterceptor; +namespace TemporalioSamples.CounterInterceptor; using Temporalio.Activities; using Temporalio.Worker.Interceptors; diff --git a/src/CounterInterceptor/WorkerCounter.cs b/src/CounterInterceptor/WorkerCounter.cs index 4a21182..f6f733e 100644 --- a/src/CounterInterceptor/WorkerCounter.cs +++ b/src/CounterInterceptor/WorkerCounter.cs @@ -1,4 +1,4 @@ -namespace TermporalioSamples.CounterInterceptor; +namespace TemporalioSamples.CounterInterceptor; using System.Numerics; diff --git a/tests/CounterInterceptor/MyWorkflowTests.cs b/tests/CounterInterceptor/MyWorkflowTests.cs new file mode 100644 index 0000000..6dc786e --- /dev/null +++ b/tests/CounterInterceptor/MyWorkflowTests.cs @@ -0,0 +1,76 @@ +using Temporalio.Client; +using Temporalio.Testing; +using Temporalio.Worker; +using TemporalioSamples.CounterInterceptor; +using Xunit; + +namespace TemporalioSamples.Tests.CounterInterceptor; + +public class MyWorkflowTests +{ + [Fact] + public async Task RunAsync_CounterInterceptor() + { + await using var env = await WorkflowEnvironment.StartLocalAsync(); + + // add the interceptor to the client + var clientOptions = (TemporalClientOptions)env.Client.Options.Clone(); + clientOptions.Interceptors = new[] + { + new SimpleClientCallsInterceptor(), + }; + + var client = new TemporalClient(env.Client.Connection, clientOptions); + + var workerOptions = new TemporalWorkerOptions( + TemporalioSamples.CounterInterceptor.Constants.TaskQueue). + AddAllActivities(new MyActivities()). + AddWorkflow(). + AddWorkflow(); + + workerOptions.Interceptors = new[] { new SimpleCounterWorkerInterceptor() }; + + var parentWorkflowId = "ParentWorkflowId"; + var childWorkflowId = TemporalioSamples.CounterInterceptor.Constants.ChildWorkflowId; + + using var worker = new TemporalWorker(client, workerOptions); + await worker.ExecuteAsync(async () => + { + var handle = await client.StartWorkflowAsync( + (MyWorkflow wf) => wf.ExecAsync(), + new( + id: parentWorkflowId, + taskQueue: TemporalioSamples.CounterInterceptor.Constants.TaskQueue)); + + await handle.SignalAsync(wf => wf.SignalNameAndTitleAsync("John", "Customer")); + + var name = await handle.QueryAsync(wf => wf.QueryName()); + var title = await handle.QueryAsync(wf => wf.QueryTitle()); + + // send exit signal to workflow + await handle.SignalAsync(wf => wf.ExitAsync()); + + // Wait for the workflow to complete + var result = await handle.GetResultAsync(); + + // validate that the worker counters have the correct numbers for the parent + Assert.Equal(1, WorkerCounter.NumOfWorkflowExecutions(parentWorkflowId)); + Assert.Equal(1, WorkerCounter.NumOfChildWorkflowExecutions(parentWorkflowId)); + Assert.Equal(0, WorkerCounter.NumOfActivityExecutions(parentWorkflowId)); + Assert.Equal(2, WorkerCounter.NumOfSignals(parentWorkflowId)); + Assert.Equal(2, WorkerCounter.NumOfQueries(parentWorkflowId)); + + // validate the worker counters have the correct numbers for the child + Assert.Equal(1, WorkerCounter.NumOfWorkflowExecutions(childWorkflowId)); + Assert.Equal(0, WorkerCounter.NumOfChildWorkflowExecutions(childWorkflowId)); + Assert.Equal(2, WorkerCounter.NumOfActivityExecutions(childWorkflowId)); + Assert.Equal(0, WorkerCounter.NumOfSignals(childWorkflowId)); + Assert.Equal(0, WorkerCounter.NumOfQueries(childWorkflowId)); + + // validate the client counters have correct numbers + Assert.Equal(1, ClientCounter.NumOfWorkflowExecutions(parentWorkflowId)); + Assert.Equal(2, ClientCounter.NumOfSignals(parentWorkflowId)); + Assert.Equal(2, ClientCounter.NumOfQueries(parentWorkflowId)); + }); + } +} \ No newline at end of file diff --git a/tests/TemporalioSamples.Tests.csproj b/tests/TemporalioSamples.Tests.csproj index f4a0ed8..2c3b60a 100644 --- a/tests/TemporalioSamples.Tests.csproj +++ b/tests/TemporalioSamples.Tests.csproj @@ -21,6 +21,7 @@ + From 26fdbee8a4d66884bf73e257661e9bcae7c50226 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Tue, 23 Jul 2024 15:49:58 -0400 Subject: [PATCH 03/19] Fixed whitespace format issue --- src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs b/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs index dd7a305..81e2735 100644 --- a/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs +++ b/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs @@ -21,7 +21,7 @@ internal WorkflowInbound(SimpleCounterWorkerInterceptor root, WorkflowInboundInt public override void Init(WorkflowOutboundInterceptor outbound) { - base.Init(new WorkflowOutbound(root, outbound)); + base.Init(new WorkflowOutbound(root, outbound)); } public override Task ExecuteWorkflowAsync(ExecuteWorkflowInput input) From c68e75dc8919d3daed5079f95257d1c43d6ea4f1 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Fri, 26 Jul 2024 10:47:27 -0400 Subject: [PATCH 04/19] Changes from PR suggestions --- README.md | 2 +- src/CounterInterceptor/ClientCounter.cs | 29 +++++------ src/CounterInterceptor/Constants.cs | 7 --- .../MyChildWorkflow.workflow.cs | 10 ++-- src/CounterInterceptor/MyWorkflow.workflow.cs | 28 ++++------- src/CounterInterceptor/Program.cs | 48 +++++-------------- src/CounterInterceptor/WorkerCounter.cs | 47 ++++++++---------- tests/CounterInterceptor/MyWorkflowTests.cs | 42 ++++++++-------- 8 files changed, 77 insertions(+), 136 deletions(-) delete mode 100644 src/CounterInterceptor/Constants.cs diff --git a/README.md b/README.md index 3243047..ba67ecb 100644 --- a/README.md +++ b/README.md @@ -17,8 +17,8 @@ Prerequisites: * [ActivityWorker](src/ActivityWorker) - Use .NET activities from a workflow in another language. * [AspNet](src/AspNet) - Demonstration of a generic host worker and an ASP.NET workflow starter. * [ClientMtls](src/ClientMtls) - How to use client certificate authentication, e.g. for Temporal Cloud. -* [CounterInterceptor](src/CounterInterceptor/) - Simple Workflow and Client Interceptors example. * [ContextPropagation](src/ContextPropagation) - Context propagation via interceptors. +* [CounterInterceptor](src/CounterInterceptor/) - Simple Workflow and Client Interceptors example. * [DependencyInjection](src/DependencyInjection) - How to inject dependencies in activities and use generic hosts for workers * [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs. * [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource. diff --git a/src/CounterInterceptor/ClientCounter.cs b/src/CounterInterceptor/ClientCounter.cs index f8966a3..49b7b62 100644 --- a/src/CounterInterceptor/ClientCounter.cs +++ b/src/CounterInterceptor/ClientCounter.cs @@ -7,8 +7,8 @@ public class ClientCounter private const string NumberOfWorkflowExecutions = "numOfWorkflowExec"; private const string NumberOfSignals = "numOfSignals"; private const string NumberOfQueries = "numOfQueries"; - private static Dictionary> perWorkflowIdDictionary = - new Dictionary>(); + private static Dictionary> perWorkflowIdDictionary = + new(); public static string Info() { @@ -26,17 +26,17 @@ public static string Info() return result; } - public static BigInteger? NumOfWorkflowExecutions(string workflowId) + public static uint NumOfWorkflowExecutions(string workflowId) { return perWorkflowIdDictionary[workflowId][NumberOfWorkflowExecutions]; } - public static BigInteger? NumOfSignals(string workflowId) + public static uint NumOfSignals(string workflowId) { return perWorkflowIdDictionary[workflowId][NumberOfSignals]; } - public static BigInteger? NumOfQueries(string workflowId) + public static uint NumOfQueries(string workflowId) { return perWorkflowIdDictionary[workflowId][NumberOfQueries]; } @@ -57,9 +57,9 @@ public void AddQueryInvocation(string workflowId) } // Creates a default counter info map for a workflowid - private static Dictionary GetDefaultInfoMap() + private static Dictionary GetDefaultInfoMap() { - return new Dictionary() + return new Dictionary() { { NumberOfWorkflowExecutions, 0 }, { NumberOfSignals, 0 }, @@ -69,21 +69,14 @@ public void AddQueryInvocation(string workflowId) private void Add(string workflowId, string type) { - if (!perWorkflowIdDictionary.TryGetValue(workflowId, out Dictionary? value)) + if (!perWorkflowIdDictionary.TryGetValue(workflowId, out Dictionary? value)) { value = GetDefaultInfoMap(); perWorkflowIdDictionary.Add(workflowId, value); } - if (value[type] == null) - { - value[type] = 1; - } - else - { - var current = value[type]; - var next = current + 1; - value[type] = next; - } + var current = value[type]; + var next = current + 1; + value[type] = next; } } \ No newline at end of file diff --git a/src/CounterInterceptor/Constants.cs b/src/CounterInterceptor/Constants.cs deleted file mode 100644 index a25a077..0000000 --- a/src/CounterInterceptor/Constants.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace TemporalioSamples.CounterInterceptor; - -public static class Constants -{ - public const string TaskQueue = "counter-interceptor-sample"; - public const string ChildWorkflowId = "TestInterceptorChildWorkflow"; -} \ No newline at end of file diff --git a/src/CounterInterceptor/MyChildWorkflow.workflow.cs b/src/CounterInterceptor/MyChildWorkflow.workflow.cs index ee590d5..3b24389 100644 --- a/src/CounterInterceptor/MyChildWorkflow.workflow.cs +++ b/src/CounterInterceptor/MyChildWorkflow.workflow.cs @@ -11,11 +11,7 @@ public class MyChildWorkflow }; [WorkflowRun] - public async Task ExecChildAsync(string name, string title) - { - string result = await Workflow.ExecuteActivityAsync((MyActivities act) => act.SayHello(name, title), activityOptions); - result += await Workflow.ExecuteActivityAsync((MyActivities act) => act.SayGoodBye(name, title), activityOptions); - - return result; - } + public async Task RunAsync(string name, string title) => + await Workflow.ExecuteActivityAsync((MyActivities act) => act.SayHello(name, title), activityOptions) + + await Workflow.ExecuteActivityAsync((MyActivities act) => act.SayGoodBye(name, title), activityOptions); } \ No newline at end of file diff --git a/src/CounterInterceptor/MyWorkflow.workflow.cs b/src/CounterInterceptor/MyWorkflow.workflow.cs index 470f08c..a9bd7c3 100644 --- a/src/CounterInterceptor/MyWorkflow.workflow.cs +++ b/src/CounterInterceptor/MyWorkflow.workflow.cs @@ -10,21 +10,18 @@ public class MyWorkflow private bool exit; // automatically defaults to false [WorkflowRun] - public async Task ExecAsync() + public async Task RunAsync() { // wait for greeting info await Workflow.WaitConditionAsync(() => name != null && title != null); // Execute Child Workflow - string result = await Workflow.ExecuteChildWorkflowAsync( - (MyChildWorkflow wf) => wf.ExecChildAsync(name, title), - new() - { - Id = Constants.ChildWorkflowId, - }); + var result = await Workflow.ExecuteChildWorkflowAsync( + (MyChildWorkflow wf) => wf.RunAsync(name, title), + new() { Id = "counter-interceptor-child" }); // Wait for exit signal - await Workflow.WaitConditionAsync(() => exit != false); + await Workflow.WaitConditionAsync(() => exit); return result; } @@ -37,20 +34,11 @@ public async Task SignalNameAndTitleAsync(string name, string title) } [WorkflowQuery] - public string QueryName() - { - return name; - } + public string Name { get; private set; } = string.Empty; [WorkflowQuery] - public string QueryTitle() - { - return title; - } + public string Title { get; private set; } = string.Empty; [WorkflowSignal] - public async Task ExitAsync() - { - this.exit = true; - } + public async Task ExitAsync() => exit = true; } \ No newline at end of file diff --git a/src/CounterInterceptor/Program.cs b/src/CounterInterceptor/Program.cs index 44882b0..8e86bdf 100644 --- a/src/CounterInterceptor/Program.cs +++ b/src/CounterInterceptor/Program.cs @@ -7,35 +7,9 @@ internal class Program { private static async Task Main(string[] args) { - static string GetEnvVarWithDefault(string envName, string defaultValue) - { - string? value = Environment.GetEnvironmentVariable(envName); - if (string.IsNullOrEmpty(value)) - { - return defaultValue; - } - return value; - } - - var address = GetEnvVarWithDefault("TEMPORAL_ADDRESS", "127.0.0.1:7233"); - var temporalNamespace = GetEnvVarWithDefault("TEMPORAL_NAMESPACE", "default"); - var tlsCertPath = GetEnvVarWithDefault("TEMPORAL_TLS_CERT", string.Empty); - var tlsKeyPath = GetEnvVarWithDefault("TEMPORAL_TLS_KEY", string.Empty); - TlsOptions? tls = null; - if (!string.IsNullOrEmpty(tlsCertPath) && !string.IsNullOrEmpty(tlsKeyPath)) - { - tls = new() - { - ClientCert = await File.ReadAllBytesAsync(tlsCertPath), - ClientPrivateKey = await File.ReadAllBytesAsync(tlsKeyPath), - }; - } - var client = await TemporalClient.ConnectAsync( - options: new(address) + options: new("localhost:7233") { - Namespace = temporalNamespace, - Tls = tls, Interceptors = new[] { new SimpleClientCallsInterceptor(), @@ -51,7 +25,9 @@ static string GetEnvVarWithDefault(string envName, string defaultValue) var activities = new MyActivities(); - var workerOptions = new TemporalWorkerOptions(Constants.TaskQueue). + var taskQueue = "CounterInterceptorTaskQueue"; + + var workerOptions = new TemporalWorkerOptions(taskQueue). AddAllActivities(activities). AddWorkflow(). AddWorkflow(); @@ -69,18 +45,18 @@ static string GetEnvVarWithDefault(string envName, string defaultValue) // Start the workers var workerResult = worker.ExecuteAsync(tokenSource.Token); - // start the workflow + // Start the workflow var handle = await client.StartWorkflowAsync( - (MyWorkflow wf) => wf.ExecAsync(), - new(id: Guid.NewGuid().ToString(), taskQueue: Constants.TaskQueue)); + (MyWorkflow wf) => wf.RunAsync(), + new(id: Guid.NewGuid().ToString(), taskQueue: taskQueue)); Console.WriteLine("Sending name and title to workflow"); await handle.SignalAsync(wf => wf.SignalNameAndTitleAsync("John", "Customer")); - var name = await handle.QueryAsync(wf => wf.QueryName()); - var title = await handle.QueryAsync(wf => wf.QueryTitle()); + var name = await handle.QueryAsync(wf => wf.Name); + var title = await handle.QueryAsync(wf => wf.Title); - // send exit signal to workflow + // Send exit signal to workflow await handle.SignalAsync(wf => wf.ExitAsync()); var result = await handle.GetResultAsync(); @@ -91,11 +67,11 @@ static string GetEnvVarWithDefault(string envName, string defaultValue) Console.WriteLine("Name: " + name); Console.WriteLine("Title: " + title); - // print worker counter info + // Print worker counter info Console.WriteLine("Collected Worker Counter Info: "); Console.WriteLine(WorkerCounter.Info()); - // print client counter info + // Print client counter info Console.WriteLine(); Console.WriteLine("Collected Client Counter Info:"); Console.WriteLine(ClientCounter.Info()); diff --git a/src/CounterInterceptor/WorkerCounter.cs b/src/CounterInterceptor/WorkerCounter.cs index f6f733e..bf11e74 100644 --- a/src/CounterInterceptor/WorkerCounter.cs +++ b/src/CounterInterceptor/WorkerCounter.cs @@ -10,50 +10,43 @@ public static class WorkerCounter public const string NumberOfSignals = "numOfSignals"; public const string NumberOfQueries = "numOfQueries"; - private static Dictionary> perWorkflowIdDictionary = - new Dictionary>(); + private static Dictionary> perWorkflowIdDictionary = + new Dictionary>(); public static void Add(string workflowId, string type) { - if (!perWorkflowIdDictionary.TryGetValue(workflowId, out Dictionary? value)) + if (!perWorkflowIdDictionary.TryGetValue(workflowId, out Dictionary? value)) { value = DefaultInfoMap(); perWorkflowIdDictionary.Add(workflowId, value); } - if (value[type] == null) - { - value[type] = 1; - } - else - { - var current = value[type]; - var next = current + 1; - value[type] = next; - } + var current = value[type]; + var next = current + 1; + value[type] = next; } - public static BigInteger? NumOfWorkflowExecutions(string workflowId) + public static uint NumOfWorkflowExecutions(string workflowId) { return perWorkflowIdDictionary[workflowId][NumberOfWorkflowExecutions]; } - public static BigInteger? NumOfChildWorkflowExecutions(string workflowId) + public static uint NumOfChildWorkflowExecutions(string workflowId) { return perWorkflowIdDictionary[workflowId][NumberOfChildWorkflowExecutions]; } - public static BigInteger? NumOfActivityExecutions(string workflowId) + public static uint NumOfActivityExecutions(string workflowId) { return perWorkflowIdDictionary[workflowId][NumberOfActivityExecutions]; } - public static BigInteger? NumOfSignals(string workflowId) + public static uint NumOfSignals(string workflowId) { return perWorkflowIdDictionary[workflowId][NumberOfSignals]; } - public static BigInteger? NumOfQueries(string workflowId) + public static uint NumOfQueries(string workflowId) { return perWorkflowIdDictionary[workflowId][NumberOfQueries]; } @@ -63,23 +56,23 @@ public static string Info() string result = string.Empty; foreach (var item in perWorkflowIdDictionary) { - var info = item.Value; + var itemInfo = item.Value; result = result + "\n** Workflow ID: " + item.Key + - "\n\tTotal Number of Workflow Exec: " + info[NumberOfWorkflowExecutions] + - "\n\tTotal Number of Child Worflow Exec: " + info[NumberOfChildWorkflowExecutions] + - "\n\tTotal Number of Activity Exec: " + info[NumberOfActivityExecutions] + - "\n\tTotal Number of Signals: " + info[NumberOfSignals] + - "\n\tTotal Number of Queries: " + info[NumberOfQueries]; + "\n\tTotal Number of Workflow Exec: " + itemInfo[NumberOfWorkflowExecutions] + + "\n\tTotal Number of Child Worflow Exec: " + itemInfo[NumberOfChildWorkflowExecutions] + + "\n\tTotal Number of Activity Exec: " + itemInfo[NumberOfActivityExecutions] + + "\n\tTotal Number of Signals: " + itemInfo[NumberOfSignals] + + "\n\tTotal Number of Queries: " + itemInfo[NumberOfQueries]; } return result; } - // Creates a default counter info map for a workflowid - private static Dictionary DefaultInfoMap() + // Creates a default counter info map for a workflowId + private static Dictionary DefaultInfoMap() { - return new Dictionary() + return new Dictionary() { { NumberOfWorkflowExecutions, 0 }, { NumberOfChildWorkflowExecutions, 0 }, diff --git a/tests/CounterInterceptor/MyWorkflowTests.cs b/tests/CounterInterceptor/MyWorkflowTests.cs index 6dc786e..44bd478 100644 --- a/tests/CounterInterceptor/MyWorkflowTests.cs +++ b/tests/CounterInterceptor/MyWorkflowTests.cs @@ -22,8 +22,9 @@ public async Task RunAsync_CounterInterceptor() var client = new TemporalClient(env.Client.Connection, clientOptions); - var workerOptions = new TemporalWorkerOptions( - TemporalioSamples.CounterInterceptor.Constants.TaskQueue). + var taskQueue = "counter-interceptor-test-task-queue"; + + var workerOptions = new TemporalWorkerOptions(taskQueue). AddAllActivities(new MyActivities()). AddWorkflow(). AddWorkflow(); @@ -31,21 +32,22 @@ public async Task RunAsync_CounterInterceptor() workerOptions.Interceptors = new[] { new SimpleCounterWorkerInterceptor() }; var parentWorkflowId = "ParentWorkflowId"; - var childWorkflowId = TemporalioSamples.CounterInterceptor.Constants.ChildWorkflowId; + // Be sure that this matches the ID in the Workflow + var childWorkflowId = "counter-interceptor-child"; using var worker = new TemporalWorker(client, workerOptions); await worker.ExecuteAsync(async () => { var handle = await client.StartWorkflowAsync( - (MyWorkflow wf) => wf.ExecAsync(), + (MyWorkflow wf) => wf.RunAsync(), new( id: parentWorkflowId, - taskQueue: TemporalioSamples.CounterInterceptor.Constants.TaskQueue)); + taskQueue: taskQueue)); await handle.SignalAsync(wf => wf.SignalNameAndTitleAsync("John", "Customer")); - var name = await handle.QueryAsync(wf => wf.QueryName()); - var title = await handle.QueryAsync(wf => wf.QueryTitle()); + var name = await handle.QueryAsync(wf => wf.Name); + var title = await handle.QueryAsync(wf => wf.Title); // send exit signal to workflow await handle.SignalAsync(wf => wf.ExitAsync()); @@ -54,23 +56,23 @@ await worker.ExecuteAsync(async () => var result = await handle.GetResultAsync(); // validate that the worker counters have the correct numbers for the parent - Assert.Equal(1, WorkerCounter.NumOfWorkflowExecutions(parentWorkflowId)); - Assert.Equal(1, WorkerCounter.NumOfChildWorkflowExecutions(parentWorkflowId)); - Assert.Equal(0, WorkerCounter.NumOfActivityExecutions(parentWorkflowId)); - Assert.Equal(2, WorkerCounter.NumOfSignals(parentWorkflowId)); - Assert.Equal(2, WorkerCounter.NumOfQueries(parentWorkflowId)); + Assert.Equal(1U, WorkerCounter.NumOfWorkflowExecutions(parentWorkflowId)); + Assert.Equal(1U, WorkerCounter.NumOfChildWorkflowExecutions(parentWorkflowId)); + Assert.Equal(0U, WorkerCounter.NumOfActivityExecutions(parentWorkflowId)); + Assert.Equal(2U, WorkerCounter.NumOfSignals(parentWorkflowId)); + Assert.Equal(2U, WorkerCounter.NumOfQueries(parentWorkflowId)); // validate the worker counters have the correct numbers for the child - Assert.Equal(1, WorkerCounter.NumOfWorkflowExecutions(childWorkflowId)); - Assert.Equal(0, WorkerCounter.NumOfChildWorkflowExecutions(childWorkflowId)); - Assert.Equal(2, WorkerCounter.NumOfActivityExecutions(childWorkflowId)); - Assert.Equal(0, WorkerCounter.NumOfSignals(childWorkflowId)); - Assert.Equal(0, WorkerCounter.NumOfQueries(childWorkflowId)); + Assert.Equal(1U, WorkerCounter.NumOfWorkflowExecutions(childWorkflowId)); + Assert.Equal(0U, WorkerCounter.NumOfChildWorkflowExecutions(childWorkflowId)); + Assert.Equal(2U, WorkerCounter.NumOfActivityExecutions(childWorkflowId)); + Assert.Equal(0U, WorkerCounter.NumOfSignals(childWorkflowId)); + Assert.Equal(0U, WorkerCounter.NumOfQueries(childWorkflowId)); // validate the client counters have correct numbers - Assert.Equal(1, ClientCounter.NumOfWorkflowExecutions(parentWorkflowId)); - Assert.Equal(2, ClientCounter.NumOfSignals(parentWorkflowId)); - Assert.Equal(2, ClientCounter.NumOfQueries(parentWorkflowId)); + Assert.Equal(1U, ClientCounter.NumOfWorkflowExecutions(parentWorkflowId)); + Assert.Equal(2U, ClientCounter.NumOfSignals(parentWorkflowId)); + Assert.Equal(2U, ClientCounter.NumOfQueries(parentWorkflowId)); }); } } \ No newline at end of file From ec9da55c719240478f820a9f5732f929208f7fbb Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Mon, 29 Jul 2024 18:32:01 -0400 Subject: [PATCH 05/19] Addressing PR comments; Fixed Query bug --- TemporalioSamples.sln | 7 ++ src/CounterInterceptor/ClientCounter.cs | 71 +++++++------- src/CounterInterceptor/MyActivities.cs | 2 +- src/CounterInterceptor/MyWorkflow.workflow.cs | 4 +- src/CounterInterceptor/Program.cs | 4 +- .../SimpleClientCallsInterceptor.cs | 8 +- .../SimpleCounterWorkerInterceptor.cs | 16 +--- src/CounterInterceptor/WorkerCounter.cs | 93 +++++++++++-------- 8 files changed, 109 insertions(+), 96 deletions(-) diff --git a/TemporalioSamples.sln b/TemporalioSamples.sln index b45058f..60a57aa 100644 --- a/TemporalioSamples.sln +++ b/TemporalioSamples.sln @@ -55,6 +55,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.WorkflowU EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ContextPropagation", "src\ContextPropagation\TemporalioSamples.ContextPropagation.csproj", "{7B797D20-485F-441D-8E71-AF7E315FA9CF}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.CounterInterceptor", "src\CounterInterceptor\TemporalioSamples.CounterInterceptor.csproj", "{EB05B946-4DD3-4992-B84B-4438B5846ED5}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -149,6 +151,10 @@ Global {7B797D20-485F-441D-8E71-AF7E315FA9CF}.Debug|Any CPU.Build.0 = Debug|Any CPU {7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.ActiveCfg = Release|Any CPU {7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.Build.0 = Release|Any CPU + {EB05B946-4DD3-4992-B84B-4438B5846ED5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EB05B946-4DD3-4992-B84B-4438B5846ED5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {EB05B946-4DD3-4992-B84B-4438B5846ED5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {EB05B946-4DD3-4992-B84B-4438B5846ED5}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -178,5 +184,6 @@ Global {B79F07F7-3429-4C58-84C3-08587F748B2D} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {B3DB7B8C-7BD3-4A53-A809-AB6279B1A630} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {7B797D20-485F-441D-8E71-AF7E315FA9CF} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} + {EB05B946-4DD3-4992-B84B-4438B5846ED5} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} EndGlobalSection EndGlobal diff --git a/src/CounterInterceptor/ClientCounter.cs b/src/CounterInterceptor/ClientCounter.cs index 49b7b62..286c087 100644 --- a/src/CounterInterceptor/ClientCounter.cs +++ b/src/CounterInterceptor/ClientCounter.cs @@ -1,44 +1,49 @@ namespace TemporalioSamples.CounterInterceptor; +public record ClientCounts +{ + public uint Executions { get; internal set; } + + public uint Signals { get; internal set; } -using System.Numerics; + public uint Queries { get; internal set; } + + public override string ToString() + { + return + "\n\tTotal Number of Workflow Exec: " + Executions + + "\n\tTotal Number of Signals: " + Signals + + "\n\tTotal Number of Queries: " + Queries; + } +} public class ClientCounter { private const string NumberOfWorkflowExecutions = "numOfWorkflowExec"; private const string NumberOfSignals = "numOfSignals"; private const string NumberOfQueries = "numOfQueries"; - private static Dictionary> perWorkflowIdDictionary = + private static Dictionary perWorkflowIdDictionary = new(); public static string Info() { - string result = string.Empty; - foreach (var item in perWorkflowIdDictionary) - { - var info = item.Value; - result = result + - "\n** Workflow ID: " + item.Key + - "\n\tTotal Number of Workflow Exec: " + info[NumberOfWorkflowExecutions] + - "\n\tTotal Number of Signals: " + info[NumberOfSignals] + - "\n\tTotal Number of Queries: " + info[NumberOfQueries]; - } - - return result; + return string.Join( + "\n", + perWorkflowIdDictionary.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value}")); } public static uint NumOfWorkflowExecutions(string workflowId) { - return perWorkflowIdDictionary[workflowId][NumberOfWorkflowExecutions]; + return perWorkflowIdDictionary[workflowId].Executions; } public static uint NumOfSignals(string workflowId) { - return perWorkflowIdDictionary[workflowId][NumberOfSignals]; + return perWorkflowIdDictionary[workflowId].Signals; } public static uint NumOfQueries(string workflowId) { - return perWorkflowIdDictionary[workflowId][NumberOfQueries]; + return perWorkflowIdDictionary[workflowId].Queries; } public void AddStartInvocation(string workflowId) @@ -56,27 +61,27 @@ public void AddQueryInvocation(string workflowId) Add(workflowId, NumberOfQueries); } - // Creates a default counter info map for a workflowid - private static Dictionary GetDefaultInfoMap() - { - return new Dictionary() - { - { NumberOfWorkflowExecutions, 0 }, - { NumberOfSignals, 0 }, - { NumberOfQueries, 0 }, - }; - } - private void Add(string workflowId, string type) { - if (!perWorkflowIdDictionary.TryGetValue(workflowId, out Dictionary? value)) + if (!perWorkflowIdDictionary.TryGetValue(workflowId, out ClientCounts? value)) { - value = GetDefaultInfoMap(); + value = new ClientCounts(); perWorkflowIdDictionary.Add(workflowId, value); } - var current = value[type]; - var next = current + 1; - value[type] = next; + switch (type) + { + case NumberOfWorkflowExecutions: + value.Executions++; + break; + case NumberOfQueries: + value.Queries++; + break; + case NumberOfSignals: + value.Signals++; + break; + default: + throw new NotImplementedException("Unknown type: " + type); + } } } \ No newline at end of file diff --git a/src/CounterInterceptor/MyActivities.cs b/src/CounterInterceptor/MyActivities.cs index 2e07dbc..d96d12b 100644 --- a/src/CounterInterceptor/MyActivities.cs +++ b/src/CounterInterceptor/MyActivities.cs @@ -16,4 +16,4 @@ public string SayGoodBye(string name, string title) { return "Goodbye " + title + " " + name; } -} +} \ No newline at end of file diff --git a/src/CounterInterceptor/MyWorkflow.workflow.cs b/src/CounterInterceptor/MyWorkflow.workflow.cs index a9bd7c3..d74974b 100644 --- a/src/CounterInterceptor/MyWorkflow.workflow.cs +++ b/src/CounterInterceptor/MyWorkflow.workflow.cs @@ -34,10 +34,10 @@ public async Task SignalNameAndTitleAsync(string name, string title) } [WorkflowQuery] - public string Name { get; private set; } = string.Empty; + public string Name { get => name; } [WorkflowQuery] - public string Title { get; private set; } = string.Empty; + public string Title { get => title; } [WorkflowSignal] public async Task ExitAsync() => exit = true; diff --git a/src/CounterInterceptor/Program.cs b/src/CounterInterceptor/Program.cs index 8e86bdf..574bef4 100644 --- a/src/CounterInterceptor/Program.cs +++ b/src/CounterInterceptor/Program.cs @@ -64,8 +64,8 @@ private static async Task Main(string[] args) Console.WriteLine($"Workflow result is {result}", result); Console.WriteLine("Query results: "); - Console.WriteLine("Name: " + name); - Console.WriteLine("Title: " + title); + Console.WriteLine($"\tName: {name}", name); + Console.WriteLine($"\tTitle: {title}", title); // Print worker counter info Console.WriteLine("Collected Worker Counter Info: "); diff --git a/src/CounterInterceptor/SimpleClientCallsInterceptor.cs b/src/CounterInterceptor/SimpleClientCallsInterceptor.cs index 438c48c..7558fc0 100644 --- a/src/CounterInterceptor/SimpleClientCallsInterceptor.cs +++ b/src/CounterInterceptor/SimpleClientCallsInterceptor.cs @@ -53,13 +53,7 @@ public override Task> StartWorkflowUpdateAsync ExecuteWorkflowAsync(ExecuteWorkflowInput input) @@ -41,20 +41,14 @@ public override Task HandleSignalAsync(HandleSignalInput input) WorkerCounter.Add(Workflow.Info.WorkflowId, WorkerCounter.NumberOfQueries); return base.HandleQuery(input); } - - public override void ValidateUpdate(HandleUpdateInput input) - { - // not monitoring - base.ValidateUpdate(input); - } } private sealed class WorkflowOutbound : WorkflowOutboundInterceptor { - private readonly SimpleCounterWorkerInterceptor root; - - internal WorkflowOutbound(SimpleCounterWorkerInterceptor root, WorkflowOutboundInterceptor next) - : base(next) => this.root = root; + internal WorkflowOutbound(WorkflowOutboundInterceptor next) + : base(next) + { + } public override Task ScheduleActivityAsync( ScheduleActivityInput input) diff --git a/src/CounterInterceptor/WorkerCounter.cs b/src/CounterInterceptor/WorkerCounter.cs index bf11e74..3ab7fac 100644 --- a/src/CounterInterceptor/WorkerCounter.cs +++ b/src/CounterInterceptor/WorkerCounter.cs @@ -1,6 +1,26 @@ namespace TemporalioSamples.CounterInterceptor; +public record WorkflowCounts +{ + public uint Executions { get; internal set; } + + public uint Signals { get; internal set; } + + public uint Queries { get; internal set; } + + public uint ChildExecutions { get; internal set; } -using System.Numerics; + public uint ActivityExecutions { get; internal set; } + + public override string ToString() + { + return + "\n\tTotal Number of Workflow Exec: " + Executions + + "\n\tTotal Number of Child Worflow Exec: " + ChildExecutions + + "\n\tTotal Number of Activity Exec: " + ActivityExecutions + + "\n\tTotal Number of Signals: " + Signals + + "\n\tTotal Number of Queries: " + Queries; + } +} public static class WorkerCounter { @@ -10,75 +30,68 @@ public static class WorkerCounter public const string NumberOfSignals = "numOfSignals"; public const string NumberOfQueries = "numOfQueries"; - private static Dictionary> perWorkflowIdDictionary = - new Dictionary>(); + private static Dictionary perWorkflowIdDictionary = + new Dictionary(); public static void Add(string workflowId, string type) { - if (!perWorkflowIdDictionary.TryGetValue(workflowId, out Dictionary? value)) + if (!perWorkflowIdDictionary.TryGetValue(workflowId, out WorkflowCounts? value)) { - value = DefaultInfoMap(); + value = new WorkflowCounts(); perWorkflowIdDictionary.Add(workflowId, value); } - var current = value[type]; - var next = current + 1; - value[type] = next; + switch (type) + { + case NumberOfActivityExecutions: + value.ActivityExecutions++; + break; + case NumberOfChildWorkflowExecutions: + value.ChildExecutions++; + break; + case NumberOfQueries: + value.Queries++; + break; + case NumberOfSignals: + value.Signals++; + break; + case NumberOfWorkflowExecutions: + value.Executions++; + break; + default: + throw new NotImplementedException($"Unknown type: " + type); + } } public static uint NumOfWorkflowExecutions(string workflowId) { - return perWorkflowIdDictionary[workflowId][NumberOfWorkflowExecutions]; + return perWorkflowIdDictionary[workflowId].Executions; } public static uint NumOfChildWorkflowExecutions(string workflowId) { - return perWorkflowIdDictionary[workflowId][NumberOfChildWorkflowExecutions]; + return perWorkflowIdDictionary[workflowId].ChildExecutions; } public static uint NumOfActivityExecutions(string workflowId) { - return perWorkflowIdDictionary[workflowId][NumberOfActivityExecutions]; + return perWorkflowIdDictionary[workflowId].ActivityExecutions; } public static uint NumOfSignals(string workflowId) { - return perWorkflowIdDictionary[workflowId][NumberOfSignals]; + return perWorkflowIdDictionary[workflowId].Signals; } public static uint NumOfQueries(string workflowId) { - return perWorkflowIdDictionary[workflowId][NumberOfQueries]; + return perWorkflowIdDictionary[workflowId].Queries; } public static string Info() { - string result = string.Empty; - foreach (var item in perWorkflowIdDictionary) - { - var itemInfo = item.Value; - result = result + - "\n** Workflow ID: " + item.Key + - "\n\tTotal Number of Workflow Exec: " + itemInfo[NumberOfWorkflowExecutions] + - "\n\tTotal Number of Child Worflow Exec: " + itemInfo[NumberOfChildWorkflowExecutions] + - "\n\tTotal Number of Activity Exec: " + itemInfo[NumberOfActivityExecutions] + - "\n\tTotal Number of Signals: " + itemInfo[NumberOfSignals] + - "\n\tTotal Number of Queries: " + itemInfo[NumberOfQueries]; - } - - return result; - } - - // Creates a default counter info map for a workflowId - private static Dictionary DefaultInfoMap() - { - return new Dictionary() - { - { NumberOfWorkflowExecutions, 0 }, - { NumberOfChildWorkflowExecutions, 0 }, - { NumberOfActivityExecutions, 0 }, - { NumberOfSignals, 0 }, - { NumberOfQueries, 0 }, - }; + return string.Join( + "\n", + perWorkflowIdDictionary.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value}")); } } \ No newline at end of file From 31c0743edd2a2c3d10805232ef4fe950bcd60f63 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Tue, 30 Jul 2024 08:00:43 -0400 Subject: [PATCH 06/19] Refactored to use properties --- src/CounterInterceptor/MyWorkflow.workflow.cs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/CounterInterceptor/MyWorkflow.workflow.cs b/src/CounterInterceptor/MyWorkflow.workflow.cs index d74974b..f93e74d 100644 --- a/src/CounterInterceptor/MyWorkflow.workflow.cs +++ b/src/CounterInterceptor/MyWorkflow.workflow.cs @@ -5,19 +5,17 @@ namespace TemporalioSamples.CounterInterceptor; [Workflow] public class MyWorkflow { - private string name = string.Empty; - private string title = string.Empty; private bool exit; // automatically defaults to false [WorkflowRun] public async Task RunAsync() { // wait for greeting info - await Workflow.WaitConditionAsync(() => name != null && title != null); + await Workflow.WaitConditionAsync(() => Name != null && Title != null); // Execute Child Workflow var result = await Workflow.ExecuteChildWorkflowAsync( - (MyChildWorkflow wf) => wf.RunAsync(name, title), + (MyChildWorkflow wf) => wf.RunAsync(Name, Title), new() { Id = "counter-interceptor-child" }); // Wait for exit signal @@ -29,15 +27,15 @@ public async Task RunAsync() [WorkflowSignal] public async Task SignalNameAndTitleAsync(string name, string title) { - this.name = name; - this.title = title; + Name = name; + Title = title; } [WorkflowQuery] - public string Name { get => name; } + public string Name { get; private set; } = string.Empty; [WorkflowQuery] - public string Title { get => title; } + public string Title { get; private set; } = string.Empty; [WorkflowSignal] public async Task ExitAsync() => exit = true; From ca90698cdad5ee50ea1e350123cf10440641dbc1 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Tue, 30 Jul 2024 09:49:36 -0400 Subject: [PATCH 07/19] Made task queue a guid. Moved test to be in alphabetical order --- tests/CounterInterceptor/MyWorkflowTests.cs | 2 +- tests/TemporalioSamples.Tests.csproj | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/CounterInterceptor/MyWorkflowTests.cs b/tests/CounterInterceptor/MyWorkflowTests.cs index 44bd478..7cedbc7 100644 --- a/tests/CounterInterceptor/MyWorkflowTests.cs +++ b/tests/CounterInterceptor/MyWorkflowTests.cs @@ -22,7 +22,7 @@ public async Task RunAsync_CounterInterceptor() var client = new TemporalClient(env.Client.Connection, clientOptions); - var taskQueue = "counter-interceptor-test-task-queue"; + var taskQueue = Guid.NewGuid().ToString(); var workerOptions = new TemporalWorkerOptions(taskQueue). AddAllActivities(new MyActivities()). diff --git a/tests/TemporalioSamples.Tests.csproj b/tests/TemporalioSamples.Tests.csproj index 2c3b60a..1191fca 100644 --- a/tests/TemporalioSamples.Tests.csproj +++ b/tests/TemporalioSamples.Tests.csproj @@ -21,8 +21,8 @@ - + From 7ac67a94cce6dd12ddd06fc1bc99484c02320da2 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Tue, 30 Jul 2024 14:52:12 -0400 Subject: [PATCH 08/19] Updated readme. Consistent case for comments --- src/CounterInterceptor/MyWorkflow.workflow.cs | 4 ++-- src/CounterInterceptor/README.md | 6 +++--- src/CounterInterceptor/SimpleClientCallsInterceptor.cs | 2 +- tests/CounterInterceptor/MyWorkflowTests.cs | 10 +++++----- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/CounterInterceptor/MyWorkflow.workflow.cs b/src/CounterInterceptor/MyWorkflow.workflow.cs index f93e74d..08e4e55 100644 --- a/src/CounterInterceptor/MyWorkflow.workflow.cs +++ b/src/CounterInterceptor/MyWorkflow.workflow.cs @@ -5,12 +5,12 @@ namespace TemporalioSamples.CounterInterceptor; [Workflow] public class MyWorkflow { - private bool exit; // automatically defaults to false + private bool exit; // Automatically defaults to false [WorkflowRun] public async Task RunAsync() { - // wait for greeting info + // Wait for greeting info await Workflow.WaitConditionAsync(() => Name != null && Title != null); // Execute Child Workflow diff --git a/src/CounterInterceptor/README.md b/src/CounterInterceptor/README.md index 1dbb81d..da5334c 100644 --- a/src/CounterInterceptor/README.md +++ b/src/CounterInterceptor/README.md @@ -1,8 +1,8 @@ # dotnet-counter-interceptor The sample demonstrates: -- the use of a simple Worker Workflow Interceptor that counts the number of Workflow Executions, Child Workflow Executions, and Activity Executions as well as the number of Signals and Queries. It is based -off of the [Java Sample](https://github.com/temporalio/samples-java/tree/main) located [here](https://github.com/temporalio/samples-java/tree/main/core/src/main/java/io/temporal/samples/countinterceptor) -- the use of a simple Client Workflow Interceptor that counts the number of Workflow Executions as well as the number of Signals and Queries. +- the use of a Worker Workflow Interceptor that counts the number of Workflow Executions, Child Workflow Executions, and Activity Executions and the number of Signals and Queries. It is based +off of the [Java sample](https://github.com/temporalio/samples-java/tree/main) located [here](https://github.com/temporalio/samples-java/tree/main/core/src/main/java/io/temporal/samples/countinterceptor) +- the use of a Client Workflow Interceptor that counts the number of Workflow Executions and the number of Signals and Queries. ## Start local Temporal Server ```bash diff --git a/src/CounterInterceptor/SimpleClientCallsInterceptor.cs b/src/CounterInterceptor/SimpleClientCallsInterceptor.cs index 7558fc0..fa013c9 100644 --- a/src/CounterInterceptor/SimpleClientCallsInterceptor.cs +++ b/src/CounterInterceptor/SimpleClientCallsInterceptor.cs @@ -47,7 +47,7 @@ public override Task QueryWorkflowAsync(QueryWorkflowInput inp public override Task> StartWorkflowUpdateAsync( StartWorkflowUpdateInput input) { - // not tracking this + // Not tracking this return base.StartWorkflowUpdateAsync(input); } diff --git a/tests/CounterInterceptor/MyWorkflowTests.cs b/tests/CounterInterceptor/MyWorkflowTests.cs index 7cedbc7..f28d250 100644 --- a/tests/CounterInterceptor/MyWorkflowTests.cs +++ b/tests/CounterInterceptor/MyWorkflowTests.cs @@ -13,7 +13,7 @@ public async Task RunAsync_CounterInterceptor() { await using var env = await WorkflowEnvironment.StartLocalAsync(); - // add the interceptor to the client + // Add the interceptor to the client var clientOptions = (TemporalClientOptions)env.Client.Options.Clone(); clientOptions.Interceptors = new[] { @@ -49,27 +49,27 @@ await worker.ExecuteAsync(async () => var name = await handle.QueryAsync(wf => wf.Name); var title = await handle.QueryAsync(wf => wf.Title); - // send exit signal to workflow + // Send exit signal to workflow await handle.SignalAsync(wf => wf.ExitAsync()); // Wait for the workflow to complete var result = await handle.GetResultAsync(); - // validate that the worker counters have the correct numbers for the parent + // Validate that the worker counters have the correct numbers for the parent Assert.Equal(1U, WorkerCounter.NumOfWorkflowExecutions(parentWorkflowId)); Assert.Equal(1U, WorkerCounter.NumOfChildWorkflowExecutions(parentWorkflowId)); Assert.Equal(0U, WorkerCounter.NumOfActivityExecutions(parentWorkflowId)); Assert.Equal(2U, WorkerCounter.NumOfSignals(parentWorkflowId)); Assert.Equal(2U, WorkerCounter.NumOfQueries(parentWorkflowId)); - // validate the worker counters have the correct numbers for the child + // Validate the worker counters have the correct numbers for the child Assert.Equal(1U, WorkerCounter.NumOfWorkflowExecutions(childWorkflowId)); Assert.Equal(0U, WorkerCounter.NumOfChildWorkflowExecutions(childWorkflowId)); Assert.Equal(2U, WorkerCounter.NumOfActivityExecutions(childWorkflowId)); Assert.Equal(0U, WorkerCounter.NumOfSignals(childWorkflowId)); Assert.Equal(0U, WorkerCounter.NumOfQueries(childWorkflowId)); - // validate the client counters have correct numbers + // Validate the client counters have correct numbers Assert.Equal(1U, ClientCounter.NumOfWorkflowExecutions(parentWorkflowId)); Assert.Equal(2U, ClientCounter.NumOfSignals(parentWorkflowId)); Assert.Equal(2U, ClientCounter.NumOfQueries(parentWorkflowId)); From 34c83afcb76db85da8bc7f714ed3e3c20325420f Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Fri, 9 Aug 2024 14:16:27 -0400 Subject: [PATCH 09/19] Addressing PR comments --- src/CounterInterceptor/ClientCounter.cs | 87 --------------- src/CounterInterceptor/Program.cs | 10 +- .../SimpleClientCallsInterceptor.cs | 86 +++++++++++---- .../SimpleCounterWorkerInterceptor.cs | 103 +++++++++++++++--- src/CounterInterceptor/WorkerCounter.cs | 97 ----------------- tests/CounterInterceptor/MyWorkflowTests.cs | 33 +++--- 6 files changed, 175 insertions(+), 241 deletions(-) delete mode 100644 src/CounterInterceptor/ClientCounter.cs delete mode 100644 src/CounterInterceptor/WorkerCounter.cs diff --git a/src/CounterInterceptor/ClientCounter.cs b/src/CounterInterceptor/ClientCounter.cs deleted file mode 100644 index 286c087..0000000 --- a/src/CounterInterceptor/ClientCounter.cs +++ /dev/null @@ -1,87 +0,0 @@ -namespace TemporalioSamples.CounterInterceptor; -public record ClientCounts -{ - public uint Executions { get; internal set; } - - public uint Signals { get; internal set; } - - public uint Queries { get; internal set; } - - public override string ToString() - { - return - "\n\tTotal Number of Workflow Exec: " + Executions + - "\n\tTotal Number of Signals: " + Signals + - "\n\tTotal Number of Queries: " + Queries; - } -} - -public class ClientCounter -{ - private const string NumberOfWorkflowExecutions = "numOfWorkflowExec"; - private const string NumberOfSignals = "numOfSignals"; - private const string NumberOfQueries = "numOfQueries"; - private static Dictionary perWorkflowIdDictionary = - new(); - - public static string Info() - { - return string.Join( - "\n", - perWorkflowIdDictionary.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value}")); - } - - public static uint NumOfWorkflowExecutions(string workflowId) - { - return perWorkflowIdDictionary[workflowId].Executions; - } - - public static uint NumOfSignals(string workflowId) - { - return perWorkflowIdDictionary[workflowId].Signals; - } - - public static uint NumOfQueries(string workflowId) - { - return perWorkflowIdDictionary[workflowId].Queries; - } - - public void AddStartInvocation(string workflowId) - { - Add(workflowId, NumberOfWorkflowExecutions); - } - - public void AddSignalInvocation(string workflowId) - { - Add(workflowId, NumberOfSignals); - } - - public void AddQueryInvocation(string workflowId) - { - Add(workflowId, NumberOfQueries); - } - - private void Add(string workflowId, string type) - { - if (!perWorkflowIdDictionary.TryGetValue(workflowId, out ClientCounts? value)) - { - value = new ClientCounts(); - perWorkflowIdDictionary.Add(workflowId, value); - } - - switch (type) - { - case NumberOfWorkflowExecutions: - value.Executions++; - break; - case NumberOfQueries: - value.Queries++; - break; - case NumberOfSignals: - value.Signals++; - break; - default: - throw new NotImplementedException("Unknown type: " + type); - } - } -} \ No newline at end of file diff --git a/src/CounterInterceptor/Program.cs b/src/CounterInterceptor/Program.cs index 574bef4..9ef7e5d 100644 --- a/src/CounterInterceptor/Program.cs +++ b/src/CounterInterceptor/Program.cs @@ -7,12 +7,13 @@ internal class Program { private static async Task Main(string[] args) { + var clientInterceptor = new SimpleClientCallsInterceptor(); var client = await TemporalClient.ConnectAsync( options: new("localhost:7233") { Interceptors = new[] { - new SimpleClientCallsInterceptor(), + clientInterceptor, }, }); @@ -32,7 +33,8 @@ private static async Task Main(string[] args) AddWorkflow(). AddWorkflow(); - workerOptions.Interceptors = new[] { new SimpleCounterWorkerInterceptor() }; + var workerInterceptor = new SimpleCounterWorkerInterceptor(); + workerOptions.Interceptors = new[] { workerInterceptor }; using var worker = new TemporalWorker( client, @@ -69,12 +71,12 @@ private static async Task Main(string[] args) // Print worker counter info Console.WriteLine("Collected Worker Counter Info: "); - Console.WriteLine(WorkerCounter.Info()); + Console.WriteLine(workerInterceptor.Info()); // Print client counter info Console.WriteLine(); Console.WriteLine("Collected Client Counter Info:"); - Console.WriteLine(ClientCounter.Info()); + Console.WriteLine(clientInterceptor.Info()); } catch (OperationCanceledException) { diff --git a/src/CounterInterceptor/SimpleClientCallsInterceptor.cs b/src/CounterInterceptor/SimpleClientCallsInterceptor.cs index fa013c9..dbff909 100644 --- a/src/CounterInterceptor/SimpleClientCallsInterceptor.cs +++ b/src/CounterInterceptor/SimpleClientCallsInterceptor.cs @@ -3,18 +3,68 @@ namespace TemporalioSamples.CounterInterceptor; using Temporalio.Client; using Temporalio.Client.Interceptors; -public class SimpleClientCallsInterceptor : IClientInterceptor +public record ClientCounts { - private ClientCounter clientCounter; + public uint Executions { get; internal set; } - public SimpleClientCallsInterceptor() - { - this.clientCounter = new ClientCounter(); - } + public uint Signals { get; internal set; } + + public uint Queries { get; internal set; } + + public override string ToString() => + $"\n\tTotal Number of Workflow Exec: {Executions}\n\t" + + $"Total Number of Signals: {Signals}\n\t" + + $"Total Number of Queries: {Queries}"; +} + +public class SimpleClientCallsInterceptor : IClientInterceptor +{ + private const string NumberOfWorkflowExecutions = "numOfWorkflowExec"; + private const string NumberOfSignals = "numOfSignals"; + private const string NumberOfQueries = "numOfQueries"; + private static Dictionary clientDictionary = new(); public ClientOutboundInterceptor InterceptClient(ClientOutboundInterceptor nextInterceptor) => new ClientOutbound(this, nextInterceptor); + public string Info() => + string.Join( + "\n", + clientDictionary.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value}")); + + public uint NumOfWorkflowExecutions(string workflowId) => + clientDictionary[workflowId].Executions; + + public uint NumOfSignals(string workflowId) => + clientDictionary[workflowId].Signals; + + public uint NumOfQueries(string workflowId) => + clientDictionary[workflowId].Queries; + + private void Add(string workflowId, string type) + { + if (!clientDictionary.TryGetValue(workflowId, out ClientCounts? value)) + { + value = new ClientCounts(); + clientDictionary.Add(workflowId, value); + } + + switch (type) + { + case NumberOfWorkflowExecutions: + value.Executions++; + break; + case NumberOfQueries: + value.Queries++; + break; + case NumberOfSignals: + value.Signals++; + break; + default: + throw new NotImplementedException("Unknown type: " + type); + } + } + private class ClientOutbound : ClientOutboundInterceptor { private SimpleClientCallsInterceptor root; @@ -25,35 +75,23 @@ public ClientOutbound(SimpleClientCallsInterceptor root, ClientOutboundIntercept public override Task> StartWorkflowAsync( StartWorkflowInput input) { - var id = CheckId(input.Options.Id); - root.clientCounter.AddStartInvocation(id); + var id = input.Options.Id ?? "None"; + root.Add(id, NumberOfWorkflowExecutions); return base.StartWorkflowAsync(input); } public override Task SignalWorkflowAsync(SignalWorkflowInput input) { - var id = CheckId(input.Id); - root.clientCounter.AddSignalInvocation(id); + var id = input.Id ?? "None"; + root.Add(id, NumberOfSignals); return base.SignalWorkflowAsync(input); } public override Task QueryWorkflowAsync(QueryWorkflowInput input) { - var id = CheckId(input.Id); - root.clientCounter.AddQueryInvocation(id); + var id = input.Id ?? "None"; + root.Add(id, NumberOfQueries); return base.QueryWorkflowAsync(input); } - - public override Task> StartWorkflowUpdateAsync( - StartWorkflowUpdateInput input) - { - // Not tracking this - return base.StartWorkflowUpdateAsync(input); - } - - private static string CheckId(string? id) - { - return id ??= "None"; - } } } \ No newline at end of file diff --git a/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs b/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs index 039378c..3e1ab9c 100644 --- a/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs +++ b/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs @@ -4,14 +4,92 @@ namespace TemporalioSamples.CounterInterceptor; using Temporalio.Worker.Interceptors; using Temporalio.Workflows; +public record WorkflowCounts +{ + public uint Executions { get; internal set; } + + public uint Signals { get; internal set; } + + public uint Queries { get; internal set; } + + public uint ChildExecutions { get; internal set; } + + public uint ActivityExecutions { get; internal set; } + + public override string ToString() => + $"\n\tTotal Number of Workflow Exec: {Executions}\n\t" + + $"Total Number of Child Workflow Exec: {ChildExecutions}\n\t" + + $"Total Number of Activity Exec: {ActivityExecutions}\n\t" + + $"Total Number of Signals: {Signals}\n\t" + + $"Total Number of Queries: {Queries}"; +} + public class SimpleCounterWorkerInterceptor : IWorkerInterceptor { + private const string NumberOfWorkflowExecutions = "numOfWorkflowExec"; + private const string NumberOfChildWorkflowExecutions = "numOfChildWorkflowExec"; + private const string NumberOfActivityExecutions = "numOfActivityExec"; + private const string NumberOfSignals = "numOfSignals"; + private const string NumberOfQueries = "numOfQueries"; + + private Dictionary counterDictionary = new Dictionary(); + public WorkflowInboundInterceptor InterceptWorkflow(WorkflowInboundInterceptor nextInterceptor) => new WorkflowInbound(this, nextInterceptor); public ActivityInboundInterceptor InterceptActivity(ActivityInboundInterceptor nextInterceptor) => new ActivityInbound(this, nextInterceptor); + public string Info() => + string.Join( + "\n", + counterDictionary.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value}")); + + public uint NumOfWorkflowExecutions(string workflowId) => + counterDictionary[workflowId].Executions; + + public uint NumOfChildWorkflowExecutions(string workflowId) => + counterDictionary[workflowId].ChildExecutions; + + public uint NumOfActivityExecutions(string workflowId) => + counterDictionary[workflowId].ActivityExecutions; + + public uint NumOfSignals(string workflowId) => + counterDictionary[workflowId].Signals; + + public uint NumOfQueries(string workflowId) => + counterDictionary[workflowId].Queries; + + private void Add(string workflowId, string type) + { + if (!counterDictionary.TryGetValue(workflowId, out WorkflowCounts? value)) + { + value = new WorkflowCounts(); + counterDictionary.Add(workflowId, value); + } + + switch (type) + { + case NumberOfActivityExecutions: + value.ActivityExecutions++; + break; + case NumberOfChildWorkflowExecutions: + value.ChildExecutions++; + break; + case NumberOfQueries: + value.Queries++; + break; + case NumberOfSignals: + value.Signals++; + break; + case NumberOfWorkflowExecutions: + value.Executions++; + break; + default: + throw new NotImplementedException($"Unknown type: " + type); + } + } + private class WorkflowInbound : WorkflowInboundInterceptor { private readonly SimpleCounterWorkerInterceptor root; @@ -21,34 +99,34 @@ internal WorkflowInbound(SimpleCounterWorkerInterceptor root, WorkflowInboundInt public override void Init(WorkflowOutboundInterceptor outbound) { - base.Init(new WorkflowOutbound(outbound)); + base.Init(new WorkflowOutbound(root, outbound)); } public override Task ExecuteWorkflowAsync(ExecuteWorkflowInput input) { - WorkerCounter.Add(Workflow.Info.WorkflowId, WorkerCounter.NumberOfWorkflowExecutions); + root.Add(Workflow.Info.WorkflowId, NumberOfWorkflowExecutions); return base.ExecuteWorkflowAsync(input); } public override Task HandleSignalAsync(HandleSignalInput input) { - WorkerCounter.Add(Workflow.Info.WorkflowId, WorkerCounter.NumberOfSignals); + root.Add(Workflow.Info.WorkflowId, NumberOfSignals); return base.HandleSignalAsync(input); } public override object? HandleQuery(HandleQueryInput input) { - WorkerCounter.Add(Workflow.Info.WorkflowId, WorkerCounter.NumberOfQueries); + root.Add(Workflow.Info.WorkflowId, NumberOfQueries); return base.HandleQuery(input); } } private sealed class WorkflowOutbound : WorkflowOutboundInterceptor { - internal WorkflowOutbound(WorkflowOutboundInterceptor next) - : base(next) - { - } + private readonly SimpleCounterWorkerInterceptor root; + + internal WorkflowOutbound(SimpleCounterWorkerInterceptor root, WorkflowOutboundInterceptor next) + : base(next) => this.root = root; public override Task ScheduleActivityAsync( ScheduleActivityInput input) @@ -69,7 +147,7 @@ public override Task SignalExternalWorkflowAsync(SignalExternalWorkflowInput inp public override Task> StartChildWorkflowAsync( StartChildWorkflowInput input) { - WorkerCounter.Add(Workflow.Info.WorkflowId, WorkerCounter.NumberOfChildWorkflowExecutions); + root.Add(Workflow.Info.WorkflowId, NumberOfChildWorkflowExecutions); return base.StartChildWorkflowAsync(input); } } @@ -79,14 +157,11 @@ private sealed class ActivityInbound : ActivityInboundInterceptor private readonly SimpleCounterWorkerInterceptor root; internal ActivityInbound(SimpleCounterWorkerInterceptor root, ActivityInboundInterceptor next) - : base(next) - { - this.root = root; - } + : base(next) => this.root = root; public override Task ExecuteActivityAsync(ExecuteActivityInput input) { - WorkerCounter.Add(ActivityExecutionContext.Current.Info.WorkflowId, WorkerCounter.NumberOfActivityExecutions); + root.Add(ActivityExecutionContext.Current.Info.WorkflowId, NumberOfActivityExecutions); return base.ExecuteActivityAsync(input); } } diff --git a/src/CounterInterceptor/WorkerCounter.cs b/src/CounterInterceptor/WorkerCounter.cs deleted file mode 100644 index 3ab7fac..0000000 --- a/src/CounterInterceptor/WorkerCounter.cs +++ /dev/null @@ -1,97 +0,0 @@ -namespace TemporalioSamples.CounterInterceptor; -public record WorkflowCounts -{ - public uint Executions { get; internal set; } - - public uint Signals { get; internal set; } - - public uint Queries { get; internal set; } - - public uint ChildExecutions { get; internal set; } - - public uint ActivityExecutions { get; internal set; } - - public override string ToString() - { - return - "\n\tTotal Number of Workflow Exec: " + Executions + - "\n\tTotal Number of Child Worflow Exec: " + ChildExecutions + - "\n\tTotal Number of Activity Exec: " + ActivityExecutions + - "\n\tTotal Number of Signals: " + Signals + - "\n\tTotal Number of Queries: " + Queries; - } -} - -public static class WorkerCounter -{ - public const string NumberOfWorkflowExecutions = "numOfWorkflowExec"; - public const string NumberOfChildWorkflowExecutions = "numOfChildWorkflowExec"; - public const string NumberOfActivityExecutions = "numOfActivityExec"; - public const string NumberOfSignals = "numOfSignals"; - public const string NumberOfQueries = "numOfQueries"; - - private static Dictionary perWorkflowIdDictionary = - new Dictionary(); - - public static void Add(string workflowId, string type) - { - if (!perWorkflowIdDictionary.TryGetValue(workflowId, out WorkflowCounts? value)) - { - value = new WorkflowCounts(); - perWorkflowIdDictionary.Add(workflowId, value); - } - - switch (type) - { - case NumberOfActivityExecutions: - value.ActivityExecutions++; - break; - case NumberOfChildWorkflowExecutions: - value.ChildExecutions++; - break; - case NumberOfQueries: - value.Queries++; - break; - case NumberOfSignals: - value.Signals++; - break; - case NumberOfWorkflowExecutions: - value.Executions++; - break; - default: - throw new NotImplementedException($"Unknown type: " + type); - } - } - - public static uint NumOfWorkflowExecutions(string workflowId) - { - return perWorkflowIdDictionary[workflowId].Executions; - } - - public static uint NumOfChildWorkflowExecutions(string workflowId) - { - return perWorkflowIdDictionary[workflowId].ChildExecutions; - } - - public static uint NumOfActivityExecutions(string workflowId) - { - return perWorkflowIdDictionary[workflowId].ActivityExecutions; - } - - public static uint NumOfSignals(string workflowId) - { - return perWorkflowIdDictionary[workflowId].Signals; - } - - public static uint NumOfQueries(string workflowId) - { - return perWorkflowIdDictionary[workflowId].Queries; - } - - public static string Info() - { - return string.Join( - "\n", - perWorkflowIdDictionary.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value}")); - } -} \ No newline at end of file diff --git a/tests/CounterInterceptor/MyWorkflowTests.cs b/tests/CounterInterceptor/MyWorkflowTests.cs index f28d250..d49865b 100644 --- a/tests/CounterInterceptor/MyWorkflowTests.cs +++ b/tests/CounterInterceptor/MyWorkflowTests.cs @@ -13,11 +13,13 @@ public async Task RunAsync_CounterInterceptor() { await using var env = await WorkflowEnvironment.StartLocalAsync(); + var clientInterceptor = new SimpleClientCallsInterceptor(); + // Add the interceptor to the client var clientOptions = (TemporalClientOptions)env.Client.Options.Clone(); clientOptions.Interceptors = new[] { - new SimpleClientCallsInterceptor(), + clientInterceptor, }; var client = new TemporalClient(env.Client.Connection, clientOptions); @@ -29,7 +31,8 @@ public async Task RunAsync_CounterInterceptor() AddWorkflow(). AddWorkflow(); - workerOptions.Interceptors = new[] { new SimpleCounterWorkerInterceptor() }; + var workerInterceptor = new SimpleCounterWorkerInterceptor(); + workerOptions.Interceptors = new[] { workerInterceptor }; var parentWorkflowId = "ParentWorkflowId"; // Be sure that this matches the ID in the Workflow @@ -56,23 +59,23 @@ await worker.ExecuteAsync(async () => var result = await handle.GetResultAsync(); // Validate that the worker counters have the correct numbers for the parent - Assert.Equal(1U, WorkerCounter.NumOfWorkflowExecutions(parentWorkflowId)); - Assert.Equal(1U, WorkerCounter.NumOfChildWorkflowExecutions(parentWorkflowId)); - Assert.Equal(0U, WorkerCounter.NumOfActivityExecutions(parentWorkflowId)); - Assert.Equal(2U, WorkerCounter.NumOfSignals(parentWorkflowId)); - Assert.Equal(2U, WorkerCounter.NumOfQueries(parentWorkflowId)); + Assert.Equal(1U, workerInterceptor.NumOfWorkflowExecutions(parentWorkflowId)); + Assert.Equal(1U, workerInterceptor.NumOfChildWorkflowExecutions(parentWorkflowId)); + Assert.Equal(0U, workerInterceptor.NumOfActivityExecutions(parentWorkflowId)); + Assert.Equal(2U, workerInterceptor.NumOfSignals(parentWorkflowId)); + Assert.Equal(2U, workerInterceptor.NumOfQueries(parentWorkflowId)); // Validate the worker counters have the correct numbers for the child - Assert.Equal(1U, WorkerCounter.NumOfWorkflowExecutions(childWorkflowId)); - Assert.Equal(0U, WorkerCounter.NumOfChildWorkflowExecutions(childWorkflowId)); - Assert.Equal(2U, WorkerCounter.NumOfActivityExecutions(childWorkflowId)); - Assert.Equal(0U, WorkerCounter.NumOfSignals(childWorkflowId)); - Assert.Equal(0U, WorkerCounter.NumOfQueries(childWorkflowId)); + Assert.Equal(1U, workerInterceptor.NumOfWorkflowExecutions(childWorkflowId)); + Assert.Equal(0U, workerInterceptor.NumOfChildWorkflowExecutions(childWorkflowId)); + Assert.Equal(2U, workerInterceptor.NumOfActivityExecutions(childWorkflowId)); + Assert.Equal(0U, workerInterceptor.NumOfSignals(childWorkflowId)); + Assert.Equal(0U, workerInterceptor.NumOfQueries(childWorkflowId)); // Validate the client counters have correct numbers - Assert.Equal(1U, ClientCounter.NumOfWorkflowExecutions(parentWorkflowId)); - Assert.Equal(2U, ClientCounter.NumOfSignals(parentWorkflowId)); - Assert.Equal(2U, ClientCounter.NumOfQueries(parentWorkflowId)); + Assert.Equal(1U, clientInterceptor.NumOfWorkflowExecutions(parentWorkflowId)); + Assert.Equal(2U, clientInterceptor.NumOfSignals(parentWorkflowId)); + Assert.Equal(2U, clientInterceptor.NumOfQueries(parentWorkflowId)); }); } } \ No newline at end of file From 7682937b6fe2d6fe0ab1296df5449f0951d86387 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Thu, 15 Aug 2024 07:24:01 -0400 Subject: [PATCH 10/19] Addressing PR comments --- src/CounterInterceptor/Counts.cs | 54 ++++++ src/CounterInterceptor/MyActivities.cs | 6 +- .../MyCounterInterceptor.cs | 137 ++++++++++++++ src/CounterInterceptor/Program.cs | 23 ++- src/CounterInterceptor/README.md | 11 -- .../SimpleClientCallsInterceptor.cs | 97 ---------- .../SimpleCounterWorkerInterceptor.cs | 168 ------------------ tests/CounterInterceptor/MyWorkflowTests.cs | 33 ++-- 8 files changed, 219 insertions(+), 310 deletions(-) create mode 100644 src/CounterInterceptor/Counts.cs create mode 100644 src/CounterInterceptor/MyCounterInterceptor.cs delete mode 100644 src/CounterInterceptor/SimpleClientCallsInterceptor.cs delete mode 100644 src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs diff --git a/src/CounterInterceptor/Counts.cs b/src/CounterInterceptor/Counts.cs new file mode 100644 index 0000000..742b0d5 --- /dev/null +++ b/src/CounterInterceptor/Counts.cs @@ -0,0 +1,54 @@ +public record Counts +{ + public Counts() + { + clientExecutions = 0; + clientQueries = 0; + clientSignals = 0; + workflowExecutions = 0; + workflowSignals = 0; + workflowQueries = 0; + workflowChildExecutions = 0; + workflowActivityExecutions = 0; + } + + private uint clientExecutions; + private uint clientQueries; + private uint clientSignals; + private uint workflowExecutions; + private uint workflowSignals; + private uint workflowQueries; + private uint workflowChildExecutions; + private uint workflowActivityExecutions; + + public ref uint ClientExecutions => ref clientExecutions; + + public ref uint ClientSignals => ref clientSignals; + + public ref uint ClientQueries => ref clientQueries; + + public string ClientInfo() => + $"\n\tTotal Number of Workflow Exec: {ClientExecutions}\n\t" + + $"Total Number of Signals: {ClientSignals}\n\t" + + $"Total Number of Queries: {ClientQueries}"; + + public ref uint WorkflowExecutions => ref workflowExecutions; + + public ref uint WorkflowSignals => ref workflowSignals; + + public ref uint WorkflowQueries => ref workflowQueries; + + public ref uint WorkflowChildExecutions => ref workflowChildExecutions; + + public ref uint WorkflowActivityExecutions => ref workflowActivityExecutions; + + public string WorkflowInfo() => + $"\n\tTotal Number of Workflow Exec: {WorkflowExecutions}\n\t" + + $"Total Number of Child Workflow Exec: {WorkflowChildExecutions}\n\t" + + $"Total Number of Activity Exec: {WorkflowActivityExecutions}\n\t" + + $"Total Number of Signals: {WorkflowSignals}\n\t" + + $"Total Number of Queries: {WorkflowQueries}"; + + public override string ToString() => + ClientInfo() + WorkflowInfo(); +} \ No newline at end of file diff --git a/src/CounterInterceptor/MyActivities.cs b/src/CounterInterceptor/MyActivities.cs index d96d12b..def2a20 100644 --- a/src/CounterInterceptor/MyActivities.cs +++ b/src/CounterInterceptor/MyActivities.cs @@ -6,10 +6,8 @@ namespace TemporalioSamples.CounterInterceptor; public class MyActivities { [Activity] - public string SayHello(string name, string title) - { - return "Hello " + title + " " + name; - } + public string SayHello(string name, string title) => + "Hello " + title + " " + name; [Activity] public string SayGoodBye(string name, string title) diff --git a/src/CounterInterceptor/MyCounterInterceptor.cs b/src/CounterInterceptor/MyCounterInterceptor.cs new file mode 100644 index 0000000..f1241d0 --- /dev/null +++ b/src/CounterInterceptor/MyCounterInterceptor.cs @@ -0,0 +1,137 @@ +namespace TemporalioSamples.CounterInterceptor; + +using System.Collections.Concurrent; +using Temporalio.Activities; +using Temporalio.Client; +using Temporalio.Client.Interceptors; +using Temporalio.Worker.Interceptors; +using Temporalio.Workflows; + +public class MyCounterInterceptor : IClientInterceptor, IWorkerInterceptor +{ + private ConcurrentDictionary counts = new(); + + public ConcurrentDictionary Counts => counts; + + public string WorkerInfo() => + string.Join( + "\n", + counts.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value.WorkflowInfo()}")); + + public string ClientInfo() => + string.Join( + "\n", + counts.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value.ClientInfo()}")); + + public ClientOutboundInterceptor InterceptClient(ClientOutboundInterceptor nextInterceptor) => + new ClientOutbound(this, nextInterceptor); + + public WorkflowInboundInterceptor InterceptWorkflow(WorkflowInboundInterceptor nextInterceptor) => + new WorkflowInbound(this, nextInterceptor); + + public ActivityInboundInterceptor InterceptActivity(ActivityInboundInterceptor nextInterceptor) => + new ActivityInbound(this, nextInterceptor); + + private sealed class ClientOutbound : ClientOutboundInterceptor + { + private MyCounterInterceptor root; + + public ClientOutbound(MyCounterInterceptor root, ClientOutboundInterceptor next) + : base(next) => this.root = root; + + public override Task> StartWorkflowAsync( + StartWorkflowInput input) + { + var id = input.Options.Id ?? "None"; + // Need to add an empty record if none exists + // we don't care if it doesn't add it as we will + // still increment the current value. + root.counts.TryAdd(id, new Counts()); + Interlocked.Increment(ref root.counts[id].ClientExecutions); + return base.StartWorkflowAsync(input); + } + + public override Task SignalWorkflowAsync(SignalWorkflowInput input) + { + var id = input.Id ?? "None"; + Interlocked.Increment(ref root.counts[id].ClientSignals); + return base.SignalWorkflowAsync(input); + } + + public override Task QueryWorkflowAsync(QueryWorkflowInput input) + { + var id = input.Id ?? "None"; + Interlocked.Increment(ref root.counts[id].ClientQueries); + return base.QueryWorkflowAsync(input); + } + } + + private sealed class WorkflowInbound : WorkflowInboundInterceptor + { + private readonly MyCounterInterceptor root; + + internal WorkflowInbound(MyCounterInterceptor root, WorkflowInboundInterceptor next) + : base(next) => this.root = root; + + public override void Init(WorkflowOutboundInterceptor outbound) + { + base.Init(new WorkflowOutbound(root, outbound)); + } + + public override Task ExecuteWorkflowAsync(ExecuteWorkflowInput input) + { + var id = Workflow.Info.WorkflowId; + // Need to add an empty record if none exists + // we don't care if it doesn't add it as we will + // still increment the current value. + root.counts.TryAdd(id, new Counts()); + Interlocked.Increment(ref root.counts[id].WorkflowExecutions); + return base.ExecuteWorkflowAsync(input); + } + + public override Task HandleSignalAsync(HandleSignalInput input) + { + var id = Workflow.Info.WorkflowId; + Interlocked.Increment(ref root.counts[id].WorkflowSignals); + return base.HandleSignalAsync(input); + } + + public override object? HandleQuery(HandleQueryInput input) + { + var id = Workflow.Info.WorkflowId; + Interlocked.Increment(ref root.counts[id].WorkflowQueries); + return base.HandleQuery(input); + } + } + + private sealed class WorkflowOutbound : WorkflowOutboundInterceptor + { + private readonly MyCounterInterceptor root; + + internal WorkflowOutbound(MyCounterInterceptor root, WorkflowOutboundInterceptor next) + : base(next) => this.root = root; + + public override Task> StartChildWorkflowAsync( + StartChildWorkflowInput input) + { + var id = Workflow.Info.WorkflowId; + Interlocked.Increment(ref root.counts[id].WorkflowChildExecutions); + return base.StartChildWorkflowAsync(input); + } + } + + private sealed class ActivityInbound : ActivityInboundInterceptor + { + private readonly MyCounterInterceptor root; + + internal ActivityInbound(MyCounterInterceptor root, ActivityInboundInterceptor next) + : base(next) => this.root = root; + + public override Task ExecuteActivityAsync(ExecuteActivityInput input) + { + var id = ActivityExecutionContext.Current.Info.WorkflowId; + Interlocked.Increment(ref root.counts[id].WorkflowActivityExecutions); + return base.ExecuteActivityAsync(input); + } + } +} \ No newline at end of file diff --git a/src/CounterInterceptor/Program.cs b/src/CounterInterceptor/Program.cs index 9ef7e5d..1fc6ac7 100644 --- a/src/CounterInterceptor/Program.cs +++ b/src/CounterInterceptor/Program.cs @@ -7,13 +7,13 @@ internal class Program { private static async Task Main(string[] args) { - var clientInterceptor = new SimpleClientCallsInterceptor(); + var counterInterceptor = new MyCounterInterceptor(); var client = await TemporalClient.ConnectAsync( options: new("localhost:7233") { Interceptors = new[] { - clientInterceptor, + counterInterceptor, }, }); @@ -33,9 +33,7 @@ private static async Task Main(string[] args) AddWorkflow(). AddWorkflow(); - var workerInterceptor = new SimpleCounterWorkerInterceptor(); - workerOptions.Interceptors = new[] { workerInterceptor }; - + // workerOptions.Interceptors = new[] { counterInterceptor }; using var worker = new TemporalWorker( client, workerOptions); @@ -63,20 +61,21 @@ private static async Task Main(string[] args) var result = await handle.GetResultAsync(); - Console.WriteLine($"Workflow result is {result}", result); + Console.WriteLine($"Workflow result is {result}"); Console.WriteLine("Query results: "); - Console.WriteLine($"\tName: {name}", name); - Console.WriteLine($"\tTitle: {title}", title); + Console.WriteLine($"\tName: {name}"); + Console.WriteLine($"\tTitle: {title}"); // Print worker counter info - Console.WriteLine("Collected Worker Counter Info: "); - Console.WriteLine(workerInterceptor.Info()); + Console.WriteLine("\nCollected Worker Counter Info:\n"); + Console.WriteLine(counterInterceptor.WorkerInfo()); + Console.WriteLine($"Number of workers: {counterInterceptor.Counts.Count}"); // Print client counter info Console.WriteLine(); - Console.WriteLine("Collected Client Counter Info:"); - Console.WriteLine(clientInterceptor.Info()); + Console.WriteLine("Collected Client Counter Info:\n"); + Console.WriteLine(counterInterceptor.ClientInfo()); } catch (OperationCanceledException) { diff --git a/src/CounterInterceptor/README.md b/src/CounterInterceptor/README.md index da5334c..7830df0 100644 --- a/src/CounterInterceptor/README.md +++ b/src/CounterInterceptor/README.md @@ -15,14 +15,3 @@ temporal server start-dev # make sure you have temporal server running (see section above) dotnet run ``` - -## Run Worker using Temporal Cloud -```bash -# set up environment variables -export TEMPORAL_NAMESPACE=. -export TEMPORAL_ADDRESS=..tmprl.cloud:7233 -export TEMPORAL_TLS_CERT=/path/to/cert -export TEMPORAL_TLS_KEY=/path/to/key -# run the worker -dotnet run -``` \ No newline at end of file diff --git a/src/CounterInterceptor/SimpleClientCallsInterceptor.cs b/src/CounterInterceptor/SimpleClientCallsInterceptor.cs deleted file mode 100644 index dbff909..0000000 --- a/src/CounterInterceptor/SimpleClientCallsInterceptor.cs +++ /dev/null @@ -1,97 +0,0 @@ -namespace TemporalioSamples.CounterInterceptor; - -using Temporalio.Client; -using Temporalio.Client.Interceptors; - -public record ClientCounts -{ - public uint Executions { get; internal set; } - - public uint Signals { get; internal set; } - - public uint Queries { get; internal set; } - - public override string ToString() => - $"\n\tTotal Number of Workflow Exec: {Executions}\n\t" + - $"Total Number of Signals: {Signals}\n\t" + - $"Total Number of Queries: {Queries}"; -} - -public class SimpleClientCallsInterceptor : IClientInterceptor -{ - private const string NumberOfWorkflowExecutions = "numOfWorkflowExec"; - private const string NumberOfSignals = "numOfSignals"; - private const string NumberOfQueries = "numOfQueries"; - private static Dictionary clientDictionary = new(); - - public ClientOutboundInterceptor InterceptClient(ClientOutboundInterceptor nextInterceptor) => - new ClientOutbound(this, nextInterceptor); - - public string Info() => - string.Join( - "\n", - clientDictionary.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value}")); - - public uint NumOfWorkflowExecutions(string workflowId) => - clientDictionary[workflowId].Executions; - - public uint NumOfSignals(string workflowId) => - clientDictionary[workflowId].Signals; - - public uint NumOfQueries(string workflowId) => - clientDictionary[workflowId].Queries; - - private void Add(string workflowId, string type) - { - if (!clientDictionary.TryGetValue(workflowId, out ClientCounts? value)) - { - value = new ClientCounts(); - clientDictionary.Add(workflowId, value); - } - - switch (type) - { - case NumberOfWorkflowExecutions: - value.Executions++; - break; - case NumberOfQueries: - value.Queries++; - break; - case NumberOfSignals: - value.Signals++; - break; - default: - throw new NotImplementedException("Unknown type: " + type); - } - } - - private class ClientOutbound : ClientOutboundInterceptor - { - private SimpleClientCallsInterceptor root; - - public ClientOutbound(SimpleClientCallsInterceptor root, ClientOutboundInterceptor next) - : base(next) => this.root = root; - - public override Task> StartWorkflowAsync( - StartWorkflowInput input) - { - var id = input.Options.Id ?? "None"; - root.Add(id, NumberOfWorkflowExecutions); - return base.StartWorkflowAsync(input); - } - - public override Task SignalWorkflowAsync(SignalWorkflowInput input) - { - var id = input.Id ?? "None"; - root.Add(id, NumberOfSignals); - return base.SignalWorkflowAsync(input); - } - - public override Task QueryWorkflowAsync(QueryWorkflowInput input) - { - var id = input.Id ?? "None"; - root.Add(id, NumberOfQueries); - return base.QueryWorkflowAsync(input); - } - } -} \ No newline at end of file diff --git a/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs b/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs deleted file mode 100644 index 3e1ab9c..0000000 --- a/src/CounterInterceptor/SimpleCounterWorkerInterceptor.cs +++ /dev/null @@ -1,168 +0,0 @@ -namespace TemporalioSamples.CounterInterceptor; - -using Temporalio.Activities; -using Temporalio.Worker.Interceptors; -using Temporalio.Workflows; - -public record WorkflowCounts -{ - public uint Executions { get; internal set; } - - public uint Signals { get; internal set; } - - public uint Queries { get; internal set; } - - public uint ChildExecutions { get; internal set; } - - public uint ActivityExecutions { get; internal set; } - - public override string ToString() => - $"\n\tTotal Number of Workflow Exec: {Executions}\n\t" + - $"Total Number of Child Workflow Exec: {ChildExecutions}\n\t" + - $"Total Number of Activity Exec: {ActivityExecutions}\n\t" + - $"Total Number of Signals: {Signals}\n\t" + - $"Total Number of Queries: {Queries}"; -} - -public class SimpleCounterWorkerInterceptor : IWorkerInterceptor -{ - private const string NumberOfWorkflowExecutions = "numOfWorkflowExec"; - private const string NumberOfChildWorkflowExecutions = "numOfChildWorkflowExec"; - private const string NumberOfActivityExecutions = "numOfActivityExec"; - private const string NumberOfSignals = "numOfSignals"; - private const string NumberOfQueries = "numOfQueries"; - - private Dictionary counterDictionary = new Dictionary(); - - public WorkflowInboundInterceptor InterceptWorkflow(WorkflowInboundInterceptor nextInterceptor) => - new WorkflowInbound(this, nextInterceptor); - - public ActivityInboundInterceptor InterceptActivity(ActivityInboundInterceptor nextInterceptor) => - new ActivityInbound(this, nextInterceptor); - - public string Info() => - string.Join( - "\n", - counterDictionary.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value}")); - - public uint NumOfWorkflowExecutions(string workflowId) => - counterDictionary[workflowId].Executions; - - public uint NumOfChildWorkflowExecutions(string workflowId) => - counterDictionary[workflowId].ChildExecutions; - - public uint NumOfActivityExecutions(string workflowId) => - counterDictionary[workflowId].ActivityExecutions; - - public uint NumOfSignals(string workflowId) => - counterDictionary[workflowId].Signals; - - public uint NumOfQueries(string workflowId) => - counterDictionary[workflowId].Queries; - - private void Add(string workflowId, string type) - { - if (!counterDictionary.TryGetValue(workflowId, out WorkflowCounts? value)) - { - value = new WorkflowCounts(); - counterDictionary.Add(workflowId, value); - } - - switch (type) - { - case NumberOfActivityExecutions: - value.ActivityExecutions++; - break; - case NumberOfChildWorkflowExecutions: - value.ChildExecutions++; - break; - case NumberOfQueries: - value.Queries++; - break; - case NumberOfSignals: - value.Signals++; - break; - case NumberOfWorkflowExecutions: - value.Executions++; - break; - default: - throw new NotImplementedException($"Unknown type: " + type); - } - } - - private class WorkflowInbound : WorkflowInboundInterceptor - { - private readonly SimpleCounterWorkerInterceptor root; - - internal WorkflowInbound(SimpleCounterWorkerInterceptor root, WorkflowInboundInterceptor next) - : base(next) => this.root = root; - - public override void Init(WorkflowOutboundInterceptor outbound) - { - base.Init(new WorkflowOutbound(root, outbound)); - } - - public override Task ExecuteWorkflowAsync(ExecuteWorkflowInput input) - { - root.Add(Workflow.Info.WorkflowId, NumberOfWorkflowExecutions); - return base.ExecuteWorkflowAsync(input); - } - - public override Task HandleSignalAsync(HandleSignalInput input) - { - root.Add(Workflow.Info.WorkflowId, NumberOfSignals); - return base.HandleSignalAsync(input); - } - - public override object? HandleQuery(HandleQueryInput input) - { - root.Add(Workflow.Info.WorkflowId, NumberOfQueries); - return base.HandleQuery(input); - } - } - - private sealed class WorkflowOutbound : WorkflowOutboundInterceptor - { - private readonly SimpleCounterWorkerInterceptor root; - - internal WorkflowOutbound(SimpleCounterWorkerInterceptor root, WorkflowOutboundInterceptor next) - : base(next) => this.root = root; - - public override Task ScheduleActivityAsync( - ScheduleActivityInput input) - { - return base.ScheduleActivityAsync(input); - } - - public override Task SignalChildWorkflowAsync(SignalChildWorkflowInput input) - { - return base.SignalChildWorkflowAsync(input); - } - - public override Task SignalExternalWorkflowAsync(SignalExternalWorkflowInput input) - { - return base.SignalExternalWorkflowAsync(input); - } - - public override Task> StartChildWorkflowAsync( - StartChildWorkflowInput input) - { - root.Add(Workflow.Info.WorkflowId, NumberOfChildWorkflowExecutions); - return base.StartChildWorkflowAsync(input); - } - } - - private sealed class ActivityInbound : ActivityInboundInterceptor - { - private readonly SimpleCounterWorkerInterceptor root; - - internal ActivityInbound(SimpleCounterWorkerInterceptor root, ActivityInboundInterceptor next) - : base(next) => this.root = root; - - public override Task ExecuteActivityAsync(ExecuteActivityInput input) - { - root.Add(ActivityExecutionContext.Current.Info.WorkflowId, NumberOfActivityExecutions); - return base.ExecuteActivityAsync(input); - } - } -} \ No newline at end of file diff --git a/tests/CounterInterceptor/MyWorkflowTests.cs b/tests/CounterInterceptor/MyWorkflowTests.cs index d49865b..0af1d67 100644 --- a/tests/CounterInterceptor/MyWorkflowTests.cs +++ b/tests/CounterInterceptor/MyWorkflowTests.cs @@ -13,13 +13,13 @@ public async Task RunAsync_CounterInterceptor() { await using var env = await WorkflowEnvironment.StartLocalAsync(); - var clientInterceptor = new SimpleClientCallsInterceptor(); + var counterInterceptor = new MyCounterInterceptor(); // Add the interceptor to the client var clientOptions = (TemporalClientOptions)env.Client.Options.Clone(); clientOptions.Interceptors = new[] { - clientInterceptor, + counterInterceptor, }; var client = new TemporalClient(env.Client.Connection, clientOptions); @@ -31,9 +31,6 @@ public async Task RunAsync_CounterInterceptor() AddWorkflow(). AddWorkflow(); - var workerInterceptor = new SimpleCounterWorkerInterceptor(); - workerOptions.Interceptors = new[] { workerInterceptor }; - var parentWorkflowId = "ParentWorkflowId"; // Be sure that this matches the ID in the Workflow var childWorkflowId = "counter-interceptor-child"; @@ -59,23 +56,23 @@ await worker.ExecuteAsync(async () => var result = await handle.GetResultAsync(); // Validate that the worker counters have the correct numbers for the parent - Assert.Equal(1U, workerInterceptor.NumOfWorkflowExecutions(parentWorkflowId)); - Assert.Equal(1U, workerInterceptor.NumOfChildWorkflowExecutions(parentWorkflowId)); - Assert.Equal(0U, workerInterceptor.NumOfActivityExecutions(parentWorkflowId)); - Assert.Equal(2U, workerInterceptor.NumOfSignals(parentWorkflowId)); - Assert.Equal(2U, workerInterceptor.NumOfQueries(parentWorkflowId)); + Assert.Equal(1U, counterInterceptor.Counts[parentWorkflowId].WorkflowExecutions); + Assert.Equal(1U, counterInterceptor.Counts[parentWorkflowId].WorkflowChildExecutions); + Assert.Equal(0U, counterInterceptor.Counts[parentWorkflowId].WorkflowActivityExecutions); + Assert.Equal(2U, counterInterceptor.Counts[parentWorkflowId].WorkflowSignals); + Assert.Equal(2U, counterInterceptor.Counts[parentWorkflowId].WorkflowQueries); // Validate the worker counters have the correct numbers for the child - Assert.Equal(1U, workerInterceptor.NumOfWorkflowExecutions(childWorkflowId)); - Assert.Equal(0U, workerInterceptor.NumOfChildWorkflowExecutions(childWorkflowId)); - Assert.Equal(2U, workerInterceptor.NumOfActivityExecutions(childWorkflowId)); - Assert.Equal(0U, workerInterceptor.NumOfSignals(childWorkflowId)); - Assert.Equal(0U, workerInterceptor.NumOfQueries(childWorkflowId)); + Assert.Equal(1U, counterInterceptor.Counts[childWorkflowId].WorkflowExecutions); + Assert.Equal(0U, counterInterceptor.Counts[childWorkflowId].WorkflowChildExecutions); + Assert.Equal(2U, counterInterceptor.Counts[childWorkflowId].WorkflowActivityExecutions); + Assert.Equal(0U, counterInterceptor.Counts[childWorkflowId].WorkflowSignals); + Assert.Equal(0U, counterInterceptor.Counts[childWorkflowId].WorkflowQueries); // Validate the client counters have correct numbers - Assert.Equal(1U, clientInterceptor.NumOfWorkflowExecutions(parentWorkflowId)); - Assert.Equal(2U, clientInterceptor.NumOfSignals(parentWorkflowId)); - Assert.Equal(2U, clientInterceptor.NumOfQueries(parentWorkflowId)); + Assert.Equal(1U, counterInterceptor.Counts[parentWorkflowId].ClientExecutions); + Assert.Equal(2U, counterInterceptor.Counts[parentWorkflowId].ClientSignals); + Assert.Equal(2U, counterInterceptor.Counts[parentWorkflowId].ClientQueries); }); } } \ No newline at end of file From 37692a7c3b18f743d98d7307c5d14d5e09703346 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Thu, 15 Aug 2024 09:07:06 -0400 Subject: [PATCH 11/19] Removed/Added CounterInterceptor project to avoid conflict with head --- TemporalioSamples.sln | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/TemporalioSamples.sln b/TemporalioSamples.sln index 60a57aa..05fda7b 100644 --- a/TemporalioSamples.sln +++ b/TemporalioSamples.sln @@ -55,7 +55,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.WorkflowU EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ContextPropagation", "src\ContextPropagation\TemporalioSamples.ContextPropagation.csproj", "{7B797D20-485F-441D-8E71-AF7E315FA9CF}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.CounterInterceptor", "src\CounterInterceptor\TemporalioSamples.CounterInterceptor.csproj", "{EB05B946-4DD3-4992-B84B-4438B5846ED5}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.CounterInterceptor", "src\CounterInterceptor\TemporalioSamples.CounterInterceptor.csproj", "{C06DA866-D78E-4BD7-BAD9-53718DC40AF0}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -151,10 +151,10 @@ Global {7B797D20-485F-441D-8E71-AF7E315FA9CF}.Debug|Any CPU.Build.0 = Debug|Any CPU {7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.ActiveCfg = Release|Any CPU {7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.Build.0 = Release|Any CPU - {EB05B946-4DD3-4992-B84B-4438B5846ED5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {EB05B946-4DD3-4992-B84B-4438B5846ED5}.Debug|Any CPU.Build.0 = Debug|Any CPU - {EB05B946-4DD3-4992-B84B-4438B5846ED5}.Release|Any CPU.ActiveCfg = Release|Any CPU - {EB05B946-4DD3-4992-B84B-4438B5846ED5}.Release|Any CPU.Build.0 = Release|Any CPU + {C06DA866-D78E-4BD7-BAD9-53718DC40AF0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C06DA866-D78E-4BD7-BAD9-53718DC40AF0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C06DA866-D78E-4BD7-BAD9-53718DC40AF0}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C06DA866-D78E-4BD7-BAD9-53718DC40AF0}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -184,6 +184,6 @@ Global {B79F07F7-3429-4C58-84C3-08587F748B2D} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {B3DB7B8C-7BD3-4A53-A809-AB6279B1A630} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {7B797D20-485F-441D-8E71-AF7E315FA9CF} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} - {EB05B946-4DD3-4992-B84B-4438B5846ED5} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} + {C06DA866-D78E-4BD7-BAD9-53718DC40AF0} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} EndGlobalSection EndGlobal From d548089decffabab4d0bfd357badab89c93b9413 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Thu, 15 Aug 2024 09:26:59 -0400 Subject: [PATCH 12/19] Resolving merge conflict --- TemporalioSamples.sln | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/TemporalioSamples.sln b/TemporalioSamples.sln index 05fda7b..df5871f 100644 --- a/TemporalioSamples.sln +++ b/TemporalioSamples.sln @@ -55,7 +55,9 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.WorkflowU EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ContextPropagation", "src\ContextPropagation\TemporalioSamples.ContextPropagation.csproj", "{7B797D20-485F-441D-8E71-AF7E315FA9CF}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.CounterInterceptor", "src\CounterInterceptor\TemporalioSamples.CounterInterceptor.csproj", "{C06DA866-D78E-4BD7-BAD9-53718DC40AF0}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.SafeMessageHandlers", "src\SafeMessageHandlers\TemporalioSamples.SafeMessageHandlers.csproj", "{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.CounterInterceptor", "src\CounterInterceptor\TemporalioSamples.CounterInterceptor.csproj", "{F9C44936-8BF9-4919-BB66-8F1888E22AEB}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -151,10 +153,14 @@ Global {7B797D20-485F-441D-8E71-AF7E315FA9CF}.Debug|Any CPU.Build.0 = Debug|Any CPU {7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.ActiveCfg = Release|Any CPU {7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.Build.0 = Release|Any CPU - {C06DA866-D78E-4BD7-BAD9-53718DC40AF0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {C06DA866-D78E-4BD7-BAD9-53718DC40AF0}.Debug|Any CPU.Build.0 = Debug|Any CPU - {C06DA866-D78E-4BD7-BAD9-53718DC40AF0}.Release|Any CPU.ActiveCfg = Release|Any CPU - {C06DA866-D78E-4BD7-BAD9-53718DC40AF0}.Release|Any CPU.Build.0 = Release|Any CPU + {FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Release|Any CPU.Build.0 = Release|Any CPU + {F9C44936-8BF9-4919-BB66-8F1888E22AEB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F9C44936-8BF9-4919-BB66-8F1888E22AEB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F9C44936-8BF9-4919-BB66-8F1888E22AEB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F9C44936-8BF9-4919-BB66-8F1888E22AEB}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -184,6 +190,7 @@ Global {B79F07F7-3429-4C58-84C3-08587F748B2D} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {B3DB7B8C-7BD3-4A53-A809-AB6279B1A630} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {7B797D20-485F-441D-8E71-AF7E315FA9CF} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} - {C06DA866-D78E-4BD7-BAD9-53718DC40AF0} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} + {FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} + {F9C44936-8BF9-4919-BB66-8F1888E22AEB} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} EndGlobalSection EndGlobal From c0131fd9fe9441da689d75223126a68e14ae9526 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Thu, 15 Aug 2024 16:53:52 -0400 Subject: [PATCH 13/19] Updated tests to use base class --- tests/CounterInterceptor/MyWorkflowTests.cs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/CounterInterceptor/MyWorkflowTests.cs b/tests/CounterInterceptor/MyWorkflowTests.cs index 0af1d67..479046b 100644 --- a/tests/CounterInterceptor/MyWorkflowTests.cs +++ b/tests/CounterInterceptor/MyWorkflowTests.cs @@ -1,28 +1,31 @@ using Temporalio.Client; -using Temporalio.Testing; using Temporalio.Worker; using TemporalioSamples.CounterInterceptor; using Xunit; +using Xunit.Abstractions; namespace TemporalioSamples.Tests.CounterInterceptor; -public class MyWorkflowTests +public class MyWorkflowTests : WorkflowEnvironmentTestBase { + public MyWorkflowTests(ITestOutputHelper output, WorkflowEnvironment env) + : base(output, env) + { + } + [Fact] public async Task RunAsync_CounterInterceptor() { - await using var env = await WorkflowEnvironment.StartLocalAsync(); - var counterInterceptor = new MyCounterInterceptor(); // Add the interceptor to the client - var clientOptions = (TemporalClientOptions)env.Client.Options.Clone(); + var clientOptions = (TemporalClientOptions)Client.Options.Clone(); clientOptions.Interceptors = new[] { counterInterceptor, }; - var client = new TemporalClient(env.Client.Connection, clientOptions); + var client = new TemporalClient(Client.Connection, clientOptions); var taskQueue = Guid.NewGuid().ToString(); From 4557ffeeaf28915c9bc444c05d39e8b608f35e60 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Fri, 16 Aug 2024 16:01:17 -0400 Subject: [PATCH 14/19] Addressing PR comments --- src/CounterInterceptor/Counts.cs | 8 ++-- .../MyCounterInterceptor.cs | 43 ++++++++----------- src/CounterInterceptor/MyWorkflow.workflow.cs | 3 +- src/CounterInterceptor/Program.cs | 20 ++------- tests/CounterInterceptor/MyWorkflowTests.cs | 4 +- 5 files changed, 29 insertions(+), 49 deletions(-) diff --git a/src/CounterInterceptor/Counts.cs b/src/CounterInterceptor/Counts.cs index 742b0d5..fd0ef4d 100644 --- a/src/CounterInterceptor/Counts.cs +++ b/src/CounterInterceptor/Counts.cs @@ -5,7 +5,7 @@ public Counts() clientExecutions = 0; clientQueries = 0; clientSignals = 0; - workflowExecutions = 0; + workflowReplays = 0; workflowSignals = 0; workflowQueries = 0; workflowChildExecutions = 0; @@ -15,7 +15,7 @@ public Counts() private uint clientExecutions; private uint clientQueries; private uint clientSignals; - private uint workflowExecutions; + private uint workflowReplays; private uint workflowSignals; private uint workflowQueries; private uint workflowChildExecutions; @@ -32,7 +32,7 @@ public string ClientInfo() => $"Total Number of Signals: {ClientSignals}\n\t" + $"Total Number of Queries: {ClientQueries}"; - public ref uint WorkflowExecutions => ref workflowExecutions; + public ref uint WorkflowReplays => ref workflowReplays; public ref uint WorkflowSignals => ref workflowSignals; @@ -43,7 +43,7 @@ public string ClientInfo() => public ref uint WorkflowActivityExecutions => ref workflowActivityExecutions; public string WorkflowInfo() => - $"\n\tTotal Number of Workflow Exec: {WorkflowExecutions}\n\t" + + $"\n\tTotal Number of Workflow Replays: {WorkflowReplays}\n\t" + $"Total Number of Child Workflow Exec: {WorkflowChildExecutions}\n\t" + $"Total Number of Activity Exec: {WorkflowActivityExecutions}\n\t" + $"Total Number of Signals: {WorkflowSignals}\n\t" + diff --git a/src/CounterInterceptor/MyCounterInterceptor.cs b/src/CounterInterceptor/MyCounterInterceptor.cs index f1241d0..ae86ce6 100644 --- a/src/CounterInterceptor/MyCounterInterceptor.cs +++ b/src/CounterInterceptor/MyCounterInterceptor.cs @@ -9,19 +9,17 @@ namespace TemporalioSamples.CounterInterceptor; public class MyCounterInterceptor : IClientInterceptor, IWorkerInterceptor { - private ConcurrentDictionary counts = new(); - - public ConcurrentDictionary Counts => counts; + public ConcurrentDictionary Counts { get; } = new(); public string WorkerInfo() => string.Join( "\n", - counts.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value.WorkflowInfo()}")); + Counts.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value.WorkflowInfo()}")); public string ClientInfo() => string.Join( "\n", - counts.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value.ClientInfo()}")); + Counts.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value.ClientInfo()}")); public ClientOutboundInterceptor InterceptClient(ClientOutboundInterceptor nextInterceptor) => new ClientOutbound(this, nextInterceptor); @@ -32,6 +30,9 @@ public WorkflowInboundInterceptor InterceptWorkflow(WorkflowInboundInterceptor n public ActivityInboundInterceptor InterceptActivity(ActivityInboundInterceptor nextInterceptor) => new ActivityInbound(this, nextInterceptor); + private void Increment(string id, Action increment) => + increment(Counts.GetOrAdd(id, _ => new())); + private sealed class ClientOutbound : ClientOutboundInterceptor { private MyCounterInterceptor root; @@ -43,25 +44,21 @@ public override Task> StartWorkflowAsync Interlocked.Increment(ref root.Counts[id].ClientExecutions)); return base.StartWorkflowAsync(input); } public override Task SignalWorkflowAsync(SignalWorkflowInput input) { - var id = input.Id ?? "None"; - Interlocked.Increment(ref root.counts[id].ClientSignals); + var id = input.Id; + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].ClientSignals)); return base.SignalWorkflowAsync(input); } public override Task QueryWorkflowAsync(QueryWorkflowInput input) { - var id = input.Id ?? "None"; - Interlocked.Increment(ref root.counts[id].ClientQueries); + var id = input.Id; + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].ClientQueries)); return base.QueryWorkflowAsync(input); } } @@ -73,33 +70,27 @@ private sealed class WorkflowInbound : WorkflowInboundInterceptor internal WorkflowInbound(MyCounterInterceptor root, WorkflowInboundInterceptor next) : base(next) => this.root = root; - public override void Init(WorkflowOutboundInterceptor outbound) - { + public override void Init(WorkflowOutboundInterceptor outbound) => base.Init(new WorkflowOutbound(root, outbound)); - } public override Task ExecuteWorkflowAsync(ExecuteWorkflowInput input) { var id = Workflow.Info.WorkflowId; - // Need to add an empty record if none exists - // we don't care if it doesn't add it as we will - // still increment the current value. - root.counts.TryAdd(id, new Counts()); - Interlocked.Increment(ref root.counts[id].WorkflowExecutions); + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowReplays)); return base.ExecuteWorkflowAsync(input); } public override Task HandleSignalAsync(HandleSignalInput input) { var id = Workflow.Info.WorkflowId; - Interlocked.Increment(ref root.counts[id].WorkflowSignals); + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowSignals)); return base.HandleSignalAsync(input); } public override object? HandleQuery(HandleQueryInput input) { var id = Workflow.Info.WorkflowId; - Interlocked.Increment(ref root.counts[id].WorkflowQueries); + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowQueries)); return base.HandleQuery(input); } } @@ -115,7 +106,7 @@ public override Task> StartChildWorkflow StartChildWorkflowInput input) { var id = Workflow.Info.WorkflowId; - Interlocked.Increment(ref root.counts[id].WorkflowChildExecutions); + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowChildExecutions)); return base.StartChildWorkflowAsync(input); } } @@ -130,7 +121,7 @@ internal ActivityInbound(MyCounterInterceptor root, ActivityInboundInterceptor n public override Task ExecuteActivityAsync(ExecuteActivityInput input) { var id = ActivityExecutionContext.Current.Info.WorkflowId; - Interlocked.Increment(ref root.counts[id].WorkflowActivityExecutions); + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowActivityExecutions)); return base.ExecuteActivityAsync(input); } } diff --git a/src/CounterInterceptor/MyWorkflow.workflow.cs b/src/CounterInterceptor/MyWorkflow.workflow.cs index 08e4e55..bf0652c 100644 --- a/src/CounterInterceptor/MyWorkflow.workflow.cs +++ b/src/CounterInterceptor/MyWorkflow.workflow.cs @@ -11,7 +11,8 @@ public class MyWorkflow public async Task RunAsync() { // Wait for greeting info - await Workflow.WaitConditionAsync(() => Name != null && Title != null); + await Workflow.WaitConditionAsync(() => + !string.IsNullOrEmpty(Name) && !string.IsNullOrEmpty(Title)); // Execute Child Workflow var result = await Workflow.ExecuteChildWorkflowAsync( diff --git a/src/CounterInterceptor/Program.cs b/src/CounterInterceptor/Program.cs index 1fc6ac7..114c155 100644 --- a/src/CounterInterceptor/Program.cs +++ b/src/CounterInterceptor/Program.cs @@ -17,13 +17,6 @@ private static async Task Main(string[] args) }, }); - using var tokenSource = new CancellationTokenSource(); - Console.CancelKeyPress += (_, eventArgs) => - { - tokenSource.Cancel(); - eventArgs.Cancel = true; - }; - var activities = new MyActivities(); var taskQueue = "CounterInterceptorTaskQueue"; @@ -40,11 +33,10 @@ private static async Task Main(string[] args) // Run worker until cancelled Console.WriteLine("Running worker..."); - try - { - // Start the workers - var workerResult = worker.ExecuteAsync(tokenSource.Token); + // Start the workers + await worker.ExecuteAsync(async () => + { // Start the workflow var handle = await client.StartWorkflowAsync( (MyWorkflow wf) => wf.RunAsync(), @@ -76,10 +68,6 @@ private static async Task Main(string[] args) Console.WriteLine(); Console.WriteLine("Collected Client Counter Info:\n"); Console.WriteLine(counterInterceptor.ClientInfo()); - } - catch (OperationCanceledException) - { - Console.WriteLine("Worker cancelled"); - } + }); } } \ No newline at end of file diff --git a/tests/CounterInterceptor/MyWorkflowTests.cs b/tests/CounterInterceptor/MyWorkflowTests.cs index 479046b..47523bd 100644 --- a/tests/CounterInterceptor/MyWorkflowTests.cs +++ b/tests/CounterInterceptor/MyWorkflowTests.cs @@ -59,14 +59,14 @@ await worker.ExecuteAsync(async () => var result = await handle.GetResultAsync(); // Validate that the worker counters have the correct numbers for the parent - Assert.Equal(1U, counterInterceptor.Counts[parentWorkflowId].WorkflowExecutions); + Assert.Equal(1U, counterInterceptor.Counts[parentWorkflowId].WorkflowReplays); Assert.Equal(1U, counterInterceptor.Counts[parentWorkflowId].WorkflowChildExecutions); Assert.Equal(0U, counterInterceptor.Counts[parentWorkflowId].WorkflowActivityExecutions); Assert.Equal(2U, counterInterceptor.Counts[parentWorkflowId].WorkflowSignals); Assert.Equal(2U, counterInterceptor.Counts[parentWorkflowId].WorkflowQueries); // Validate the worker counters have the correct numbers for the child - Assert.Equal(1U, counterInterceptor.Counts[childWorkflowId].WorkflowExecutions); + Assert.Equal(1U, counterInterceptor.Counts[childWorkflowId].WorkflowReplays); Assert.Equal(0U, counterInterceptor.Counts[childWorkflowId].WorkflowChildExecutions); Assert.Equal(2U, counterInterceptor.Counts[childWorkflowId].WorkflowActivityExecutions); Assert.Equal(0U, counterInterceptor.Counts[childWorkflowId].WorkflowSignals); From c7b6e31b8b3258f7276519f88970e9de7bb0253a Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Fri, 16 Aug 2024 16:45:22 -0400 Subject: [PATCH 15/19] Added logic for detecting replay; fixed test --- .../MyCounterInterceptor.cs | 32 ++++++++++++++----- tests/CounterInterceptor/MyWorkflowTests.cs | 2 +- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/src/CounterInterceptor/MyCounterInterceptor.cs b/src/CounterInterceptor/MyCounterInterceptor.cs index ae86ce6..22fb9fd 100644 --- a/src/CounterInterceptor/MyCounterInterceptor.cs +++ b/src/CounterInterceptor/MyCounterInterceptor.cs @@ -75,22 +75,34 @@ public override void Init(WorkflowOutboundInterceptor outbound) => public override Task ExecuteWorkflowAsync(ExecuteWorkflowInput input) { - var id = Workflow.Info.WorkflowId; - root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowReplays)); + // Count only if we're not replaying + if (!Workflow.Unsafe.IsReplaying) + { + var id = Workflow.Info.WorkflowId; + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowReplays)); + } return base.ExecuteWorkflowAsync(input); } public override Task HandleSignalAsync(HandleSignalInput input) { - var id = Workflow.Info.WorkflowId; - root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowSignals)); + // Count only if we're not replaying + if (!Workflow.Unsafe.IsReplaying) + { + var id = Workflow.Info.WorkflowId; + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowSignals)); + } return base.HandleSignalAsync(input); } public override object? HandleQuery(HandleQueryInput input) { - var id = Workflow.Info.WorkflowId; - root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowQueries)); + // Count only if we're not replaying + if (!Workflow.Unsafe.IsReplaying) + { + var id = Workflow.Info.WorkflowId; + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowQueries)); + } return base.HandleQuery(input); } } @@ -105,8 +117,12 @@ internal WorkflowOutbound(MyCounterInterceptor root, WorkflowOutboundInterceptor public override Task> StartChildWorkflowAsync( StartChildWorkflowInput input) { - var id = Workflow.Info.WorkflowId; - root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowChildExecutions)); + // Count only if we're not replaying + if (!Workflow.Unsafe.IsReplaying) + { + var id = Workflow.Info.WorkflowId; + root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowChildExecutions)); + } return base.StartChildWorkflowAsync(input); } } diff --git a/tests/CounterInterceptor/MyWorkflowTests.cs b/tests/CounterInterceptor/MyWorkflowTests.cs index 47523bd..c99f36c 100644 --- a/tests/CounterInterceptor/MyWorkflowTests.cs +++ b/tests/CounterInterceptor/MyWorkflowTests.cs @@ -63,7 +63,7 @@ await worker.ExecuteAsync(async () => Assert.Equal(1U, counterInterceptor.Counts[parentWorkflowId].WorkflowChildExecutions); Assert.Equal(0U, counterInterceptor.Counts[parentWorkflowId].WorkflowActivityExecutions); Assert.Equal(2U, counterInterceptor.Counts[parentWorkflowId].WorkflowSignals); - Assert.Equal(2U, counterInterceptor.Counts[parentWorkflowId].WorkflowQueries); + Assert.Equal(0U, counterInterceptor.Counts[parentWorkflowId].WorkflowQueries); // Validate the worker counters have the correct numbers for the child Assert.Equal(1U, counterInterceptor.Counts[childWorkflowId].WorkflowReplays); From f3a82cc196e1a28bc0b874fc8e7c7025ec988c15 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Fri, 16 Aug 2024 16:58:48 -0400 Subject: [PATCH 16/19] Changed Count to a class; changed activities to use interpolation --- src/CounterInterceptor/Counts.cs | 15 ++------------- src/CounterInterceptor/MyActivities.cs | 8 +++----- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/src/CounterInterceptor/Counts.cs b/src/CounterInterceptor/Counts.cs index fd0ef4d..2949e76 100644 --- a/src/CounterInterceptor/Counts.cs +++ b/src/CounterInterceptor/Counts.cs @@ -1,17 +1,6 @@ -public record Counts +namespace TemporalioSamples.CounterInterceptor; +public class Counts { - public Counts() - { - clientExecutions = 0; - clientQueries = 0; - clientSignals = 0; - workflowReplays = 0; - workflowSignals = 0; - workflowQueries = 0; - workflowChildExecutions = 0; - workflowActivityExecutions = 0; - } - private uint clientExecutions; private uint clientQueries; private uint clientSignals; diff --git a/src/CounterInterceptor/MyActivities.cs b/src/CounterInterceptor/MyActivities.cs index def2a20..d038093 100644 --- a/src/CounterInterceptor/MyActivities.cs +++ b/src/CounterInterceptor/MyActivities.cs @@ -7,11 +7,9 @@ public class MyActivities { [Activity] public string SayHello(string name, string title) => - "Hello " + title + " " + name; + $"Hello {title} {name}"; [Activity] - public string SayGoodBye(string name, string title) - { - return "Goodbye " + title + " " + name; - } + public string SayGoodBye(string name, string title) => + $"Goodby {title} {name}"; } \ No newline at end of file From 078bbe6d97290b1759783edada77c5ab1f2d69a1 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Fri, 16 Aug 2024 17:26:35 -0400 Subject: [PATCH 17/19] Added testing instructions to README --- src/CounterInterceptor/README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/CounterInterceptor/README.md b/src/CounterInterceptor/README.md index 7830df0..273fbaa 100644 --- a/src/CounterInterceptor/README.md +++ b/src/CounterInterceptor/README.md @@ -15,3 +15,9 @@ temporal server start-dev # make sure you have temporal server running (see section above) dotnet run ``` + +## Run the tests +```bash +# cd ../../test +dotnet test +``` From fc4fadaa07f2b19b7bde9b350b68af93fb00a161 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Fri, 16 Aug 2024 17:35:00 -0400 Subject: [PATCH 18/19] Changed output from worker to unique workflows --- src/CounterInterceptor/Program.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/CounterInterceptor/Program.cs b/src/CounterInterceptor/Program.cs index 114c155..a4d500d 100644 --- a/src/CounterInterceptor/Program.cs +++ b/src/CounterInterceptor/Program.cs @@ -62,7 +62,7 @@ await worker.ExecuteAsync(async () => // Print worker counter info Console.WriteLine("\nCollected Worker Counter Info:\n"); Console.WriteLine(counterInterceptor.WorkerInfo()); - Console.WriteLine($"Number of workers: {counterInterceptor.Counts.Count}"); + Console.WriteLine($"Number of unique workflows: {counterInterceptor.Counts.Count}"); // Print client counter info Console.WriteLine(); From c6f5e559d5a1b5735b6229140cec0be4f4b15466 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Mon, 19 Aug 2024 16:58:58 -0500 Subject: [PATCH 19/19] Updated README to be consistent with others --- src/CounterInterceptor/README.md | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/src/CounterInterceptor/README.md b/src/CounterInterceptor/README.md index 273fbaa..5ce04d1 100644 --- a/src/CounterInterceptor/README.md +++ b/src/CounterInterceptor/README.md @@ -4,20 +4,10 @@ The sample demonstrates: off of the [Java sample](https://github.com/temporalio/samples-java/tree/main) located [here](https://github.com/temporalio/samples-java/tree/main/core/src/main/java/io/temporal/samples/countinterceptor) - the use of a Client Workflow Interceptor that counts the number of Workflow Executions and the number of Signals and Queries. -## Start local Temporal Server -```bash -# run only once -temporal server start-dev -``` +To run, first see [README.md](https://github.com/temporalio/samples-dotnet/blob/main/README.md) for prerequisites -## Run Worker Locally +## Run Worker and Client ```bash -# make sure you have temporal server running (see section above) +# make sure you have temporal server running dotnet run ``` - -## Run the tests -```bash -# cd ../../test -dotnet test -```