Skip to content

Commit 71d758d

Browse files
committed
Support dedup status when starting orchestration
1 parent ce4a7c7 commit 71d758d

File tree

6 files changed

+329
-7
lines changed

6 files changed

+329
-7
lines changed

src/Abstractions/TaskOptions.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,17 @@ public record StartOrchestrationOptions(string? InstanceId = null, DateTimeOffse
134134
/// Gets the version to associate with the orchestration instance.
135135
/// </summary>
136136
public TaskVersion? Version { get; init; }
137+
138+
/// <summary>
139+
/// Gets the orchestration runtime statuses that should be considered for deduplication.
140+
/// If an orchestration instance with the same instance ID already exists and is in one of these statuses,
141+
/// the creation will throw an <see cref="OrchestrationAlreadyExistsException"/> instead of creating a new instance.
142+
/// This enables idempotent orchestration creation.
143+
/// </summary>
144+
/// <remarks>
145+
/// The status names should match the values from <see cref="Microsoft.DurableTask.Client.OrchestrationRuntimeStatus"/> enum
146+
/// (e.g., "Completed", "Failed", "Terminated", "Canceled").
147+
/// For type-safe usage, use extension methods from <see cref="Microsoft.DurableTask.Client.StartOrchestrationOptionsExtensions"/>.
148+
/// </remarks>
149+
public IReadOnlyList<string>? DedupeStatuses { get; init; }
137150
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
namespace Microsoft.DurableTask.Client;
5+
6+
/// <summary>
7+
/// Extension methods for <see cref="StartOrchestrationOptions"/> to provide type-safe deduplication status configuration.
8+
/// </summary>
9+
public static class StartOrchestrationOptionsExtensions
10+
{
11+
/// <summary>
12+
/// Gets the terminal orchestration runtime statuses that are valid for deduplication.
13+
/// These are the statuses that can be used to prevent replacement of an existing orchestration instance.
14+
/// </summary>
15+
public static readonly OrchestrationRuntimeStatus[] ValidDedupeStatuses = new[]
16+
{
17+
OrchestrationRuntimeStatus.Completed,
18+
OrchestrationRuntimeStatus.Failed,
19+
OrchestrationRuntimeStatus.Terminated,
20+
OrchestrationRuntimeStatus.Canceled,
21+
};
22+
23+
/// <summary>
24+
/// Creates a new <see cref="StartOrchestrationOptions"/> with the specified deduplication statuses.
25+
/// </summary>
26+
/// <param name="options">The base options to extend.</param>
27+
/// <param name="dedupeStatuses">The orchestration runtime statuses that should be considered for deduplication.</param>
28+
/// <returns>A new <see cref="StartOrchestrationOptions"/> instance with the deduplication statuses set.</returns>
29+
public static StartOrchestrationOptions WithDedupeStatuses(
30+
this StartOrchestrationOptions options,
31+
params OrchestrationRuntimeStatus[] dedupeStatuses)
32+
{
33+
return options with
34+
{
35+
DedupeStatuses = dedupeStatuses.Select(s => s.ToString()).ToList(),
36+
};
37+
}
38+
}

src/Client/Grpc/GrpcDurableTaskClient.cs

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4+
using System.Collections.Immutable;
45
using System.Diagnostics;
56
using System.Text;
67
using DurableTask.Core.History;
@@ -91,7 +92,7 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
9192
version = this.options.DefaultVersion;
9293
}
9394

94-
var request = new P.CreateInstanceRequest
95+
P.CreateInstanceRequest request = new()
9596
{
9697
Name = orchestratorName.Name,
9798
Version = version,
@@ -122,6 +123,47 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
122123
request.ScheduledStartTimestamp = Timestamp.FromDateTimeOffset(startAt.Value.ToUniversalTime());
123124
}
124125

