Skip to content

Commit be233c9

Browse files
committed
Atomic message handler sample
1 parent 1b00616 commit be233c9

14 files changed

+539
-12
lines changed

.editorconfig

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ dotnet_diagnostic.SA1515.severity = none
9696

9797
# Do not require XML doc in samples
9898
dotnet_diagnostic.SA1600.severity = none
99+
dotnet_diagnostic.SA1602.severity = none
99100

100101
# Do not require file header
101102
dotnet_diagnostic.SA1633.severity = none

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Prerequisites:
1515
* [ActivityHeartbeatingCancellation](src/ActivityHeartbeatingCancellation) - How to use heartbeating and cancellation handling in an activity.
1616
* [ActivitySimple](src/ActivitySimple) - Simple workflow that runs simple activities.
1717
* [ActivityWorker](src/ActivityWorker) - Use .NET activities from a workflow in another language.
18+
* [AtomicMessageHandlers](src/AtomicMessageHandlers) - Use `SemaphoreSlim` to ensure operations are atomically processed in a workflow.
1819
* [AspNet](src/AspNet) - Demonstration of a generic host worker and an ASP.NET workflow starter.
1920
* [ClientMtls](src/ClientMtls) - How to use client certificate authentication, e.g. for Temporal Cloud.
2021
* [ContextPropagation](src/ContextPropagation) - Context propagation via interceptors.

