Skip to content

Commit f3a7fca

Browse files
authored
Support dedup status when starting orchestration (#542)
1 parent ad96233 commit f3a7fca

File tree

12 files changed

+1294
-32
lines changed

12 files changed

+1294
-32
lines changed

src/Abstractions/TaskOptions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,12 @@ public record StartOrchestrationOptions(string? InstanceId = null, DateTimeOffse
134134
/// Gets the version to associate with the orchestration instance.
135135
/// </summary>
136136
public TaskVersion? Version { get; init; }
137+
138+
/// <summary>
139+
/// Gets the orchestration runtime statuses that should be considered for deduplication.
140+
/// </summary>
141+
/// <remarks>
142+
/// For type-safe usage, use the WithDedupeStatuses extension method.
143+
/// </remarks>
144+
public IReadOnlyList<string>? DedupeStatuses { get; init; }
137145
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using System.Linq;
5+
6+
namespace Microsoft.DurableTask.Client;
7+
8+
/// <summary>
9+
/// Extension methods for <see cref="StartOrchestrationOptions"/> to provide type-safe deduplication status configuration.
10+
/// </summary>
11+
public static class StartOrchestrationOptionsExtensions
12+
{
13+
public static readonly OrchestrationRuntimeStatus[] ValidDedupeStatuses = new[]
14+
{
15+
OrchestrationRuntimeStatus.Completed,
16+
OrchestrationRuntimeStatus.Failed,
17+
OrchestrationRuntimeStatus.Terminated,
18+
OrchestrationRuntimeStatus.Canceled,
19+
};
20+
21+
/// <summary>
22+
/// Creates a new <see cref="StartOrchestrationOptions"/> with the specified deduplication statuses.
23+
/// </summary>
24+
/// <param name="options">The base options to extend.</param>
25+
/// <param name="dedupeStatuses">The orchestration runtime statuses that should be considered for deduplication.</param>
26+
/// <returns>A new <see cref="StartOrchestrationOptions"/> instance with the deduplication statuses set.</returns>
27+
public static StartOrchestrationOptions WithDedupeStatuses(
28+
this StartOrchestrationOptions options,
29+
params OrchestrationRuntimeStatus[] dedupeStatuses)
30+
{
31+
return options with
32+
{
33+
DedupeStatuses = dedupeStatuses.Select(s => s.ToString()).ToList(),
34+
};
35+
}
36+
}

src/Client/Grpc/GrpcDurableTaskClient.cs

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4+
using System.Collections.Immutable;
45
using System.Diagnostics;
56
using System.Text;
67
using DurableTask.Core.History;
@@ -91,7 +92,7 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
9192
version = this.options.DefaultVersion;
9293
}
9394

94-
var request = new P.CreateInstanceRequest
95+
P.CreateInstanceRequest request = new()
9596
{
9697
Name = orchestratorName.Name,
9798
Version = version,
@@ -122,6 +123,34 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
122123
request.ScheduledStartTimestamp = Timestamp.FromDateTimeOffset(startAt.Value.ToUniversalTime());
123124
}
124125

126+
// Set orchestration ID reuse policy for deduplication support
127+
// Note: This requires the protobuf to support OrchestrationIdReusePolicy field
128+
// If the protobuf doesn't support it yet, this will need to be updated when the protobuf is updated
129+
if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0)
130+
{
131+
// Parse and validate all status strings to enum first
132+
ImmutableHashSet<OrchestrationRuntimeStatus> dedupeStatuses = options.DedupeStatuses
133+
.Select(s =>
134+
{
135+
if (!System.Enum.TryParse<OrchestrationRuntimeStatus>(s, ignoreCase: true, out OrchestrationRuntimeStatus status))
136+
{
137+
throw new ArgumentException(
138+
$"Invalid orchestration runtime status: '{s}' for deduplication.");
139+
}
140+
141+
return status;
142+
}).ToImmutableHashSet();
143+
144+
// Convert dedupe statuses to protobuf statuses and create reuse policy
145+
IEnumerable<P.OrchestrationStatus> dedupeStatusesProto = dedupeStatuses.Select(s => s.ToGrpcStatus());
146+
P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatusesProto);
147+
148+
if (policy != null)
149+
{
150+
request.OrchestrationIdReusePolicy = policy;
151+
}
152+
}
153+
125154
using Activity? newActivity = TraceHelper.StartActivityForNewOrchestration(request);
126155