126+
// Set orchestration ID reuse policy for deduplication support
127+
// Note: This requires the protobuf to support OrchestrationIdReusePolicy field
128+
// If the protobuf doesn't support it yet, this will need to be updated when the protobuf is updated
129+
if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0)
130+
{
131+
// Parse and validate all status strings to enum first
132+
ImmutableHashSet<OrchestrationRuntimeStatus> dedupeStatuses = options.DedupeStatuses
133+
.Select(s =>
134+
{
135+
if (!System.Enum.TryParse<OrchestrationRuntimeStatus>(s, ignoreCase: true, out OrchestrationRuntimeStatus status))
136+
{
137+
string validStatuses = string.Join(", ", StartOrchestrationOptionsExtensions.ValidDedupeStatuses.Select(ts => ts.ToString()));
138+
throw new ArgumentException(
139+
$"Invalid orchestration runtime status for deduplication: '{s}'. Valid statuses for deduplication are: {validStatuses}",
140+
nameof(options.DedupeStatuses));
141+
}
142+
143+
return status;
144+
}).ToImmutableHashSet();
145+
146+
P.OrchestrationIdReusePolicy policy = new();
147+
148+
// The policy uses "replaceableStatus" - these are statuses that CAN be replaced
149+
// dedupeStatuses are statuses that should NOT be replaced (should throw exception)
150+
// So we add terminal statuses that are NOT in dedupeStatuses to replaceableStatus
151+
// This matches the logic in AAPT-DTMB ProtoUtils.Convert
152+
foreach (OrchestrationRuntimeStatus terminalStatus in StartOrchestrationOptionsExtensions.ValidDedupeStatuses)
153+
{
154+
if (!dedupeStatuses.Contains(terminalStatus))
155+
{
156+
policy.ReplaceableStatus.Add(terminalStatus.ToGrpcStatus());
157+
}
158+
}
159+
160+
// Only set if we have replaceable statuses
161+
if (policy.ReplaceableStatus.Count > 0)
162+
{
163+
request.OrchestrationIdReusePolicy = policy;
164+
}
165+
}
166+
125167
using Activity? newActivity = TraceHelper.StartActivityForNewOrchestration(request);
126168