TemporalioSamples.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.WorkflowU
5555
EndProject
5656
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ContextPropagation", "src\ContextPropagation\TemporalioSamples.ContextPropagation.csproj", "{7B797D20-485F-441D-8E71-AF7E315FA9CF}"
5757
EndProject
58+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.AtomicMessageHandlers", "src\AtomicMessageHandlers\TemporalioSamples.AtomicMessageHandlers.csproj", "{5E497499-F87C-4DC6-B5DC-F508F31EA172}"
59+
EndProject
5860
Global
5961
GlobalSection(SolutionConfigurationPlatforms) = preSolution
6062
Debug|Any CPU = Debug|Any CPU
@@ -149,6 +151,10 @@ Global
149151
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Debug|Any CPU.Build.0 = Debug|Any CPU
150152
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.ActiveCfg = Release|Any CPU
151153
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.Build.0 = Release|Any CPU
154+
{5E497499-F87C-4DC6-B5DC-F508F31EA172}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
155+
{5E497499-F87C-4DC6-B5DC-F508F31EA172}.Debug|Any CPU.Build.0 = Debug|Any CPU
156+
{5E497499-F87C-4DC6-B5DC-F508F31EA172}.Release|Any CPU.ActiveCfg = Release|Any CPU
157+
{5E497499-F87C-4DC6-B5DC-F508F31EA172}.Release|Any CPU.Build.0 = Release|Any CPU
152158
EndGlobalSection
153159
GlobalSection(SolutionProperties) = preSolution
154160
HideSolutionNode = FALSE
@@ -178,5 +184,6 @@ Global
178184
{B79F07F7-3429-4C58-84C3-08587F748B2D} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
179185
{B3DB7B8C-7BD3-4A53-A809-AB6279B1A630} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
180186
{7B797D20-485F-441D-8E71-AF7E315FA9CF} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
187+
{5E497499-F87C-4DC6-B5DC-F508F31EA172} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
181188
EndGlobalSection
182189
EndGlobal
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
namespace TemporalioSamples.AtomicMessageHandlers;
2+
3+
using Microsoft.Extensions.Logging;
4+
using Temporalio.Activities;
5+
6+
public class ClusterManagerActivities
7+
{
8+
public record AllocateNodesToJobInput(
9+
IList<string> Nodes,
10+
string JobName);
11+
12+
[Activity]
13+
public async Task AllocateNodesToJobAsync(AllocateNodesToJobInput input)
14+
{
15+
ActivityExecutionContext.Current.Logger.LogInformation(
16+
"Assigning nodes {Nodes} to job {TaskName}", input.Nodes, input.JobName);
17+
await Task.Delay(100);
18+
}
19+
20+
public record DeallocateNodesFromJobInput(
21+
IList<string> Nodes,
22+
string JobName);
23+
24+
[Activity]
25+
public async Task DeallocateNodesFromJobAsync(DeallocateNodesFromJobInput input)
26+
{
27+
ActivityExecutionContext.Current.Logger.LogInformation(
28+
"Deallocating nodes {Nodes} from job {TaskName}", input.Nodes, input.JobName);
29+
await Task.Delay(100);
30+
}
31+
32+
public record FindBadNodesInput(
33+
IList<string> Nodes);
34+
35+
[Activity]
36+
public async Task<List<string>> FindBadNodesAsync(FindBadNodesInput input)
37+
{
38+
await Task.Delay(100);
39+
return input.Nodes.
40+
Select((node, index) => index % 5 == 0 ? null : node).
41+
OfType<string>().
42+
ToList();
43+
}
44+
}
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
namespace TemporalioSamples.AtomicMessageHandlers;
2+
3+
using Microsoft.Extensions.Logging;
4+
using Temporalio.Exceptions;
5+
using Temporalio.Workflows;
6+
7+
[Workflow]
8+
public sealed class ClusterManagerWorkflow : IDisposable
9+
{
10+
public record State
11+
{
12+
public bool ClusterStarted { get; set; }
13+
14+
public bool ClusterShutdown { get; set; }
15+
16+
public IDictionary<string, string?> Nodes { get; init; } = new Dictionary<string, string?>();
17+
18+
public int MaxAssignedNodes { get; set; }
19+
}
20+
21+
public record Input
22+
{
23+
public State State { get; init; } = new();
24+
25+
public bool TestContinueAsNew { get; init; }
26+
}
27+
28+
public record Result(
29+
int MaxAssignedNodes,
30+
int NumAssignedNodes);
31+
32+
private readonly SemaphoreSlim nodesLock = new(1, 1);
33+
private readonly int maxHistoryLength;
34+
private readonly TimeSpan sleepInterval;
35+
36+
[WorkflowInit]
37+
public ClusterManagerWorkflow(Input input)
38+
{
39+
CurrentState = input.State;
40+
maxHistoryLength = input.TestContinueAsNew ? 120 : int.MaxValue;
41+
sleepInterval = TimeSpan.FromSeconds(input.TestContinueAsNew ? 1 : 600);
42+
}
43+
44+
[WorkflowQuery]
45+
public State CurrentState { get; init; }
46+
47+
[WorkflowRun]
48+
public async Task<Result> RunAsync(Input input)
49+
{
50+
await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted);
51+
52+
// Perform health checks at intervals
53+
do
54+
{
55+
await PerformHealthChecksAsync();
56+
await Workflow.WaitConditionAsync(
57+
() => CurrentState.ClusterShutdown || ShouldContinueAsNew,
58+
sleepInterval);
59+
60+
// Continue as new if needed
61+
if (ShouldContinueAsNew)
62+
{
63+
Workflow.Logger.LogInformation("Continuing as new");
64+
throw Workflow.CreateContinueAsNewException((ClusterManagerWorkflow wf) => wf.RunAsync(new()
65+
{
66+
State = CurrentState,
67+
TestContinueAsNew = input.TestContinueAsNew,
68+
}));
69+
}
70+
}
71+
while (!CurrentState.ClusterShutdown);
72+
return new(CurrentState.MaxAssignedNodes, NumAssignedNodes);
73+
}
74+
75+
[WorkflowSignal]
76+
public async Task StartClusterAsync()
77+
{
78+
CurrentState.ClusterStarted = true;
79+
foreach (var node in Enumerable.Range(0, 25))
80+
{
81+
CurrentState.Nodes[$"{node}"] = null;
82+
}
83+
Workflow.Logger.LogInformation("Cluster started");
84+
}
85+
86+
[WorkflowSignal]
87+
public async Task ShutdownClusterAsync()
88+
{
89+
await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted);
90+
CurrentState.ClusterShutdown = true;
91+
Workflow.Logger.LogInformation("Cluster shut down");
92+
}
93+
94+
public record AllocateNodesToJobInput(int NumNodes, string JobName);
95+
96+
[WorkflowUpdate]
97+
public async Task<List<string>> AllocateNodesToJobAsync(AllocateNodesToJobInput input)
98+
{
99+
await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted);
100+
if (CurrentState.ClusterShutdown)
101+
{
102+
throw new ApplicationFailureException(
103+
"Cannot allocate nodes to a job, cluster is already shut down");
104+
}
105+
await nodesLock.WaitAsync();
106+
try
107+
{
108+
var unassignedNodes = CurrentState.Nodes.
109+
Where(kvp => kvp.Value == null).
110+
Select(kvp => kvp.Key).
111+
ToList();
112+
if (unassignedNodes.Count < input.NumNodes)
113+
{
114+
throw new ApplicationFailureException(
115+
$"Cannot allocate {input.NumNodes} nodes, have only {unassignedNodes.Count} available");
116+
}
117+
var assignedNodes = unassignedNodes[..input.NumNodes];
118+
// This await would be dangerous without nodesLock because it yields control and allows
119+
// interleaving
120+
await Workflow.ExecuteActivityAsync(
121+
(ClusterManagerActivities acts) => acts.AllocateNodesToJobAsync(new(assignedNodes, input.JobName)),
122+
new() { StartToCloseTimeout = TimeSpan.FromSeconds(10) });
123+
foreach (var node in assignedNodes)
124+
{
125+
CurrentState.Nodes[node] = input.JobName;
126+
}
127+
CurrentState.MaxAssignedNodes = int.Max(CurrentState.MaxAssignedNodes, NumAssignedNodes);
128+
return assignedNodes;
129+
}
130+
finally
131+
{
132+
nodesLock.Release();
133+
}
134+
}
135+
136+
public record DeleteJobInput(string JobName);
137+
138+
[WorkflowUpdate]
139+
public async Task DeleteJobAsync(DeleteJobInput input)
140+
{
141+
await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted);
142+
if (CurrentState.ClusterShutdown)
143+
{
144+
throw new ApplicationFailureException(
145+
"Cannot delete job, cluster is already shut down");
146+
}
147+
await nodesLock.WaitAsync();
148+
try
149+
{
150+
var toUnassign = CurrentState.Nodes.
151+
Where(kvp => kvp.Value == input.JobName).
152+
Select(kvp => kvp.Key).
153+
ToList();
154+
// This await would be dangerous without nodesLock because it yields control and allows
155+
// interleaving
156+
await Workflow.ExecuteActivityAsync(
157+
(ClusterManagerActivities acts) => acts.DeallocateNodesFromJobAsync(new(toUnassign, input.JobName)),
158+
new() { StartToCloseTimeout = TimeSpan.FromSeconds(10) });
159+
foreach (var node in toUnassign)
160+
{
161+
CurrentState.Nodes[node] = null;
162+
}
163+
}
164+
finally
165+
{
166+
nodesLock.Release();
167+
}
168+
}
169+
170+
public void Dispose() => nodesLock.Dispose();
171+
172+
private int NumAssignedNodes =>
173+
CurrentState.Nodes.Count(kvp => kvp.Value is { } val && val != "BAD!");
174+
175+
private bool ShouldContinueAsNew =>
176+
// Don't continue as new while update running
177+
nodesLock.CurrentCount > 0 &&
178+
// Continue if suggested or, for ease of testing, max history reached
179+
(Workflow.ContinueAsNewSuggested || Workflow.CurrentHistoryLength > maxHistoryLength);
180+
181+
private async Task PerformHealthChecksAsync()
182+
{
183+
await nodesLock.WaitAsync();
184+
try
185+
{
186+
// Find bad nodes from the set of non-bad ones. This await would be dangerous without
187+
// nodesLock because it yields control and allows interleaving.
188+
var assignedNodes = CurrentState.Nodes.
189+
Where(kvp => kvp.Value is { } val && val != "BAD!").
190+
Select(kvp => kvp.Value!).
191+
ToList();
192+
var badNodes = await Workflow.ExecuteActivityAsync(
193+
(ClusterManagerActivities acts) => acts.FindBadNodesAsync(new(assignedNodes)),
194+
new()
195+
{
196+
StartToCloseTimeout = TimeSpan.FromSeconds(10),
197+
// This health check is optional, and our lock would block the whole workflow if
198+
// we let it retry forever
199+
RetryPolicy = new() { MaximumAttempts = 1 },
200+
});
201+
foreach (var node in badNodes)
202+
{
203+
CurrentState.Nodes[node] = "BAD!";
204+
}
205+
}
206+
finally
207+
{
208+
nodesLock.Release();
209+
}
210+
}
211+
}

0 commit comments

Comments
 (0)