Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/Abstractions/TaskOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public StartOrchestrationOptions(StartOrchestrationOptions options)
this.Tags = options.Tags;
this.Version = options.Version;
this.DedupeStatuses = options.DedupeStatuses;
this.IdReusePolicy = options.IdReusePolicy;
}

/// <summary>
Expand Down Expand Up @@ -204,6 +205,16 @@ public StartOrchestrationOptions(StartOrchestrationOptions options)
/// </summary>
/// <remarks>
/// For type-safe usage, use the WithDedupeStatuses extension method.
/// This property is mutually exclusive with IdReusePolicy. If both are set, IdReusePolicy takes precedence.
/// </remarks>
public IReadOnlyList<string>? DedupeStatuses { get; init; }

/// <summary>
/// Gets the orchestration ID reuse policy.
/// </summary>
/// <remarks>
/// This is an internal property. For type-safe usage, use the WithIdReusePolicy extension method.
/// This property takes precedence over DedupeStatuses if both are set.
/// </remarks>
public object? IdReusePolicy { get; init; }
}
28 changes: 28 additions & 0 deletions src/Client/Core/CreateOrchestrationAction.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Microsoft.DurableTask.Client;

/// <summary>
/// Defines actions for handling orchestration instance ID conflicts.
/// </summary>
public enum CreateOrchestrationAction
{
/// <summary>
/// Throws an exception if an orchestration instance with the specified ID already exists in one of the operation statuses.
/// This is the default behavior.
/// </summary>
Error = 0,

/// <summary>
/// Ignores the request to create a new orchestration instance if one already exists in one of the operation statuses.
/// No exception is thrown and no new instance is created.
/// </summary>
Ignore = 1,

/// <summary>
/// Terminates any existing orchestration instance with the same ID that is in one of the operation statuses,
/// and then creates a new instance as an atomic operation. This is similar to an on-demand ContinueAsNew.
/// </summary>
Terminate = 2,
}
69 changes: 69 additions & 0 deletions src/Client/Core/OrchestrationIdReusePolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Microsoft.DurableTask.Client;

/// <summary>
/// Defines a policy for reusing orchestration instance IDs.
/// </summary>
/// <remarks>
/// This policy determines what happens when a client attempts to create a new orchestration instance
/// with an ID that already exists. The policy consists of an action (Error, Ignore, or Terminate)
/// and a set of orchestration runtime statuses to which the action applies.
/// </remarks>
public sealed class OrchestrationIdReusePolicy
{
/// <summary>
/// Initializes a new instance of the <see cref="OrchestrationIdReusePolicy"/> class.
/// </summary>
/// <param name="operationStatuses">The orchestration runtime statuses to which the action applies.</param>
/// <param name="action">The action to take when an orchestration instance with a matching status exists.</param>
public OrchestrationIdReusePolicy(
IEnumerable<OrchestrationRuntimeStatus> operationStatuses,
CreateOrchestrationAction action)
{
Check.NotNull(operationStatuses);
this.OperationStatuses = operationStatuses.ToArray();
this.Action = action;
}

/// <summary>
/// Gets the orchestration runtime statuses to which the action applies.
/// </summary>
/// <remarks>
/// When an orchestration instance exists with one of these statuses, the specified action will be taken.
/// For example, if the action is <see cref="CreateOrchestrationAction.Terminate"/> and the operation statuses
/// include <see cref="OrchestrationRuntimeStatus.Running"/>, then any running instance with the same ID
/// will be terminated before creating a new instance.
/// </remarks>
public IReadOnlyList<OrchestrationRuntimeStatus> OperationStatuses { get; }

/// <summary>
/// Gets the action to take when an orchestration instance with a matching status exists.
/// </summary>
public CreateOrchestrationAction Action { get; }

/// <summary>
/// Creates a policy that throws an error if an orchestration instance with the specified statuses already exists.
/// </summary>
/// <param name="statuses">The orchestration runtime statuses that should cause an error.</param>
/// <returns>A new <see cref="OrchestrationIdReusePolicy"/> with the Error action.</returns>
public static OrchestrationIdReusePolicy Error(params OrchestrationRuntimeStatus[] statuses)
=> new(statuses, CreateOrchestrationAction.Error);

/// <summary>
/// Creates a policy that ignores the request if an orchestration instance with the specified statuses already exists.
/// </summary>
/// <param name="statuses">The orchestration runtime statuses that should cause the request to be ignored.</param>
/// <returns>A new <see cref="OrchestrationIdReusePolicy"/> with the Ignore action.</returns>
public static OrchestrationIdReusePolicy Ignore(params OrchestrationRuntimeStatus[] statuses)
=> new(statuses, CreateOrchestrationAction.Ignore);

/// <summary>
/// Creates a policy that terminates any existing orchestration instance with the specified statuses and creates a new one.
/// </summary>
/// <param name="statuses">The orchestration runtime statuses that should be terminated before creating a new instance.</param>
/// <returns>A new <see cref="OrchestrationIdReusePolicy"/> with the Terminate action.</returns>
public static OrchestrationIdReusePolicy Terminate(params OrchestrationRuntimeStatus[] statuses)
=> new(statuses, CreateOrchestrationAction.Terminate);
}
20 changes: 20 additions & 0 deletions src/Client/Core/StartOrchestrationOptionsExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ namespace Microsoft.DurableTask.Client;
/// </summary>
public static class StartOrchestrationOptionsExtensions
{
/// <summary>
/// Gets the valid terminal orchestration statuses that can be used for deduplication and ID reuse policies.
/// </summary>
public static readonly OrchestrationRuntimeStatus[] ValidDedupeStatuses = new[]
{
OrchestrationRuntimeStatus.Completed,
Expand All @@ -33,4 +36,21 @@ public static StartOrchestrationOptions WithDedupeStatuses(
DedupeStatuses = dedupeStatuses.Select(s => s.ToString()).ToList(),
};
}

/// <summary>
/// Creates a new <see cref="StartOrchestrationOptions"/> with the specified orchestration ID reuse policy.
/// </summary>
/// <param name="options">The base options to extend.</param>
/// <param name="policy">The orchestration ID reuse policy.</param>
/// <returns>A new <see cref="StartOrchestrationOptions"/> instance with the ID reuse policy set.</returns>
public static StartOrchestrationOptions WithIdReusePolicy(
this StartOrchestrationOptions options,
OrchestrationIdReusePolicy policy)
{
Check.NotNull(policy);
return options with
{
IdReusePolicy = policy,
};
}
}
17 changes: 13 additions & 4 deletions src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,20 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
request.ScheduledStartTimestamp = Timestamp.FromDateTimeOffset(startAt.Value.ToUniversalTime());
}

