Skip to content

Commit da2d895

Browse files
committed
refactor
1 parent 6182e72 commit da2d895

File tree

3 files changed

+96
-31
lines changed

3 files changed

+96
-31
lines changed

src/Client/Grpc/GrpcDurableTaskClient.cs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -143,17 +143,11 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
143143
return status;
144144
}).ToImmutableHashSet();
145145

146-
P.OrchestrationIdReusePolicy policy = new();
146+
// Convert dedupe statuses to protobuf statuses and create reuse policy
147+
IEnumerable<P.OrchestrationStatus> dedupeStatusesProto = dedupeStatuses.Select(s => s.ToGrpcStatus());
148+
P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatusesProto);
147149

148-
// The policy uses "replaceableStatus" - these are statuses that CAN be replaced
149-
// dedupeStatuses are statuses that should NOT be replaced
150-
foreach (OrchestrationRuntimeStatus terminalStatus in StartOrchestrationOptionsExtensions.ValidDedupeStatuses.Where(ts => !dedupeStatuses.Contains(ts)))
151-
{
152-
policy.ReplaceableStatus.Add(terminalStatus.ToGrpcStatus());
153-
}
154-
155-
// Only set if we have replaceable statuses
156-
if (policy.ReplaceableStatus.Count > 0)
150+
if (policy != null)
157151
{
158152
request.OrchestrationIdReusePolicy = policy;
159153
}

src/Client/Grpc/ProtoUtils.cs

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,100 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4+
using System.Collections.Immutable;
45
using P = Microsoft.DurableTask.Protobuf;
56

67
namespace Microsoft.DurableTask.Client.Grpc;
78

