Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
Expand Down Expand Up @@ -174,13 +175,20 @@ async Task<IActionResult> IDurableOrchestrationClient.WaitForCompletionOrCreateC
}

/// <inheritdoc />
async Task<string> IDurableOrchestrationClient.StartNewAsync<T>(string orchestratorFunctionName, string instanceId, T input)
async Task<string> IDurableOrchestrationClient.StartNewAsync(DurableOrchestrationOptions options)
{
if (options == null)
{
throw new ArgumentNullException(nameof(options), "Orchestration options cannot be null.");
}

if (this.ClientReferencesCurrentApp(this))
{
this.config?.ThrowIfFunctionDoesNotExist(orchestratorFunctionName, FunctionType.Orchestrator);
this.config?.ThrowIfFunctionDoesNotExist(options.OrchestratorFunctionName, FunctionType.Orchestrator);
}

string instanceId = options.InstanceId;

if (string.IsNullOrEmpty(instanceId))
{
instanceId = Guid.NewGuid().ToString("N");
Expand All @@ -201,11 +209,11 @@ async Task<string> IDurableOrchestrationClient.StartNewAsync<T>(string orchestra

OrchestrationStatus[] dedupeStatuses = this.GetStatusesNotToOverride();
Task<OrchestrationInstance> createTask = this.client.CreateOrchestrationInstanceAsync(
orchestratorFunctionName, this.durableTaskOptions.DefaultVersion, instanceId, input, null, dedupeStatuses);
options.OrchestratorFunctionName, this.durableTaskOptions.DefaultVersion, instanceId, options?.Input, options.Tags?.ToDictionary(kvp => kvp.Key, kvp => kvp.Value), dedupeStatuses);

this.traceHelper.FunctionScheduled(
this.TaskHubName,
orchestratorFunctionName,
options.OrchestratorFunctionName,
instanceId,
reason: "NewInstance",
functionType: FunctionType.Orchestrator,
Expand All @@ -215,6 +223,12 @@ async Task<string> IDurableOrchestrationClient.StartNewAsync<T>(string orchestra
return instance.InstanceId;
}

/// <inheritdoc />
Task<string> IDurableOrchestrationClient.StartNewAsync<T>(string orchestratorFunctionName, string instanceId, T input)
{
return ((IDurableOrchestrationClient)this).StartNewAsync(new DurableOrchestrationOptions(orchestratorFunctionName) { Input = input, InstanceId = instanceId });
}

private OrchestrationStatus[] GetStatusesNotToOverride()
{
OverridableStates overridableStates = this.durableTaskOptions.OverridableExistingInstanceStates;
Expand Down Expand Up @@ -1067,7 +1081,9 @@ internal HttpResponseMessage CreateCheckStatusResponse(

private static void TrackNameAndScheduledTime(JObject historyItem, EventType eventType, int index, Dictionary<string, EventIndexDateMapping> eventMapper)
{
eventMapper.Add($"{eventType}_{historyItem["EventId"]}", new EventIndexDateMapping { Index = index, Name = (string)historyItem["Name"], Date = (DateTime)historyItem["Timestamp"], Input = (string)historyItem["Input"] });
JObject tags = historyItem["Tags"] as JObject;

eventMapper.Add($"{eventType}_{historyItem["EventId"]}", new EventIndexDateMapping { Index = index, Name = (string)historyItem["Name"], Date = (DateTime)historyItem["Timestamp"], Input = (string)historyItem["Input"], Tags = tags });
}

private static void AddScheduledEventDataAndAggregate(ref Dictionary<string, EventIndexDateMapping> eventMapper, string prefix, JToken historyItem, List<int> indexList, bool showInput)
Expand All @@ -1081,6 +1097,11 @@ private static void AddScheduledEventDataAndAggregate(ref Dictionary<string, Eve
historyItem["Input"] = taskScheduledData.Input;
}

if (taskScheduledData.Tags is not null)
{
historyItem["Tags"] = taskScheduledData.Tags;
}

indexList.Add(taskScheduledData.Index);
}
}
Expand All @@ -1098,6 +1119,9 @@ internal static DurableOrchestrationStatus ConvertOrchestrationStateToStatus(Orc
Input = ParseToJToken(orchestrationState.Input),
Output = ParseToJToken(orchestrationState.Output),
History = historyArray,
Tags = orchestrationState.Tags is not null
? new Dictionary<string, string>(orchestrationState.Tags)
: ImmutableDictionary<string, string>.Empty,
};
}

Expand Down Expand Up @@ -1139,13 +1163,13 @@ private static void ConvertOutputToJToken(JObject jsonObject, bool showHistoryOu
/// <inheritdoc/>
Task<string> IDurableOrchestrationClient.StartNewAsync(string orchestratorFunctionName, string instanceId)
{
return ((IDurableOrchestrationClient)this).StartNewAsync<object>(orchestratorFunctionName, instanceId, null);
return ((IDurableOrchestrationClient)this).StartNewAsync(new DurableOrchestrationOptions(orchestratorFunctionName) { InstanceId = instanceId });
}

/// <inheritdoc/>
Task<string> IDurableOrchestrationClient.StartNewAsync<T>(string orchestratorFunctionName, T input)
{
return ((IDurableOrchestrationClient)this).StartNewAsync<T>(orchestratorFunctionName, string.Empty, input);
return ((IDurableOrchestrationClient)this).StartNewAsync(new DurableOrchestrationOptions(orchestratorFunctionName) { Input = input });
}

async Task<string> IDurableOrchestrationClient.RestartAsync(string instanceId, bool restartWithNewInstanceId)
Expand Down Expand Up @@ -1235,6 +1259,8 @@ private class EventIndexDateMapping
public string Name { get; set; }

public string Input { get; set; }

public JObject Tags { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ Task<TResult> IDurableOrchestrationContext.CallSubOrchestratorAsync<TResult>(str
/// <inheritdoc />
Task<TResult> IDurableOrchestrationContext.CallSubOrchestratorAsync<TResult>(string functionName, string instanceId, object input)
{
return this.CallDurableTaskFunctionAsync<TResult>(functionName, FunctionType.Orchestrator, false, instanceId, null, null, input, null);
return this.CallDurableTaskFunctionAsync<TResult>(functionName, FunctionType.Orchestrator, false, instanceId, null, null, input, null, new Dictionary<string, string>());
}

/// <inheritdoc />
Expand All @@ -257,7 +257,7 @@ Task<TResult> IDurableOrchestrationContext.CallSubOrchestratorWithRetryAsync<TRe
throw new ArgumentNullException(nameof(retryOptions));
}

return this.CallDurableTaskFunctionAsync<TResult>(functionName, FunctionType.Orchestrator, false, instanceId, null, retryOptions, input, null);
return this.CallDurableTaskFunctionAsync<TResult>(functionName, FunctionType.Orchestrator, false, instanceId, null, retryOptions, input, null, new Dictionary<string, string>());
}

Task<DurableHttpResponse> IDurableOrchestrationContext.CallHttpAsync(HttpMethod method, Uri uri, string content, HttpRetryOptions retryOptions)
Expand Down Expand Up @@ -317,7 +317,8 @@ private async Task<DurableHttpResponse> ScheduleDurableHttpActivityAsync(Durable
operation: null,
retryOptions: req.HttpRetryOptions?.GetRetryOptions(),
input: req,
scheduledTimeUtc: null);
scheduledTimeUtc: null,
tags: new Dictionary<string, string>());

return durableHttpResponse;
}
Expand Down Expand Up @@ -449,10 +450,16 @@ Task<T> IDurableOrchestrationContext.WaitForExternalEvent<T>(string name, TimeSp
return this.WaitForExternalEvent(name, timeout, timedOutAction, cancelToken);
}

/// <inheritdoc />
Task<TResult> IDurableOrchestrationContext.CallActivityAsync<TResult>(DurableActivityOptions options)
{
return this.CallDurableTaskFunctionAsync<TResult>(options.FunctionName, FunctionType.Activity, false, null, null, options.RetryOptions, options.Input, null, options.Tags);
}

/// <inheritdoc />
Task<TResult> IDurableOrchestrationContext.CallActivityAsync<TResult>(string functionName, object input)
{
return this.CallDurableTaskFunctionAsync<TResult>(functionName, FunctionType.Activity, false, null, null, null, input, null);
return this.CallDurableTaskFunctionAsync<TResult>(functionName, FunctionType.Activity, false, null, null, null, input, null, new Dictionary<string, string>());
}

/// <inheritdoc />
Expand All @@ -463,7 +470,7 @@ Task<TResult> IDurableOrchestrationContext.CallActivityWithRetryAsync<TResult>(s
throw new ArgumentNullException(nameof(retryOptions));
}

return this.CallDurableTaskFunctionAsync<TResult>(functionName, FunctionType.Activity, false, null, null, retryOptions, input, null);
return this.CallDurableTaskFunctionAsync<TResult>(functionName, FunctionType.Activity, false, null, null, retryOptions, input, null, new Dictionary<string, string>());
}