127156
P.CreateInstanceResponse? result = await this.sidecarClient.StartInstanceAsync(
@@ -405,7 +434,7 @@ public override async Task<string> RestartAsync(
405434
Check.NotNullOrEmpty(instanceId);
406435
Check.NotEntity(this.options.EnableEntitySupport, instanceId);
407436

408-
var request = new P.RestartInstanceRequest
437+
P.RestartInstanceRequest request = new P.RestartInstanceRequest
409438
{
410439
InstanceId = instanceId,
411440
RestartWithNewInstanceId = restartWithNewInstanceId,
@@ -441,7 +470,7 @@ public override async Task RewindInstanceAsync(
441470
Check.NotNullOrEmpty(instanceId);
442471
Check.NotEntity(this.options.EnableEntitySupport, instanceId);
443472

444-
var request = new P.RewindInstanceRequest
473+
P.RewindInstanceRequest request = new P.RewindInstanceRequest
445474
{
446475
InstanceId = instanceId,
447476
Reason = reason,
@@ -573,7 +602,7 @@ async Task<PurgeResult> PurgeInstancesCoreAsync(
573602

574603
OrchestrationMetadata CreateMetadata(P.OrchestrationState state, bool includeInputsAndOutputs)
575604
{
576-
var metadata = new OrchestrationMetadata(state.Name, state.InstanceId)
605+
OrchestrationMetadata metadata = new OrchestrationMetadata(state.Name, state.InstanceId)
577606
{
578607
CreatedAt = state.CreatedTimestamp.ToDateTimeOffset(),
579608
LastUpdatedAt = state.LastUpdatedTimestamp.ToDateTimeOffset(),

src/Client/Grpc/ProtoUtils.cs

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,93 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4+
using System.Collections.Immutable;
5+
using System.Linq;
46
using P = Microsoft.DurableTask.Protobuf;
57

68
namespace Microsoft.DurableTask.Client.Grpc;
79

810
/// <summary>
911
/// Protobuf helpers and utilities.
1012
/// </summary>
11-
static class ProtoUtils
13+
public static class ProtoUtils
1214
{
15+
/// <summary>
16+
/// Gets the terminal orchestration statuses that are commonly used for deduplication.
17+
/// These are the statuses that can be used in OrchestrationIdReusePolicy.
18+
/// </summary>
19+
/// <returns>An immutable array of terminal orchestration statuses.</returns>
20+
public static ImmutableArray<P.OrchestrationStatus> GetTerminalStatuses()
21+
{
22+
#pragma warning disable CS0618 // Type or member is obsolete - Canceled is intentionally included for compatibility
23+
return ImmutableArray.Create(
24+
P.OrchestrationStatus.Completed,
25+
P.OrchestrationStatus.Failed,
26+
P.OrchestrationStatus.Terminated,
27+
P.OrchestrationStatus.Canceled);
28+
#pragma warning restore CS0618
29+
}
30+
31+
/// <summary>
32+
/// Converts dedupe statuses (statuses that should NOT be replaced) to an OrchestrationIdReusePolicy
33+
/// with replaceable statuses (statuses that CAN be replaced).
34+
/// </summary>
35+
/// <param name="dedupeStatuses">The orchestration statuses that should NOT be replaced. These are statuses for which an exception should be thrown if an orchestration already exists.</param>
36+
/// <returns>An OrchestrationIdReusePolicy with replaceable statuses set, or null if all terminal statuses are dedupe statuses.</returns>
37+
/// <remarks>
38+
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
39+
/// dedupeStatuses are statuses that should NOT be replaced.
40+
/// So replaceableStatus = all terminal statuses MINUS dedupeStatuses.
41+
/// </remarks>
42+
public static P.OrchestrationIdReusePolicy? ConvertDedupeStatusesToReusePolicy(
43+
IEnumerable<P.OrchestrationStatus>? dedupeStatuses)
44+
{
45+
ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
46+
ImmutableHashSet<P.OrchestrationStatus> dedupeStatusSet = dedupeStatuses?.ToImmutableHashSet() ?? ImmutableHashSet<P.OrchestrationStatus>.Empty;
47+
48+
P.OrchestrationIdReusePolicy policy = new();
49+
50+
// Add terminal statuses that are NOT in dedupeStatuses as replaceable
51+
foreach (P.OrchestrationStatus terminalStatus in terminalStatuses.Where(status => !dedupeStatusSet.Contains(status)))
52+
{
53+
policy.ReplaceableStatus.Add(terminalStatus);
54+
}
55+
56+
// Only return policy if we have replaceable statuses
57+
return policy.ReplaceableStatus.Count > 0 ? policy : null;
58+
}
59+
60+
/// <summary>
61+
/// Converts an OrchestrationIdReusePolicy with replaceable statuses to dedupe statuses
62+
/// (statuses that should NOT be replaced).
63+
/// </summary>
64+
/// <param name="policy">The OrchestrationIdReusePolicy containing replaceable statuses.</param>
65+
/// <returns>An array of orchestration statuses that should NOT be replaced, or null if all terminal statuses are replaceable.</returns>
66+
/// <remarks>
67+
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
68+
/// dedupeStatuses are statuses that should NOT be replaced (should throw exception).
69+
/// So dedupeStatuses = all terminal statuses MINUS replaceableStatus.
70+
/// </remarks>
71+
public static P.OrchestrationStatus[]? ConvertReusePolicyToDedupeStatuses(
72+
P.OrchestrationIdReusePolicy? policy)
73+
{
74+
if (policy == null || policy.ReplaceableStatus.Count == 0)
75+
{
76+
return null;
77+
}
78+
79+
ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
80+
ImmutableHashSet<P.OrchestrationStatus> replaceableStatusSet = policy.ReplaceableStatus.ToImmutableHashSet();
81+
82+
// Calculate dedupe statuses = terminal statuses - replaceable statuses
83+
P.OrchestrationStatus[] dedupeStatuses = terminalStatuses
84+
.Where(terminalStatus => !replaceableStatusSet.Contains(terminalStatus))
85+
.ToArray();
86+
87+
// Only return if there are dedupe statuses
88+
return dedupeStatuses.Length > 0 ? dedupeStatuses : null;
89+
}
90+
1391
#pragma warning disable 0618 // Referencing Obsolete member. This is intention as we are only converting it.
1492
/// <summary>
1593
/// Converts <see cref="OrchestrationRuntimeStatus" /> to <see cref="P.OrchestrationStatus" />.

src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using DurableTask.Core;
88
using DurableTask.Core.History;
99
using DurableTask.Core.Query;
10+
using Microsoft.DurableTask.Client;
1011
using Microsoft.DurableTask.Client.Entities;
1112
using Microsoft.Extensions.DependencyInjection;
1213
using Microsoft.Extensions.Options;
@@ -192,7 +193,23 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
192193
},
193194
};
194195

195-
await this.Client.CreateTaskOrchestrationAsync(message);
196+
Core.OrchestrationStatus[]? dedupeStatuses = null;
197+
if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0)
198+
{
199+
dedupeStatuses = options.DedupeStatuses
200+
.Select(s =>
201+
{
202+
if (!Enum.TryParse<OrchestrationRuntimeStatus>(s, ignoreCase: true, out var status))
203+
{
204+
throw new ArgumentException(
205+
$"Invalid orchestration runtime status: '{s}' for deduplication.");
206+
}
207+
return status.ConvertToCore();
208+
})
209+
.ToArray();
210+
}
211+
212+
await this.Client.CreateTaskOrchestrationAsync(message, dedupeStatuses);
196213
return instanceId;
197214
}
198215

@@ -303,7 +320,7 @@ public override async Task<string> RestartAsync(
303320
},
304321
};
305322

306-
await this.Client.CreateTaskOrchestrationAsync(message);
323+
await this.Client.CreateTaskOrchestrationAsync(message, dedupeStatuses: null);
307324
return newInstanceId;
308325
}
309326

src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
1-
// Copyright (c) Microsoft Corporation.
1+
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

44
using System.Collections.Concurrent;
55
using System.Diagnostics;
66
using System.Globalization;
7+
using System.Linq;
78
using DurableTask.Core;
9+
using DurableTask.Core.Exceptions;
810
using DurableTask.Core.History;
911
using DurableTask.Core.Query;
1012
using Google.Protobuf.WellKnownTypes;
1113
using Grpc.Core;
14+
using Microsoft.DurableTask.Client.Grpc;
1215
using Microsoft.DurableTask.Testing.Sidecar.Dispatcher;
1316
using Microsoft.Extensions.Hosting;
1417
using Microsoft.Extensions.Logging;
@@ -202,6 +205,18 @@ async Task WaitForWorkItemClientConnection()
202205

203206
try
204207
{
208+
// Convert OrchestrationIdReusePolicy to dedupeStatuses
209+
// The policy uses "replaceableStatus" - these are statuses that CAN be replaced
210+
// dedupeStatuses are statuses that should NOT be replaced (should throw exception)
211+
// So dedupeStatuses = all terminal statuses MINUS replaceableStatus
212+
OrchestrationStatus[]? dedupeStatuses = null;
213+
P.OrchestrationStatus[]? dedupeStatusesProto = ProtoUtils.ConvertReusePolicyToDedupeStatuses(request.OrchestrationIdReusePolicy);
214+
if (dedupeStatusesProto != null)
215+
{
216+
// Convert protobuf statuses to Core.OrchestrationStatus
217+
dedupeStatuses = dedupeStatusesProto.Select(s => (OrchestrationStatus)s).ToArray();
218+
}
219+
205220
await this.client.CreateTaskOrchestrationAsync(
206221
new TaskMessage
207222
{
@@ -216,7 +231,14 @@ await this.client.CreateTaskOrchestrationAsync(
216231
: null
217232
},
218233
OrchestrationInstance = instance,
219-
});
234+
},
235+
dedupeStatuses);
236+
}
237+
catch (OrchestrationAlreadyExistsException e)
238+
{
239+
// Convert to gRPC exception
240+
this.log.LogWarning(e, "Orchestration with ID {InstanceId} already exists", instance.InstanceId);
241+
throw new RpcException(new Status(StatusCode.AlreadyExists, e.Message));
220242
}
221243
catch (Exception e)
222244
{

test/Abstractions.Tests/Abstractions.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
<ItemGroup>
88
<ProjectReference Include="$(SrcRoot)Abstractions/Abstractions.csproj" />
9+
<ProjectReference Include="$(SrcRoot)Client/Core/Client.csproj" />
910
</ItemGroup>
1011

1112
</Project>

0 commit comments

Comments
 (0)