diff --git a/README.md b/README.md index c325158..6239bd5 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ Prerequisites: * [OpenTelemetry](src/OpenTelemetry) - Demonstrates how to set up OpenTelemetry tracing and metrics for both the client and worker, using both the .NET metrics API and internal forwarding from the Core SDK. * [Patching](src/Patching) - Alter workflows safely with Patch and DeprecatePatch. * [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. +* [RefreshingClient](src/RefreshingClient) - Demonstrates how to periodically refresh the Temporal client in a Worker every 2 hours. * [SafeMessageHandlers](src/SafeMessageHandlers) - Use `Semaphore` to ensure operations are atomically processed in a workflow. * [Saga](src/Saga) - Demonstrates how to implement a saga pattern. * [Schedules](src/Schedules) - How to schedule workflows to be run at specific times in the future. diff --git a/TemporalioSamples.sln b/TemporalioSamples.sln index d181ea5..7afff65 100644 --- a/TemporalioSamples.sln +++ b/TemporalioSamples.sln @@ -1,4 +1,4 @@ - + Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 17 VisualStudioVersion = 17.0.31903.59 @@ -98,6 +98,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.EnvConfig EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Timer", "src\Timer\TemporalioSamples.Timer.csproj", "{B37B3E98-4B04-48B8-9017-F0EDEDC7BD98}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.RefreshingClient", "src\RefreshingClient\TemporalioSamples.RefreshingClient.csproj", "{9654050C-AA1E-4376-BA4E-8190D1842818}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -588,6 +590,18 @@ Global {B37B3E98-4B04-48B8-9017-F0EDEDC7BD98}.Release|x64.Build.0 = Release|Any CPU {B37B3E98-4B04-48B8-9017-F0EDEDC7BD98}.Release|x86.ActiveCfg = Release|Any CPU {B37B3E98-4B04-48B8-9017-F0EDEDC7BD98}.Release|x86.Build.0 = Release|Any CPU + {9654050C-AA1E-4376-BA4E-8190D1842818}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9654050C-AA1E-4376-BA4E-8190D1842818}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9654050C-AA1E-4376-BA4E-8190D1842818}.Debug|x64.ActiveCfg = Debug|Any CPU + {9654050C-AA1E-4376-BA4E-8190D1842818}.Debug|x64.Build.0 = Debug|Any CPU + {9654050C-AA1E-4376-BA4E-8190D1842818}.Debug|x86.ActiveCfg = Debug|Any CPU + {9654050C-AA1E-4376-BA4E-8190D1842818}.Debug|x86.Build.0 = Debug|Any CPU + {9654050C-AA1E-4376-BA4E-8190D1842818}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9654050C-AA1E-4376-BA4E-8190D1842818}.Release|Any CPU.Build.0 = Release|Any CPU + {9654050C-AA1E-4376-BA4E-8190D1842818}.Release|x64.ActiveCfg = Release|Any CPU + {9654050C-AA1E-4376-BA4E-8190D1842818}.Release|x64.Build.0 = Release|Any CPU + {9654050C-AA1E-4376-BA4E-8190D1842818}.Release|x86.ActiveCfg = Release|Any CPU + {9654050C-AA1E-4376-BA4E-8190D1842818}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -637,5 +651,6 @@ Global {8BE23F78-7178-4924-AB45-4AF74454CC97} = {18E26AEE-5DA3-7BF8-A1AD-13A28A6C7BA3} {52CE80AF-09C3-4209-8A21-6CFFAA3B2B01} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {B37B3E98-4B04-48B8-9017-F0EDEDC7BD98} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} + {9654050C-AA1E-4376-BA4E-8190D1842818} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} EndGlobalSection EndGlobal diff --git a/src/RefreshingClient/MyActivities.cs b/src/RefreshingClient/MyActivities.cs new file mode 100644 index 0000000..7533255 --- /dev/null +++ b/src/RefreshingClient/MyActivities.cs @@ -0,0 +1,23 @@ +namespace TemporalioSamples.RefreshingClient; + +using Temporalio.Activities; + +public class MyActivities +{ + private readonly MyDatabaseClient dbClient = new(); + + // Activities can be static and/or sync + [Activity] + public static string DoStaticThing() => "some-static-value"; + + // Activities can be methods that can access state + [Activity] + public Task SelectFromDatabaseAsync(string table) => + dbClient.SelectValueAsync(table); + + public class MyDatabaseClient + { + public Task SelectValueAsync(string table) => + Task.FromResult($"some-db-value from table {table}"); + } +} \ No newline at end of file diff --git a/src/RefreshingClient/MyWorkflow.workflow.cs b/src/RefreshingClient/MyWorkflow.workflow.cs new file mode 100644 index 0000000..b9be245 --- /dev/null +++ b/src/RefreshingClient/MyWorkflow.workflow.cs @@ -0,0 +1,33 @@ +namespace TemporalioSamples.RefreshingClient; + +using Microsoft.Extensions.Logging; +using Temporalio.Workflows; + +[Workflow] +public class MyWorkflow +{ + [WorkflowRun] + public async Task RunAsync() + { + // Run an async instance method activity. + var result1 = await Workflow.ExecuteActivityAsync( + (MyActivities act) => act.SelectFromDatabaseAsync("some-db-table"), + new() + { + StartToCloseTimeout = TimeSpan.FromMinutes(5), + }); + Workflow.Logger.LogInformation("Activity instance method result: {Result}", result1); + + // Run a sync static method activity. + var result2 = await Workflow.ExecuteActivityAsync( + () => MyActivities.DoStaticThing(), + new() + { + StartToCloseTimeout = TimeSpan.FromMinutes(5), + }); + Workflow.Logger.LogInformation("Activity static method result: {Result}", result2); + + // We'll go ahead and return this result + return result2; + } +} \ No newline at end of file diff --git a/src/RefreshingClient/Program.cs b/src/RefreshingClient/Program.cs new file mode 100644 index 0000000..8f2cb11 --- /dev/null +++ b/src/RefreshingClient/Program.cs @@ -0,0 +1,112 @@ +using Microsoft.Extensions.Logging; +using Temporalio.Client; +using Temporalio.Client.EnvConfig; +using Temporalio.Worker; +using TemporalioSamples.RefreshingClient; + +async Task CreateClientAsync() +{ + var connectOptions = ClientEnvConfig.LoadClientConnectOptions(); + if (string.IsNullOrEmpty(connectOptions.TargetHost)) + { + connectOptions.TargetHost = "localhost:7233"; + } + connectOptions.LoggerFactory = LoggerFactory.Create(builder => + builder. + AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] "). + SetMinimumLevel(LogLevel.Information)); + return await TemporalClient.ConnectAsync(connectOptions); +} + +async Task RunWorkerAsync(TemporalClient client) +{ + // Cancellation token cancelled on ctrl+c + using var tokenSource = new CancellationTokenSource(); + Console.CancelKeyPress += (_, eventArgs) => + { + tokenSource.Cancel(); + eventArgs.Cancel = true; + }; + + // Create an activity instance with some state + var activities = new MyActivities(); + + // Run worker until cancelled + Console.WriteLine("Running worker"); + using var worker = new TemporalWorker( + client, + new TemporalWorkerOptions(taskQueue: "activity-simple-sample"). + AddActivity(activities.SelectFromDatabaseAsync). + AddActivity(MyActivities.DoStaticThing). + AddWorkflow()); + + var replaceWorkerClient = (TemporalClient newClient) => + { + worker.Client = newClient; + Console.WriteLine("Worker's client has been refreshed."); + return Task.FromResult(true); + }; + + try + { + await Task.WhenAll(ClientRefreshAsync(replaceWorkerClient, tokenSource.Token), worker.ExecuteAsync(tokenSource.Token)); + } + catch (OperationCanceledException) + { + Console.WriteLine("Worker cancelled"); + } +} + +async Task ExecuteWorkflowAsync(TemporalClient client) +{ + Console.WriteLine("Executing workflow"); + await client.ExecuteWorkflowAsync( + (MyWorkflow wf) => wf.RunAsync(), + new(id: "activity-simple-workflow-id", taskQueue: "activity-simple-sample")); +} + +async Task ClientRefreshAsync(Func asyncFunc, CancellationToken cancellationToken) +{ + // Change the frequency of rotation as per your requirements + Console.WriteLine("This program will refresh its Temporal client every 2 hours."); + await RunRecurringTaskAsync(TimeSpan.FromHours(2), cancellationToken, asyncFunc); +} + +async Task RunRecurringTaskAsync(TimeSpan interval, CancellationToken cancellationToken, Func asyncFunc) +{ + while (!cancellationToken.IsCancellationRequested) + { + try + { + await Task.Delay(interval, cancellationToken); + Console.WriteLine("Refreshing client..."); + var client = await CreateClientAsync(); + await asyncFunc(client); + } + catch (OperationCanceledException) + { + Console.WriteLine("Refreshing task cancelled."); + break; + } +#pragma warning disable CA1031 // Do not catch general exception types + catch (Exception ex) + { + Console.WriteLine($"Error: {ex.Message}"); + // Continue running even if one iteration fails + } +#pragma warning restore CA1031 // Do not catch general exception types + } +} + +var client = await CreateClientAsync(); +switch (args.ElementAtOrDefault(0)) +{ + case "worker": + await RunWorkerAsync(client); + break; + case "workflow": + await ExecuteWorkflowAsync(client); + break; + default: + throw new ArgumentException("Must pass 'worker' or 'workflow' as the single argument"); +} \ No newline at end of file diff --git a/src/RefreshingClient/README.md b/src/RefreshingClient/README.md new file mode 100644 index 0000000..c585f67 --- /dev/null +++ b/src/RefreshingClient/README.md @@ -0,0 +1,17 @@ +# Refreshing Client + +This sample demonstrates how to periodically refresh the Temporal client in a Worker. +The Worker program refreshes the Temporal client every 2 hours, which is useful for scenarios requiring credential mTLS or api key rotation. + +`ClientRefreshAsync` accepts a Func to deliver a new client, to replace the callers Worker client. + +To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory +in a separate terminal to start the worker: + + dotnet run worker + +Then in another terminal, run a workflow every second from this directory: + + watch -n1 dotnet run workflow + +This will show logs in the worker window of the workflow running. \ No newline at end of file diff --git a/src/RefreshingClient/TemporalioSamples.RefreshingClient.csproj b/src/RefreshingClient/TemporalioSamples.RefreshingClient.csproj new file mode 100644 index 0000000..967ff4c --- /dev/null +++ b/src/RefreshingClient/TemporalioSamples.RefreshingClient.csproj @@ -0,0 +1,7 @@ + + + + Exe + + +