Skip to content

Commit 896bff1

Browse files
authored
Adopted approach more akin to DurableTask for generating deterministic guids across replays (#1742)
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
1 parent 919e895 commit 896bff1

File tree

4 files changed

+140
-7
lines changed

4 files changed

+140
-7
lines changed

src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ internal sealed class WorkflowOrchestrationContext : WorkflowContext
6161
private readonly Dictionary<string, HistoryEvent> _unmatchedCompletionsByExecutionId = new(StringComparer.Ordinal);
6262

6363

64-
// Parse instance ID as GUID or generate one
64+
// Parse execution/instance ID as GUID or derive a deterministic namespace from the ID
6565
private readonly Guid _instanceGuid;
66+
private static readonly Guid InstanceIdNamespace = new("6f927a2e-9c7e-4a1d-9b8d-7a86f2e7f62f");
6667

6768
private readonly string? _appId;
6869
private int _sequenceNumber;
@@ -73,13 +74,17 @@ internal sealed class WorkflowOrchestrationContext : WorkflowContext
7374
private bool _turnInitialized;
7475

7576
public WorkflowOrchestrationContext(string name, string instanceId, DateTime currentUtcDateTime,
76-
IWorkflowSerializer workflowSerializer, ILoggerFactory loggerFactory, WorkflowVersionTracker versionTracker, string? appId = null)
77+
IWorkflowSerializer workflowSerializer, ILoggerFactory loggerFactory, WorkflowVersionTracker versionTracker,
78+
string? appId = null, string? executionId = null)
7779
{
7880
_workflowSerializer = workflowSerializer;
7981
_loggerFactory = loggerFactory;
8082
_logger = loggerFactory.CreateLogger<WorkflowOrchestrationContext>() ??
8183
throw new ArgumentNullException(nameof(loggerFactory));
82-
_instanceGuid = Guid.TryParse(instanceId, out var guid) ? guid : Guid.NewGuid();
84+
var guidSeed = !string.IsNullOrWhiteSpace(executionId) ? executionId : instanceId;
85+
_instanceGuid = Guid.TryParse(guidSeed, out var guid)
86+
? guid
87+
: CreateGuidFromName(InstanceIdNamespace, Encoding.UTF8.GetBytes(guidSeed));
8388
Name = name;
8489
InstanceId = instanceId;
8590
_currentUtcDateTime = currentUtcDateTime;
@@ -334,9 +339,8 @@ public override void ContinueAsNew(object? newInput = null, bool preserveUnproce
334339
/// <inheritdoc />
335340
public override Guid NewGuid()
336341
{
337-
// Create deterministic Guid based on instance ID and counter
338342
var guidCounter = _guidCounter++;
339-
var name = $"{InstanceId}_{guidCounter}"; // Stable name
343+
var name = $"{InstanceId}_{guidCounter}"; // Stable per execution and replay
340344
return CreateGuidFromName(_instanceGuid, Encoding.UTF8.GetBytes(name));
341345
}
342346

src/Dapr.Workflow/Worker/WorkflowWorker.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ private async Task<OrchestratorResponse> HandleOrchestratorResponseAsync(Orchest
310310

311311
// Initialize the context with the FULL history
312312
var context = new WorkflowOrchestrationContext(workflowName, request.InstanceId, currentUtcDateTime,
313-
_serializer, loggerFactory, versionTracker, appId);
313+
_serializer, loggerFactory, versionTracker, appId, request.ExecutionId);
314314

315315
// Deserialize the input
316316
object? input = string.IsNullOrEmpty(serializedInput)

test/Dapr.IntegrationTest.Workflow/ReplaySafetyTests.cs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,74 @@ public async Task Workflow_ShouldUseDeterministicGuidGeneration()
110110
Assert.All(guids, g => Assert.NotEqual(Guid.Empty, g));
111111
}
112112

113+
[Fact]
114+
public async Task NewGuid_ShouldRemainStableAcrossReplays()
115+
{
116+
var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
117+
var workflowInstanceId = Guid.NewGuid().ToString();
118+
119+
await using var environment = await DaprTestEnvironment.CreateWithPooledNetworkAsync(needsActorState: true);
120+
await environment.StartAsync();
121+
122+
var harness = new DaprHarnessBuilder(componentsDir)
123+
.WithEnvironment(environment)
124+
.BuildWorkflow();
125+
await using var testApp = await DaprHarnessBuilder.ForHarness(harness)
126+
.ConfigureServices(builder =>
127+
{
128+
builder.Services.AddDaprWorkflowBuilder(
129+
configureRuntime: opt =>
130+
{
131+
opt.RegisterWorkflow<ReplayGuidWorkflow>();
132+
},
133+
configureClient: (sp, clientBuilder) =>
134+
{
135+
var config = sp.GetRequiredService<IConfiguration>();
136+
var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"];
137+
if (!string.IsNullOrEmpty(grpcEndpoint))
138+
clientBuilder.UseGrpcEndpoint(grpcEndpoint);
139+
});
140+
})
141+
.BuildAndStartAsync();
142+
143+
using var scope = testApp.CreateScope();
144+
var daprWorkflowClient = scope.ServiceProvider.GetRequiredService<DaprWorkflowClient>();
145+
146+
await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(ReplayGuidWorkflow), workflowInstanceId);
147+
148+
using var statusCts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
149+
var initialStatus = await WaitForGuidStatusAsync(daprWorkflowClient, workflowInstanceId, phase: 0, statusCts.Token);
150+
var expectedGuid = initialStatus.Guid;
151+
152+
await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, ReplayGuidWorkflow.Event1Name, "go", statusCts.Token);
153+
var replayStatus = await WaitForGuidStatusAsync(daprWorkflowClient, workflowInstanceId, phase: 1, statusCts.Token);
154+
Assert.Equal(expectedGuid, replayStatus.Guid);
155+
156+
await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, ReplayGuidWorkflow.Event2Name, "go", statusCts.Token);
157+
158+
var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId, cancellation: statusCts.Token);
159+
Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus);
160+
Assert.Equal(expectedGuid, result.ReadOutputAs<string>());
161+
}
162+
163+
private static async Task<ReplayGuidStatus> WaitForGuidStatusAsync(
164+
DaprWorkflowClient client,
165+
string instanceId,
166+
int phase,
167+
CancellationToken cancellationToken)
168+
{
169+
while (true)
170+
{
171+
cancellationToken.ThrowIfCancellationRequested();
172+
var state = await client.GetWorkflowStateAsync(instanceId, getInputsAndOutputs: true, cancellation: cancellationToken);
173+
var status = state?.ReadCustomStatusAs<ReplayGuidStatus>();
174+
if (status is not null && status.Phase == phase && !string.IsNullOrWhiteSpace(status.Guid))
175+
return status;
176+
177+
await Task.Delay(TimeSpan.FromMilliseconds(200), cancellationToken);
178+
}
179+
}
180+
113181
private sealed partial class LoggingWorkflow : Workflow<string, string>
114182
{
115183
public override async Task<string> RunAsync(WorkflowContext context, string input)
@@ -147,6 +215,28 @@ public override Task<Guid[]> RunAsync(WorkflowContext context, object? input)
147215
}
148216
}
149217