89
/// <summary>
910
/// Protobuf helpers and utilities.
1011
/// </summary>
11-
static class ProtoUtils
12+
public static class ProtoUtils
1213
{
14+
/// <summary>
15+
/// Gets the terminal orchestration statuses that are commonly used for deduplication.
16+
/// These are the statuses that can be used in OrchestrationIdReusePolicy.
17+
/// </summary>
18+
/// <returns>An immutable array of terminal orchestration statuses.</returns>
19+
public static ImmutableArray<P.OrchestrationStatus> GetTerminalStatuses()
20+
{
21+
#pragma warning disable CS0618 // Type or member is obsolete - Canceled is intentionally included for compatibility
22+
return ImmutableArray.Create(
23+
P.OrchestrationStatus.Completed,
24+
P.OrchestrationStatus.Failed,
25+
P.OrchestrationStatus.Terminated,
26+
P.OrchestrationStatus.Canceled);
27+
#pragma warning restore CS0618
28+
}
29+
30+
/// <summary>
31+
/// Converts dedupe statuses (statuses that should NOT be replaced) to an OrchestrationIdReusePolicy
32+
/// with replaceable statuses (statuses that CAN be replaced).
33+
/// </summary>
34+
/// <param name="dedupeStatuses">The orchestration statuses that should NOT be replaced. These are statuses for which an exception should be thrown if an orchestration already exists.</param>
35+
/// <returns>An OrchestrationIdReusePolicy with replaceable statuses set, or null if all terminal statuses are dedupe statuses.</returns>
36+
/// <remarks>
37+
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
38+
/// dedupeStatuses are statuses that should NOT be replaced.
39+
/// So replaceableStatus = all terminal statuses MINUS dedupeStatuses.
40+
/// </remarks>
41+
public static P.OrchestrationIdReusePolicy? ConvertDedupeStatusesToReusePolicy(
42+
IEnumerable<P.OrchestrationStatus> dedupeStatuses)
43+
{
44+
ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
45+
ImmutableHashSet<P.OrchestrationStatus> dedupeStatusSet = dedupeStatuses.ToImmutableHashSet();
46+
47+
P.OrchestrationIdReusePolicy policy = new();
48+
49+
// Add terminal statuses that are NOT in dedupeStatuses as replaceable
50+
foreach (P.OrchestrationStatus terminalStatus in terminalStatuses)
51+
{
52+
if (!dedupeStatusSet.Contains(terminalStatus))
53+
{
54+
policy.ReplaceableStatus.Add(terminalStatus);
55+
}
56+
}
57+
58+
// Only return policy if we have replaceable statuses
59+
return policy.ReplaceableStatus.Count > 0 ? policy : null;
60+
}
61+
62+
/// <summary>
63+
/// Converts an OrchestrationIdReusePolicy with replaceable statuses to dedupe statuses
64+
/// (statuses that should NOT be replaced).
65+
/// </summary>
66+
/// <param name="policy">The OrchestrationIdReusePolicy containing replaceable statuses.</param>
67+
/// <returns>An array of orchestration statuses that should NOT be replaced, or null if all terminal statuses are replaceable.</returns>
68+
/// <remarks>
69+
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
70+
/// dedupeStatuses are statuses that should NOT be replaced (should throw exception).
71+
/// So dedupeStatuses = all terminal statuses MINUS replaceableStatus.
72+
/// </remarks>
73+
public static P.OrchestrationStatus[]? ConvertReusePolicyToDedupeStatuses(
74+
P.OrchestrationIdReusePolicy? policy)
75+
{
76+
if (policy == null || policy.ReplaceableStatus.Count == 0)
77+
{
78+
return null;
79+
}
80+
81+
ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
82+
ImmutableHashSet<P.OrchestrationStatus> replaceableStatusSet = policy.ReplaceableStatus.ToImmutableHashSet();
83+
84+
// Calculate dedupe statuses = terminal statuses - replaceable statuses
85+
List<P.OrchestrationStatus> dedupeStatuses = new();
86+
foreach (P.OrchestrationStatus terminalStatus in terminalStatuses)
87+
{
88+
if (!replaceableStatusSet.Contains(terminalStatus))
89+
{
90+
dedupeStatuses.Add(terminalStatus);
91+
}
92+
}
93+
94+
// Only return if there are dedupe statuses
95+
return dedupeStatuses.Count > 0 ? dedupeStatuses.ToArray() : null;
96+
}
97+
1398
#pragma warning disable 0618 // Referencing Obsolete member. This is intention as we are only converting it.
1499
/// <summary>
15100
/// Converts <see cref="OrchestrationRuntimeStatus" /> to <see cref="P.OrchestrationStatus" />.

src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
using System.Collections.Concurrent;
55
using System.Diagnostics;
66
using System.Globalization;
7+
using System.Linq;
78
using DurableTask.Core;
89
using DurableTask.Core.Exceptions;
910
using DurableTask.Core.History;
1011
using DurableTask.Core.Query;
1112
using Google.Protobuf.WellKnownTypes;
1213
using Grpc.Core;
14+
using Microsoft.DurableTask.Client.Grpc;
1315
using Microsoft.DurableTask.Testing.Sidecar.Dispatcher;
1416
using Microsoft.Extensions.Hosting;
1517
using Microsoft.Extensions.Logging;
@@ -208,27 +210,11 @@ async Task WaitForWorkItemClientConnection()
208210
// dedupeStatuses are statuses that should NOT be replaced (should throw exception)
209211
// So dedupeStatuses = all terminal statuses MINUS replaceableStatus
210212
OrchestrationStatus[]? dedupeStatuses = null;
211-
if (request.OrchestrationIdReusePolicy != null && request.OrchestrationIdReusePolicy.ReplaceableStatus.Count > 0)
213+
P.OrchestrationStatus[]? dedupeStatusesProto = ProtoUtils.ConvertReusePolicyToDedupeStatuses(request.OrchestrationIdReusePolicy);
214+
if (dedupeStatusesProto != null)
212215
{
213-
var terminalStatuses = new HashSet<OrchestrationStatus>
214-
{
215-
OrchestrationStatus.Completed,
216-
OrchestrationStatus.Failed,
217-
OrchestrationStatus.Terminated,
218-
OrchestrationStatus.Canceled,
219-
};
220-
221-
// Remove replaceable statuses from terminal statuses to get dedupe statuses
222-
foreach (P.OrchestrationStatus replaceableStatus in request.OrchestrationIdReusePolicy.ReplaceableStatus)
223-
{
224-
terminalStatuses.Remove((OrchestrationStatus)replaceableStatus);
225-
}
226-
227-
// Only set dedupeStatuses if there are any statuses that should not be replaced
228-
if (terminalStatuses.Count > 0)
229-
{
230-
dedupeStatuses = terminalStatuses.ToArray();
231-
}
216+
// Convert protobuf statuses to Core.OrchestrationStatus
217+
dedupeStatuses = dedupeStatusesProto.Select(s => (OrchestrationStatus)s).ToArray();
232218
}
233219

234220
await this.client.CreateTaskOrchestrationAsync(

0 commit comments

Comments
 (0)