127169
P.CreateInstanceResponse? result = await this.sidecarClient.StartInstanceAsync(
@@ -405,7 +447,7 @@ public override async Task<string> RestartAsync(
405447
Check.NotNullOrEmpty(instanceId);
406448
Check.NotEntity(this.options.EnableEntitySupport, instanceId);
407449

408-
var request = new P.RestartInstanceRequest
450+
P.RestartInstanceRequest request = new P.RestartInstanceRequest
409451
{
410452
InstanceId = instanceId,
411453
RestartWithNewInstanceId = restartWithNewInstanceId,
@@ -441,7 +483,7 @@ public override async Task RewindInstanceAsync(
441483
Check.NotNullOrEmpty(instanceId);
442484
Check.NotEntity(this.options.EnableEntitySupport, instanceId);
443485

444-
var request = new P.RewindInstanceRequest
486+
P.RewindInstanceRequest request = new P.RewindInstanceRequest
445487
{
446488
InstanceId = instanceId,
447489
Reason = reason,
@@ -573,7 +615,7 @@ async Task<PurgeResult> PurgeInstancesCoreAsync(
573615

574616
OrchestrationMetadata CreateMetadata(P.OrchestrationState state, bool includeInputsAndOutputs)
575617
{
576-
var metadata = new OrchestrationMetadata(state.Name, state.InstanceId)
618+
OrchestrationMetadata metadata = new OrchestrationMetadata(state.Name, state.InstanceId)
577619
{
578620
CreatedAt = state.CreatedTimestamp.ToDateTimeOffset(),
579621
LastUpdatedAt = state.LastUpdatedTimestamp.ToDateTimeOffset(),

src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using DurableTask.Core;
88
using DurableTask.Core.History;
99
using DurableTask.Core.Query;
10+
using Microsoft.DurableTask.Client;
1011
using Microsoft.DurableTask.Client.Entities;
1112
using Microsoft.Extensions.DependencyInjection;
1213
using Microsoft.Extensions.Options;
@@ -192,7 +193,25 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
192193
},
193194
};
194195

195-
await this.Client.CreateTaskOrchestrationAsync(message);
196+
Core.OrchestrationStatus[]? dedupeStatuses = null;
197+
if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0)
198+
{
199+
dedupeStatuses = options.DedupeStatuses
200+
.Select(s =>
201+
{
202+
if (!Enum.TryParse<OrchestrationRuntimeStatus>(s, ignoreCase: true, out var status))
203+
{
204+
var validStatuses = string.Join(", ", StartOrchestrationOptionsExtensions.ValidDedupeStatuses.Select(ts => ts.ToString()));
205+
throw new ArgumentException(
206+
$"Invalid orchestration runtime status for deduplication: '{s}'. Valid statuses for deduplication are: {validStatuses}",
207+
nameof(options.DedupeStatuses));
208+
}
209+
return status.ConvertToCore();
210+
})
211+
.ToArray();
212+
}
213+
214+
await this.Client.CreateTaskOrchestrationAsync(message, dedupeStatuses);
196215
return instanceId;
197216
}
198217

src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
// Copyright (c) Microsoft Corporation.
1+
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

44
using System.Collections.Concurrent;
55
using System.Diagnostics;
66
using System.Globalization;
77
using DurableTask.Core;
8+
using DurableTask.Core.Exceptions;
89
using DurableTask.Core.History;
910
using DurableTask.Core.Query;
1011
using Google.Protobuf.WellKnownTypes;
@@ -202,6 +203,34 @@ async Task WaitForWorkItemClientConnection()
202203

203204
try
204205
{
206+
// Convert OrchestrationIdReusePolicy to dedupeStatuses
207+
// The policy uses "replaceableStatus" - these are statuses that CAN be replaced
208+
// dedupeStatuses are statuses that should NOT be replaced (should throw exception)
209+
// So dedupeStatuses = all terminal statuses MINUS replaceableStatus
210+
OrchestrationStatus[]? dedupeStatuses = null;
211+
if (request.OrchestrationIdReusePolicy != null && request.OrchestrationIdReusePolicy.ReplaceableStatus.Count > 0)
212+
{
213+
var terminalStatuses = new HashSet<OrchestrationStatus>
214+
{
215+
OrchestrationStatus.Completed,
216+
OrchestrationStatus.Failed,
217+
OrchestrationStatus.Terminated,
218+
OrchestrationStatus.Canceled,
219+
};
220+
221+
// Remove replaceable statuses from terminal statuses to get dedupe statuses
222+
foreach (P.OrchestrationStatus replaceableStatus in request.OrchestrationIdReusePolicy.ReplaceableStatus)
223+
{
224+
terminalStatuses.Remove((OrchestrationStatus)replaceableStatus);
225+
}
226+
227+
// Only set dedupeStatuses if there are any statuses that should not be replaced
228+
if (terminalStatuses.Count > 0)
229+
{
230+
dedupeStatuses = terminalStatuses.ToArray();
231+
}
232+
}
233+
205234
await this.client.CreateTaskOrchestrationAsync(
206235
new TaskMessage
207236
{
@@ -216,7 +245,14 @@ await this.client.CreateTaskOrchestrationAsync(
216245
: null
217246
},
218247
OrchestrationInstance = instance,
219-
});
248+
},
249+
dedupeStatuses);
250+
}
251+
catch (OrchestrationAlreadyExistsException e)
252+
{
253+
// Convert to gRPC exception
254+
this.log.LogWarning(e, "Orchestration with ID {InstanceId} already exists", instance.InstanceId);
255+
throw new RpcException(new Status(StatusCode.AlreadyExists, e.Message));
220256
}
221257
catch (Exception e)
222258
{

0 commit comments

Comments
 (0)