/// <inheritdoc/>
Expand All @@ -488,7 +495,7 @@ void IDurableOrchestrationContext.SignalEntity(EntityId entity, string operation
throw new ArgumentNullException(nameof(operationName));
}

var alreadyCompletedTask = this.CallDurableTaskFunctionAsync<object>(entity.EntityName, FunctionType.Entity, true, EntityId.GetSchedulerIdFromEntityId(entity), operationName, null, operationInput, null);
var alreadyCompletedTask = this.CallDurableTaskFunctionAsync<object>(entity.EntityName, FunctionType.Entity, true, EntityId.GetSchedulerIdFromEntityId(entity), operationName, null, operationInput, null, new Dictionary<string, string>());
System.Diagnostics.Debug.Assert(alreadyCompletedTask.IsCompleted, "signaling entities is synchronous");

try
Expand All @@ -510,7 +517,7 @@ void IDurableOrchestrationContext.SignalEntity(EntityId entity, DateTime startTi
throw new ArgumentNullException(nameof(operationName));
}

var alreadyCompletedTask = this.CallDurableTaskFunctionAsync<object>(entity.EntityName, FunctionType.Entity, true, EntityId.GetSchedulerIdFromEntityId(entity), operationName, null, operationInput, startTime);
var alreadyCompletedTask = this.CallDurableTaskFunctionAsync<object>(entity.EntityName, FunctionType.Entity, true, EntityId.GetSchedulerIdFromEntityId(entity), operationName, null, operationInput, startTime, new Dictionary<string, string>());
System.Diagnostics.Debug.Assert(alreadyCompletedTask.IsCompleted, "scheduling operations on entities is synchronous");

