Skip to content
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

## (Unreleased)

- - Add automatic retry on gateway timeout in `GrpcDurableTaskClient.WaitForInstanceCompletionAsync` in [#412](https://github.com/microsoft/durabletask-dotnet/pull/412))
- Add New Property Properties to TaskOrchestrationContext by @nytian in [#415](https://github.com/microsoft/durabletask-dotnet/pull/415)
- Add automatic retry on gateway timeout in `GrpcDurableTaskClient.WaitForInstanceCompletionAsync` in [#412](https://github.com/microsoft/durabletask-dotnet/pull/412))
- Add specific logging for NotFound error on worker connection by @halspang in ([#413](https://github.com/microsoft/durabletask-dotnet/pull/413))


Expand Down
5 changes: 5 additions & 0 deletions src/Abstractions/TaskOrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public abstract class TaskOrchestrationContext
/// </summary>
public virtual string Version => string.Empty;

/// <summary>
/// Gets the configuration settings for the orchestration context.
/// </summary>
public virtual IDictionary<string, object?> Properties { get; } = new Dictionary<string, object?>();

/// <summary>
/// Gets the entity feature, for interacting with entities.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions src/Grpc/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/wrappers.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";

message OrchestrationInstance {
string instanceId = 1;
Expand Down Expand Up @@ -318,6 +319,7 @@ message OrchestratorRequest {
repeated HistoryEvent newEvents = 4;
OrchestratorEntityParameters entityParameters = 5;
bool requiresHistoryStreaming = 6;
map<string, google.protobuf.Value> properties = 7;
}

message OrchestratorResponse {
Expand Down
4 changes: 2 additions & 2 deletions src/Grpc/versions.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# The following files were downloaded from branch main at 2025-03-24 23:37:31 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/c85ef11430ff8e10e21105abb545b0803bb86c66/protos/orchestrator_service.proto
# The following files were downloaded from branch main at 2025-04-23 23:27:00 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/fbe5bb20835678099fc51a44993ed9b045dee5a6/protos/orchestrator_service.proto
31 changes: 31 additions & 0 deletions src/Shared/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,37 @@ internal static T Base64Decode<T>(this MessageParser parser, string encodedMessa
failureDetails.IsNonRetriable);
}

/// <summary>
/// Converts a <see cref="Google.Protobuf.WellKnownTypes.Value"/> instance to a corresponding C# object.
/// </summary>
/// <param name="value">The Protobuf Value to convert.</param>
/// <returns>The corresponding C# object.</returns>
/// <exception cref="NotSupportedException">
/// Thrown when the Protobuf Value.KindCase is not one of the supported types.
/// </exception>
internal static object? ConvertValueToObject(Google.Protobuf.WellKnownTypes.Value value)
{
switch (value.KindCase)
{
case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.NullValue:
return null;
case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.NumberValue:
return value.NumberValue;
case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.StringValue:
return value.StringValue;
case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.BoolValue:
return value.BoolValue;
case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.StructValue:
return value.StructValue.Fields.ToDictionary(
pair => pair.Key,
pair => ConvertValueToObject(pair.Value));
case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.ListValue:
return value.ListValue.Values.Select(ConvertValueToObject).ToList();
default:
throw new NotSupportedException($"Unsupported Value kind: {value.KindCase}");
}
}

/// <summary>
/// Converts a <see cref="FailureDetails" /> to a grpc <see cref="P.TaskFailureDetails" />.
/// </summary>
Expand Down
23 changes: 23 additions & 0 deletions src/Worker/Core/Shims/DurableTaskShimFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,29 @@ public TaskOrchestration CreateOrchestration(
return new TaskOrchestrationShim(context, orchestrator);
}

/// <summary>
/// Creates a <see cref="TaskOrchestration" /> from a <see cref="ITaskOrchestrator" />.
/// </summary>
/// <param name="name">
/// The name of the orchestration. This should be the name the orchestration was invoked with.
/// </param>
/// <param name="orchestrator">The orchestration to wrap.</param>
/// <param name ="properties">Configuration for the orchestration.</param>
/// <param name="parent">The orchestration parent details or <c>null</c> if no parent.</param>
/// <returns>A new <see cref="TaskOrchestration" />.</returns>
public TaskOrchestration CreateOrchestration(
TaskName name,
ITaskOrchestrator orchestrator,
Dictionary<string, object?> properties,
ParentOrchestrationInstance? parent = null)
{
Check.NotDefault(name);
Check.NotNull(orchestrator);
Check.NotNull(properties);
OrchestrationInvocationContext context = new(name, this.options, this.loggerFactory, parent);
return new TaskOrchestrationShim(context, orchestrator, properties);
}

/// <summary>
/// Creates a <see cref="TaskOrchestration" /> from a <see cref="ITaskOrchestrator" />.
/// </summary>
Expand Down
27 changes: 27 additions & 0 deletions src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,31 @@ public TaskOrchestrationContextWrapper(
OrchestrationContext innerContext,
OrchestrationInvocationContext invocationContext,
object? deserializedInput)
: this(innerContext, invocationContext, deserializedInput, new Dictionary<string, object?>())
{
}

/// <summary>
/// Initializes a new instance of the <see cref="TaskOrchestrationContextWrapper"/> class.
/// </summary>
/// <param name="innerContext">The inner orchestration context.</param>
/// <param name="invocationContext">The invocation context.</param>
/// <param name="deserializedInput">The deserialized input.</param>
/// <param name="properties">The configuration for context.</param>
public TaskOrchestrationContextWrapper(
OrchestrationContext innerContext,
OrchestrationInvocationContext invocationContext,
object? deserializedInput,
IEnumerable<KeyValuePair<string, object?>> properties)
{
this.innerContext = Check.NotNull(innerContext);
this.invocationContext = Check.NotNull(invocationContext);
if (properties is null)
{
throw new ArgumentNullException(nameof(properties));
}

this.Properties = properties.ToDictionary(pair => pair.Key, pair => pair.Value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting from IEnumerable<KVP> to a dictionary is dangerous because IEnumerable<KVP> allows duplicate keys whereas dictionary does not. We should change the properties parameter to be a dictionary instead of IEnumerable<KVP> to avoid these kinds of problems.

Copy link
Member

@jviau jviau Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is safe if you use a foreach loop and set the key each time, so last key wins. My preference for IEnumerable is only to follow the "take in least derived, return most derived" principal. But I am not unmoving in that. If you disagree and want IDictionary, that is fine.

Copy link
Contributor Author

@nytian nytian Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Chris and Jacob. I updated this to be Dictionary. Because to write a foreach loop, I have to either grant Properties private set, or adding a new field to store this and then pass the value to Properties. Just feel like using Dictionary directly here seems like a more straightforward and cleaner way in this case. Let me know if you prefer others.

Copy link
Member

@jviau jviau Apr 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am okay with going back to dictionary, but I don't see why you need a private set. The following is valid without any setter on Properties:

this.Properties = new();
foreach ((string key, object? value) in properties)
{
    this.Properties[key] = value;
}

or if this.Properties is IReadOnlyDictionary<string, object?>, then you can do this:

Dictionary<string, object?> props = new();
foreach ((string key, object? value) in properties)
{
    props[key] = value;
}

this.Properties = props;

this.logger = this.CreateReplaySafeLogger("Microsoft.DurableTask");
this.deserializedInput = deserializedInput;
}
Expand All @@ -60,6 +82,11 @@ public TaskOrchestrationContextWrapper(
/// <inheritdoc/>
public override DateTime CurrentUtcDateTime => this.innerContext.CurrentUtcDateTime;

/// <summary>
/// Gets the configuration settings for the orchestration.
/// </summary>
public override IDictionary<string, object?> Properties { get; } = new Dictionary<string, object?>();

/// <inheritdoc/>
public override TaskOrchestrationEntityFeature Entities
{
Expand Down
18 changes: 17 additions & 1 deletion src/Worker/Core/Shims/TaskOrchestrationShim.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ partial class TaskOrchestrationShim : TaskOrchestration
readonly ITaskOrchestrator implementation;
readonly OrchestrationInvocationContext invocationContext;
readonly ILogger logger;
readonly Dictionary<string, object?> properties = new();

TaskOrchestrationContextWrapper? wrapperContext;

Expand All @@ -30,9 +31,24 @@ partial class TaskOrchestrationShim : TaskOrchestration
public TaskOrchestrationShim(
OrchestrationInvocationContext invocationContext,
ITaskOrchestrator implementation)
: this(invocationContext, implementation, new Dictionary<string, object?>())
{
}

/// <summary>
/// Initializes a new instance of the <see cref="TaskOrchestrationShim"/> class.
/// </summary>
/// <param name="invocationContext">The invocation context for this orchestration.</param>
/// <param name="implementation">The orchestration's implementation.</param>
/// <param name="properties">Configuration for the orchestration.</param>
public TaskOrchestrationShim(
OrchestrationInvocationContext invocationContext,
ITaskOrchestrator implementation,
Dictionary<string, object?> properties)
{
this.invocationContext = Check.NotNull(invocationContext);
this.implementation = Check.NotNull(implementation);
this.properties = Check.NotNull(properties);

this.logger = Logs.CreateWorkerLogger(this.invocationContext.LoggerFactory, "Orchestrations");
}
Expand All @@ -48,7 +64,7 @@ public TaskOrchestrationShim(
innerContext.ErrorDataConverter = converterShim;

object? input = this.DataConverter.Deserialize(rawInput, this.implementation.InputType);
this.wrapperContext = new(innerContext, this.invocationContext, input);
this.wrapperContext = new(innerContext, this.invocationContext, input, this.properties);

string instanceId = innerContext.OrchestrationInstance.InstanceId;
if (!innerContext.IsReplaying)
Expand Down
5 changes: 4 additions & 1 deletion src/Worker/Grpc/GrpcOrchestrationRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public static string LoadAndRun(

List<HistoryEvent> pastEvents = request.PastEvents.Select(ProtoUtils.ConvertHistoryEvent).ToList();
IEnumerable<HistoryEvent> newEvents = request.NewEvents.Select(ProtoUtils.ConvertHistoryEvent);
Dictionary<string, object?> properties = request.Properties.ToDictionary(
pair => pair.Key,
pair => ProtoUtils.ConvertValueToObject(pair.Value));

// Re-construct the orchestration state from the history.
// New events must be added using the AddEvent method.
Expand All @@ -108,7 +111,7 @@ public static string LoadAndRun(
DurableTaskShimFactory factory = services is null
? DurableTaskShimFactory.Default
: ActivatorUtilities.GetServiceOrCreateInstance<DurableTaskShimFactory>(services);
TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, parent);
TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, properties, parent);
TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails);
OrchestratorExecutionResult result = executor.Execute();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public TestTaskOrchestrationContext(string version)

protected override ILoggerFactory LoggerFactory => throw new NotImplementedException();

public override Dictionary<string, object?> Properties => throw new NotImplementedException();

public override Task<TResult> CallActivityAsync<TResult>(TaskName name, object? input = null, TaskOptions? options = null)
{
throw new NotImplementedException();
Expand Down
Loading