// Set orchestration ID reuse policy for deduplication support
// Note: This requires the protobuf to support OrchestrationIdReusePolicy field
// If the protobuf doesn't support it yet, this will need to be updated when the protobuf is updated
if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0)
// Set orchestration ID reuse policy
// Priority: IdReusePolicy > DedupeStatuses
if (options?.IdReusePolicy is OrchestrationIdReusePolicy idReusePolicy)
{
// Use the new explicit ID reuse policy
P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertToProtoReusePolicy(idReusePolicy);
if (policy != null)
{
request.OrchestrationIdReusePolicy = policy;
}
}
else if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0)
{
// Fall back to legacy dedupe statuses for backward compatibility
// Parse and validate all status strings to enum first
ImmutableHashSet<OrchestrationRuntimeStatus> dedupeStatuses = options.DedupeStatuses
.Select(s =>
Expand Down
129 changes: 87 additions & 42 deletions src/Client/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,42 @@ namespace Microsoft.DurableTask.Client.Grpc;
/// </summary>
public static class ProtoUtils
{
/// <summary>
/// Converts a public CreateOrchestrationAction to a protobuf CreateOrchestrationAction.
/// </summary>
/// <param name="action">The public action.</param>
/// <returns>A protobuf CreateOrchestrationAction.</returns>
internal static P.CreateOrchestrationAction ConvertToProtoAction(
Microsoft.DurableTask.Client.CreateOrchestrationAction action)
=> action switch
{
Microsoft.DurableTask.Client.CreateOrchestrationAction.Error => P.CreateOrchestrationAction.Error,
Microsoft.DurableTask.Client.CreateOrchestrationAction.Ignore => P.CreateOrchestrationAction.Ignore,
Microsoft.DurableTask.Client.CreateOrchestrationAction.Terminate => P.CreateOrchestrationAction.Terminate,
_ => throw new ArgumentOutOfRangeException(nameof(action), "Unexpected value"),
};

#pragma warning disable 0618 // Referencing Obsolete member. This is intention as we are only converting it.
/// <summary>
/// Converts <see cref="OrchestrationRuntimeStatus" /> to <see cref="P.OrchestrationStatus" />.
/// </summary>
/// <param name="status">The orchestration status.</param>
/// <returns>A <see cref="P.OrchestrationStatus" />.</returns>
internal static P.OrchestrationStatus ToGrpcStatus(this OrchestrationRuntimeStatus status)
=> status switch
{
OrchestrationRuntimeStatus.Canceled => P.OrchestrationStatus.Canceled,
OrchestrationRuntimeStatus.Completed => P.OrchestrationStatus.Completed,
OrchestrationRuntimeStatus.ContinuedAsNew => P.OrchestrationStatus.ContinuedAsNew,
OrchestrationRuntimeStatus.Failed => P.OrchestrationStatus.Failed,
OrchestrationRuntimeStatus.Pending => P.OrchestrationStatus.Pending,
OrchestrationRuntimeStatus.Running => P.OrchestrationStatus.Running,
OrchestrationRuntimeStatus.Terminated => P.OrchestrationStatus.Terminated,
OrchestrationRuntimeStatus.Suspended => P.OrchestrationStatus.Suspended,
_ => throw new ArgumentOutOfRangeException(nameof(status), "Unexpected value"),
};
#pragma warning restore 0618 // Referencing Obsolete member.

/// <summary>
/// Gets the terminal orchestration statuses that are commonly used for deduplication.
/// These are the statuses that can be used in OrchestrationIdReusePolicy.
Expand All @@ -30,82 +66,91 @@ public static class ProtoUtils

/// <summary>
/// Converts dedupe statuses (statuses that should NOT be replaced) to an OrchestrationIdReusePolicy
/// with replaceable statuses (statuses that CAN be replaced).
/// with TERMINATE action for terminal statuses that CAN be replaced.
/// </summary>
/// <param name="dedupeStatuses">The orchestration statuses that should NOT be replaced. These are statuses for which an exception should be thrown if an orchestration already exists.</param>
/// <returns>An OrchestrationIdReusePolicy with replaceable statuses set, or null if all terminal statuses are dedupe statuses.</returns>
/// <returns>An OrchestrationIdReusePolicy with TERMINATE action and operation statuses set, or null if all terminal statuses are dedupe statuses.</returns>
/// <remarks>
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
/// dedupeStatuses are statuses that should NOT be replaced.
/// So replaceableStatus = all terminal statuses MINUS dedupeStatuses.
/// This method maintains backward compatibility by converting dedupe statuses to the new policy format.
/// The policy will have action = TERMINATE and operationStatus = terminal statuses that can be replaced.
/// dedupeStatuses are statuses that should NOT be replaced (ERROR action).
/// So operationStatus = all terminal statuses MINUS dedupeStatuses.
/// </remarks>
public static P.OrchestrationIdReusePolicy? ConvertDedupeStatusesToReusePolicy(
IEnumerable<P.OrchestrationStatus>? dedupeStatuses)
{
ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
ImmutableHashSet<P.OrchestrationStatus> dedupeStatusSet = dedupeStatuses?.ToImmutableHashSet() ?? ImmutableHashSet<P.OrchestrationStatus>.Empty;

P.OrchestrationIdReusePolicy policy = new();
P.OrchestrationIdReusePolicy policy = new()
{
Action = P.CreateOrchestrationAction.Terminate,
};

// Add terminal statuses that are NOT in dedupeStatuses as replaceable
// Add terminal statuses that are NOT in dedupeStatuses to operation status (these can be terminated and replaced)
foreach (P.OrchestrationStatus terminalStatus in terminalStatuses.Where(status => !dedupeStatusSet.Contains(status)))
{
policy.ReplaceableStatus.Add(terminalStatus);
policy.OperationStatus.Add(terminalStatus);
}

// Only return policy if we have operation statuses
return policy.OperationStatus.Count > 0 ? policy : null;
}

/// <summary>
/// Converts a public OrchestrationIdReusePolicy to a protobuf OrchestrationIdReusePolicy.
/// </summary>
/// <param name="policy">The public orchestration ID reuse policy.</param>
/// <returns>A protobuf OrchestrationIdReusePolicy.</returns>
public static P.OrchestrationIdReusePolicy? ConvertToProtoReusePolicy(
Microsoft.DurableTask.Client.OrchestrationIdReusePolicy? policy)
{
if (policy == null)
{
return null;
}

P.OrchestrationIdReusePolicy protoPolicy = new()
{
Action = ConvertToProtoAction(policy.Action),
};

foreach (OrchestrationRuntimeStatus status in policy.OperationStatuses)
{
protoPolicy.OperationStatus.Add(status.ToGrpcStatus());
}

// Only return policy if we have replaceable statuses
return policy.ReplaceableStatus.Count > 0 ? policy : null;
return protoPolicy;
}

/// <summary>
/// Converts an OrchestrationIdReusePolicy with replaceable statuses to dedupe statuses
/// (statuses that should NOT be replaced).
/// Converts an OrchestrationIdReusePolicy to dedupe statuses (statuses that should NOT be replaced).
/// </summary>
/// <param name="policy">The OrchestrationIdReusePolicy containing replaceable statuses.</param>
/// <returns>An array of orchestration statuses that should NOT be replaced, or null if all terminal statuses are replaceable.</returns>
/// <param name="policy">The OrchestrationIdReusePolicy containing action and operation statuses.</param>
/// <returns>An array of orchestration statuses that should NOT be replaced, or null if all terminal statuses can be replaced.</returns>
/// <remarks>
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
/// dedupeStatuses are statuses that should NOT be replaced (should throw exception).
/// So dedupeStatuses = all terminal statuses MINUS replaceableStatus.
/// This method maintains backward compatibility by converting the new policy format to dedupe statuses.
/// For TERMINATE action: dedupeStatuses = all terminal statuses MINUS operationStatus.
/// For ERROR or IGNORE action: the behavior depends on the action semantics.
/// </remarks>
public static P.OrchestrationStatus[]? ConvertReusePolicyToDedupeStatuses(
P.OrchestrationIdReusePolicy? policy)
{
if (policy == null || policy.ReplaceableStatus.Count == 0)
if (policy == null || policy.OperationStatus.Count == 0)
{
return null;
}

ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
ImmutableHashSet<P.OrchestrationStatus> replaceableStatusSet = policy.ReplaceableStatus.ToImmutableHashSet();
ImmutableHashSet<P.OrchestrationStatus> operationStatusSet = policy.OperationStatus.ToImmutableHashSet();

// Calculate dedupe statuses = terminal statuses - replaceable statuses
// For TERMINATE action: dedupe statuses = terminal statuses - operation status
// For other actions, the conversion may not be straightforward
P.OrchestrationStatus[] dedupeStatuses = terminalStatuses
.Where(terminalStatus => !replaceableStatusSet.Contains(terminalStatus))
.Where(terminalStatus => !operationStatusSet.Contains(terminalStatus))
.ToArray();

// Only return if there are dedupe statuses
return dedupeStatuses.Length > 0 ? dedupeStatuses : null;
}

#pragma warning disable 0618 // Referencing Obsolete member. This is intention as we are only converting it.
/// <summary>
/// Converts <see cref="OrchestrationRuntimeStatus" /> to <see cref="P.OrchestrationStatus" />.
/// </summary>
/// <param name="status">The orchestration status.</param>
/// <returns>A <see cref="P.OrchestrationStatus" />.</returns>
internal static P.OrchestrationStatus ToGrpcStatus(this OrchestrationRuntimeStatus status)
=> status switch
{
OrchestrationRuntimeStatus.Canceled => P.OrchestrationStatus.Canceled,
OrchestrationRuntimeStatus.Completed => P.OrchestrationStatus.Completed,
OrchestrationRuntimeStatus.ContinuedAsNew => P.OrchestrationStatus.ContinuedAsNew,
OrchestrationRuntimeStatus.Failed => P.OrchestrationStatus.Failed,
OrchestrationRuntimeStatus.Pending => P.OrchestrationStatus.Pending,
OrchestrationRuntimeStatus.Running => P.OrchestrationStatus.Running,
OrchestrationRuntimeStatus.Terminated => P.OrchestrationStatus.Terminated,
OrchestrationRuntimeStatus.Suspended => P.OrchestrationStatus.Suspended,
_ => throw new ArgumentOutOfRangeException(nameof(status), "Unexpected value"),
};
#pragma warning restore 0618 // Referencing Obsolete member.
}
10 changes: 8 additions & 2 deletions src/Grpc/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ enum OrchestrationStatus {
ORCHESTRATION_STATUS_SUSPENDED = 7;
}

enum CreateOrchestrationAction {
CREATE_ORCHESTRATION_ACTION_ERROR = 0;
CREATE_ORCHESTRATION_ACTION_IGNORE = 1;
CREATE_ORCHESTRATION_ACTION_TERMINATE = 2;
}

message ParentInstanceInfo {
int32 taskScheduledId = 1;
google.protobuf.StringValue name = 2;
Expand Down Expand Up @@ -380,8 +386,8 @@ message CreateInstanceRequest {
}

message OrchestrationIdReusePolicy {
repeated OrchestrationStatus replaceableStatus = 1;
reserved 2;
repeated OrchestrationStatus operationStatus = 1;
CreateOrchestrationAction action = 2;
}

message CreateInstanceResponse {
Expand Down
Loading
Loading