try
Expand All @@ -532,7 +539,7 @@ string IDurableOrchestrationContext.StartNewOrchestration(string functionName, o
#endif
this.ThrowIfInvalidAccess();
var actualInstanceId = string.IsNullOrEmpty(instanceId) ? this.NewGuid().ToString() : instanceId;
var alreadyCompletedTask = this.CallDurableTaskFunctionAsync<string>(functionName, FunctionType.Orchestrator, true, actualInstanceId, null, null, input, null);
var alreadyCompletedTask = this.CallDurableTaskFunctionAsync<string>(functionName, FunctionType.Orchestrator, true, actualInstanceId, null, null, input, null, new Dictionary<string, string>());
System.Diagnostics.Debug.Assert(alreadyCompletedTask.IsCompleted, "starting orchestrations is synchronous");
return actualInstanceId;
}
Expand All @@ -545,7 +552,8 @@ internal async Task<TResult> CallDurableTaskFunctionAsync<TResult>(
string operation,
RetryOptions retryOptions,
object input,
DateTime? scheduledTimeUtc)
DateTime? scheduledTimeUtc,
IReadOnlyDictionary<string, string> tags)
{
this.ThrowIfInvalidAccess();

Expand Down Expand Up @@ -581,19 +589,22 @@ internal async Task<TResult> CallDurableTaskFunctionAsync<TResult>(
System.Diagnostics.Debug.Assert(instanceId == null, "The instanceId parameter should not be used for activity functions.");
System.Diagnostics.Debug.Assert(operation == null, "The operation parameter should not be used for activity functions.");
System.Diagnostics.Debug.Assert(!oneWay, "The oneWay parameter should not be used for activity functions.");
this.IncrementActionsOrThrowException();
Task<TResult> RetryCall() =>
this.InnerContext
.ScheduleTask<TResult>(
functionName,
version,
ScheduleTaskOptions.CreateBuilder().WithTags(new Dictionary<string, string>(tags)).Build(),
input);
if (retryOptions == null)
{
this.IncrementActionsOrThrowException();
callTask = this.InnerContext.ScheduleTask<TResult>(functionName, version, input);
callTask = RetryCall();
}
else
{
this.IncrementActionsOrThrowException();
callTask = this.InnerContext.ScheduleWithRetry<TResult>(
functionName,
version,
retryOptions.GetRetryOptions(),
input);
var retryInterceptor = new RetryInterceptor<TResult>(this.InnerContext, retryOptions.GetRetryOptions(), RetryCall);
callTask = retryInterceptor.Invoke();
}

break;
Expand Down Expand Up @@ -1071,14 +1082,14 @@ void IDurableOrchestrationContext.ContinueAsNew(object input, bool preserveUnpro
Task<TResult> IDurableOrchestrationContext.CallEntityAsync<TResult>(EntityId entityId, string operationName, object operationInput)
{
this.ThrowIfInvalidAccess();
return this.CallDurableTaskFunctionAsync<TResult>(entityId.EntityName, FunctionType.Entity, false, EntityId.GetSchedulerIdFromEntityId(entityId), operationName, null, operationInput, null);
return this.CallDurableTaskFunctionAsync<TResult>(entityId.EntityName, FunctionType.Entity, false, EntityId.GetSchedulerIdFromEntityId(entityId), operationName, null, operationInput, null, new Dictionary<string, string>());
}

/// <inheritdoc/>
Task IDurableOrchestrationContext.CallEntityAsync(EntityId entityId, string operationName, object operationInput)
{
this.ThrowIfInvalidAccess();
return this.CallDurableTaskFunctionAsync<object>(entityId.EntityName, FunctionType.Entity, false, EntityId.GetSchedulerIdFromEntityId(entityId), operationName, null, operationInput, null);
return this.CallDurableTaskFunctionAsync<object>(entityId.EntityName, FunctionType.Entity, false, EntityId.GetSchedulerIdFromEntityId(entityId), operationName, null, operationInput, null, new Dictionary<string, string>());
}

/// <inheritdoc/>
Expand Down Expand Up @@ -1411,5 +1422,12 @@ private interface IEventTaskCompletionSource
/// <param name="result">The result.</param>
void TrySetResult(object result);
}

private Task<T> ScheduleWithRetry<T>(string name, string version, ScheduleTaskOptions options, params object[] parameters)
{
Task<T> RetryCall() => this.InnerContext.ScheduleTask<T>(name, version, options, parameters);
var retryInterceptor = new RetryInterceptor<T>(this.InnerContext, options.RetryOptions, RetryCall);
return retryInterceptor.Invoke();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System.Collections.Generic;
using System.Collections.Immutable;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
/// <summary>
/// Options for starting a durable activity.
/// </summary>
public sealed class DurableActivityOptions
{
/// <summary>
/// Initializes a new instance of the <see cref="DurableActivityOptions"/> class.
/// </summary>
/// <param name="functionName">The name of the activity function to invoke.</param>
public DurableActivityOptions(string functionName)
{
this.FunctionName = functionName;
}

/// <summary>
/// Gets the name of the activity function to invoke.
/// </summary>
public string FunctionName { get; }

/// <summary>
/// Gets or sets the input to the activity.
/// </summary>
public object Input { get; init; }

/// <summary>
/// Gets or sets the retry options for the activity.
/// </summary>
public RetryOptions RetryOptions { get; init; }

/// <summary>
/// Gets the tags associated with the activity.
/// </summary>
public IReadOnlyDictionary<string, string> Tags { get; init; } = ImmutableDictionary<string, string>.Empty;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System.Collections.Generic;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
/// <summary>
/// Options for starting a durable orchestration.
/// </summary>
public sealed class DurableOrchestrationOptions
{
/// <summary>
/// Initializes a new instance of the <see cref="DurableOrchestrationOptions"/> class.
/// </summary>
/// <param name="orchestratorFunctionName">The name of the orchestrator function to start.</param>
public DurableOrchestrationOptions(string orchestratorFunctionName)
{
this.OrchestratorFunctionName = orchestratorFunctionName;
}

/// <summary>
/// JSON-serializeable input value for the orchestrator function.
/// </summary>
public object Input { get; init; }

/// <summary>
/// Gets or sets the ID for the new orchestration.
/// </summary>
public string InstanceId { get; init; }

/// <summary>
/// Gets the name of the orchestrator function to start.
/// </summary>
public string OrchestratorFunctionName { get; }

/// <summary>
/// Gets or sets tags to associate with the new orchestration.
/// </summary>
public IReadOnlyDictionary<string, string> Tags { get; init; } = new Dictionary<string, string>();
}
}
Loading
Loading