diff --git a/CHANGELOG.md b/CHANGELOG.md
index f3ec402b8..02449a9e2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,6 +5,7 @@
- Introduce default version setting to DurableTaskClient and expose to orchestrator ([#393](https://github.com/microsoft/durabletask-dotnet/pull/393))
- Add support for local credential types in DTS libraries ([#396](https://github.com/microsoft/durabletask-dotnet/pull/396))
- Add utility for easier version comparison in orchestration context ([#394](https://github.com/microsoft/durabletask-dotnet/pull/394))
+- Add tags support for orchestrations ([#397])(https://github.com/microsoft/durabletask-dotnet/pull/397)
## v1.8.1
diff --git a/Directory.Packages.props b/Directory.Packages.props
index a1e78379e..ee4e690d3 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -30,7 +30,6 @@
-
@@ -77,6 +76,7 @@
+
diff --git a/src/Abstractions/Abstractions.csproj b/src/Abstractions/Abstractions.csproj
index e4c127b9e..db8be76ab 100644
--- a/src/Abstractions/Abstractions.csproj
+++ b/src/Abstractions/Abstractions.csproj
@@ -12,6 +12,7 @@
+
diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs
index 63f943825..d3e06e5ca 100644
--- a/src/Abstractions/TaskOptions.cs
+++ b/src/Abstractions/TaskOptions.cs
@@ -1,103 +1,111 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT License.
-
-namespace Microsoft.DurableTask;
-
-///
-/// Options that can be used to control the behavior of orchestrator task execution.
-///
-public record TaskOptions
-{
- ///
- /// Initializes a new instance of the class.
- ///
- /// The task retry options.
- public TaskOptions(TaskRetryOptions? retry = null)
- {
- this.Retry = retry;
- }
-
- ///
- /// Gets the task retry options.
- ///
- public TaskRetryOptions? Retry { get; init; }
-
- ///
- /// Returns a new from the provided .
- ///
- /// The policy to convert from.
- /// A built from the policy.
- public static TaskOptions FromRetryPolicy(RetryPolicy policy) => new(policy);
-
- ///
- /// Returns a new from the provided .
- ///
- /// The handler to convert from.
- /// A built from the handler.
- public static TaskOptions FromRetryHandler(AsyncRetryHandler handler) => new(handler);
-
- ///
- /// Returns a new from the provided .
- ///
- /// The handler to convert from.
- /// A built from the handler.
- public static TaskOptions FromRetryHandler(RetryHandler handler) => new(handler);
-
- ///
- /// Returns a new with the provided instance ID. This can be used when
- /// starting a new sub-orchestration to specify the instance ID.
- ///
- /// The instance ID to use.
- /// A new .
- public SubOrchestrationOptions WithInstanceId(string instanceId) => new(this, instanceId);
-}
-
-///
-/// Options that can be used to control the behavior of orchestrator task execution. This derived type can be used to
-/// supply extra options for orchestrations.
-///
-public record SubOrchestrationOptions : TaskOptions
-{
- ///
- /// Initializes a new instance of the class.
- ///
- /// The task retry options.
- /// The orchestration instance ID.
- public SubOrchestrationOptions(TaskRetryOptions? retry = null, string? instanceId = null)
- : base(retry)
- {
- this.InstanceId = instanceId;
- }
-
- ///
- /// Initializes a new instance of the class.
- ///
- /// The task options to wrap.
- /// The orchestration instance ID.
- public SubOrchestrationOptions(TaskOptions options, string? instanceId = null)
- : base(options)
- {
- this.InstanceId = instanceId;
- if (instanceId is null && options is SubOrchestrationOptions derived)
- {
- this.InstanceId = derived.InstanceId;
- }
- }
-
- ///
- /// Gets the orchestration instance ID.
- ///
- public string? InstanceId { get; init; }
-}
-
-///
-/// Options for submitting new orchestrations via the client.
-///
-///
-/// The unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used.
-///
-///
-/// The time when the orchestration instance should start executing. If not specified or if a date-time in the past
-/// is specified, the orchestration instance will be scheduled immediately.
-///
-public record StartOrchestrationOptions(string? InstanceId = null, DateTimeOffset? StartAt = null);
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using System.Collections.Immutable;
+
+namespace Microsoft.DurableTask;
+
+///
+/// Options that can be used to control the behavior of orchestrator task execution.
+///
+public record TaskOptions
+{
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The task retry options.
+ public TaskOptions(TaskRetryOptions? retry = null)
+ {
+ this.Retry = retry;
+ }
+
+ ///
+ /// Gets the task retry options.
+ ///
+ public TaskRetryOptions? Retry { get; init; }
+
+ ///
+ /// Returns a new from the provided .
+ ///
+ /// The policy to convert from.
+ /// A built from the policy.
+ public static TaskOptions FromRetryPolicy(RetryPolicy policy) => new(policy);
+
+ ///
+ /// Returns a new from the provided .
+ ///
+ /// The handler to convert from.
+ /// A built from the handler.
+ public static TaskOptions FromRetryHandler(AsyncRetryHandler handler) => new(handler);
+
+ ///
+ /// Returns a new from the provided .
+ ///
+ /// The handler to convert from.
+ /// A built from the handler.
+ public static TaskOptions FromRetryHandler(RetryHandler handler) => new(handler);
+
+ ///
+ /// Returns a new with the provided instance ID. This can be used when
+ /// starting a new sub-orchestration to specify the instance ID.
+ ///
+ /// The instance ID to use.
+ /// A new .
+ public SubOrchestrationOptions WithInstanceId(string instanceId) => new(this, instanceId);
+}
+
+///
+/// Options that can be used to control the behavior of orchestrator task execution. This derived type can be used to
+/// supply extra options for orchestrations.
+///
+public record SubOrchestrationOptions : TaskOptions
+{
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The task retry options.
+ /// The orchestration instance ID.
+ public SubOrchestrationOptions(TaskRetryOptions? retry = null, string? instanceId = null)
+ : base(retry)
+ {
+ this.InstanceId = instanceId;
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The task options to wrap.
+ /// The orchestration instance ID.
+ public SubOrchestrationOptions(TaskOptions options, string? instanceId = null)
+ : base(options)
+ {
+ this.InstanceId = instanceId;
+ if (instanceId is null && options is SubOrchestrationOptions derived)
+ {
+ this.InstanceId = derived.InstanceId;
+ }
+ }
+
+ ///
+ /// Gets the orchestration instance ID.
+ ///
+ public string? InstanceId { get; init; }
+}
+
+///
+/// Options for submitting new orchestrations via the client.
+///
+///
+/// The unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used.
+///
+///
+/// The time when the orchestration instance should start executing. If not specified or if a date-time in the past
+/// is specified, the orchestration instance will be scheduled immediately.
+///
+public record StartOrchestrationOptions(string? InstanceId = null, DateTimeOffset? StartAt = null)
+{
+ ///
+ /// Gets the tags to associate with the orchestration instance.
+ ///
+ public IReadOnlyDictionary Tags { get; init; } = ImmutableDictionary.Create();
+}
diff --git a/src/Client/Core/OrchestrationMetadata.cs b/src/Client/Core/OrchestrationMetadata.cs
index 3fc43b0a3..a1cf3d9fa 100644
--- a/src/Client/Core/OrchestrationMetadata.cs
+++ b/src/Client/Core/OrchestrationMetadata.cs
@@ -1,233 +1,239 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT License.
-
-using System.Diagnostics.CodeAnalysis;
-using System.Text;
-
-namespace Microsoft.DurableTask.Client;
-
-///
-/// Represents a snapshot of an orchestration instance's current state, including metadata.
-///
-///
-/// Instances of this class are produced by methods in the class, such as
-/// ,
-/// and
-/// .
-///
-public sealed class OrchestrationMetadata
-{
- ///
- /// Initializes a new instance of the class.
- ///
- /// The name of the orchestration.
- /// The instance ID of the orchestration.
- public OrchestrationMetadata(string name, string instanceId)
- {
- this.Name = name;
- this.InstanceId = instanceId;
- }
-
- /// Gets the name of the orchestration.
- /// The name of the orchestration.
- public string Name { get; }
-
- /// Gets the unique ID of the orchestration instance.
- /// The unique ID of the orchestration instance.
- public string InstanceId { get; }
-
- ///
- /// Gets the data converter used to deserialized the serialized data on this instance.
- /// This will only be present when inputs and outputs are requested, null otherwise.
- ///
- /// The optional data converter.
- public DataConverter? DataConverter { get; init; }
-
- ///
- /// Gets the current runtime status of the orchestration instance at the time this object was fetched.
- ///
- /// The runtime status of the orchestration instance at the time this object was fetched.
- public OrchestrationRuntimeStatus RuntimeStatus { get; init; }
-
- ///
- /// Gets the orchestration instance's creation time in UTC.
- ///
- /// The orchestration instance's creation time in UTC.
- public DateTimeOffset CreatedAt { get; init; }
-
- ///
- /// Gets the orchestration instance's last updated time in UTC.
- ///
- /// The orchestration instance's last updated time in UTC.
- public DateTimeOffset LastUpdatedAt { get; init; }
-
- ///
- /// Gets the orchestration instance's serialized input, if any, as a string value.
- ///
- /// The serialized orchestration input or null.
- public string? SerializedInput { get; init; }
-
- ///
- /// Gets the orchestration instance's serialized output, if any, as a string value.
- ///
- /// The serialized orchestration output or null.
- public string? SerializedOutput { get; init; }
-
- ///
- /// Gets the orchestration instance's serialized custom status, if any, as a string value.
- ///
- /// The serialized custom status or null.
- public string? SerializedCustomStatus { get; init; }
-
- ///
- /// Gets the failure details, if any, for the orchestration instance.
- ///
- ///
- /// This property contains data only if the orchestration is in the
- /// state, and only if this instance metadata was fetched with the option to include output data.
- ///
- /// The failure details if the orchestration was in a failed state; null otherwise.
- public TaskFailureDetails? FailureDetails { get; init; }
-
- ///
- /// Gets a value indicating whether the orchestration instance was running at the time this object was fetched.
- ///
- /// true if the orchestration was in a running state; false otherwise.
- public bool IsRunning => this.RuntimeStatus == OrchestrationRuntimeStatus.Running;
-
- ///
- /// Gets a value indicating whether the orchestration instance was completed at the time this object was fetched.
- ///
- ///
- /// An orchestration instance is considered completed when its value is
- /// , ,
- /// or .
- ///
- /// true if the orchestration was in a terminal state; false otherwise.
- public bool IsCompleted =>
- this.RuntimeStatus == OrchestrationRuntimeStatus.Completed ||
- this.RuntimeStatus == OrchestrationRuntimeStatus.Failed ||
- this.RuntimeStatus == OrchestrationRuntimeStatus.Terminated;
-
- [MemberNotNullWhen(true, nameof(DataConverter))]
- bool RequestedInputsAndOutputs => this.DataConverter is not null;
-
- ///
- /// Deserializes the orchestration's input into an object of the specified type.
- ///
- ///
- /// This method can only be used when inputs and outputs are explicitly requested from the
- /// or
- /// method that produced
- /// this object.
- ///
- /// The type to deserialize the orchestration input into.
- /// Returns the deserialized input value.
- ///
- /// Thrown if this metadata object was fetched without the option to read inputs and outputs.
- ///
- public T? ReadInputAs()
- {
- if (!this.RequestedInputsAndOutputs)
- {
- throw new InvalidOperationException(
- $"The {nameof(this.ReadInputAs)} method can only be used on {nameof(OrchestrationMetadata)} objects " +
- "that are fetched with the option to include input data.");
- }
-
- return this.DataConverter.Deserialize(this.SerializedInput);
- }
-
- ///
- /// Deserializes the orchestration's output into an object of the specified type.
- ///
- ///
- /// This method can only be used when inputs and outputs are explicitly requested from the
- /// or
- /// method that produced
- /// this object.
- ///
- /// The type to deserialize the orchestration output into.
- /// Returns the deserialized output value.
- ///
- /// Thrown if this metadata object was fetched without the option to read inputs and outputs.
- ///
- public T? ReadOutputAs()
- {
- if (!this.RequestedInputsAndOutputs)
- {
- throw new InvalidOperationException(
- $"The {nameof(this.ReadOutputAs)} method can only be used on {nameof(OrchestrationMetadata)} objects " +
- "that are fetched with the option to include output data.");
- }
-
- return this.DataConverter.Deserialize(this.SerializedOutput);
- }
-
- ///
- /// Deserializes the orchestration's custom status value into an object of the specified type.
- ///
- ///
- /// This method can only be used when inputs and outputs are explicitly requested from the
- /// or
- /// method that produced
- /// this object.
- ///
- /// The type to deserialize the orchestration' custom status into.
- /// Returns the deserialized custom status value.
- ///
- /// Thrown if this metadata object was fetched without the option to read inputs and outputs.
- ///
- public T? ReadCustomStatusAs()
- {
- if (!this.RequestedInputsAndOutputs)
- {
- throw new InvalidOperationException(
- $"The {nameof(this.ReadCustomStatusAs)} method can only be used on {nameof(OrchestrationMetadata)}"
- + " objects that are fetched with the option to include input and output data.");
- }
-
- return this.DataConverter.Deserialize(this.SerializedCustomStatus);
- }
-
- ///
- /// Generates a user-friendly string representation of the current metadata object.
- ///
- /// A user-friendly string representation of the current metadata object.
- public override string ToString()
- {
- StringBuilder sb = new($"[Name: '{this.Name}', ID: '{this.InstanceId}', RuntimeStatus: {this.RuntimeStatus},"
- + $" CreatedAt: {this.CreatedAt:s}, LastUpdatedAt: {this.LastUpdatedAt:s}");
- if (this.SerializedInput != null)
- {
- sb.Append(", Input: '").Append(GetTrimmedPayload(this.SerializedInput)).Append('\'');
- }
-
- if (this.SerializedOutput != null)
- {
- sb.Append(", Output: '").Append(GetTrimmedPayload(this.SerializedOutput)).Append('\'');
- }
-
- if (this.FailureDetails != null)
- {
- sb.Append(", FailureDetails: '")
- .Append(this.FailureDetails.ErrorType)
- .Append(" - ")
- .Append(GetTrimmedPayload(this.FailureDetails.ErrorMessage))
- .Append('\'');
- }
-
- return sb.Append(']').ToString();
- }
-
- static string GetTrimmedPayload(string payload)
- {
- const int MaxLength = 50;
- if (payload.Length > MaxLength)
- {
- return string.Concat(payload[..MaxLength], "...");
- }
-
- return payload;
- }
-}
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using System.Collections.Immutable;
+using System.Diagnostics.CodeAnalysis;
+using System.Text;
+
+namespace Microsoft.DurableTask.Client;
+
+///
+/// Represents a snapshot of an orchestration instance's current state, including metadata.
+///
+///
+/// Instances of this class are produced by methods in the class, such as
+/// ,
+/// and
+/// .
+///
+public sealed class OrchestrationMetadata
+{
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The name of the orchestration.
+ /// The instance ID of the orchestration.
+ public OrchestrationMetadata(string name, string instanceId)
+ {
+ this.Name = name;
+ this.InstanceId = instanceId;
+ }
+
+ /// Gets the name of the orchestration.
+ /// The name of the orchestration.
+ public string Name { get; }
+
+ /// Gets the unique ID of the orchestration instance.
+ /// The unique ID of the orchestration instance.
+ public string InstanceId { get; }
+
+ ///
+ /// Gets the data converter used to deserialized the serialized data on this instance.
+ /// This will only be present when inputs and outputs are requested, null otherwise.
+ ///
+ /// The optional data converter.
+ public DataConverter? DataConverter { get; init; }
+
+ ///
+ /// Gets the current runtime status of the orchestration instance at the time this object was fetched.
+ ///
+ /// The runtime status of the orchestration instance at the time this object was fetched.
+ public OrchestrationRuntimeStatus RuntimeStatus { get; init; }
+
+ ///
+ /// Gets the orchestration instance's creation time in UTC.
+ ///
+ /// The orchestration instance's creation time in UTC.
+ public DateTimeOffset CreatedAt { get; init; }
+
+ ///
+ /// Gets the orchestration instance's last updated time in UTC.
+ ///
+ /// The orchestration instance's last updated time in UTC.
+ public DateTimeOffset LastUpdatedAt { get; init; }
+
+ ///
+ /// Gets the orchestration instance's serialized input, if any, as a string value.
+ ///
+ /// The serialized orchestration input or null.
+ public string? SerializedInput { get; init; }
+
+ ///
+ /// Gets the orchestration instance's serialized output, if any, as a string value.
+ ///
+ /// The serialized orchestration output or null.
+ public string? SerializedOutput { get; init; }
+
+ ///
+ /// Gets the orchestration instance's serialized custom status, if any, as a string value.
+ ///
+ /// The serialized custom status or null.
+ public string? SerializedCustomStatus { get; init; }
+
+ ///
+ /// Gets the tags associated with the orchestration instance.
+ ///
+ public IReadOnlyDictionary Tags { get; init; } = ImmutableDictionary.Create();
+
+ ///
+ /// Gets the failure details, if any, for the orchestration instance.
+ ///
+ ///
+ /// This property contains data only if the orchestration is in the
+ /// state, and only if this instance metadata was fetched with the option to include output data.
+ ///
+ /// The failure details if the orchestration was in a failed state; null otherwise.
+ public TaskFailureDetails? FailureDetails { get; init; }
+
+ ///
+ /// Gets a value indicating whether the orchestration instance was running at the time this object was fetched.
+ ///
+ /// true if the orchestration was in a running state; false otherwise.
+ public bool IsRunning => this.RuntimeStatus == OrchestrationRuntimeStatus.Running;
+
+ ///
+ /// Gets a value indicating whether the orchestration instance was completed at the time this object was fetched.
+ ///
+ ///
+ /// An orchestration instance is considered completed when its value is
+ /// , ,
+ /// or .
+ ///
+ /// true if the orchestration was in a terminal state; false otherwise.
+ public bool IsCompleted =>
+ this.RuntimeStatus == OrchestrationRuntimeStatus.Completed ||
+ this.RuntimeStatus == OrchestrationRuntimeStatus.Failed ||
+ this.RuntimeStatus == OrchestrationRuntimeStatus.Terminated;
+
+ [MemberNotNullWhen(true, nameof(DataConverter))]
+ bool RequestedInputsAndOutputs => this.DataConverter is not null;
+
+ ///
+ /// Deserializes the orchestration's input into an object of the specified type.
+ ///
+ ///
+ /// This method can only be used when inputs and outputs are explicitly requested from the
+ /// or
+ /// method that produced
+ /// this object.
+ ///
+ /// The type to deserialize the orchestration input into.
+ /// Returns the deserialized input value.
+ ///
+ /// Thrown if this metadata object was fetched without the option to read inputs and outputs.
+ ///
+ public T? ReadInputAs()
+ {
+ if (!this.RequestedInputsAndOutputs)
+ {
+ throw new InvalidOperationException(
+ $"The {nameof(this.ReadInputAs)} method can only be used on {nameof(OrchestrationMetadata)} objects " +
+ "that are fetched with the option to include input data.");
+ }
+
+ return this.DataConverter.Deserialize(this.SerializedInput);
+ }
+
+ ///
+ /// Deserializes the orchestration's output into an object of the specified type.
+ ///
+ ///
+ /// This method can only be used when inputs and outputs are explicitly requested from the
+ /// or
+ /// method that produced
+ /// this object.
+ ///
+ /// The type to deserialize the orchestration output into.
+ /// Returns the deserialized output value.
+ ///
+ /// Thrown if this metadata object was fetched without the option to read inputs and outputs.
+ ///
+ public T? ReadOutputAs()
+ {
+ if (!this.RequestedInputsAndOutputs)
+ {
+ throw new InvalidOperationException(
+ $"The {nameof(this.ReadOutputAs)} method can only be used on {nameof(OrchestrationMetadata)} objects " +
+ "that are fetched with the option to include output data.");
+ }
+
+ return this.DataConverter.Deserialize(this.SerializedOutput);
+ }
+
+ ///
+ /// Deserializes the orchestration's custom status value into an object of the specified type.
+ ///
+ ///
+ /// This method can only be used when inputs and outputs are explicitly requested from the
+ /// or
+ /// method that produced
+ /// this object.
+ ///
+ /// The type to deserialize the orchestration' custom status into.
+ /// Returns the deserialized custom status value.
+ ///
+ /// Thrown if this metadata object was fetched without the option to read inputs and outputs.
+ ///
+ public T? ReadCustomStatusAs()
+ {
+ if (!this.RequestedInputsAndOutputs)
+ {
+ throw new InvalidOperationException(
+ $"The {nameof(this.ReadCustomStatusAs)} method can only be used on {nameof(OrchestrationMetadata)}"
+ + " objects that are fetched with the option to include input and output data.");
+ }
+
+ return this.DataConverter.Deserialize(this.SerializedCustomStatus);
+ }
+
+ ///
+ /// Generates a user-friendly string representation of the current metadata object.
+ ///
+ /// A user-friendly string representation of the current metadata object.
+ public override string ToString()
+ {
+ StringBuilder sb = new($"[Name: '{this.Name}', ID: '{this.InstanceId}', RuntimeStatus: {this.RuntimeStatus},"
+ + $" CreatedAt: {this.CreatedAt:s}, LastUpdatedAt: {this.LastUpdatedAt:s}");
+ if (this.SerializedInput != null)
+ {
+ sb.Append(", Input: '").Append(GetTrimmedPayload(this.SerializedInput)).Append('\'');
+ }
+
+ if (this.SerializedOutput != null)
+ {
+ sb.Append(", Output: '").Append(GetTrimmedPayload(this.SerializedOutput)).Append('\'');
+ }
+
+ if (this.FailureDetails != null)
+ {
+ sb.Append(", FailureDetails: '")
+ .Append(this.FailureDetails.ErrorType)
+ .Append(" - ")
+ .Append(GetTrimmedPayload(this.FailureDetails.ErrorMessage))
+ .Append('\'');
+ }
+
+ return sb.Append(']').ToString();
+ }
+
+ static string GetTrimmedPayload(string payload)
+ {
+ const int MaxLength = 50;
+ if (payload.Length > MaxLength)
+ {
+ return string.Concat(payload[..MaxLength], "...");
+ }
+
+ return payload;
+ }
+}
diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs
index e7fd1da92..32c95b64b 100644
--- a/src/Client/Grpc/GrpcDurableTaskClient.cs
+++ b/src/Client/Grpc/GrpcDurableTaskClient.cs
@@ -1,463 +1,475 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT License.
-
-using System.Diagnostics;
-using System.Text;
-using Google.Protobuf.WellKnownTypes;
-using Microsoft.DurableTask.Client.Entities;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Logging;
-using Microsoft.Extensions.Options;
-using static Microsoft.DurableTask.Protobuf.TaskHubSidecarService;
-using P = Microsoft.DurableTask.Protobuf;
-
-namespace Microsoft.DurableTask.Client.Grpc;
-
-///
-/// Durable Task client implementation that uses gRPC to connect to a remote "sidecar" process.
-///
-public sealed class GrpcDurableTaskClient : DurableTaskClient
-{
- readonly ILogger logger;
- readonly TaskHubSidecarServiceClient sidecarClient;
- readonly GrpcDurableTaskClientOptions options;
- readonly DurableEntityClient? entityClient;
- AsyncDisposable asyncDisposable;
-
- ///
- /// Initializes a new instance of the class.
- ///
- /// The name of the client.
- /// The gRPC client options.
- /// The logger.
- [ActivatorUtilitiesConstructor]
- public GrpcDurableTaskClient(
- string name, IOptionsMonitor options, ILogger logger)
- : this(name, Check.NotNull(options).Get(name), logger)
- {
- }
-
- ///
- /// Initializes a new instance of the class.
- ///
- /// The name of the client.
- /// The gRPC client options.
- /// The logger.
- public GrpcDurableTaskClient(string name, GrpcDurableTaskClientOptions options, ILogger logger)
- : base(name)
- {
- this.logger = Check.NotNull(logger);
- this.options = Check.NotNull(options);
- this.asyncDisposable = GetCallInvoker(options, out CallInvoker callInvoker);
- this.sidecarClient = new TaskHubSidecarServiceClient(callInvoker);
-
- if (this.options.EnableEntitySupport)
- {
- this.entityClient = new GrpcDurableEntityClient(this.Name, this.DataConverter, this.sidecarClient, logger);
- }
- }
-
- ///
- public override DurableEntityClient Entities => this.entityClient
- ?? throw new NotSupportedException($"Durable entities are disabled because {nameof(DurableTaskClientOptions)}.{nameof(DurableTaskClientOptions.EnableEntitySupport)}=false");
-
- DataConverter DataConverter => this.options.DataConverter;
-
- ///
- public override ValueTask DisposeAsync()
- {
- return this.asyncDisposable.DisposeAsync();
- }
-
- ///
- public override async Task ScheduleNewOrchestrationInstanceAsync(
- TaskName orchestratorName,
- object? input = null,
- StartOrchestrationOptions? options = null,
- CancellationToken cancellation = default)
- {
- Check.NotEntity(this.options.EnableEntitySupport, options?.InstanceId);
-
- string version = string.Empty;
- if (!string.IsNullOrEmpty(orchestratorName.Version))
- {
- version = orchestratorName.Version;
- }
- else if (!string.IsNullOrEmpty(this.options.DefaultVersion))
- {
- version = this.options.DefaultVersion;
- }
-
- var request = new P.CreateInstanceRequest
- {
- Name = orchestratorName.Name,
- Version = version,
- InstanceId = options?.InstanceId ?? Guid.NewGuid().ToString("N"),
- Input = this.DataConverter.Serialize(input),
- };
-
- if (Activity.Current?.Id != null || Activity.Current?.TraceStateString != null)
- {
- if (request.ParentTraceContext == null)
- {
- request.ParentTraceContext = new P.TraceContext();
- }
-
- if (Activity.Current?.Id != null)
- {
- request.ParentTraceContext.TraceParent = Activity.Current?.Id;
- }
-
- if (Activity.Current?.TraceStateString != null)
- {
- request.ParentTraceContext.TraceState = Activity.Current?.TraceStateString;
- }
- }
-
- DateTimeOffset? startAt = options?.StartAt;
- this.logger.SchedulingOrchestration(
- request.InstanceId,
- orchestratorName,
- sizeInBytes: request.Input != null ? Encoding.UTF8.GetByteCount(request.Input) : 0,
- startAt.GetValueOrDefault(DateTimeOffset.UtcNow));
-
- if (startAt.HasValue)
- {
- // Convert timestamps to UTC if not already UTC
- request.ScheduledStartTimestamp = Timestamp.FromDateTimeOffset(startAt.Value.ToUniversalTime());
- }
-
- P.CreateInstanceResponse? result = await this.sidecarClient.StartInstanceAsync(
- request, cancellationToken: cancellation);
- return result.InstanceId;
- }
-
- ///
- public override async Task RaiseEventAsync(
- string instanceId, string eventName, object? eventPayload = null, CancellationToken cancellation = default)
- {
- Check.NotNullOrEmpty(instanceId);
- Check.NotNullOrEmpty(eventName);
-
- Check.NotEntity(this.options.EnableEntitySupport, instanceId);
-
- P.RaiseEventRequest request = new()
- {
- InstanceId = instanceId,
- Name = eventName,
- Input = this.DataConverter.Serialize(eventPayload),
- };
-
- await this.sidecarClient.RaiseEventAsync(request, cancellationToken: cancellation);
- }
-
- ///
- public override async Task TerminateInstanceAsync(
- string instanceId, TerminateInstanceOptions? options = null, CancellationToken cancellation = default)
- {
- object? output = options?.Output;
- bool recursive = options?.Recursive ?? false;
-
- Check.NotNullOrEmpty(instanceId);
- Check.NotEntity(this.options.EnableEntitySupport, instanceId);
-
- this.logger.TerminatingInstance(instanceId);
-
- string? serializedOutput = this.DataConverter.Serialize(output);
- await this.sidecarClient.TerminateInstanceAsync(
- new P.TerminateRequest
- {
- InstanceId = instanceId,
- Output = serializedOutput,
- Recursive = recursive,
- },
- cancellationToken: cancellation);
- }
-
- ///
- public override async Task SuspendInstanceAsync(
- string instanceId, string? reason = null, CancellationToken cancellation = default)
- {
- Check.NotEntity(this.options.EnableEntitySupport, instanceId);
-
- P.SuspendRequest request = new()
- {
- InstanceId = instanceId,
- Reason = reason,
- };
-
- try
- {
- await this.sidecarClient.SuspendInstanceAsync(request, cancellationToken: cancellation);
- }
- catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
- {
- throw new OperationCanceledException(
- $"The {nameof(this.SuspendInstanceAsync)} operation was canceled.", e, cancellation);
- }
- }
-
- ///
- public override async Task ResumeInstanceAsync(
- string instanceId, string? reason = null, CancellationToken cancellation = default)
- {
- Check.NotEntity(this.options.EnableEntitySupport, instanceId);
-
- P.ResumeRequest request = new()
- {
- InstanceId = instanceId,
- Reason = reason,
- };
-
- try
- {
- await this.sidecarClient.ResumeInstanceAsync(request, cancellationToken: cancellation);
- }
- catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
- {
- throw new OperationCanceledException(
- $"The {nameof(this.ResumeInstanceAsync)} operation was canceled.", e, cancellation);
- }
- }
-
- ///
- public override async Task GetInstancesAsync(
- string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
- {
- Check.NotEntity(this.options.EnableEntitySupport, instanceId);
-
- if (string.IsNullOrEmpty(instanceId))
- {
- throw new ArgumentNullException(nameof(instanceId));
- }
-
- P.GetInstanceResponse response = await this.sidecarClient.GetInstanceAsync(
- new P.GetInstanceRequest
- {
- InstanceId = instanceId,
- GetInputsAndOutputs = getInputsAndOutputs,
- },
- cancellationToken: cancellation);
-
- // REVIEW: Should we return a non-null value instead of !exists?
- if (!response.Exists)
- {
- return null;
- }
-
- return this.CreateMetadata(response.OrchestrationState, getInputsAndOutputs);
- }
-
- ///
- public override AsyncPageable GetAllInstancesAsync(OrchestrationQuery? filter = null)
- {
- Check.NotEntity(this.options.EnableEntitySupport, filter?.InstanceIdPrefix);
-
- return Pageable.Create(async (continuation, pageSize, cancellation) =>
- {
- P.QueryInstancesRequest request = new()
- {
- Query = new P.InstanceQuery
- {
- CreatedTimeFrom = filter?.CreatedFrom?.ToTimestamp(),
- CreatedTimeTo = filter?.CreatedTo?.ToTimestamp(),
- FetchInputsAndOutputs = filter?.FetchInputsAndOutputs ?? false,
- InstanceIdPrefix = filter?.InstanceIdPrefix,
- MaxInstanceCount = pageSize ?? filter?.PageSize ?? OrchestrationQuery.DefaultPageSize,
- ContinuationToken = continuation ?? filter?.ContinuationToken,
- },
- };
-
- if (filter?.Statuses is not null)
- {
- request.Query.RuntimeStatus.AddRange(filter.Statuses.Select(x => x.ToGrpcStatus()));
- }
-
- if (filter?.TaskHubNames is not null)
- {
- request.Query.TaskHubNames.AddRange(filter.TaskHubNames);
- }
-
- try
- {
- P.QueryInstancesResponse response = await this.sidecarClient.QueryInstancesAsync(
- request, cancellationToken: cancellation);
-
- bool getInputsAndOutputs = filter?.FetchInputsAndOutputs ?? false;
- IReadOnlyList values = response.OrchestrationState
- .Select(x => this.CreateMetadata(x, getInputsAndOutputs))
- .ToList();
-
- return new Page(values, response.ContinuationToken);
- }
- catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
- {
- throw new OperationCanceledException(
- $"The {nameof(this.GetInstancesAsync)} operation was canceled.", e, cancellation);
- }
- });
- }
-
- ///
- public override async Task WaitForInstanceStartAsync(
- string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
- {
- Check.NotEntity(this.options.EnableEntitySupport, instanceId);
-
- this.logger.WaitingForInstanceStart(instanceId, getInputsAndOutputs);
-
- P.GetInstanceRequest request = new()
- {
- InstanceId = instanceId,
- GetInputsAndOutputs = getInputsAndOutputs,
- };
-
- try
- {
- P.GetInstanceResponse response = await this.sidecarClient.WaitForInstanceStartAsync(
- request, cancellationToken: cancellation);
- return this.CreateMetadata(response.OrchestrationState, getInputsAndOutputs);
- }
- catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
- {
- throw new OperationCanceledException(
- $"The {nameof(this.WaitForInstanceStartAsync)} operation was canceled.", e, cancellation);
- }
- }
-
- ///
- public override async Task WaitForInstanceCompletionAsync(
- string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
- {
- Check.NotEntity(this.options.EnableEntitySupport, instanceId);
-
- this.logger.WaitingForInstanceCompletion(instanceId, getInputsAndOutputs);
-
- P.GetInstanceRequest request = new()
- {
- InstanceId = instanceId,
- GetInputsAndOutputs = getInputsAndOutputs,
- };
-
- try
- {
- P.GetInstanceResponse response = await this.sidecarClient.WaitForInstanceCompletionAsync(
- request, cancellationToken: cancellation);
- return this.CreateMetadata(response.OrchestrationState, getInputsAndOutputs);
- }
- catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
- {
- throw new OperationCanceledException(
- $"The {nameof(this.WaitForInstanceCompletionAsync)} operation was canceled.", e, cancellation);
- }
- }
-
- ///
- public override Task PurgeInstanceAsync(
- string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
- {
- bool recursive = options?.Recursive ?? false;
- this.logger.PurgingInstanceMetadata(instanceId);
-
- P.PurgeInstancesRequest request = new() { InstanceId = instanceId, Recursive = recursive };
- return this.PurgeInstancesCoreAsync(request, cancellation);
- }
-
- ///
- public override Task PurgeAllInstancesAsync(
- PurgeInstancesFilter filter, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
- {
- bool recursive = options?.Recursive ?? false;
- this.logger.PurgingInstances(filter);
- P.PurgeInstancesRequest request = new()
- {
- PurgeInstanceFilter = new()
- {
- CreatedTimeFrom = filter?.CreatedFrom.ToTimestamp(),
- CreatedTimeTo = filter?.CreatedTo.ToTimestamp(),
- },
- Recursive = recursive,
- };
-
- if (filter?.Statuses is not null)
- {
- request.PurgeInstanceFilter.RuntimeStatus.AddRange(filter.Statuses.Select(x => x.ToGrpcStatus()));
- }
-
- return this.PurgeInstancesCoreAsync(request, cancellation);
- }
-
- static AsyncDisposable GetCallInvoker(GrpcDurableTaskClientOptions options, out CallInvoker callInvoker)
- {
- if (options.Channel is GrpcChannel c)
- {
- callInvoker = c.CreateCallInvoker();
- return default;
- }
-
- if (options.CallInvoker is CallInvoker invoker)
- {
- callInvoker = invoker;
- return default;
- }
-
- c = GetChannel(options.Address);
- callInvoker = c.CreateCallInvoker();
- return new AsyncDisposable(() => new(c.ShutdownAsync()));
- }
-
-#if NET6_0_OR_GREATER
- static GrpcChannel GetChannel(string? address)
- {
- if (string.IsNullOrEmpty(address))
- {
- address = "http://localhost:4001";
- }
-
- return GrpcChannel.ForAddress(address);
- }
-#endif
-
-#if NETSTANDARD2_0
- static GrpcChannel GetChannel(string? address)
- {
- if (string.IsNullOrEmpty(address))
- {
- address = "localhost:4001";
- }
-
- return new(address, ChannelCredentials.Insecure);
- }
-#endif
-
- async Task PurgeInstancesCoreAsync(
- P.PurgeInstancesRequest request, CancellationToken cancellation = default)
- {
- try
- {
- P.PurgeInstancesResponse response = await this.sidecarClient.PurgeInstancesAsync(
- request, cancellationToken: cancellation);
- return new PurgeResult(response.DeletedInstanceCount);
- }
- catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
- {
- throw new OperationCanceledException(
- $"The {nameof(this.PurgeAllInstancesAsync)} operation was canceled.", e, cancellation);
- }
- }
-
- OrchestrationMetadata CreateMetadata(P.OrchestrationState state, bool includeInputsAndOutputs)
- {
- return new(state.Name, state.InstanceId)
- {
- CreatedAt = state.CreatedTimestamp.ToDateTimeOffset(),
- LastUpdatedAt = state.LastUpdatedTimestamp.ToDateTimeOffset(),
- RuntimeStatus = (OrchestrationRuntimeStatus)state.OrchestrationStatus,
- SerializedInput = state.Input,
- SerializedOutput = state.Output,
- SerializedCustomStatus = state.CustomStatus,
- FailureDetails = state.FailureDetails.ToTaskFailureDetails(),
- DataConverter = includeInputsAndOutputs ? this.DataConverter : null,
- };
- }
-}
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using System.Diagnostics;
+using System.Text;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.DurableTask.Client.Entities;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using static Microsoft.DurableTask.Protobuf.TaskHubSidecarService;
+using P = Microsoft.DurableTask.Protobuf;
+
+namespace Microsoft.DurableTask.Client.Grpc;
+
+///
+/// Durable Task client implementation that uses gRPC to connect to a remote "sidecar" process.
+///
+public sealed class GrpcDurableTaskClient : DurableTaskClient
+{
+ readonly ILogger logger;
+ readonly TaskHubSidecarServiceClient sidecarClient;
+ readonly GrpcDurableTaskClientOptions options;
+ readonly DurableEntityClient? entityClient;
+ AsyncDisposable asyncDisposable;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The name of the client.
+ /// The gRPC client options.
+ /// The logger.
+ [ActivatorUtilitiesConstructor]
+ public GrpcDurableTaskClient(
+ string name, IOptionsMonitor options, ILogger logger)
+ : this(name, Check.NotNull(options).Get(name), logger)
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The name of the client.
+ /// The gRPC client options.
+ /// The logger.
+ public GrpcDurableTaskClient(string name, GrpcDurableTaskClientOptions options, ILogger logger)
+ : base(name)
+ {
+ this.logger = Check.NotNull(logger);
+ this.options = Check.NotNull(options);
+ this.asyncDisposable = GetCallInvoker(options, out CallInvoker callInvoker);
+ this.sidecarClient = new TaskHubSidecarServiceClient(callInvoker);
+
+ if (this.options.EnableEntitySupport)
+ {
+ this.entityClient = new GrpcDurableEntityClient(this.Name, this.DataConverter, this.sidecarClient, logger);
+ }
+ }
+
+ ///
+ public override DurableEntityClient Entities => this.entityClient
+ ?? throw new NotSupportedException($"Durable entities are disabled because {nameof(DurableTaskClientOptions)}.{nameof(DurableTaskClientOptions.EnableEntitySupport)}=false");
+
+ DataConverter DataConverter => this.options.DataConverter;
+
+ ///
+ public override ValueTask DisposeAsync()
+ {
+ return this.asyncDisposable.DisposeAsync();
+ }
+
+ ///
+ public override async Task ScheduleNewOrchestrationInstanceAsync(
+ TaskName orchestratorName,
+ object? input = null,
+ StartOrchestrationOptions? options = null,
+ CancellationToken cancellation = default)
+ {
+ Check.NotEntity(this.options.EnableEntitySupport, options?.InstanceId);
+
+ string version = string.Empty;
+ if (!string.IsNullOrEmpty(orchestratorName.Version))
+ {
+ version = orchestratorName.Version;
+ }
+ else if (!string.IsNullOrEmpty(this.options.DefaultVersion))
+ {
+ version = this.options.DefaultVersion;
+ }
+
+ var request = new P.CreateInstanceRequest
+ {
+ Name = orchestratorName.Name,
+ Version = version,
+ InstanceId = options?.InstanceId ?? Guid.NewGuid().ToString("N"),
+ Input = this.DataConverter.Serialize(input),
+ };
+
+ // Add tags to the collection
+ if (request?.Tags != null && options?.Tags != null)
+ {
+ foreach (KeyValuePair tag in options.Tags)
+ {
+ request.Tags.Add(tag.Key, tag.Value);
+ }
+ }
+
+ if (Activity.Current?.Id != null || Activity.Current?.TraceStateString != null)
+ {
+ if (request.ParentTraceContext == null)
+ {
+ request.ParentTraceContext = new P.TraceContext();
+ }
+
+ if (Activity.Current?.Id != null)
+ {
+ request.ParentTraceContext.TraceParent = Activity.Current?.Id;
+ }
+
+ if (Activity.Current?.TraceStateString != null)
+ {
+ request.ParentTraceContext.TraceState = Activity.Current?.TraceStateString;
+ }
+ }
+
+ DateTimeOffset? startAt = options?.StartAt;
+ this.logger.SchedulingOrchestration(
+ request.InstanceId,
+ orchestratorName,
+ sizeInBytes: request.Input != null ? Encoding.UTF8.GetByteCount(request.Input) : 0,
+ startAt.GetValueOrDefault(DateTimeOffset.UtcNow));
+
+ if (startAt.HasValue)
+ {
+ // Convert timestamps to UTC if not already UTC
+ request.ScheduledStartTimestamp = Timestamp.FromDateTimeOffset(startAt.Value.ToUniversalTime());
+ }
+
+ P.CreateInstanceResponse? result = await this.sidecarClient.StartInstanceAsync(
+ request, cancellationToken: cancellation);
+ return result.InstanceId;
+ }
+
+ ///
+ public override async Task RaiseEventAsync(
+ string instanceId, string eventName, object? eventPayload = null, CancellationToken cancellation = default)
+ {
+ Check.NotNullOrEmpty(instanceId);
+ Check.NotNullOrEmpty(eventName);
+
+ Check.NotEntity(this.options.EnableEntitySupport, instanceId);
+
+ P.RaiseEventRequest request = new()
+ {
+ InstanceId = instanceId,
+ Name = eventName,
+ Input = this.DataConverter.Serialize(eventPayload),
+ };
+
+ await this.sidecarClient.RaiseEventAsync(request, cancellationToken: cancellation);
+ }
+
+ ///
+ public override async Task TerminateInstanceAsync(
+ string instanceId, TerminateInstanceOptions? options = null, CancellationToken cancellation = default)
+ {
+ object? output = options?.Output;
+ bool recursive = options?.Recursive ?? false;
+
+ Check.NotNullOrEmpty(instanceId);
+ Check.NotEntity(this.options.EnableEntitySupport, instanceId);
+
+ this.logger.TerminatingInstance(instanceId);
+
+ string? serializedOutput = this.DataConverter.Serialize(output);
+ await this.sidecarClient.TerminateInstanceAsync(
+ new P.TerminateRequest
+ {
+ InstanceId = instanceId,
+ Output = serializedOutput,
+ Recursive = recursive,
+ },
+ cancellationToken: cancellation);
+ }
+
+ ///
+ public override async Task SuspendInstanceAsync(
+ string instanceId, string? reason = null, CancellationToken cancellation = default)
+ {
+ Check.NotEntity(this.options.EnableEntitySupport, instanceId);
+
+ P.SuspendRequest request = new()
+ {
+ InstanceId = instanceId,
+ Reason = reason,
+ };
+
+ try
+ {
+ await this.sidecarClient.SuspendInstanceAsync(request, cancellationToken: cancellation);
+ }
+ catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
+ {
+ throw new OperationCanceledException(
+ $"The {nameof(this.SuspendInstanceAsync)} operation was canceled.", e, cancellation);
+ }
+ }
+
+ ///
+ public override async Task ResumeInstanceAsync(
+ string instanceId, string? reason = null, CancellationToken cancellation = default)
+ {
+ Check.NotEntity(this.options.EnableEntitySupport, instanceId);
+
+ P.ResumeRequest request = new()
+ {
+ InstanceId = instanceId,
+ Reason = reason,
+ };
+
+ try
+ {
+ await this.sidecarClient.ResumeInstanceAsync(request, cancellationToken: cancellation);
+ }
+ catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
+ {
+ throw new OperationCanceledException(
+ $"The {nameof(this.ResumeInstanceAsync)} operation was canceled.", e, cancellation);
+ }
+ }
+
+ ///
+ public override async Task GetInstancesAsync(
+ string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
+ {
+ Check.NotEntity(this.options.EnableEntitySupport, instanceId);
+
+ if (string.IsNullOrEmpty(instanceId))
+ {
+ throw new ArgumentNullException(nameof(instanceId));
+ }
+
+ P.GetInstanceResponse response = await this.sidecarClient.GetInstanceAsync(
+ new P.GetInstanceRequest
+ {
+ InstanceId = instanceId,
+ GetInputsAndOutputs = getInputsAndOutputs,
+ },
+ cancellationToken: cancellation);
+
+ // REVIEW: Should we return a non-null value instead of !exists?
+ if (!response.Exists)
+ {
+ return null;
+ }
+
+ return this.CreateMetadata(response.OrchestrationState, getInputsAndOutputs);
+ }
+
+ ///
+ public override AsyncPageable GetAllInstancesAsync(OrchestrationQuery? filter = null)
+ {
+ Check.NotEntity(this.options.EnableEntitySupport, filter?.InstanceIdPrefix);
+
+ return Pageable.Create(async (continuation, pageSize, cancellation) =>
+ {
+ P.QueryInstancesRequest request = new()
+ {
+ Query = new P.InstanceQuery
+ {
+ CreatedTimeFrom = filter?.CreatedFrom?.ToTimestamp(),
+ CreatedTimeTo = filter?.CreatedTo?.ToTimestamp(),
+ FetchInputsAndOutputs = filter?.FetchInputsAndOutputs ?? false,
+ InstanceIdPrefix = filter?.InstanceIdPrefix,
+ MaxInstanceCount = pageSize ?? filter?.PageSize ?? OrchestrationQuery.DefaultPageSize,
+ ContinuationToken = continuation ?? filter?.ContinuationToken,
+ },
+ };
+
+ if (filter?.Statuses is not null)
+ {
+ request.Query.RuntimeStatus.AddRange(filter.Statuses.Select(x => x.ToGrpcStatus()));
+ }
+
+ if (filter?.TaskHubNames is not null)
+ {
+ request.Query.TaskHubNames.AddRange(filter.TaskHubNames);
+ }
+
+ try
+ {
+ P.QueryInstancesResponse response = await this.sidecarClient.QueryInstancesAsync(
+ request, cancellationToken: cancellation);
+
+ bool getInputsAndOutputs = filter?.FetchInputsAndOutputs ?? false;
+ IReadOnlyList values = response.OrchestrationState
+ .Select(x => this.CreateMetadata(x, getInputsAndOutputs))
+ .ToList();
+
+ return new Page(values, response.ContinuationToken);
+ }
+ catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
+ {
+ throw new OperationCanceledException(
+ $"The {nameof(this.GetInstancesAsync)} operation was canceled.", e, cancellation);
+ }
+ });
+ }
+
+ ///
+ public override async Task WaitForInstanceStartAsync(
+ string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
+ {
+ Check.NotEntity(this.options.EnableEntitySupport, instanceId);
+
+ this.logger.WaitingForInstanceStart(instanceId, getInputsAndOutputs);
+
+ P.GetInstanceRequest request = new()
+ {
+ InstanceId = instanceId,
+ GetInputsAndOutputs = getInputsAndOutputs,
+ };
+
+ try
+ {
+ P.GetInstanceResponse response = await this.sidecarClient.WaitForInstanceStartAsync(
+ request, cancellationToken: cancellation);
+ return this.CreateMetadata(response.OrchestrationState, getInputsAndOutputs);
+ }
+ catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
+ {
+ throw new OperationCanceledException(
+ $"The {nameof(this.WaitForInstanceStartAsync)} operation was canceled.", e, cancellation);
+ }
+ }
+
+ ///
+ public override async Task WaitForInstanceCompletionAsync(
+ string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
+ {
+ Check.NotEntity(this.options.EnableEntitySupport, instanceId);
+
+ this.logger.WaitingForInstanceCompletion(instanceId, getInputsAndOutputs);
+
+ P.GetInstanceRequest request = new()
+ {
+ InstanceId = instanceId,
+ GetInputsAndOutputs = getInputsAndOutputs,
+ };
+
+ try
+ {
+ P.GetInstanceResponse response = await this.sidecarClient.WaitForInstanceCompletionAsync(
+ request, cancellationToken: cancellation);
+ return this.CreateMetadata(response.OrchestrationState, getInputsAndOutputs);
+ }
+ catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
+ {
+ throw new OperationCanceledException(
+ $"The {nameof(this.WaitForInstanceCompletionAsync)} operation was canceled.", e, cancellation);
+ }
+ }
+
+ ///
+ public override Task PurgeInstanceAsync(
+ string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
+ {
+ bool recursive = options?.Recursive ?? false;
+ this.logger.PurgingInstanceMetadata(instanceId);
+
+ P.PurgeInstancesRequest request = new() { InstanceId = instanceId, Recursive = recursive };
+ return this.PurgeInstancesCoreAsync(request, cancellation);
+ }
+
+ ///
+ public override Task PurgeAllInstancesAsync(
+ PurgeInstancesFilter filter, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
+ {
+ bool recursive = options?.Recursive ?? false;
+ this.logger.PurgingInstances(filter);
+ P.PurgeInstancesRequest request = new()
+ {
+ PurgeInstanceFilter = new()
+ {
+ CreatedTimeFrom = filter?.CreatedFrom.ToTimestamp(),
+ CreatedTimeTo = filter?.CreatedTo.ToTimestamp(),
+ },
+ Recursive = recursive,
+ };
+
+ if (filter?.Statuses is not null)
+ {
+ request.PurgeInstanceFilter.RuntimeStatus.AddRange(filter.Statuses.Select(x => x.ToGrpcStatus()));
+ }
+
+ return this.PurgeInstancesCoreAsync(request, cancellation);
+ }
+
+ static AsyncDisposable GetCallInvoker(GrpcDurableTaskClientOptions options, out CallInvoker callInvoker)
+ {
+ if (options.Channel is GrpcChannel c)
+ {
+ callInvoker = c.CreateCallInvoker();
+ return default;
+ }
+
+ if (options.CallInvoker is CallInvoker invoker)
+ {
+ callInvoker = invoker;
+ return default;
+ }
+
+ c = GetChannel(options.Address);
+ callInvoker = c.CreateCallInvoker();
+ return new AsyncDisposable(() => new(c.ShutdownAsync()));
+ }
+
+#if NET6_0_OR_GREATER
+ static GrpcChannel GetChannel(string? address)
+ {
+ if (string.IsNullOrEmpty(address))
+ {
+ address = "http://localhost:4001";
+ }
+
+ return GrpcChannel.ForAddress(address);
+ }
+#endif
+
+#if NETSTANDARD2_0
+ static GrpcChannel GetChannel(string? address)
+ {
+ if (string.IsNullOrEmpty(address))
+ {
+ address = "localhost:4001";
+ }
+
+ return new(address, ChannelCredentials.Insecure);
+ }
+#endif
+
+ async Task PurgeInstancesCoreAsync(
+ P.PurgeInstancesRequest request, CancellationToken cancellation = default)
+ {
+ try
+ {
+ P.PurgeInstancesResponse response = await this.sidecarClient.PurgeInstancesAsync(
+ request, cancellationToken: cancellation);
+ return new PurgeResult(response.DeletedInstanceCount);
+ }
+ catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
+ {
+ throw new OperationCanceledException(
+ $"The {nameof(this.PurgeAllInstancesAsync)} operation was canceled.", e, cancellation);
+ }
+ }
+
+ OrchestrationMetadata CreateMetadata(P.OrchestrationState state, bool includeInputsAndOutputs)
+ {
+ var metadata = new OrchestrationMetadata(state.Name, state.InstanceId)
+ {
+ CreatedAt = state.CreatedTimestamp.ToDateTimeOffset(),
+ LastUpdatedAt = state.LastUpdatedTimestamp.ToDateTimeOffset(),
+ RuntimeStatus = (OrchestrationRuntimeStatus)state.OrchestrationStatus,
+ SerializedInput = state.Input,
+ SerializedOutput = state.Output,
+ SerializedCustomStatus = state.CustomStatus,
+ FailureDetails = state.FailureDetails.ToTaskFailureDetails(),
+ DataConverter = includeInputsAndOutputs ? this.DataConverter : null,
+ Tags = new Dictionary(state.Tags),
+ };
+
+ return metadata;
+ }
+}
diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs
index ce510fd0a..fe7625a88 100644
--- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs
+++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs
@@ -1,290 +1,290 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT License.
-
-using System.Diagnostics.CodeAnalysis;
-using DurableTask.Core;
-using DurableTask.Core.Entities;
-using DurableTask.Core.History;
-using DurableTask.Core.Query;
-using Microsoft.DurableTask.Client.Entities;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Options;
-using Core = DurableTask.Core;
-using CoreOrchestrationQuery = DurableTask.Core.Query.OrchestrationQuery;
-
-namespace Microsoft.DurableTask.Client.OrchestrationServiceClientShim;
-
-///
-/// A shim client for interacting with the backend via .
-///
-///
-/// Initializes a new instance of the class.
-///
-/// The name of the client.
-/// The client options.
-class ShimDurableTaskClient(string name, ShimDurableTaskClientOptions options) : DurableTaskClient(name)
-{
- readonly ShimDurableTaskClientOptions options = Check.NotNull(options);
- ShimDurableEntityClient? entities;
-
- ///
- /// Initializes a new instance of the class.
- ///
- /// The name of this client.
- /// The client options.
- [ActivatorUtilitiesConstructor]
- public ShimDurableTaskClient(
- string name, IOptionsMonitor options)
- : this(name, Check.NotNull(options).Get(name))
- {
- }
-
- ///
- public override DurableEntityClient Entities
- {
- get
- {
- if (!this.options.EnableEntitySupport)
- {
- throw new InvalidOperationException("Entity support is not enabled.");
- }
-
- if (this.entities is null)
- {
- if (this.options.Entities.Queries is null)
- {
- throw new NotSupportedException(
- "The configured IOrchestrationServiceClient does not support entities.");
- }
-
- this.entities = new(this.Name, this.options);
- }
-
- return this.entities;
- }
- }
-
- DataConverter DataConverter => this.options.DataConverter;
-
- IOrchestrationServiceClient Client => this.options.Client!;
-
- IOrchestrationServicePurgeClient PurgeClient => this.CastClient();
-
- ///
- public override ValueTask DisposeAsync() => default;
-
- ///
- public override async Task GetInstancesAsync(
- string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
- {
- cancellation.ThrowIfCancellationRequested();
- IList states = await this.Client.GetOrchestrationStateAsync(instanceId, false);
- if (states is null or { Count: 0 })
- {
- return null;
- }
-
- return this.ToMetadata(states.First(), getInputsAndOutputs);
- }
-
- ///
- public override AsyncPageable GetAllInstancesAsync(OrchestrationQuery? query = null)
- {
- // Get this early to force an exception if not supported.
- IOrchestrationServiceQueryClient queryClient = this.CastClient();
- return Pageable.Create(async (continuation, pageSize, cancellation) =>
- {
- CoreOrchestrationQuery coreQuery = new()
- {
- RuntimeStatus = query?.Statuses?.Select(x => x.ConvertToCore()).ToList(),
- CreatedTimeFrom = query?.CreatedFrom?.UtcDateTime,
- CreatedTimeTo = query?.CreatedTo?.UtcDateTime,
- TaskHubNames = query?.TaskHubNames?.ToList(),
- PageSize = pageSize ?? query?.PageSize ?? OrchestrationQuery.DefaultPageSize,
- ContinuationToken = continuation ?? query?.ContinuationToken,
- InstanceIdPrefix = query?.InstanceIdPrefix,
- FetchInputsAndOutputs = query?.FetchInputsAndOutputs ?? false,
- };
-
- OrchestrationQueryResult result = await queryClient.GetOrchestrationWithQueryAsync(
- coreQuery, cancellation);
-
- var metadata = result.OrchestrationState.Select(x => this.ToMetadata(x, coreQuery.FetchInputsAndOutputs))
- .ToList();
- return new Page(metadata, result.ContinuationToken);
- });
- }
-
- ///
- public override async Task PurgeInstanceAsync(
- string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
- {
- Check.NotNullOrEmpty(instanceId);
- cancellation.ThrowIfCancellationRequested();
-
- // TODO: Support recursive purge of sub-orchestrations
- Core.PurgeResult result = await this.PurgeClient.PurgeInstanceStateAsync(instanceId);
- return result.ConvertFromCore();
- }
-
- ///
- public override async Task PurgeAllInstancesAsync(
- PurgeInstancesFilter filter, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
- {
- Check.NotNull(filter);
- cancellation.ThrowIfCancellationRequested();
-
- // TODO: Support recursive purge of sub-orchestrations
- Core.PurgeResult result = await this.PurgeClient.PurgeInstanceStateAsync(filter.ConvertToCore());
- return result.ConvertFromCore();
- }
-
- ///
- public override Task RaiseEventAsync(
- string instanceId, string eventName, object? eventPayload = null, CancellationToken cancellation = default)
- {
- Check.NotNullOrEmpty(instanceId);
- Check.NotNullOrEmpty(eventName);
-
- string? serializedInput = this.DataConverter.Serialize(eventPayload);
- return this.SendInstanceMessageAsync(
- instanceId, new EventRaisedEvent(-1, serializedInput) { Name = eventName }, cancellation);
- }
-
- ///
- public override async Task ScheduleNewOrchestrationInstanceAsync(
- TaskName orchestratorName,
- object? input = null,
- StartOrchestrationOptions? options = null,
- CancellationToken cancellation = default)
- {
- cancellation.ThrowIfCancellationRequested();
- string instanceId = options?.InstanceId ?? Guid.NewGuid().ToString("N");
- OrchestrationInstance instance = new()
- {
- InstanceId = instanceId,
- ExecutionId = Guid.NewGuid().ToString("N"),
- };
-
- string? serializedInput = this.DataConverter.Serialize(input);
- TaskMessage message = new()
- {
- OrchestrationInstance = instance,
- Event = new ExecutionStartedEvent(-1, serializedInput)
- {
- Name = orchestratorName.Name,
- Version = orchestratorName.Version,
- OrchestrationInstance = instance,
- ScheduledStartTime = options?.StartAt?.UtcDateTime,
- },
- };
-
- await this.Client.CreateTaskOrchestrationAsync(message);
- return instanceId;
- }
-
- ///
- public override Task SuspendInstanceAsync(
- string instanceId, string? reason = null, CancellationToken cancellation = default)
- => this.SendInstanceMessageAsync(instanceId, new ExecutionSuspendedEvent(-1, reason), cancellation);
-
- ///
- public override Task ResumeInstanceAsync(
- string instanceId, string? reason = null, CancellationToken cancellation = default)
- => this.SendInstanceMessageAsync(instanceId, new ExecutionResumedEvent(-1, reason), cancellation);
-
- ///
- public override Task TerminateInstanceAsync(
- string instanceId, TerminateInstanceOptions? options = null, CancellationToken cancellation = default)
- {
- object? output = options?.Output;
- Check.NotNullOrEmpty(instanceId);
- cancellation.ThrowIfCancellationRequested();
- string? reason = this.DataConverter.Serialize(output);
-
- // TODO: Support recursive termination of sub-orchestrations
- return this.Client.ForceTerminateTaskOrchestrationAsync(instanceId, reason);
- }
-
- ///
- public override async Task WaitForInstanceCompletionAsync(
- string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
- {
- Check.NotNullOrEmpty(instanceId);
- OrchestrationState state = await this.Client.WaitForOrchestrationAsync(
- instanceId, null, TimeSpan.MaxValue, cancellation);
- return this.ToMetadata(state, getInputsAndOutputs);
- }
-
- ///
- public override async Task WaitForInstanceStartAsync(
- string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
- {
- Check.NotNullOrEmpty(instanceId);
-
- while (true)
- {
- OrchestrationMetadata? metadata = await this.GetInstancesAsync(
- instanceId, getInputsAndOutputs, cancellation);
- if (metadata is null)
- {
- throw new InvalidOperationException($"Orchestration with instanceId '{instanceId}' does not exist");
- }
-
- if (metadata.RuntimeStatus != OrchestrationRuntimeStatus.Pending)
- {
- // TODO: Evaluate what to do with "Suspended" state. Do we wait on that?
- return metadata;
- }
-
- await Task.Delay(TimeSpan.FromSeconds(1), cancellation);
- }
- }
-
- [return: NotNullIfNotNull("state")]
- OrchestrationMetadata? ToMetadata(Core.OrchestrationState? state, bool getInputsAndOutputs)
- {
- if (state is null)
- {
- return null;
- }
-
- return new OrchestrationMetadata(state.Name, state.OrchestrationInstance.InstanceId)
- {
- DataConverter = getInputsAndOutputs ? this.DataConverter : null,
- RuntimeStatus = state.OrchestrationStatus.ConvertFromCore(),
- CreatedAt = state.CreatedTime,
- LastUpdatedAt = state.LastUpdatedTime,
- SerializedInput = state.Input,
- SerializedOutput = state.Output,
- SerializedCustomStatus = state.Status,
- FailureDetails = state.FailureDetails?.ConvertFromCore(),
- };
- }
-
- T CastClient()
- {
- if (this.Client is T t)
- {
- return t;
- }
-
- throw new NotSupportedException($"Provided IOrchestrationServiceClient does not implement {typeof(T)}.");
- }
-
- Task SendInstanceMessageAsync(string instanceId, HistoryEvent @event, CancellationToken cancellation)
- {
- Check.NotNullOrEmpty(instanceId);
- Check.NotNull(@event);
-
- cancellation.ThrowIfCancellationRequested();
-
- TaskMessage message = new()
- {
- OrchestrationInstance = new() { InstanceId = instanceId },
- Event = @event,
- };
-
- return this.Client.SendTaskOrchestrationMessageAsync(message);
- }
-}
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using System.Diagnostics.CodeAnalysis;
+using DurableTask.Core;
+using DurableTask.Core.History;
+using DurableTask.Core.Query;
+using Microsoft.DurableTask.Client.Entities;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+using Core = DurableTask.Core;
+using CoreOrchestrationQuery = DurableTask.Core.Query.OrchestrationQuery;
+
+namespace Microsoft.DurableTask.Client.OrchestrationServiceClientShim;
+
+///
+/// A shim client for interacting with the backend via .
+///
+///
+/// Initializes a new instance of the class.
+///
+/// The name of the client.
+/// The client options.
+class ShimDurableTaskClient(string name, ShimDurableTaskClientOptions options) : DurableTaskClient(name)
+{
+ readonly ShimDurableTaskClientOptions options = Check.NotNull(options);
+ ShimDurableEntityClient? entities;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The name of this client.
+ /// The client options.
+ [ActivatorUtilitiesConstructor]
+ public ShimDurableTaskClient(
+ string name, IOptionsMonitor options)
+ : this(name, Check.NotNull(options).Get(name))
+ {
+ }
+
+ ///
+ public override DurableEntityClient Entities
+ {
+ get
+ {
+ if (!this.options.EnableEntitySupport)
+ {
+ throw new InvalidOperationException("Entity support is not enabled.");
+ }
+
+ if (this.entities is null)
+ {
+ if (this.options.Entities.Queries is null)
+ {
+ throw new NotSupportedException(
+ "The configured IOrchestrationServiceClient does not support entities.");
+ }
+
+ this.entities = new(this.Name, this.options);
+ }
+
+ return this.entities;
+ }
+ }
+
+ DataConverter DataConverter => this.options.DataConverter;
+
+ IOrchestrationServiceClient Client => this.options.Client!;
+
+ IOrchestrationServicePurgeClient PurgeClient => this.CastClient();
+
+ ///
+ public override ValueTask DisposeAsync() => default;
+
+ ///
+ public override async Task GetInstancesAsync(
+ string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
+ {
+ cancellation.ThrowIfCancellationRequested();
+ IList states = await this.Client.GetOrchestrationStateAsync(instanceId, false);
+ if (states is null or { Count: 0 })
+ {
+ return null;
+ }
+
+ return this.ToMetadata(states.First(), getInputsAndOutputs);
+ }
+
+ ///
+ public override AsyncPageable GetAllInstancesAsync(OrchestrationQuery? query = null)
+ {
+ // Get this early to force an exception if not supported.
+ IOrchestrationServiceQueryClient queryClient = this.CastClient();
+ return Pageable.Create(async (continuation, pageSize, cancellation) =>
+ {
+ CoreOrchestrationQuery coreQuery = new()
+ {
+ RuntimeStatus = query?.Statuses?.Select(x => x.ConvertToCore()).ToList(),
+ CreatedTimeFrom = query?.CreatedFrom?.UtcDateTime,
+ CreatedTimeTo = query?.CreatedTo?.UtcDateTime,
+ TaskHubNames = query?.TaskHubNames?.ToList(),
+ PageSize = pageSize ?? query?.PageSize ?? OrchestrationQuery.DefaultPageSize,
+ ContinuationToken = continuation ?? query?.ContinuationToken,
+ InstanceIdPrefix = query?.InstanceIdPrefix,
+ FetchInputsAndOutputs = query?.FetchInputsAndOutputs ?? false,
+ };
+
+ OrchestrationQueryResult result = await queryClient.GetOrchestrationWithQueryAsync(
+ coreQuery, cancellation);
+
+ var metadata = result.OrchestrationState.Select(x => this.ToMetadata(x, coreQuery.FetchInputsAndOutputs))
+ .ToList();
+ return new Page(metadata, result.ContinuationToken);
+ });
+ }
+
+ ///
+ public override async Task PurgeInstanceAsync(
+ string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
+ {
+ Check.NotNullOrEmpty(instanceId);
+ cancellation.ThrowIfCancellationRequested();
+
+ // TODO: Support recursive purge of sub-orchestrations
+ Core.PurgeResult result = await this.PurgeClient.PurgeInstanceStateAsync(instanceId);
+ return result.ConvertFromCore();
+ }
+
+ ///
+ public override async Task PurgeAllInstancesAsync(
+ PurgeInstancesFilter filter, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
+ {
+ Check.NotNull(filter);
+ cancellation.ThrowIfCancellationRequested();
+
+ // TODO: Support recursive purge of sub-orchestrations
+ Core.PurgeResult result = await this.PurgeClient.PurgeInstanceStateAsync(filter.ConvertToCore());
+ return result.ConvertFromCore();
+ }
+
+ ///
+ public override Task RaiseEventAsync(
+ string instanceId, string eventName, object? eventPayload = null, CancellationToken cancellation = default)
+ {
+ Check.NotNullOrEmpty(instanceId);
+ Check.NotNullOrEmpty(eventName);
+
+ string? serializedInput = this.DataConverter.Serialize(eventPayload);
+ return this.SendInstanceMessageAsync(
+ instanceId, new EventRaisedEvent(-1, serializedInput) { Name = eventName }, cancellation);
+ }
+
+ ///
+ public override async Task ScheduleNewOrchestrationInstanceAsync(
+ TaskName orchestratorName,
+ object? input = null,
+ StartOrchestrationOptions? options = null,
+ CancellationToken cancellation = default)
+ {
+ cancellation.ThrowIfCancellationRequested();
+ string instanceId = options?.InstanceId ?? Guid.NewGuid().ToString("N");
+ OrchestrationInstance instance = new()
+ {
+ InstanceId = instanceId,
+ ExecutionId = Guid.NewGuid().ToString("N"),
+ };
+
+ string? serializedInput = this.DataConverter.Serialize(input);
+ TaskMessage message = new()
+ {
+ OrchestrationInstance = instance,
+ Event = new ExecutionStartedEvent(-1, serializedInput)
+ {
+ Name = orchestratorName.Name,
+ Version = orchestratorName.Version,
+ OrchestrationInstance = instance,
+ ScheduledStartTime = options?.StartAt?.UtcDateTime,
+ Tags = options?.Tags != null ? options.Tags.ToDictionary(kvp => kvp.Key, kvp => kvp.Value) : null,
+ },
+ };
+
+ await this.Client.CreateTaskOrchestrationAsync(message);
+ return instanceId;
+ }
+
+ ///
+ public override Task SuspendInstanceAsync(
+ string instanceId, string? reason = null, CancellationToken cancellation = default)
+ => this.SendInstanceMessageAsync(instanceId, new ExecutionSuspendedEvent(-1, reason), cancellation);
+
+ ///
+ public override Task ResumeInstanceAsync(
+ string instanceId, string? reason = null, CancellationToken cancellation = default)
+ => this.SendInstanceMessageAsync(instanceId, new ExecutionResumedEvent(-1, reason), cancellation);
+
+ ///
+ public override Task TerminateInstanceAsync(
+ string instanceId, TerminateInstanceOptions? options = null, CancellationToken cancellation = default)
+ {
+ object? output = options?.Output;
+ Check.NotNullOrEmpty(instanceId);
+ cancellation.ThrowIfCancellationRequested();
+ string? reason = this.DataConverter.Serialize(output);
+
+ // TODO: Support recursive termination of sub-orchestrations
+ return this.Client.ForceTerminateTaskOrchestrationAsync(instanceId, reason);
+ }
+
+ ///
+ public override async Task WaitForInstanceCompletionAsync(
+ string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
+ {
+ Check.NotNullOrEmpty(instanceId);
+ OrchestrationState state = await this.Client.WaitForOrchestrationAsync(
+ instanceId, null, TimeSpan.MaxValue, cancellation);
+ return this.ToMetadata(state, getInputsAndOutputs);
+ }
+
+ ///
+ public override async Task WaitForInstanceStartAsync(
+ string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
+ {
+ Check.NotNullOrEmpty(instanceId);
+
+ while (true)
+ {
+ OrchestrationMetadata? metadata = await this.GetInstancesAsync(
+ instanceId, getInputsAndOutputs, cancellation);
+ if (metadata is null)
+ {
+ throw new InvalidOperationException($"Orchestration with instanceId '{instanceId}' does not exist");
+ }
+
+ if (metadata.RuntimeStatus != OrchestrationRuntimeStatus.Pending)
+ {
+ // TODO: Evaluate what to do with "Suspended" state. Do we wait on that?
+ return metadata;
+ }
+
+ await Task.Delay(TimeSpan.FromSeconds(1), cancellation);
+ }
+ }
+
+ [return: NotNullIfNotNull("state")]
+ OrchestrationMetadata? ToMetadata(Core.OrchestrationState? state, bool getInputsAndOutputs)
+ {
+ if (state is null)
+ {
+ return null;
+ }
+
+ return new OrchestrationMetadata(state.Name, state.OrchestrationInstance.InstanceId)
+ {
+ DataConverter = getInputsAndOutputs ? this.DataConverter : null,
+ RuntimeStatus = state.OrchestrationStatus.ConvertFromCore(),
+ CreatedAt = state.CreatedTime,
+ LastUpdatedAt = state.LastUpdatedTime,
+ SerializedInput = state.Input,
+ SerializedOutput = state.Output,
+ SerializedCustomStatus = state.Status,
+ FailureDetails = state.FailureDetails?.ConvertFromCore(),
+ };
+ }
+
+ T CastClient()
+ {
+ if (this.Client is T t)
+ {
+ return t;
+ }
+
+ throw new NotSupportedException($"Provided IOrchestrationServiceClient does not implement {typeof(T)}.");
+ }
+
+ Task SendInstanceMessageAsync(string instanceId, HistoryEvent @event, CancellationToken cancellation)
+ {
+ Check.NotNullOrEmpty(instanceId);
+ Check.NotNull(@event);
+
+ cancellation.ThrowIfCancellationRequested();
+
+ TaskMessage message = new()
+ {
+ OrchestrationInstance = new() { InstanceId = instanceId },
+ Event = @event,
+ };
+
+ return this.Client.SendTaskOrchestrationMessageAsync(message);
+ }
+}
diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto
index 0fa6b6595..64e752818 100644
--- a/src/Grpc/orchestrator_service.proto
+++ b/src/Grpc/orchestrator_service.proto
@@ -75,6 +75,7 @@ message ExecutionStartedEvent {
google.protobuf.Timestamp scheduledStartTimestamp = 6;
TraceContext parentTraceContext = 7;
google.protobuf.StringValue orchestrationSpanID = 8;
+ map tags = 9;
}
message ExecutionCompletedEvent {
@@ -343,14 +344,8 @@ message CreateInstanceRequest {
}
message OrchestrationIdReusePolicy {
- repeated OrchestrationStatus operationStatus = 1;
- CreateOrchestrationAction action = 2;
-}
-
-enum CreateOrchestrationAction {
- ERROR = 0;
- IGNORE = 1;
- TERMINATE = 2;
+ repeated OrchestrationStatus replaceableStatus = 1;
+ reserved 2;
}
message CreateInstanceResponse {
@@ -391,6 +386,7 @@ message OrchestrationState {
google.protobuf.StringValue executionId = 12;
google.protobuf.Timestamp completedTimestamp = 13;
google.protobuf.StringValue parentInstanceId = 14;
+ map tags = 15;
}
message RaiseEventRequest {
@@ -731,4 +727,4 @@ message StreamInstanceHistoryRequest {
message HistoryChunk {
repeated HistoryEvent events = 1;
-}
\ No newline at end of file
+}
diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt
index ca514f29a..c016c8199 100644
--- a/src/Grpc/versions.txt
+++ b/src/Grpc/versions.txt
@@ -1,2 +1,2 @@
-# The following files were downloaded from branch main at 2025-02-19 06:25:02 UTC
-https://raw.githubusercontent.com/microsoft/durabletask-protobuf/589cb5ecd9dd4b1fe463750defa3e2c84276b079/protos/orchestrator_service.proto
+# The following files were downloaded from branch main at 2025-03-19 19:55:31 UTC
+https://raw.githubusercontent.com/microsoft/durabletask-protobuf/4792f47019ab2b3e9ea979fb4af72427a4144c51/protos/orchestrator_service.proto
diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs
index c3c0e45f7..9a18c524c 100644
--- a/src/Shared/Grpc/ProtoUtils.cs
+++ b/src/Shared/Grpc/ProtoUtils.cs
@@ -1,1042 +1,1044 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT License.
-
-using System.Buffers;
-using System.Buffers.Text;
-using System.Diagnostics.CodeAnalysis;
-using System.Runtime.CompilerServices;
-using System.Text;
-using DurableTask.Core;
-using DurableTask.Core.Command;
-using DurableTask.Core.Entities;
-using DurableTask.Core.Entities.OperationFormat;
-using DurableTask.Core.History;
-using Google.Protobuf;
-using Google.Protobuf.WellKnownTypes;
-using DTCore = DurableTask.Core;
-using P = Microsoft.DurableTask.Protobuf;
-
-namespace Microsoft.DurableTask;
-
-///
-/// Protobuf utilities and helpers.
-///
-static class ProtoUtils
-{
- ///
- /// Converts a history event from to .
- ///
- /// The proto history event to converter.
- /// The converted history event.
- /// When the provided history event type is not supported.
- internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto)
- {
- return ConvertHistoryEvent(proto, conversionState: null);
- }
-
- ///
- /// Converts a history event from to , and performs
- /// stateful conversions of entity-related events.
- ///
- /// The proto history event to converter.
- /// State needed for converting entity-related history entries and actions.
- /// The converted history event.
- /// When the provided history event type is not supported.
- internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto, EntityConversionState? conversionState)
- {
- Check.NotNull(proto);
- HistoryEvent historyEvent;
- switch (proto.EventTypeCase)
- {
- case P.HistoryEvent.EventTypeOneofCase.ContinueAsNew:
- historyEvent = new ContinueAsNewEvent(proto.EventId, proto.ContinueAsNew.Input);
- break;
- case P.HistoryEvent.EventTypeOneofCase.ExecutionStarted:
- OrchestrationInstance instance = proto.ExecutionStarted.OrchestrationInstance.ToCore();
- conversionState?.SetOrchestrationInstance(instance);
- historyEvent = new ExecutionStartedEvent(proto.EventId, proto.ExecutionStarted.Input)
- {
- Name = proto.ExecutionStarted.Name,
- Version = proto.ExecutionStarted.Version,
- OrchestrationInstance = instance,
- ParentInstance = proto.ExecutionStarted.ParentInstance == null ? null : new ParentInstance
- {
- Name = proto.ExecutionStarted.ParentInstance.Name,
- Version = proto.ExecutionStarted.ParentInstance.Version,
- OrchestrationInstance = proto.ExecutionStarted.ParentInstance.OrchestrationInstance.ToCore(),
- TaskScheduleId = proto.ExecutionStarted.ParentInstance.TaskScheduledId,
- },
- ScheduledStartTime = proto.ExecutionStarted.ScheduledStartTimestamp?.ToDateTime(),
- };
- break;
- case P.HistoryEvent.EventTypeOneofCase.ExecutionCompleted:
- historyEvent = new ExecutionCompletedEvent(
- proto.EventId,
- proto.ExecutionCompleted.Result,
- proto.ExecutionCompleted.OrchestrationStatus.ToCore());
- break;
- case P.HistoryEvent.EventTypeOneofCase.ExecutionTerminated:
- historyEvent = new ExecutionTerminatedEvent(proto.EventId, proto.ExecutionTerminated.Input);
- break;
- case P.HistoryEvent.EventTypeOneofCase.ExecutionSuspended:
- historyEvent = new ExecutionSuspendedEvent(proto.EventId, proto.ExecutionSuspended.Input);
- break;
- case P.HistoryEvent.EventTypeOneofCase.ExecutionResumed:
- historyEvent = new ExecutionResumedEvent(proto.EventId, proto.ExecutionResumed.Input);
- break;
- case P.HistoryEvent.EventTypeOneofCase.TaskScheduled:
- historyEvent = new TaskScheduledEvent(
- proto.EventId,
- proto.TaskScheduled.Name,
- proto.TaskScheduled.Version,
- proto.TaskScheduled.Input);
- break;
- case P.HistoryEvent.EventTypeOneofCase.TaskCompleted:
- historyEvent = new TaskCompletedEvent(
- proto.EventId,
- proto.TaskCompleted.TaskScheduledId,
- proto.TaskCompleted.Result);
- break;
- case P.HistoryEvent.EventTypeOneofCase.TaskFailed:
- historyEvent = new TaskFailedEvent(
- proto.EventId,
- proto.TaskFailed.TaskScheduledId,
- reason: null, /* not supported */
- details: null, /* not supported */
- proto.TaskFailed.FailureDetails.ToCore());
- break;
- case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCreated:
- historyEvent = new SubOrchestrationInstanceCreatedEvent(proto.EventId)
- {
- Input = proto.SubOrchestrationInstanceCreated.Input,
- InstanceId = proto.SubOrchestrationInstanceCreated.InstanceId,
- Name = proto.SubOrchestrationInstanceCreated.Name,
- Version = proto.SubOrchestrationInstanceCreated.Version,
- };
- break;
- case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCompleted:
- historyEvent = new SubOrchestrationInstanceCompletedEvent(
- proto.EventId,
- proto.SubOrchestrationInstanceCompleted.TaskScheduledId,
- proto.SubOrchestrationInstanceCompleted.Result);
- break;
- case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceFailed:
- historyEvent = new SubOrchestrationInstanceFailedEvent(
- proto.EventId,
- proto.SubOrchestrationInstanceFailed.TaskScheduledId,
- reason: null /* not supported */,
- details: null /* not supported */,
- proto.SubOrchestrationInstanceFailed.FailureDetails.ToCore());
- break;
- case P.HistoryEvent.EventTypeOneofCase.TimerCreated:
- historyEvent = new TimerCreatedEvent(
- proto.EventId,
- proto.TimerCreated.FireAt.ToDateTime());
- break;
- case P.HistoryEvent.EventTypeOneofCase.TimerFired:
- historyEvent = new TimerFiredEvent(
- eventId: -1,
- proto.TimerFired.FireAt.ToDateTime())
- {
- TimerId = proto.TimerFired.TimerId,
- };
- break;
- case P.HistoryEvent.EventTypeOneofCase.OrchestratorStarted:
- historyEvent = new OrchestratorStartedEvent(proto.EventId);
- break;
- case P.HistoryEvent.EventTypeOneofCase.OrchestratorCompleted:
- historyEvent = new OrchestratorCompletedEvent(proto.EventId);
- break;
- case P.HistoryEvent.EventTypeOneofCase.EventSent:
- historyEvent = new EventSentEvent(proto.EventId)
- {
- InstanceId = proto.EventSent.InstanceId,
- Name = proto.EventSent.Name,
- Input = proto.EventSent.Input,
- };
- break;
- case P.HistoryEvent.EventTypeOneofCase.EventRaised:
- historyEvent = new EventRaisedEvent(proto.EventId, proto.EventRaised.Input)
- {
- Name = proto.EventRaised.Name,
- };
- break;
- case P.HistoryEvent.EventTypeOneofCase.EntityOperationCalled:
- historyEvent = EntityConversions.EncodeOperationCalled(proto, conversionState!.CurrentInstance);
- conversionState?.EntityRequestIds.Add(proto.EntityOperationCalled.RequestId);
- break;
- case P.HistoryEvent.EventTypeOneofCase.EntityOperationSignaled:
- historyEvent = EntityConversions.EncodeOperationSignaled(proto);
- conversionState?.EntityRequestIds.Add(proto.EntityOperationSignaled.RequestId);
- break;
- case P.HistoryEvent.EventTypeOneofCase.EntityLockRequested:
- historyEvent = EntityConversions.EncodeLockRequested(proto, conversionState!.CurrentInstance);
- conversionState?.AddUnlockObligations(proto.EntityLockRequested);
- break;
- case P.HistoryEvent.EventTypeOneofCase.EntityUnlockSent:
- historyEvent = EntityConversions.EncodeUnlockSent(proto, conversionState!.CurrentInstance);
- conversionState?.RemoveUnlockObligation(proto.EntityUnlockSent.TargetInstanceId);
- break;
- case P.HistoryEvent.EventTypeOneofCase.EntityLockGranted:
- historyEvent = EntityConversions.EncodeLockGranted(proto);
- break;
- case P.HistoryEvent.EventTypeOneofCase.EntityOperationCompleted:
- historyEvent = EntityConversions.EncodeOperationCompleted(proto);
- break;
- case P.HistoryEvent.EventTypeOneofCase.EntityOperationFailed:
- historyEvent = EntityConversions.EncodeOperationFailed(proto);
- break;
- case P.HistoryEvent.EventTypeOneofCase.GenericEvent:
- historyEvent = new GenericEvent(proto.EventId, proto.GenericEvent.Data);
- break;
- case P.HistoryEvent.EventTypeOneofCase.HistoryState:
- historyEvent = new HistoryStateEvent(
- proto.EventId,
- new OrchestrationState
- {
- OrchestrationInstance = new OrchestrationInstance
- {
- InstanceId = proto.HistoryState.OrchestrationState.InstanceId,
- },
- Name = proto.HistoryState.OrchestrationState.Name,
- Version = proto.HistoryState.OrchestrationState.Version,
- ScheduledStartTime = proto.HistoryState.OrchestrationState.ScheduledStartTimestamp.ToDateTime(),
- CreatedTime = proto.HistoryState.OrchestrationState.CreatedTimestamp.ToDateTime(),
- LastUpdatedTime = proto.HistoryState.OrchestrationState.LastUpdatedTimestamp.ToDateTime(),
- Input = proto.HistoryState.OrchestrationState.Input,
- Output = proto.HistoryState.OrchestrationState.Output,
- Status = proto.HistoryState.OrchestrationState.CustomStatus,
- });
- break;
- default:
- throw new NotSupportedException($"Deserialization of {proto.EventTypeCase} is not supported.");
- }
-
- historyEvent.Timestamp = proto.Timestamp.ToDateTime();
- return historyEvent;
- }
-
- ///
- /// Converts a to a gRPC .
- ///
- /// The date-time to convert.
- /// The gRPC timestamp.
- internal static Timestamp ToTimestamp(this DateTime dateTime)
- {
- // The protobuf libraries require timestamps to be in UTC
- if (dateTime.Kind == DateTimeKind.Unspecified)
- {
- dateTime = DateTime.SpecifyKind(dateTime, DateTimeKind.Utc);
- }
- else if (dateTime.Kind == DateTimeKind.Local)
- {
- dateTime = dateTime.ToUniversalTime();
- }
-
- return Timestamp.FromDateTime(dateTime);
- }
-
- ///
- /// Converts a to a gRPC .
- ///
- /// The date-time to convert.
- /// The gRPC timestamp.
- internal static Timestamp? ToTimestamp(this DateTime? dateTime)
- => dateTime.HasValue ? dateTime.Value.ToTimestamp() : null;
-
- ///
- /// Converts a to a gRPC .
- ///
- /// The date-time to convert.
- /// The gRPC timestamp.
- internal static Timestamp ToTimestamp(this DateTimeOffset dateTime) => Timestamp.FromDateTimeOffset(dateTime);
-
- ///
- /// Converts a to a gRPC .
- ///
- /// The date-time to convert.
- /// The gRPC timestamp.
- internal static Timestamp? ToTimestamp(this DateTimeOffset? dateTime)
- => dateTime.HasValue ? dateTime.Value.ToTimestamp() : null;
-
- ///
- /// Constructs a .
- ///
- /// The orchestrator instance ID.
- /// The orchestrator customer status or null if no custom status.
- /// The orchestrator actions.
- ///
- /// The completion token for the work item. It must be the exact same
- /// value that was provided by the corresponding that triggered the orchestrator execution.
- ///
- /// The entity conversion state, or null if no conversion is required.
- /// The orchestrator response.
- /// When an orchestrator action is unknown.
- internal static P.OrchestratorResponse ConstructOrchestratorResponse(
- string instanceId,
- string? customStatus,
- IEnumerable actions,
- string completionToken,
- EntityConversionState? entityConversionState)
- {
- Check.NotNull(actions);
- var response = new P.OrchestratorResponse
- {
- InstanceId = instanceId,
- CustomStatus = customStatus,
- CompletionToken = completionToken,
- };
-
- foreach (OrchestratorAction action in actions)
- {
- var protoAction = new P.OrchestratorAction { Id = action.Id };
-
- switch (action.OrchestratorActionType)
- {
- case OrchestratorActionType.ScheduleOrchestrator:
- var scheduleTaskAction = (ScheduleTaskOrchestratorAction)action;
- protoAction.ScheduleTask = new P.ScheduleTaskAction
- {
- Name = scheduleTaskAction.Name,
- Version = scheduleTaskAction.Version,
- Input = scheduleTaskAction.Input,
- };
- break;
- case OrchestratorActionType.CreateSubOrchestration:
- var subOrchestrationAction = (CreateSubOrchestrationAction)action;
- protoAction.CreateSubOrchestration = new P.CreateSubOrchestrationAction
- {
- Input = subOrchestrationAction.Input,
- InstanceId = subOrchestrationAction.InstanceId,
- Name = subOrchestrationAction.Name,
- Version = subOrchestrationAction.Version,
- };
- break;
- case OrchestratorActionType.CreateTimer:
- var createTimerAction = (CreateTimerOrchestratorAction)action;
- protoAction.CreateTimer = new P.CreateTimerAction
- {
- FireAt = createTimerAction.FireAt.ToTimestamp(),
- };
- break;
- case OrchestratorActionType.SendEvent:
- var sendEventAction = (SendEventOrchestratorAction)action;
- if (sendEventAction.Instance == null)
- {
- throw new ArgumentException(
- $"{nameof(SendEventOrchestratorAction)} cannot have a null Instance property!");
- }
-
- if (entityConversionState is not null
- && DTCore.Common.Entities.IsEntityInstance(sendEventAction.Instance.InstanceId)
- && sendEventAction.EventName is not null
- && sendEventAction.EventData is not null)
- {
- P.SendEntityMessageAction sendAction = new P.SendEntityMessageAction();
- protoAction.SendEntityMessage = sendAction;
-
- EntityConversions.DecodeEntityMessageAction(
- sendEventAction.EventName,
- sendEventAction.EventData,
- sendEventAction.Instance.InstanceId,
- sendAction,
- out string requestId);
-
- entityConversionState.EntityRequestIds.Add(requestId);
-
- switch (sendAction.EntityMessageTypeCase)
- {
- case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityLockRequested:
- entityConversionState.AddUnlockObligations(sendAction.EntityLockRequested);
- break;
- case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityUnlockSent:
- entityConversionState.RemoveUnlockObligation(sendAction.EntityUnlockSent.TargetInstanceId);
- break;
- default:
- break;
- }
- }
- else
- {
- protoAction.SendEvent = new P.SendEventAction
- {
- Instance = sendEventAction.Instance.ToProtobuf(),
- Name = sendEventAction.EventName,
- Data = sendEventAction.EventData,
- };
- }
-
- break;
- case OrchestratorActionType.OrchestrationComplete:
-
- if (entityConversionState is not null)
- {
- // as a precaution, unlock any entities that were not unlocked for some reason, before
- // completing the orchestration.
- foreach ((string target, string criticalSectionId) in entityConversionState.ResetObligations())
- {
- response.Actions.Add(new P.OrchestratorAction
- {
- Id = action.Id,
- SendEntityMessage = new P.SendEntityMessageAction
- {
- EntityUnlockSent = new P.EntityUnlockSentEvent
- {
- CriticalSectionId = criticalSectionId,
- TargetInstanceId = target,
- ParentInstanceId = entityConversionState.CurrentInstance?.InstanceId,
- },
- },
- });
- }
- }
-
- var completeAction = (OrchestrationCompleteOrchestratorAction)action;
- protoAction.CompleteOrchestration = new P.CompleteOrchestrationAction
- {
- CarryoverEvents =
- {
- // TODO
- },
- Details = completeAction.Details,
- NewVersion = completeAction.NewVersion,
- OrchestrationStatus = completeAction.OrchestrationStatus.ToProtobuf(),
- Result = completeAction.Result,
- };
-
- if (completeAction.OrchestrationStatus == OrchestrationStatus.Failed)
- {
- protoAction.CompleteOrchestration.FailureDetails = completeAction.FailureDetails.ToProtobuf();
- }
-
- break;
- default:
- throw new NotSupportedException($"Unknown orchestrator action: {action.OrchestratorActionType}");
- }
-
- response.Actions.Add(protoAction);
- }
-
- return response;
- }
-
- ///
- /// Converts a to a .
- ///
- /// The status to convert.
- /// The converted status.
- internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status)
- {
- return (OrchestrationStatus)status;
- }
-
- ///
- /// Converts a to a .
- ///
- /// The status to convert.
- /// The converted status.
- [return: NotNullIfNotNull(nameof(status))]
- internal static OrchestrationInstance? ToCore(this P.OrchestrationInstance? status)
- {
- if (status == null)
- {
- return null;
- }
-
- return new OrchestrationInstance
- {
- InstanceId = status.InstanceId,
- ExecutionId = status.ExecutionId,
- };
- }
-
- ///
- /// Converts a to a .
- ///
- /// The failure details to convert.
- /// The converted failure details.
- [return: NotNullIfNotNull(nameof(failureDetails))]
- internal static TaskFailureDetails? ToTaskFailureDetails(this P.TaskFailureDetails? failureDetails)
- {
- if (failureDetails == null)
- {
- return null;
- }
-
- return new TaskFailureDetails(
- failureDetails.ErrorType,
- failureDetails.ErrorMessage,
- failureDetails.StackTrace,
- failureDetails.InnerFailure.ToTaskFailureDetails());
- }
-
- ///
- /// Converts a to .
- ///
- /// The exception to convert.
- /// The task failure details.
- [return: NotNullIfNotNull(nameof(e))]
- internal static P.TaskFailureDetails? ToTaskFailureDetails(this Exception? e)
- {
- if (e == null)
- {
- return null;
- }
-
- return new P.TaskFailureDetails
- {
- ErrorType = e.GetType().FullName,
- ErrorMessage = e.Message,
- StackTrace = e.StackTrace,
- InnerFailure = e.InnerException.ToTaskFailureDetails(),
- };
- }
-
- ///
- /// Converts a to a .
- ///
- /// The entity batch request to convert.
- /// The converted entity batch request.
- [return: NotNullIfNotNull(nameof(entityBatchRequest))]
- internal static EntityBatchRequest? ToEntityBatchRequest(this P.EntityBatchRequest? entityBatchRequest)
- {
- if (entityBatchRequest == null)
- {
- return null;
- }
-
- return new EntityBatchRequest()
- {
- EntityState = entityBatchRequest.EntityState,
- InstanceId = entityBatchRequest.InstanceId,
- Operations = entityBatchRequest.Operations.Select(r => r.ToOperationRequest()).ToList(),
- };
- }
-
- ///
- /// Converts a to a .
- ///
- /// The entity request to convert.
- /// The converted request.
- /// Additional info about each operation, required by DTS.
- internal static void ToEntityBatchRequest(
- this P.EntityRequest entityRequest,
- out EntityBatchRequest batchRequest,
- out List operationInfos)
- {
- batchRequest = new EntityBatchRequest()
- {
- EntityState = entityRequest.EntityState,
- InstanceId = entityRequest.InstanceId,
- Operations = [], // operations are added to this collection below
- };
-
- operationInfos = new(entityRequest.OperationRequests.Count);
-
- foreach (P.HistoryEvent? op in entityRequest.OperationRequests)
- {
- if (op.EntityOperationSignaled is not null)
- {
- batchRequest.Operations.Add(new OperationRequest
- {
- Id = Guid.Parse(op.EntityOperationSignaled.RequestId),
- Operation = op.EntityOperationSignaled.Operation,
- Input = op.EntityOperationSignaled.Input,
- });
- operationInfos.Add(new P.OperationInfo
- {
- RequestId = op.EntityOperationSignaled.RequestId,
- ResponseDestination = null, // means we don't send back a response to the caller
- });
- }
- else if (op.EntityOperationCalled is not null)
- {
- batchRequest.Operations.Add(new OperationRequest
- {
- Id = Guid.Parse(op.EntityOperationCalled.RequestId),
- Operation = op.EntityOperationCalled.Operation,
- Input = op.EntityOperationCalled.Input,
- });
- operationInfos.Add(new P.OperationInfo
- {
- RequestId = op.EntityOperationCalled.RequestId,
- ResponseDestination = new P.OrchestrationInstance
- {
- InstanceId = op.EntityOperationCalled.ParentInstanceId,
- ExecutionId = op.EntityOperationCalled.ParentExecutionId,
- },
- });
- }
- }
- }
-
- ///
- /// Converts a to a .
- ///
- /// The operation request to convert.
- /// The converted operation request.
- [return: NotNullIfNotNull(nameof(operationRequest))]
- internal static OperationRequest? ToOperationRequest(this P.OperationRequest? operationRequest)
- {
- if (operationRequest == null)
- {
- return null;
- }
-
- return new OperationRequest()
- {
- Operation = operationRequest.Operation,
- Input = operationRequest.Input,
- Id = Guid.Parse(operationRequest.RequestId),
- };
- }
-
- ///
- /// Converts a to a .
- ///
- /// The operation result to convert.
- /// The converted operation result.
- [return: NotNullIfNotNull(nameof(operationResult))]
- internal static OperationResult? ToOperationResult(this P.OperationResult? operationResult)
- {
- if (operationResult == null)
- {
- return null;
- }
-
- switch (operationResult.ResultTypeCase)
- {
- case P.OperationResult.ResultTypeOneofCase.Success:
- return new OperationResult()
- {
- Result = operationResult.Success.Result,
- };
-
- case P.OperationResult.ResultTypeOneofCase.Failure:
- return new OperationResult()
- {
- FailureDetails = operationResult.Failure.FailureDetails.ToCore(),
- };
-
- default:
- throw new NotSupportedException($"Deserialization of {operationResult.ResultTypeCase} is not supported.");
- }
- }
-
- ///
- /// Converts a to .
- ///
- /// The operation result to convert.
- /// The converted operation result.
- [return: NotNullIfNotNull(nameof(operationResult))]
- internal static P.OperationResult? ToOperationResult(this OperationResult? operationResult)
- {
- if (operationResult == null)
- {
- return null;
- }
-
- if (operationResult.FailureDetails == null)
- {
- return new P.OperationResult()
- {
- Success = new P.OperationResultSuccess()
- {
- Result = operationResult.Result,
- },
- };
- }
- else
- {
- return new P.OperationResult()
- {
- Failure = new P.OperationResultFailure()
- {
- FailureDetails = ToProtobuf(operationResult.FailureDetails),
- },
- };
- }
- }
-
- ///
- /// Converts a to a .
- ///
- /// The operation action to convert.
- /// The converted operation action.
- [return: NotNullIfNotNull(nameof(operationAction))]
- internal static OperationAction? ToOperationAction(this P.OperationAction? operationAction)
- {
- if (operationAction == null)
- {
- return null;
- }
-
- switch (operationAction.OperationActionTypeCase)
- {
- case P.OperationAction.OperationActionTypeOneofCase.SendSignal:
-
- return new SendSignalOperationAction()
- {
- Name = operationAction.SendSignal.Name,
- Input = operationAction.SendSignal.Input,
- InstanceId = operationAction.SendSignal.InstanceId,
- ScheduledTime = operationAction.SendSignal.ScheduledTime?.ToDateTime(),
- };
-
- case P.OperationAction.OperationActionTypeOneofCase.StartNewOrchestration:
-
- return new StartNewOrchestrationOperationAction()
- {
- Name = operationAction.StartNewOrchestration.Name,
- Input = operationAction.StartNewOrchestration.Input,
- InstanceId = operationAction.StartNewOrchestration.InstanceId,
- Version = operationAction.StartNewOrchestration.Version,
- ScheduledStartTime = operationAction.StartNewOrchestration.ScheduledTime?.ToDateTime(),
- };
- default:
- throw new NotSupportedException($"Deserialization of {operationAction.OperationActionTypeCase} is not supported.");
- }
- }
-
- ///
- /// Converts a to .
- ///
- /// The operation action to convert.
- /// The converted operation action.
- [return: NotNullIfNotNull(nameof(operationAction))]
- internal static P.OperationAction? ToOperationAction(this OperationAction? operationAction)
- {
- if (operationAction == null)
- {
- return null;
- }
-
- var action = new P.OperationAction();
-
- switch (operationAction)
- {
- case SendSignalOperationAction sendSignalAction:
-
- action.SendSignal = new P.SendSignalAction()
- {
- Name = sendSignalAction.Name,
- Input = sendSignalAction.Input,
- InstanceId = sendSignalAction.InstanceId,
- ScheduledTime = sendSignalAction.ScheduledTime?.ToTimestamp(),
- };
- break;
-
- case StartNewOrchestrationOperationAction startNewOrchestrationAction:
-
- action.StartNewOrchestration = new P.StartNewOrchestrationAction()
- {
- Name = startNewOrchestrationAction.Name,
- Input = startNewOrchestrationAction.Input,
- Version = startNewOrchestrationAction.Version,
- InstanceId = startNewOrchestrationAction.InstanceId,
- ScheduledTime = startNewOrchestrationAction.ScheduledStartTime?.ToTimestamp(),
- };
- break;
- }
-
- return action;
- }
-
- ///
- /// Converts a to a .
- ///
- /// The operation result to convert.
- /// The converted operation result.
- [return: NotNullIfNotNull(nameof(entityBatchResult))]
- internal static EntityBatchResult? ToEntityBatchResult(this P.EntityBatchResult? entityBatchResult)
- {
- if (entityBatchResult == null)
- {
- return null;
- }
-
- return new EntityBatchResult()
- {
- Actions = entityBatchResult.Actions.Select(operationAction => operationAction!.ToOperationAction()).ToList(),
- EntityState = entityBatchResult.EntityState,
- Results = entityBatchResult.Results.Select(operationResult => operationResult!.ToOperationResult()).ToList(),
- FailureDetails = entityBatchResult.FailureDetails.ToCore(),
- };
- }
-
- ///
- /// Converts a to .
- ///
- /// The operation result to convert.
- /// The completion token, or null for the older protocol.
- /// Additional information about each operation, required by DTS.
- /// The converted operation result.
- [return: NotNullIfNotNull(nameof(entityBatchResult))]
- internal static P.EntityBatchResult? ToEntityBatchResult(
- this EntityBatchResult? entityBatchResult,
- string? completionToken = null,
- IEnumerable? operationInfos = null)
- {
- if (entityBatchResult == null)
- {
- return null;
- }
-
- return new P.EntityBatchResult()
- {
- EntityState = entityBatchResult.EntityState,
- FailureDetails = entityBatchResult.FailureDetails.ToProtobuf(),
- Actions = { entityBatchResult.Actions?.Select(a => a.ToOperationAction()) ?? [] },
- Results = { entityBatchResult.Results?.Select(a => a.ToOperationResult()) ?? [] },
- CompletionToken = completionToken ?? string.Empty,
- OperationInfos = { operationInfos ?? [] },
- };
- }
-
- ///
- /// Converts the gRPC representation of orchestrator entity parameters to the DT.Core representation.
- ///
- /// The DT.Core representation.
- /// The gRPC representation.
- [return: NotNullIfNotNull(nameof(parameters))]
- internal static TaskOrchestrationEntityParameters? ToCore(this P.OrchestratorEntityParameters? parameters)
- {
- if (parameters == null)
- {
- return null;
- }
-
- return new TaskOrchestrationEntityParameters()
- {
- EntityMessageReorderWindow = parameters.EntityMessageReorderWindow.ToTimeSpan(),
- };
- }
-
- ///
- /// Gets the approximate byte count for a .
- ///
- /// The failure details.
- /// The approximate byte count.
- internal static int GetApproximateByteCount(this P.TaskFailureDetails failureDetails)
- {
- // Protobuf strings are always UTF-8: https://developers.google.com/protocol-buffers/docs/proto3#scalar
- Encoding encoding = Encoding.UTF8;
-
- int byteCount = 0;
- if (failureDetails.ErrorType != null)
- {
- byteCount += encoding.GetByteCount(failureDetails.ErrorType);
- }
-
- if (failureDetails.ErrorMessage != null)
- {
- byteCount += encoding.GetByteCount(failureDetails.ErrorMessage);
- }
-
- if (failureDetails.StackTrace != null)
- {
- byteCount += encoding.GetByteCount(failureDetails.StackTrace);
- }
-
- if (failureDetails.InnerFailure != null)
- {
- byteCount += failureDetails.InnerFailure.GetApproximateByteCount();
- }
-
- return byteCount;
- }
-
- ///
- /// Decode a protobuf message from a base64 string.
- ///
- /// The type to decode to.
- /// The message parser.
- /// The base64 encoded message.
- /// The decoded message.
- /// If decoding fails.
- internal static T Base64Decode(this MessageParser parser, string encodedMessage) where T : IMessage
- {
- // Decode the base64 in a way that doesn't allocate a byte[] on each request
- int encodedByteCount = Encoding.UTF8.GetByteCount(encodedMessage);
- byte[] buffer = ArrayPool.Shared.Rent(encodedByteCount);
- try
- {
- // The Base64 APIs require first converting the string into UTF-8 bytes. We then
- // do an in-place conversion from base64 UTF-8 bytes to protobuf bytes so that
- // we can finally decode the protobuf request.
- Encoding.UTF8.GetBytes(encodedMessage, 0, encodedMessage.Length, buffer, 0);
- OperationStatus status = Base64.DecodeFromUtf8InPlace(
- buffer.AsSpan(0, encodedByteCount),
- out int bytesWritten);
- if (status != OperationStatus.Done)
- {
- throw new ArgumentException(
- $"Failed to base64-decode the '{typeof(T).Name}' payload: {status}", nameof(encodedMessage));
- }
-
- return (T)parser.ParseFrom(buffer, 0, bytesWritten);
- }
- finally
- {
- ArrayPool.Shared.Return(buffer);
- }
- }
-
- ///
- /// Converts a grpc to a .
- ///
- /// The failure details to convert.
- /// The converted failure details.
- internal static FailureDetails? ToCore(this P.TaskFailureDetails? failureDetails)
- {
- if (failureDetails == null)
- {
- return null;
- }
-
- return new FailureDetails(
- failureDetails.ErrorType,
- failureDetails.ErrorMessage,
- failureDetails.StackTrace,
- failureDetails.InnerFailure.ToCore(),
- failureDetails.IsNonRetriable);
- }
-
- ///
- /// Converts a to a grpc .
- ///
- /// The failure details to convert.
- /// The converted failure details.
- static P.TaskFailureDetails? ToProtobuf(this FailureDetails? failureDetails)
- {
- if (failureDetails == null)
- {
- return null;
- }
-
- return new P.TaskFailureDetails
- {
- ErrorType = failureDetails.ErrorType ?? "(unknown)",
- ErrorMessage = failureDetails.ErrorMessage ?? "(unknown)",
- StackTrace = failureDetails.StackTrace,
- IsNonRetriable = failureDetails.IsNonRetriable,
- InnerFailure = failureDetails.InnerFailure.ToProtobuf(),
- };
- }
-
- static P.OrchestrationStatus ToProtobuf(this OrchestrationStatus status)
- {
- return (P.OrchestrationStatus)status;
- }
-
- static P.OrchestrationInstance ToProtobuf(this OrchestrationInstance instance)
- {
- return new P.OrchestrationInstance
- {
- InstanceId = instance.InstanceId,
- ExecutionId = instance.ExecutionId,
- };
- }
-
- ///
- /// Tracks state required for converting orchestration histories containing entity-related events.
- ///
- internal class EntityConversionState
- {
- readonly bool insertMissingEntityUnlocks;
-
- OrchestrationInstance? instance;
- HashSet? entityRequestIds;
- Dictionary? unlockObligations;
-
- ///
- /// Initializes a new instance of the class.
- ///
- /// Whether to insert missing unlock events in to the history
- /// when the orchestration completes.
- public EntityConversionState(bool insertMissingEntityUnlocks)
- {
- this.ConvertFromProto = (P.HistoryEvent e) => ProtoUtils.ConvertHistoryEvent(e, this);
- this.insertMissingEntityUnlocks = insertMissingEntityUnlocks;
- }
-
- ///
- /// Gets a function that converts a history event in protobuf format to a core history event.
- ///
- public Func ConvertFromProto { get; }
-
- ///
- /// Gets the orchestration instance of this history.
- ///
- public OrchestrationInstance? CurrentInstance => this.instance;
-
- ///
- /// Gets the set of guids that have been used as entity request ids in this history.
- ///
- public HashSet EntityRequestIds => this.entityRequestIds ??= new();
-
- ///
- /// Records the orchestration instance, which may be needed for some conversions.
- ///
- /// The orchestration instance.
- public void SetOrchestrationInstance(OrchestrationInstance instance)
- {
- this.instance = instance;
- }
-
- ///
- /// Adds unlock obligations for all entities that are being locked by this request.
- ///
- /// The lock request.
- public void AddUnlockObligations(P.EntityLockRequestedEvent request)
- {
- if (!this.insertMissingEntityUnlocks)
- {
- return;
- }
-
- this.unlockObligations ??= new();
-
- foreach (string target in request.LockSet)
- {
- this.unlockObligations[target] = request.CriticalSectionId;
- }
- }
-
- ///
- /// Removes an unlock obligation.
- ///
- /// The target entity.
- public void RemoveUnlockObligation(string target)
- {
- if (!this.insertMissingEntityUnlocks)
- {
- return;
- }
-
- this.unlockObligations?.Remove(target);
- }
-
- ///
- /// Returns the remaining unlock obligations, and clears the list.
- ///
- /// The unlock obligations.
- public IEnumerable<(string Target, string CriticalSectionId)> ResetObligations()
- {
- if (!this.insertMissingEntityUnlocks)
- {
- yield break;
- }
-
- if (this.unlockObligations is not null)
- {
- foreach (var kvp in this.unlockObligations)
- {
- yield return (kvp.Key, kvp.Value);
- }
-
- this.unlockObligations = null;
- }
- }
- }
-}
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using System.Buffers;
+using System.Buffers.Text;
+using System.Diagnostics.CodeAnalysis;
+using System.Runtime.CompilerServices;
+using System.Text;
+using DurableTask.Core;
+using DurableTask.Core.Command;
+using DurableTask.Core.Entities;
+using DurableTask.Core.Entities.OperationFormat;
+using DurableTask.Core.History;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using DTCore = DurableTask.Core;
+using P = Microsoft.DurableTask.Protobuf;
+
+namespace Microsoft.DurableTask;
+
+///
+/// Protobuf utilities and helpers.
+///
+static class ProtoUtils
+{
+ ///
+ /// Converts a history event from to .
+ ///
+ /// The proto history event to converter.
+ /// The converted history event.
+ /// When the provided history event type is not supported.
+ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto)
+ {
+ return ConvertHistoryEvent(proto, conversionState: null);
+ }
+
+ ///
+ /// Converts a history event from to , and performs
+ /// stateful conversions of entity-related events.
+ ///
+ /// The proto history event to converter.
+ /// State needed for converting entity-related history entries and actions.
+ /// The converted history event.
+ /// When the provided history event type is not supported.
+ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto, EntityConversionState? conversionState)
+ {
+ Check.NotNull(proto);
+ HistoryEvent historyEvent;
+ switch (proto.EventTypeCase)
+ {
+ case P.HistoryEvent.EventTypeOneofCase.ContinueAsNew:
+ historyEvent = new ContinueAsNewEvent(proto.EventId, proto.ContinueAsNew.Input);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.ExecutionStarted:
+ OrchestrationInstance instance = proto.ExecutionStarted.OrchestrationInstance.ToCore();
+ conversionState?.SetOrchestrationInstance(instance);
+ historyEvent = new ExecutionStartedEvent(proto.EventId, proto.ExecutionStarted.Input)
+ {
+ Name = proto.ExecutionStarted.Name,
+ Version = proto.ExecutionStarted.Version,
+ OrchestrationInstance = instance,
+ Tags = proto.ExecutionStarted.Tags,
+ ParentInstance = proto.ExecutionStarted.ParentInstance == null ? null : new ParentInstance
+ {
+ Name = proto.ExecutionStarted.ParentInstance.Name,
+ Version = proto.ExecutionStarted.ParentInstance.Version,
+ OrchestrationInstance = proto.ExecutionStarted.ParentInstance.OrchestrationInstance.ToCore(),
+ TaskScheduleId = proto.ExecutionStarted.ParentInstance.TaskScheduledId,
+ },
+ ScheduledStartTime = proto.ExecutionStarted.ScheduledStartTimestamp?.ToDateTime(),
+ };
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.ExecutionCompleted:
+ historyEvent = new ExecutionCompletedEvent(
+ proto.EventId,
+ proto.ExecutionCompleted.Result,
+ proto.ExecutionCompleted.OrchestrationStatus.ToCore());
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.ExecutionTerminated:
+ historyEvent = new ExecutionTerminatedEvent(proto.EventId, proto.ExecutionTerminated.Input);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.ExecutionSuspended:
+ historyEvent = new ExecutionSuspendedEvent(proto.EventId, proto.ExecutionSuspended.Input);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.ExecutionResumed:
+ historyEvent = new ExecutionResumedEvent(proto.EventId, proto.ExecutionResumed.Input);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.TaskScheduled:
+ historyEvent = new TaskScheduledEvent(
+ proto.EventId,
+ proto.TaskScheduled.Name,
+ proto.TaskScheduled.Version,
+ proto.TaskScheduled.Input);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.TaskCompleted:
+ historyEvent = new TaskCompletedEvent(
+ proto.EventId,
+ proto.TaskCompleted.TaskScheduledId,
+ proto.TaskCompleted.Result);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.TaskFailed:
+ historyEvent = new TaskFailedEvent(
+ proto.EventId,
+ proto.TaskFailed.TaskScheduledId,
+ reason: null, /* not supported */
+ details: null, /* not supported */
+ proto.TaskFailed.FailureDetails.ToCore());
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCreated:
+ historyEvent = new SubOrchestrationInstanceCreatedEvent(proto.EventId)
+ {
+ Input = proto.SubOrchestrationInstanceCreated.Input,
+ InstanceId = proto.SubOrchestrationInstanceCreated.InstanceId,
+ Name = proto.SubOrchestrationInstanceCreated.Name,
+ Version = proto.SubOrchestrationInstanceCreated.Version,
+ };
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCompleted:
+ historyEvent = new SubOrchestrationInstanceCompletedEvent(
+ proto.EventId,
+ proto.SubOrchestrationInstanceCompleted.TaskScheduledId,
+ proto.SubOrchestrationInstanceCompleted.Result);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceFailed:
+ historyEvent = new SubOrchestrationInstanceFailedEvent(
+ proto.EventId,
+ proto.SubOrchestrationInstanceFailed.TaskScheduledId,
+ reason: null /* not supported */,
+ details: null /* not supported */,
+ proto.SubOrchestrationInstanceFailed.FailureDetails.ToCore());
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.TimerCreated:
+ historyEvent = new TimerCreatedEvent(
+ proto.EventId,
+ proto.TimerCreated.FireAt.ToDateTime());
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.TimerFired:
+ historyEvent = new TimerFiredEvent(
+ eventId: -1,
+ proto.TimerFired.FireAt.ToDateTime())
+ {
+ TimerId = proto.TimerFired.TimerId,
+ };
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.OrchestratorStarted:
+ historyEvent = new OrchestratorStartedEvent(proto.EventId);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.OrchestratorCompleted:
+ historyEvent = new OrchestratorCompletedEvent(proto.EventId);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EventSent:
+ historyEvent = new EventSentEvent(proto.EventId)
+ {
+ InstanceId = proto.EventSent.InstanceId,
+ Name = proto.EventSent.Name,
+ Input = proto.EventSent.Input,
+ };
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EventRaised:
+ historyEvent = new EventRaisedEvent(proto.EventId, proto.EventRaised.Input)
+ {
+ Name = proto.EventRaised.Name,
+ };
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EntityOperationCalled:
+ historyEvent = EntityConversions.EncodeOperationCalled(proto, conversionState!.CurrentInstance);
+ conversionState?.EntityRequestIds.Add(proto.EntityOperationCalled.RequestId);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EntityOperationSignaled:
+ historyEvent = EntityConversions.EncodeOperationSignaled(proto);
+ conversionState?.EntityRequestIds.Add(proto.EntityOperationSignaled.RequestId);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EntityLockRequested:
+ historyEvent = EntityConversions.EncodeLockRequested(proto, conversionState!.CurrentInstance);
+ conversionState?.AddUnlockObligations(proto.EntityLockRequested);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EntityUnlockSent:
+ historyEvent = EntityConversions.EncodeUnlockSent(proto, conversionState!.CurrentInstance);
+ conversionState?.RemoveUnlockObligation(proto.EntityUnlockSent.TargetInstanceId);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EntityLockGranted:
+ historyEvent = EntityConversions.EncodeLockGranted(proto);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EntityOperationCompleted:
+ historyEvent = EntityConversions.EncodeOperationCompleted(proto);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EntityOperationFailed:
+ historyEvent = EntityConversions.EncodeOperationFailed(proto);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.GenericEvent:
+ historyEvent = new GenericEvent(proto.EventId, proto.GenericEvent.Data);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.HistoryState:
+ historyEvent = new HistoryStateEvent(
+ proto.EventId,
+ new OrchestrationState
+ {
+ OrchestrationInstance = new OrchestrationInstance
+ {
+ InstanceId = proto.HistoryState.OrchestrationState.InstanceId,
+ },
+ Name = proto.HistoryState.OrchestrationState.Name,
+ Version = proto.HistoryState.OrchestrationState.Version,
+ ScheduledStartTime = proto.HistoryState.OrchestrationState.ScheduledStartTimestamp.ToDateTime(),
+ CreatedTime = proto.HistoryState.OrchestrationState.CreatedTimestamp.ToDateTime(),
+ LastUpdatedTime = proto.HistoryState.OrchestrationState.LastUpdatedTimestamp.ToDateTime(),
+ Input = proto.HistoryState.OrchestrationState.Input,
+ Output = proto.HistoryState.OrchestrationState.Output,
+ Status = proto.HistoryState.OrchestrationState.CustomStatus,
+ Tags = proto.HistoryState.OrchestrationState.Tags,
+ });
+ break;
+ default:
+ throw new NotSupportedException($"Deserialization of {proto.EventTypeCase} is not supported.");
+ }
+
+ historyEvent.Timestamp = proto.Timestamp.ToDateTime();
+ return historyEvent;
+ }
+
+ ///
+ /// Converts a to a gRPC .
+ ///
+ /// The date-time to convert.
+ /// The gRPC timestamp.
+ internal static Timestamp ToTimestamp(this DateTime dateTime)
+ {
+ // The protobuf libraries require timestamps to be in UTC
+ if (dateTime.Kind == DateTimeKind.Unspecified)
+ {
+ dateTime = DateTime.SpecifyKind(dateTime, DateTimeKind.Utc);
+ }
+ else if (dateTime.Kind == DateTimeKind.Local)
+ {
+ dateTime = dateTime.ToUniversalTime();
+ }
+
+ return Timestamp.FromDateTime(dateTime);
+ }
+
+ ///
+ /// Converts a to a gRPC .
+ ///
+ /// The date-time to convert.
+ /// The gRPC timestamp.
+ internal static Timestamp? ToTimestamp(this DateTime? dateTime)
+ => dateTime.HasValue ? dateTime.Value.ToTimestamp() : null;
+
+ ///
+ /// Converts a to a gRPC .
+ ///
+ /// The date-time to convert.
+ /// The gRPC timestamp.
+ internal static Timestamp ToTimestamp(this DateTimeOffset dateTime) => Timestamp.FromDateTimeOffset(dateTime);
+
+ ///
+ /// Converts a to a gRPC .
+ ///
+ /// The date-time to convert.
+ /// The gRPC timestamp.
+ internal static Timestamp? ToTimestamp(this DateTimeOffset? dateTime)
+ => dateTime.HasValue ? dateTime.Value.ToTimestamp() : null;
+
+ ///
+ /// Constructs a .
+ ///
+ /// The orchestrator instance ID.
+ /// The orchestrator customer status or null if no custom status.
+ /// The orchestrator actions.
+ ///
+ /// The completion token for the work item. It must be the exact same
+ /// value that was provided by the corresponding that triggered the orchestrator execution.
+ ///
+ /// The entity conversion state, or null if no conversion is required.
+ /// The orchestrator response.
+ /// When an orchestrator action is unknown.
+ internal static P.OrchestratorResponse ConstructOrchestratorResponse(
+ string instanceId,
+ string? customStatus,
+ IEnumerable actions,
+ string completionToken,
+ EntityConversionState? entityConversionState)
+ {
+ Check.NotNull(actions);
+ var response = new P.OrchestratorResponse
+ {
+ InstanceId = instanceId,
+ CustomStatus = customStatus,
+ CompletionToken = completionToken,
+ };
+
+ foreach (OrchestratorAction action in actions)
+ {
+ var protoAction = new P.OrchestratorAction { Id = action.Id };
+
+ switch (action.OrchestratorActionType)
+ {
+ case OrchestratorActionType.ScheduleOrchestrator:
+ var scheduleTaskAction = (ScheduleTaskOrchestratorAction)action;
+ protoAction.ScheduleTask = new P.ScheduleTaskAction
+ {
+ Name = scheduleTaskAction.Name,
+ Version = scheduleTaskAction.Version,
+ Input = scheduleTaskAction.Input,
+ };
+ break;
+ case OrchestratorActionType.CreateSubOrchestration:
+ var subOrchestrationAction = (CreateSubOrchestrationAction)action;
+ protoAction.CreateSubOrchestration = new P.CreateSubOrchestrationAction
+ {
+ Input = subOrchestrationAction.Input,
+ InstanceId = subOrchestrationAction.InstanceId,
+ Name = subOrchestrationAction.Name,
+ Version = subOrchestrationAction.Version,
+ };
+ break;
+ case OrchestratorActionType.CreateTimer:
+ var createTimerAction = (CreateTimerOrchestratorAction)action;
+ protoAction.CreateTimer = new P.CreateTimerAction
+ {
+ FireAt = createTimerAction.FireAt.ToTimestamp(),
+ };
+ break;
+ case OrchestratorActionType.SendEvent:
+ var sendEventAction = (SendEventOrchestratorAction)action;
+ if (sendEventAction.Instance == null)
+ {
+ throw new ArgumentException(
+ $"{nameof(SendEventOrchestratorAction)} cannot have a null Instance property!");
+ }
+
+ if (entityConversionState is not null
+ && DTCore.Common.Entities.IsEntityInstance(sendEventAction.Instance.InstanceId)
+ && sendEventAction.EventName is not null
+ && sendEventAction.EventData is not null)
+ {
+ P.SendEntityMessageAction sendAction = new P.SendEntityMessageAction();
+ protoAction.SendEntityMessage = sendAction;
+
+ EntityConversions.DecodeEntityMessageAction(
+ sendEventAction.EventName,
+ sendEventAction.EventData,
+ sendEventAction.Instance.InstanceId,
+ sendAction,
+ out string requestId);
+
+ entityConversionState.EntityRequestIds.Add(requestId);
+
+ switch (sendAction.EntityMessageTypeCase)
+ {
+ case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityLockRequested:
+ entityConversionState.AddUnlockObligations(sendAction.EntityLockRequested);
+ break;
+ case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityUnlockSent:
+ entityConversionState.RemoveUnlockObligation(sendAction.EntityUnlockSent.TargetInstanceId);
+ break;
+ default:
+ break;
+ }
+ }
+ else
+ {
+ protoAction.SendEvent = new P.SendEventAction
+ {
+ Instance = sendEventAction.Instance.ToProtobuf(),
+ Name = sendEventAction.EventName,
+ Data = sendEventAction.EventData,
+ };
+ }
+
+ break;
+ case OrchestratorActionType.OrchestrationComplete:
+
+ if (entityConversionState is not null)
+ {
+ // as a precaution, unlock any entities that were not unlocked for some reason, before
+ // completing the orchestration.
+ foreach ((string target, string criticalSectionId) in entityConversionState.ResetObligations())
+ {
+ response.Actions.Add(new P.OrchestratorAction
+ {
+ Id = action.Id,
+ SendEntityMessage = new P.SendEntityMessageAction
+ {
+ EntityUnlockSent = new P.EntityUnlockSentEvent
+ {
+ CriticalSectionId = criticalSectionId,
+ TargetInstanceId = target,
+ ParentInstanceId = entityConversionState.CurrentInstance?.InstanceId,
+ },
+ },
+ });
+ }
+ }
+
+ var completeAction = (OrchestrationCompleteOrchestratorAction)action;
+ protoAction.CompleteOrchestration = new P.CompleteOrchestrationAction
+ {
+ CarryoverEvents =
+ {
+ // TODO
+ },
+ Details = completeAction.Details,
+ NewVersion = completeAction.NewVersion,
+ OrchestrationStatus = completeAction.OrchestrationStatus.ToProtobuf(),
+ Result = completeAction.Result,
+ };
+
+ if (completeAction.OrchestrationStatus == OrchestrationStatus.Failed)
+ {
+ protoAction.CompleteOrchestration.FailureDetails = completeAction.FailureDetails.ToProtobuf();
+ }
+
+ break;
+ default:
+ throw new NotSupportedException($"Unknown orchestrator action: {action.OrchestratorActionType}");
+ }
+
+ response.Actions.Add(protoAction);
+ }
+
+ return response;
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The status to convert.
+ /// The converted status.
+ internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status)
+ {
+ return (OrchestrationStatus)status;
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The status to convert.
+ /// The converted status.
+ [return: NotNullIfNotNull(nameof(status))]
+ internal static OrchestrationInstance? ToCore(this P.OrchestrationInstance? status)
+ {
+ if (status == null)
+ {
+ return null;
+ }
+
+ return new OrchestrationInstance
+ {
+ InstanceId = status.InstanceId,
+ ExecutionId = status.ExecutionId,
+ };
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The failure details to convert.
+ /// The converted failure details.
+ [return: NotNullIfNotNull(nameof(failureDetails))]
+ internal static TaskFailureDetails? ToTaskFailureDetails(this P.TaskFailureDetails? failureDetails)
+ {
+ if (failureDetails == null)
+ {
+ return null;
+ }
+
+ return new TaskFailureDetails(
+ failureDetails.ErrorType,
+ failureDetails.ErrorMessage,
+ failureDetails.StackTrace,
+ failureDetails.InnerFailure.ToTaskFailureDetails());
+ }
+
+ ///
+ /// Converts a to .
+ ///
+ /// The exception to convert.
+ /// The task failure details.
+ [return: NotNullIfNotNull(nameof(e))]
+ internal static P.TaskFailureDetails? ToTaskFailureDetails(this Exception? e)
+ {
+ if (e == null)
+ {
+ return null;
+ }
+
+ return new P.TaskFailureDetails
+ {
+ ErrorType = e.GetType().FullName,
+ ErrorMessage = e.Message,
+ StackTrace = e.StackTrace,
+ InnerFailure = e.InnerException.ToTaskFailureDetails(),
+ };
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The entity batch request to convert.
+ /// The converted entity batch request.
+ [return: NotNullIfNotNull(nameof(entityBatchRequest))]
+ internal static EntityBatchRequest? ToEntityBatchRequest(this P.EntityBatchRequest? entityBatchRequest)
+ {
+ if (entityBatchRequest == null)
+ {
+ return null;
+ }
+
+ return new EntityBatchRequest()
+ {
+ EntityState = entityBatchRequest.EntityState,
+ InstanceId = entityBatchRequest.InstanceId,
+ Operations = entityBatchRequest.Operations.Select(r => r.ToOperationRequest()).ToList(),
+ };
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The entity request to convert.
+ /// The converted request.
+ /// Additional info about each operation, required by DTS.
+ internal static void ToEntityBatchRequest(
+ this P.EntityRequest entityRequest,
+ out EntityBatchRequest batchRequest,
+ out List operationInfos)
+ {
+ batchRequest = new EntityBatchRequest()
+ {
+ EntityState = entityRequest.EntityState,
+ InstanceId = entityRequest.InstanceId,
+ Operations = [], // operations are added to this collection below
+ };
+
+ operationInfos = new(entityRequest.OperationRequests.Count);
+
+ foreach (P.HistoryEvent? op in entityRequest.OperationRequests)
+ {
+ if (op.EntityOperationSignaled is not null)
+ {
+ batchRequest.Operations.Add(new OperationRequest
+ {
+ Id = Guid.Parse(op.EntityOperationSignaled.RequestId),
+ Operation = op.EntityOperationSignaled.Operation,
+ Input = op.EntityOperationSignaled.Input,
+ });
+ operationInfos.Add(new P.OperationInfo
+ {
+ RequestId = op.EntityOperationSignaled.RequestId,
+ ResponseDestination = null, // means we don't send back a response to the caller
+ });
+ }
+ else if (op.EntityOperationCalled is not null)
+ {
+ batchRequest.Operations.Add(new OperationRequest
+ {
+ Id = Guid.Parse(op.EntityOperationCalled.RequestId),
+ Operation = op.EntityOperationCalled.Operation,
+ Input = op.EntityOperationCalled.Input,
+ });
+ operationInfos.Add(new P.OperationInfo
+ {
+ RequestId = op.EntityOperationCalled.RequestId,
+ ResponseDestination = new P.OrchestrationInstance
+ {
+ InstanceId = op.EntityOperationCalled.ParentInstanceId,
+ ExecutionId = op.EntityOperationCalled.ParentExecutionId,
+ },
+ });
+ }
+ }
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The operation request to convert.
+ /// The converted operation request.
+ [return: NotNullIfNotNull(nameof(operationRequest))]
+ internal static OperationRequest? ToOperationRequest(this P.OperationRequest? operationRequest)
+ {
+ if (operationRequest == null)
+ {
+ return null;
+ }
+
+ return new OperationRequest()
+ {
+ Operation = operationRequest.Operation,
+ Input = operationRequest.Input,
+ Id = Guid.Parse(operationRequest.RequestId),
+ };
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The operation result to convert.
+ /// The converted operation result.
+ [return: NotNullIfNotNull(nameof(operationResult))]
+ internal static OperationResult? ToOperationResult(this P.OperationResult? operationResult)
+ {
+ if (operationResult == null)
+ {
+ return null;
+ }
+
+ switch (operationResult.ResultTypeCase)
+ {
+ case P.OperationResult.ResultTypeOneofCase.Success:
+ return new OperationResult()
+ {
+ Result = operationResult.Success.Result,
+ };
+
+ case P.OperationResult.ResultTypeOneofCase.Failure:
+ return new OperationResult()
+ {
+ FailureDetails = operationResult.Failure.FailureDetails.ToCore(),
+ };
+
+ default:
+ throw new NotSupportedException($"Deserialization of {operationResult.ResultTypeCase} is not supported.");
+ }
+ }
+
+ ///
+ /// Converts a to .
+ ///
+ /// The operation result to convert.
+ /// The converted operation result.
+ [return: NotNullIfNotNull(nameof(operationResult))]
+ internal static P.OperationResult? ToOperationResult(this OperationResult? operationResult)
+ {
+ if (operationResult == null)
+ {
+ return null;
+ }
+
+ if (operationResult.FailureDetails == null)
+ {
+ return new P.OperationResult()
+ {
+ Success = new P.OperationResultSuccess()
+ {
+ Result = operationResult.Result,
+ },
+ };
+ }
+ else
+ {
+ return new P.OperationResult()
+ {
+ Failure = new P.OperationResultFailure()
+ {
+ FailureDetails = ToProtobuf(operationResult.FailureDetails),
+ },
+ };
+ }
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The operation action to convert.
+ /// The converted operation action.
+ [return: NotNullIfNotNull(nameof(operationAction))]
+ internal static OperationAction? ToOperationAction(this P.OperationAction? operationAction)
+ {
+ if (operationAction == null)
+ {
+ return null;
+ }
+
+ switch (operationAction.OperationActionTypeCase)
+ {
+ case P.OperationAction.OperationActionTypeOneofCase.SendSignal:
+
+ return new SendSignalOperationAction()
+ {
+ Name = operationAction.SendSignal.Name,
+ Input = operationAction.SendSignal.Input,
+ InstanceId = operationAction.SendSignal.InstanceId,
+ ScheduledTime = operationAction.SendSignal.ScheduledTime?.ToDateTime(),
+ };
+
+ case P.OperationAction.OperationActionTypeOneofCase.StartNewOrchestration:
+
+ return new StartNewOrchestrationOperationAction()
+ {
+ Name = operationAction.StartNewOrchestration.Name,
+ Input = operationAction.StartNewOrchestration.Input,
+ InstanceId = operationAction.StartNewOrchestration.InstanceId,
+ Version = operationAction.StartNewOrchestration.Version,
+ ScheduledStartTime = operationAction.StartNewOrchestration.ScheduledTime?.ToDateTime(),
+ };
+ default:
+ throw new NotSupportedException($"Deserialization of {operationAction.OperationActionTypeCase} is not supported.");
+ }
+ }
+
+ ///
+ /// Converts a to .
+ ///
+ /// The operation action to convert.
+ /// The converted operation action.
+ [return: NotNullIfNotNull(nameof(operationAction))]
+ internal static P.OperationAction? ToOperationAction(this OperationAction? operationAction)
+ {
+ if (operationAction == null)
+ {
+ return null;
+ }
+
+ var action = new P.OperationAction();
+
+ switch (operationAction)
+ {
+ case SendSignalOperationAction sendSignalAction:
+
+ action.SendSignal = new P.SendSignalAction()
+ {
+ Name = sendSignalAction.Name,
+ Input = sendSignalAction.Input,
+ InstanceId = sendSignalAction.InstanceId,
+ ScheduledTime = sendSignalAction.ScheduledTime?.ToTimestamp(),
+ };
+ break;
+
+ case StartNewOrchestrationOperationAction startNewOrchestrationAction:
+
+ action.StartNewOrchestration = new P.StartNewOrchestrationAction()
+ {
+ Name = startNewOrchestrationAction.Name,
+ Input = startNewOrchestrationAction.Input,
+ Version = startNewOrchestrationAction.Version,
+ InstanceId = startNewOrchestrationAction.InstanceId,
+ ScheduledTime = startNewOrchestrationAction.ScheduledStartTime?.ToTimestamp(),
+ };
+ break;
+ }
+
+ return action;
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The operation result to convert.
+ /// The converted operation result.
+ [return: NotNullIfNotNull(nameof(entityBatchResult))]
+ internal static EntityBatchResult? ToEntityBatchResult(this P.EntityBatchResult? entityBatchResult)
+ {
+ if (entityBatchResult == null)
+ {
+ return null;
+ }
+
+ return new EntityBatchResult()
+ {
+ Actions = entityBatchResult.Actions.Select(operationAction => operationAction!.ToOperationAction()).ToList(),
+ EntityState = entityBatchResult.EntityState,
+ Results = entityBatchResult.Results.Select(operationResult => operationResult!.ToOperationResult()).ToList(),
+ FailureDetails = entityBatchResult.FailureDetails.ToCore(),
+ };
+ }
+
+ ///
+ /// Converts a to .
+ ///
+ /// The operation result to convert.
+ /// The completion token, or null for the older protocol.
+ /// Additional information about each operation, required by DTS.
+ /// The converted operation result.
+ [return: NotNullIfNotNull(nameof(entityBatchResult))]
+ internal static P.EntityBatchResult? ToEntityBatchResult(
+ this EntityBatchResult? entityBatchResult,
+ string? completionToken = null,
+ IEnumerable? operationInfos = null)
+ {
+ if (entityBatchResult == null)
+ {
+ return null;
+ }
+
+ return new P.EntityBatchResult()
+ {
+ EntityState = entityBatchResult.EntityState,
+ FailureDetails = entityBatchResult.FailureDetails.ToProtobuf(),
+ Actions = { entityBatchResult.Actions?.Select(a => a.ToOperationAction()) ?? [] },
+ Results = { entityBatchResult.Results?.Select(a => a.ToOperationResult()) ?? [] },
+ CompletionToken = completionToken ?? string.Empty,
+ OperationInfos = { operationInfos ?? [] },
+ };
+ }
+
+ ///
+ /// Converts the gRPC representation of orchestrator entity parameters to the DT.Core representation.
+ ///
+ /// The DT.Core representation.
+ /// The gRPC representation.
+ [return: NotNullIfNotNull(nameof(parameters))]
+ internal static TaskOrchestrationEntityParameters? ToCore(this P.OrchestratorEntityParameters? parameters)
+ {
+ if (parameters == null)
+ {
+ return null;
+ }
+
+ return new TaskOrchestrationEntityParameters()
+ {
+ EntityMessageReorderWindow = parameters.EntityMessageReorderWindow.ToTimeSpan(),
+ };
+ }
+
+ ///
+ /// Gets the approximate byte count for a .
+ ///
+ /// The failure details.
+ /// The approximate byte count.
+ internal static int GetApproximateByteCount(this P.TaskFailureDetails failureDetails)
+ {
+ // Protobuf strings are always UTF-8: https://developers.google.com/protocol-buffers/docs/proto3#scalar
+ Encoding encoding = Encoding.UTF8;
+
+ int byteCount = 0;
+ if (failureDetails.ErrorType != null)
+ {
+ byteCount += encoding.GetByteCount(failureDetails.ErrorType);
+ }
+
+ if (failureDetails.ErrorMessage != null)
+ {
+ byteCount += encoding.GetByteCount(failureDetails.ErrorMessage);
+ }
+
+ if (failureDetails.StackTrace != null)
+ {
+ byteCount += encoding.GetByteCount(failureDetails.StackTrace);
+ }
+
+ if (failureDetails.InnerFailure != null)
+ {
+ byteCount += failureDetails.InnerFailure.GetApproximateByteCount();
+ }
+
+ return byteCount;
+ }
+
+ ///
+ /// Decode a protobuf message from a base64 string.
+ ///
+ /// The type to decode to.
+ /// The message parser.
+ /// The base64 encoded message.
+ /// The decoded message.
+ /// If decoding fails.
+ internal static T Base64Decode(this MessageParser parser, string encodedMessage) where T : IMessage
+ {
+ // Decode the base64 in a way that doesn't allocate a byte[] on each request
+ int encodedByteCount = Encoding.UTF8.GetByteCount(encodedMessage);
+ byte[] buffer = ArrayPool.Shared.Rent(encodedByteCount);
+ try
+ {
+ // The Base64 APIs require first converting the string into UTF-8 bytes. We then
+ // do an in-place conversion from base64 UTF-8 bytes to protobuf bytes so that
+ // we can finally decode the protobuf request.
+ Encoding.UTF8.GetBytes(encodedMessage, 0, encodedMessage.Length, buffer, 0);
+ OperationStatus status = Base64.DecodeFromUtf8InPlace(
+ buffer.AsSpan(0, encodedByteCount),
+ out int bytesWritten);
+ if (status != OperationStatus.Done)
+ {
+ throw new ArgumentException(
+ $"Failed to base64-decode the '{typeof(T).Name}' payload: {status}", nameof(encodedMessage));
+ }
+
+ return (T)parser.ParseFrom(buffer, 0, bytesWritten);
+ }
+ finally
+ {
+ ArrayPool.Shared.Return(buffer);
+ }
+ }
+
+ ///
+ /// Converts a grpc to a .
+ ///
+ /// The failure details to convert.
+ /// The converted failure details.
+ internal static FailureDetails? ToCore(this P.TaskFailureDetails? failureDetails)
+ {
+ if (failureDetails == null)
+ {
+ return null;
+ }
+
+ return new FailureDetails(
+ failureDetails.ErrorType,
+ failureDetails.ErrorMessage,
+ failureDetails.StackTrace,
+ failureDetails.InnerFailure.ToCore(),
+ failureDetails.IsNonRetriable);
+ }
+
+ ///
+ /// Converts a to a grpc .
+ ///
+ /// The failure details to convert.
+ /// The converted failure details.
+ static P.TaskFailureDetails? ToProtobuf(this FailureDetails? failureDetails)
+ {
+ if (failureDetails == null)
+ {
+ return null;
+ }
+
+ return new P.TaskFailureDetails
+ {
+ ErrorType = failureDetails.ErrorType ?? "(unknown)",
+ ErrorMessage = failureDetails.ErrorMessage ?? "(unknown)",
+ StackTrace = failureDetails.StackTrace,
+ IsNonRetriable = failureDetails.IsNonRetriable,
+ InnerFailure = failureDetails.InnerFailure.ToProtobuf(),
+ };
+ }
+
+ static P.OrchestrationStatus ToProtobuf(this OrchestrationStatus status)
+ {
+ return (P.OrchestrationStatus)status;
+ }
+
+ static P.OrchestrationInstance ToProtobuf(this OrchestrationInstance instance)
+ {
+ return new P.OrchestrationInstance
+ {
+ InstanceId = instance.InstanceId,
+ ExecutionId = instance.ExecutionId,
+ };
+ }
+
+ ///
+ /// Tracks state required for converting orchestration histories containing entity-related events.
+ ///
+ internal class EntityConversionState
+ {
+ readonly bool insertMissingEntityUnlocks;
+
+ OrchestrationInstance? instance;
+ HashSet? entityRequestIds;
+ Dictionary? unlockObligations;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Whether to insert missing unlock events in to the history
+ /// when the orchestration completes.
+ public EntityConversionState(bool insertMissingEntityUnlocks)
+ {
+ this.ConvertFromProto = (P.HistoryEvent e) => ProtoUtils.ConvertHistoryEvent(e, this);
+ this.insertMissingEntityUnlocks = insertMissingEntityUnlocks;
+ }
+
+ ///
+ /// Gets a function that converts a history event in protobuf format to a core history event.
+ ///
+ public Func ConvertFromProto { get; }
+
+ ///
+ /// Gets the orchestration instance of this history.
+ ///
+ public OrchestrationInstance? CurrentInstance => this.instance;
+
+ ///
+ /// Gets the set of guids that have been used as entity request ids in this history.
+ ///
+ public HashSet EntityRequestIds => this.entityRequestIds ??= new();
+
+ ///
+ /// Records the orchestration instance, which may be needed for some conversions.
+ ///
+ /// The orchestration instance.
+ public void SetOrchestrationInstance(OrchestrationInstance instance)
+ {
+ this.instance = instance;
+ }
+
+ ///
+ /// Adds unlock obligations for all entities that are being locked by this request.
+ ///
+ /// The lock request.
+ public void AddUnlockObligations(P.EntityLockRequestedEvent request)
+ {
+ if (!this.insertMissingEntityUnlocks)
+ {
+ return;
+ }
+
+ this.unlockObligations ??= new();
+
+ foreach (string target in request.LockSet)
+ {
+ this.unlockObligations[target] = request.CriticalSectionId;
+ }
+ }
+
+ ///
+ /// Removes an unlock obligation.
+ ///
+ /// The target entity.
+ public void RemoveUnlockObligation(string target)
+ {
+ if (!this.insertMissingEntityUnlocks)
+ {
+ return;
+ }
+
+ this.unlockObligations?.Remove(target);
+ }
+
+ ///
+ /// Returns the remaining unlock obligations, and clears the list.
+ ///
+ /// The unlock obligations.
+ public IEnumerable<(string Target, string CriticalSectionId)> ResetObligations()
+ {
+ if (!this.insertMissingEntityUnlocks)
+ {
+ yield break;
+ }
+
+ if (this.unlockObligations is not null)
+ {
+ foreach (var kvp in this.unlockObligations)
+ {
+ yield return (kvp.Key, kvp.Value);
+ }
+
+ this.unlockObligations = null;
+ }
+ }
+ }
+}
diff --git a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs
index e8db1b574..cf67af96b 100644
--- a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs
+++ b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs
@@ -1,467 +1,483 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT License.
-
-using DurableTask.Core;
-using DurableTask.Core.Entities;
-using DurableTask.Core.History;
-using DurableTask.Core.Query;
-using FluentAssertions.Specialized;
-using Microsoft.DurableTask.Client.Entities;
-using Microsoft.DurableTask.Converters;
-using Microsoft.Extensions.Options;
-using Core = DurableTask.Core;
-using CoreOrchestrationQuery = DurableTask.Core.Query.OrchestrationQuery;
-using PurgeInstanceFilter = Microsoft.DurableTask.Client.PurgeInstancesFilter;
-
-namespace Microsoft.DurableTask.Client.OrchestrationServiceClientShim.Tests;
-
-public class ShimDurableTaskClientTests
-{
- readonly ShimDurableTaskClient client;
- readonly Mock orchestrationClient = new(MockBehavior.Strict);
- readonly Mock queryClient;
- readonly Mock purgeClient;
-
- public ShimDurableTaskClientTests()
- {
- this.queryClient = this.orchestrationClient.As();
- this.purgeClient = this.orchestrationClient.As();
- this.client = new("test", new ShimDurableTaskClientOptions { Client = this.orchestrationClient.Object });
- }
-
- [Fact]
- public void Ctor_NullOptions_Throws1()
- {
- IOptionsMonitor options = null!;
- Func act = () => new ShimDurableTaskClient("test", options);
- act.Should().ThrowExactly().WithParameterName("options");
-
- options = Mock.Of>();
- act = () => new ShimDurableTaskClient("test", options);
- act.Should().ThrowExactly().WithParameterName("options");
- }
-
- [Fact]
- public void Ctor_NullOptions_Throws2()
- {
- IOptionsMonitor options =
- Mock.Of>();
- Func act = () => new ShimDurableTaskClient("test", options);
- act.Should().ThrowExactly().WithParameterName("options");
- }
-
- [Fact]
- public void Ctor_NoEntitySupport_GetClientThrows()
- {
- IOrchestrationServiceClient client = Mock.Of();
- ShimDurableTaskClientOptions options = new() { Client = client };
- ShimDurableTaskClient shimClient = new("test", options);
-
- Func act = () => shimClient.Entities;
- act.Should().ThrowExactly().WithMessage("Entity support is not enabled.");
- }
-
- [Fact]
- public void Ctor_InvalidEntityOptions_GetClientThrows()
- {
- IOrchestrationServiceClient client = Mock.Of();
- ShimDurableTaskClientOptions options = new() { Client = client, EnableEntitySupport = true };
- ShimDurableTaskClient shimClient = new("test", options);
-
- Func act = () => shimClient.Entities;
- act.Should().ThrowExactly()
- .WithMessage("The configured IOrchestrationServiceClient does not support entities.");
- }
-
- [Fact]
- public void Ctor_EntitiesConfigured_GetClientSuccess()
- {
- IOrchestrationServiceClient client = Mock.Of();
- EntityBackendQueries queries = Mock.Of();
- ShimDurableTaskClientOptions options = new()
- {
- Client = client,
- EnableEntitySupport = true,
- Entities = { Queries = queries },
- };
-
- ShimDurableTaskClient shimClient = new("test", options);
- DurableEntityClient entityClient = shimClient.Entities;
-
- entityClient.Should().BeOfType();
- }
-
- [Theory]
- [InlineData(false)]
- [InlineData(true)]
- public async void GetInstanceMetadata_EmptyList_Null(bool isNull)
- {
- // arrange
- List? states = isNull ? null : new();
- string instanceId = Guid.NewGuid().ToString();
- this.orchestrationClient.Setup(m => m.GetOrchestrationStateAsync(instanceId, false)).ReturnsAsync(states);
-
- // act
- OrchestrationMetadata? result = await this.client.GetInstanceAsync(instanceId, false);
-
- // assert
- result.Should().BeNull();
- }
-
- [Theory]
- [InlineData(false)]
- [InlineData(true)]
- public async Task GetInstanceMetadata_Results(bool getInputs)
- {
- // arrange
- List states = new() { CreateState("input") };
- string instanceId = states.First().OrchestrationInstance.InstanceId;
- this.orchestrationClient.Setup(m => m.GetOrchestrationStateAsync(instanceId, false)).ReturnsAsync(states);
-
- // act
- OrchestrationMetadata? result = await this.client.GetInstanceAsync(instanceId, getInputs);
-
- // assert
- Validate(result, states.First(), getInputs);
- }
-
- [Theory]
- [InlineData(false)]
- [InlineData(true)]
- public async Task GetInstances_Results(bool getInputs)
- {
- // arrange
- DateTimeOffset utcNow = DateTimeOffset.UtcNow;
- List states =
- [
- CreateState("input", start: utcNow.AddMinutes(-1)),
- CreateState(10, "output", utcNow.AddMinutes(-5)),
- ];
-
- OrchestrationQueryResult queryResult = new(states, null);
- string instanceId = states.First().OrchestrationInstance.InstanceId;
- this.queryClient
- .Setup(m => m.GetOrchestrationWithQueryAsync(It.IsNotNull(), default))
- .ReturnsAsync(queryResult);
-
- OrchestrationQuery query = new()
- {
- CreatedFrom = utcNow.AddMinutes(-10),
- FetchInputsAndOutputs = getInputs,
- };
-
- // act
- List result = await this.client.GetAllInstancesAsync(query).ToListAsync();
-
- // assert
- this.orchestrationClient.VerifyAll();
- foreach ((OrchestrationMetadata left, Core.OrchestrationState right) in result.Zip(states))
- {
- Validate(left, right, getInputs);
- }
- }
-
- [Fact]
- public async Task PurgeInstanceMetadata()
- {
- // arrange
- string instanceId = Guid.NewGuid().ToString();
- this.purgeClient.Setup(m => m.PurgeInstanceStateAsync(instanceId)).ReturnsAsync(new Core.PurgeResult(1));
-
- // act
- PurgeResult result = await this.client.PurgeInstanceAsync(instanceId);
-
- // assert
- this.orchestrationClient.VerifyAll();
- result.PurgedInstanceCount.Should().Be(1);
- }
-
- [Fact]
- public async Task PurgeInstances()
- {
- // arrange
- PurgeInstanceFilter filter = new(CreatedTo: DateTimeOffset.UtcNow);
- this.purgeClient.Setup(m => m.PurgeInstanceStateAsync(It.IsNotNull()))
- .ReturnsAsync(new Core.PurgeResult(10));
-
- // act
- PurgeResult result = await this.client.PurgeAllInstancesAsync(filter);
-
- // assert
- this.orchestrationClient.VerifyAll();
- result.PurgedInstanceCount.Should().Be(10);
- }
-
- [Fact]
- public async Task RaiseEvent()
- {
- // arrange
- string instanceId = Guid.NewGuid().ToString();
- this.SetupClientTaskMessage(instanceId);
-
- // act
- await this.client.RaiseEventAsync(instanceId, "test-event", null, default);
-
- // assert
- this.orchestrationClient.VerifyAll();
- }
-
- [Fact]
- public async Task SuspendInstance()
- {
- // arrange
- string instanceId = Guid.NewGuid().ToString();
- this.SetupClientTaskMessage(instanceId);
-
- // act
- await this.client.SuspendInstanceAsync(instanceId, null, default);
-
- // assert
- this.orchestrationClient.VerifyAll();
- }
-
- [Fact]
- public async Task ResumeInstance()
- {
- // arrange
- string instanceId = Guid.NewGuid().ToString();
- this.SetupClientTaskMessage(instanceId);
-
- // act
- await this.client.ResumeInstanceAsync(instanceId, null, default);
-
- // assert
- this.orchestrationClient.VerifyAll();
- }
-
- [Fact]
- public async Task TerminateInstance()
- {
- // arrange
- string instanceId = Guid.NewGuid().ToString();
- this.orchestrationClient.Setup(m => m.ForceTerminateTaskOrchestrationAsync(instanceId, null))
- .Returns(Task.CompletedTask);
-
- // act
- await this.client.TerminateInstanceAsync(instanceId, null, default);
-
- // assert
- this.orchestrationClient.VerifyAll();
- }
-
- [Fact]
- public async Task WaitForInstanceCompletion()
- {
- // arrange
- Core.OrchestrationState state = CreateState("input", "output");
- this.orchestrationClient.Setup(
- m => m.WaitForOrchestrationAsync(state.OrchestrationInstance.InstanceId, null, TimeSpan.MaxValue, default))
- .ReturnsAsync(state);
-
- // act
- OrchestrationMetadata metadata = await this.client.WaitForInstanceCompletionAsync(
- state.OrchestrationInstance.InstanceId, false, default);
-
- // assert
- this.orchestrationClient.VerifyAll();
- Validate(metadata, state, false);
- }
-
- [Fact]
- public async Task WaitForInstanceStart()
- {
- // arrange
- DateTimeOffset start = DateTimeOffset.UtcNow;
- OrchestrationInstance instance = new()
- {
- InstanceId = Guid.NewGuid().ToString(),
- ExecutionId = Guid.NewGuid().ToString(),
- };
-
- Core.OrchestrationState state1 = CreateState("input", start: start);
- state1.OrchestrationInstance = instance;
- state1.OrchestrationStatus = Core.OrchestrationStatus.Pending;
- Core.OrchestrationState state2 = CreateState("input", start: start);
- state1.OrchestrationInstance = instance;
- this.orchestrationClient.SetupSequence(m => m.GetOrchestrationStateAsync(instance.InstanceId, false))
- .ReturnsAsync([state1])
- .ReturnsAsync([state2]);
-
- // act
- OrchestrationMetadata metadata = await this.client.WaitForInstanceStartAsync(
- instance.InstanceId, false, default);
-
- // assert
- this.orchestrationClient.Verify(
- m => m.GetOrchestrationStateAsync(instance.InstanceId, false), Times.Exactly(2));
- Validate(metadata, state2, false);
- }
-
- [Fact]
- public Task ScheduleNewOrchestrationInstance_IdGenerated_NoInput()
- => this.RunScheduleNewOrchestrationInstanceAsync("test", null, null);
-
- [Fact]
- public Task ScheduleNewOrchestrationInstance_IdProvided_InputProvided()
- => this.RunScheduleNewOrchestrationInstanceAsync("test", "input", new() { InstanceId = "test-id" });
-
- [Fact]
- public Task ScheduleNewOrchestrationInstance_StartAt()
- => this.RunScheduleNewOrchestrationInstanceAsync(
- "test", null, new() { StartAt = DateTimeOffset.UtcNow.AddHours(1) });
-
- static Core.OrchestrationState CreateState(
- object input, object? output = null, DateTimeOffset start = default)
- {
- string? serializedOutput = null;
- FailureDetails? failure = null;
- OrchestrationStatus status = OrchestrationStatus.Running;
-
- if (start == default)
- {
- start = DateTimeOffset.UtcNow.AddMinutes(-10);
- }
-
- switch (output)
- {
- case Exception ex:
- status = OrchestrationStatus.Failed;
- failure = new(ex.GetType().FullName!, ex.Message, ex.StackTrace, null, true);
- break;
- case not null:
- status = OrchestrationStatus.Completed;
- serializedOutput = JsonDataConverter.Default.Serialize(output);
- break;
- }
-
- return new()
- {
- CompletedTime = default,
- CreatedTime = start.UtcDateTime,
- LastUpdatedTime = start.AddMinutes(10).UtcDateTime,
- Input = JsonDataConverter.Default.Serialize(input),
- Output = serializedOutput,
- FailureDetails = failure,
- Name = "test-orchestration",
- OrchestrationInstance = new()
- {
- InstanceId = Guid.NewGuid().ToString(),
- ExecutionId = Guid.NewGuid().ToString(),
- },
- OrchestrationStatus = status,
- Status = JsonDataConverter.Default.Serialize("custom-status"),
- Version = string.Empty,
- };
- }
-
- static TaskMessage MatchStartExecutionMessage(TaskName name, object? input, StartOrchestrationOptions? options)
- {
- return Match.Create(m =>
- {
- if (m.Event is not ExecutionStartedEvent @event)
- {
- return false;
- }
-
-
- if (options?.InstanceId is string str && m.OrchestrationInstance.InstanceId != str)
- {
- return false;
- }
- else if (options?.InstanceId is null && !Guid.TryParse(m.OrchestrationInstance.InstanceId, out _))
- {
- return false;
- }
-
- if (options?.StartAt is DateTimeOffset start && @event.ScheduledStartTime != start.UtcDateTime)
- {
- return false;
- }
- else if (options?.StartAt is null && @event.ScheduledStartTime is not null)
- {
- return false;
- }
-
- return Guid.TryParse(m.OrchestrationInstance.ExecutionId, out _)
- && @event.Name == name.Name && @event.Version == name.Version
- && @event.OrchestrationInstance == m.OrchestrationInstance
- && @event.EventId == -1
- && @event.Input == JsonDataConverter.Default.Serialize(input);
- });
- }
-
- static void Validate(OrchestrationMetadata? metadata, Core.OrchestrationState? state, bool getInputs)
- {
- if (state is null)
- {
- metadata.Should().BeNull();
- return;
- }
-
- metadata.Should().NotBeNull();
- metadata!.Name.Should().Be(state.Name);
- metadata.InstanceId.Should().Be(state.OrchestrationInstance.InstanceId);
- metadata.RuntimeStatus.Should().Be(state.OrchestrationStatus.ConvertFromCore());
- metadata.CreatedAt.Should().Be(new DateTimeOffset(state.CreatedTime));
- metadata.LastUpdatedAt.Should().Be(new DateTimeOffset(state.LastUpdatedTime));
- metadata.SerializedInput.Should().Be(state.Input);
- metadata.SerializedOutput.Should().Be(state.Output);
- metadata.SerializedCustomStatus.Should().Be(state.Status);
-
- if (getInputs)
- {
- metadata.Invoking(m => m.ReadInputAs