218+
private sealed class ReplayGuidWorkflow : Workflow<object?, string>
219+
{
220+
public const string Event1Name = "ReplayGuidStep1";
221+
public const string Event2Name = "ReplayGuidStep2";
222+
223+
public override async Task<string> RunAsync(WorkflowContext context, object? input)
224+
{
225+
var guid = context.NewGuid().ToString();
226+
context.SetCustomStatus(new ReplayGuidStatus(0, guid));
227+
228+
await context.WaitForExternalEventAsync<string>(Event1Name);
229+
context.SetCustomStatus(new ReplayGuidStatus(1, guid));
230+
231+
await context.WaitForExternalEventAsync<string>(Event2Name);
232+
context.SetCustomStatus(new ReplayGuidStatus(2, guid));
233+
234+
return guid;
235+
}
236+
}
237+
238+
private sealed record ReplayGuidStatus(int Phase, string Guid);
239+
150240
private sealed class SimpleActivity : WorkflowActivity<string, string>
151241
{
152242
public override Task<string> RunAsync(WorkflowActivityContext context, string input)

test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// ------------------------------------------------------------------------
1+
// ------------------------------------------------------------------------
22
// Copyright 2025 The Dapr Authors
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -957,6 +957,45 @@ public void NewGuid_ShouldBeDeterministic_ForSameInstanceIdAndTimestamp()
957957
Assert.Equal(g1, g2);
958958
}
959959

960+
[Fact]
961+
public void NewGuid_ShouldVary_ForDifferentExecutionIds()
962+
{
963+
var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web));
964+
var tracker = new WorkflowVersionTracker([]);
965+
966+
var c1 = new WorkflowOrchestrationContext("wf", "same-instance",
967+
new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc),
968+
serializer, NullLoggerFactory.Instance, tracker, executionId: "exec-1");
969+
970+
var c2 = new WorkflowOrchestrationContext("wf", "same-instance",
971+
new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc),
972+
serializer, NullLoggerFactory.Instance, tracker, executionId: "exec-2");
973+
974+
var g1 = c1.NewGuid();
975+
var g2 = c2.NewGuid();
976+
977+
Assert.NotEqual(g1, g2);
978+
}
979+
980+
[Fact]
981+
public void NewGuid_ShouldBeDeterministic_ForNonGuidInstanceId()
982+
{
983+
var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web));
984+
var now = new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc);
985+
var tracker = new WorkflowVersionTracker([]);
986+
987+
var c1 = new WorkflowOrchestrationContext("wf", "order-123",
988+
now, serializer, NullLoggerFactory.Instance, tracker);
989+
990+
var c2 = new WorkflowOrchestrationContext("wf", "order-123",
991+
now, serializer, NullLoggerFactory.Instance, tracker);
992+
993+
var g1 = c1.NewGuid();
994+
var g2 = c2.NewGuid();
995+
996+
Assert.Equal(g1, g2);
997+
}
998+
960999
[Fact]
9611000
public async Task CallActivityAsync_ShouldThrowArgumentException_WhenNameIsNullOrWhitespace()
9621001
{

0 commit comments

Comments
 (0)