diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs index a2fcc298c..990f7df5f 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.Collections.Immutable; using System.Diagnostics; using System.Linq; using System.Net.Http; @@ -174,13 +175,20 @@ async Task IDurableOrchestrationClient.WaitForCompletionOrCreateC } /// - async Task IDurableOrchestrationClient.StartNewAsync(string orchestratorFunctionName, string instanceId, T input) + async Task IDurableOrchestrationClient.StartNewAsync(DurableOrchestrationOptions options) { + if (options == null) + { + throw new ArgumentNullException(nameof(options), "Orchestration options cannot be null."); + } + if (this.ClientReferencesCurrentApp(this)) { - this.config?.ThrowIfFunctionDoesNotExist(orchestratorFunctionName, FunctionType.Orchestrator); + this.config?.ThrowIfFunctionDoesNotExist(options.OrchestratorFunctionName, FunctionType.Orchestrator); } + string instanceId = options.InstanceId; + if (string.IsNullOrEmpty(instanceId)) { instanceId = Guid.NewGuid().ToString("N"); @@ -201,11 +209,11 @@ async Task IDurableOrchestrationClient.StartNewAsync(string orchestra OrchestrationStatus[] dedupeStatuses = this.GetStatusesNotToOverride(); Task createTask = this.client.CreateOrchestrationInstanceAsync( - orchestratorFunctionName, this.durableTaskOptions.DefaultVersion, instanceId, input, null, dedupeStatuses); + options.OrchestratorFunctionName, this.durableTaskOptions.DefaultVersion, instanceId, options?.Input, options.Tags?.ToDictionary(kvp => kvp.Key, kvp => kvp.Value), dedupeStatuses); this.traceHelper.FunctionScheduled( this.TaskHubName, - orchestratorFunctionName, + options.OrchestratorFunctionName, instanceId, reason: "NewInstance", functionType: FunctionType.Orchestrator, @@ -215,6 +223,12 @@ async Task IDurableOrchestrationClient.StartNewAsync(string orchestra return instance.InstanceId; } + /// + Task IDurableOrchestrationClient.StartNewAsync(string orchestratorFunctionName, string instanceId, T input) + { + return ((IDurableOrchestrationClient)this).StartNewAsync(new DurableOrchestrationOptions(orchestratorFunctionName) { Input = input, InstanceId = instanceId }); + } + private OrchestrationStatus[] GetStatusesNotToOverride() { OverridableStates overridableStates = this.durableTaskOptions.OverridableExistingInstanceStates; @@ -1067,7 +1081,9 @@ internal HttpResponseMessage CreateCheckStatusResponse( private static void TrackNameAndScheduledTime(JObject historyItem, EventType eventType, int index, Dictionary eventMapper) { - eventMapper.Add($"{eventType}_{historyItem["EventId"]}", new EventIndexDateMapping { Index = index, Name = (string)historyItem["Name"], Date = (DateTime)historyItem["Timestamp"], Input = (string)historyItem["Input"] }); + JObject tags = historyItem["Tags"] as JObject; + + eventMapper.Add($"{eventType}_{historyItem["EventId"]}", new EventIndexDateMapping { Index = index, Name = (string)historyItem["Name"], Date = (DateTime)historyItem["Timestamp"], Input = (string)historyItem["Input"], Tags = tags }); } private static void AddScheduledEventDataAndAggregate(ref Dictionary eventMapper, string prefix, JToken historyItem, List indexList, bool showInput) @@ -1081,6 +1097,11 @@ private static void AddScheduledEventDataAndAggregate(ref Dictionary(orchestrationState.Tags) + : ImmutableDictionary.Empty, }; } @@ -1139,13 +1163,13 @@ private static void ConvertOutputToJToken(JObject jsonObject, bool showHistoryOu /// Task IDurableOrchestrationClient.StartNewAsync(string orchestratorFunctionName, string instanceId) { - return ((IDurableOrchestrationClient)this).StartNewAsync(orchestratorFunctionName, instanceId, null); + return ((IDurableOrchestrationClient)this).StartNewAsync(new DurableOrchestrationOptions(orchestratorFunctionName) { InstanceId = instanceId }); } /// Task IDurableOrchestrationClient.StartNewAsync(string orchestratorFunctionName, T input) { - return ((IDurableOrchestrationClient)this).StartNewAsync(orchestratorFunctionName, string.Empty, input); + return ((IDurableOrchestrationClient)this).StartNewAsync(new DurableOrchestrationOptions(orchestratorFunctionName) { Input = input }); } async Task IDurableOrchestrationClient.RestartAsync(string instanceId, bool restartWithNewInstanceId) @@ -1235,6 +1259,8 @@ private class EventIndexDateMapping public string Name { get; set; } public string Input { get; set; } + + public JObject Tags { get; set; } } } } diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs index 485c7210b..482822d03 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs @@ -246,7 +246,7 @@ Task IDurableOrchestrationContext.CallSubOrchestratorAsync(str /// Task IDurableOrchestrationContext.CallSubOrchestratorAsync(string functionName, string instanceId, object input) { - return this.CallDurableTaskFunctionAsync(functionName, FunctionType.Orchestrator, false, instanceId, null, null, input, null); + return this.CallDurableTaskFunctionAsync(functionName, FunctionType.Orchestrator, false, instanceId, null, null, input, null, new Dictionary()); } /// @@ -257,7 +257,7 @@ Task IDurableOrchestrationContext.CallSubOrchestratorWithRetryAsync(functionName, FunctionType.Orchestrator, false, instanceId, null, retryOptions, input, null); + return this.CallDurableTaskFunctionAsync(functionName, FunctionType.Orchestrator, false, instanceId, null, retryOptions, input, null, new Dictionary()); } Task IDurableOrchestrationContext.CallHttpAsync(HttpMethod method, Uri uri, string content, HttpRetryOptions retryOptions) @@ -317,7 +317,8 @@ private async Task ScheduleDurableHttpActivityAsync(Durable operation: null, retryOptions: req.HttpRetryOptions?.GetRetryOptions(), input: req, - scheduledTimeUtc: null); + scheduledTimeUtc: null, + tags: new Dictionary()); return durableHttpResponse; } @@ -449,10 +450,16 @@ Task IDurableOrchestrationContext.WaitForExternalEvent(string name, TimeSp return this.WaitForExternalEvent(name, timeout, timedOutAction, cancelToken); } + /// + Task IDurableOrchestrationContext.CallActivityAsync(DurableActivityOptions options) + { + return this.CallDurableTaskFunctionAsync(options.FunctionName, FunctionType.Activity, false, null, null, options.RetryOptions, options.Input, null, options.Tags); + } + /// Task IDurableOrchestrationContext.CallActivityAsync(string functionName, object input) { - return this.CallDurableTaskFunctionAsync(functionName, FunctionType.Activity, false, null, null, null, input, null); + return this.CallDurableTaskFunctionAsync(functionName, FunctionType.Activity, false, null, null, null, input, null, new Dictionary()); } /// @@ -463,7 +470,7 @@ Task IDurableOrchestrationContext.CallActivityWithRetryAsync(s throw new ArgumentNullException(nameof(retryOptions)); } - return this.CallDurableTaskFunctionAsync(functionName, FunctionType.Activity, false, null, null, retryOptions, input, null); + return this.CallDurableTaskFunctionAsync(functionName, FunctionType.Activity, false, null, null, retryOptions, input, null, new Dictionary()); } /// @@ -488,7 +495,7 @@ void IDurableOrchestrationContext.SignalEntity(EntityId entity, string operation throw new ArgumentNullException(nameof(operationName)); } - var alreadyCompletedTask = this.CallDurableTaskFunctionAsync(entity.EntityName, FunctionType.Entity, true, EntityId.GetSchedulerIdFromEntityId(entity), operationName, null, operationInput, null); + var alreadyCompletedTask = this.CallDurableTaskFunctionAsync(entity.EntityName, FunctionType.Entity, true, EntityId.GetSchedulerIdFromEntityId(entity), operationName, null, operationInput, null, new Dictionary()); System.Diagnostics.Debug.Assert(alreadyCompletedTask.IsCompleted, "signaling entities is synchronous"); try @@ -510,7 +517,7 @@ void IDurableOrchestrationContext.SignalEntity(EntityId entity, DateTime startTi throw new ArgumentNullException(nameof(operationName)); } - var alreadyCompletedTask = this.CallDurableTaskFunctionAsync(entity.EntityName, FunctionType.Entity, true, EntityId.GetSchedulerIdFromEntityId(entity), operationName, null, operationInput, startTime); + var alreadyCompletedTask = this.CallDurableTaskFunctionAsync(entity.EntityName, FunctionType.Entity, true, EntityId.GetSchedulerIdFromEntityId(entity), operationName, null, operationInput, startTime, new Dictionary()); System.Diagnostics.Debug.Assert(alreadyCompletedTask.IsCompleted, "scheduling operations on entities is synchronous"); try @@ -532,7 +539,7 @@ string IDurableOrchestrationContext.StartNewOrchestration(string functionName, o #endif this.ThrowIfInvalidAccess(); var actualInstanceId = string.IsNullOrEmpty(instanceId) ? this.NewGuid().ToString() : instanceId; - var alreadyCompletedTask = this.CallDurableTaskFunctionAsync(functionName, FunctionType.Orchestrator, true, actualInstanceId, null, null, input, null); + var alreadyCompletedTask = this.CallDurableTaskFunctionAsync(functionName, FunctionType.Orchestrator, true, actualInstanceId, null, null, input, null, new Dictionary()); System.Diagnostics.Debug.Assert(alreadyCompletedTask.IsCompleted, "starting orchestrations is synchronous"); return actualInstanceId; } @@ -545,7 +552,8 @@ internal async Task CallDurableTaskFunctionAsync( string operation, RetryOptions retryOptions, object input, - DateTime? scheduledTimeUtc) + DateTime? scheduledTimeUtc, + IReadOnlyDictionary tags) { this.ThrowIfInvalidAccess(); @@ -581,19 +589,22 @@ internal async Task CallDurableTaskFunctionAsync( System.Diagnostics.Debug.Assert(instanceId == null, "The instanceId parameter should not be used for activity functions."); System.Diagnostics.Debug.Assert(operation == null, "The operation parameter should not be used for activity functions."); System.Diagnostics.Debug.Assert(!oneWay, "The oneWay parameter should not be used for activity functions."); + this.IncrementActionsOrThrowException(); + Task RetryCall() => + this.InnerContext + .ScheduleTask( + functionName, + version, + ScheduleTaskOptions.CreateBuilder().WithTags(new Dictionary(tags)).Build(), + input); if (retryOptions == null) { - this.IncrementActionsOrThrowException(); - callTask = this.InnerContext.ScheduleTask(functionName, version, input); + callTask = RetryCall(); } else { - this.IncrementActionsOrThrowException(); - callTask = this.InnerContext.ScheduleWithRetry( - functionName, - version, - retryOptions.GetRetryOptions(), - input); + var retryInterceptor = new RetryInterceptor(this.InnerContext, retryOptions.GetRetryOptions(), RetryCall); + callTask = retryInterceptor.Invoke(); } break; @@ -1071,14 +1082,14 @@ void IDurableOrchestrationContext.ContinueAsNew(object input, bool preserveUnpro Task IDurableOrchestrationContext.CallEntityAsync(EntityId entityId, string operationName, object operationInput) { this.ThrowIfInvalidAccess(); - return this.CallDurableTaskFunctionAsync(entityId.EntityName, FunctionType.Entity, false, EntityId.GetSchedulerIdFromEntityId(entityId), operationName, null, operationInput, null); + return this.CallDurableTaskFunctionAsync(entityId.EntityName, FunctionType.Entity, false, EntityId.GetSchedulerIdFromEntityId(entityId), operationName, null, operationInput, null, new Dictionary()); } /// Task IDurableOrchestrationContext.CallEntityAsync(EntityId entityId, string operationName, object operationInput) { this.ThrowIfInvalidAccess(); - return this.CallDurableTaskFunctionAsync(entityId.EntityName, FunctionType.Entity, false, EntityId.GetSchedulerIdFromEntityId(entityId), operationName, null, operationInput, null); + return this.CallDurableTaskFunctionAsync(entityId.EntityName, FunctionType.Entity, false, EntityId.GetSchedulerIdFromEntityId(entityId), operationName, null, operationInput, null, new Dictionary()); } /// @@ -1411,5 +1422,12 @@ private interface IEventTaskCompletionSource /// The result. void TrySetResult(object result); } + + private Task ScheduleWithRetry(string name, string version, ScheduleTaskOptions options, params object[] parameters) + { + Task RetryCall() => this.InnerContext.ScheduleTask(name, version, options, parameters); + var retryInterceptor = new RetryInterceptor(this.InnerContext, options.RetryOptions, RetryCall); + return retryInterceptor.Invoke(); + } } } diff --git a/src/WebJobs.Extensions.DurableTask/ContextInterfaces/DurableActivityOptions.cs b/src/WebJobs.Extensions.DurableTask/ContextInterfaces/DurableActivityOptions.cs new file mode 100644 index 000000000..75a87d03d --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/ContextInterfaces/DurableActivityOptions.cs @@ -0,0 +1,43 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. + +using System.Collections.Generic; +using System.Collections.Immutable; + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask +{ + /// + /// Options for starting a durable activity. + /// + public sealed class DurableActivityOptions + { + /// + /// Initializes a new instance of the class. + /// + /// The name of the activity function to invoke. + public DurableActivityOptions(string functionName) + { + this.FunctionName = functionName; + } + + /// + /// Gets the name of the activity function to invoke. + /// + public string FunctionName { get; } + + /// + /// Gets or sets the input to the activity. + /// + public object Input { get; init; } + + /// + /// Gets or sets the retry options for the activity. + /// + public RetryOptions RetryOptions { get; init; } + + /// + /// Gets the tags associated with the activity. + /// + public IReadOnlyDictionary Tags { get; init; } = ImmutableDictionary.Empty; + } +} \ No newline at end of file diff --git a/src/WebJobs.Extensions.DurableTask/ContextInterfaces/DurableOrchestrationOptions.cs b/src/WebJobs.Extensions.DurableTask/ContextInterfaces/DurableOrchestrationOptions.cs new file mode 100644 index 000000000..b8f35e18c --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/ContextInterfaces/DurableOrchestrationOptions.cs @@ -0,0 +1,42 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. + +using System.Collections.Generic; + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask +{ + /// + /// Options for starting a durable orchestration. + /// + public sealed class DurableOrchestrationOptions + { + /// + /// Initializes a new instance of the class. + /// + /// The name of the orchestrator function to start. + public DurableOrchestrationOptions(string orchestratorFunctionName) + { + this.OrchestratorFunctionName = orchestratorFunctionName; + } + + /// + /// JSON-serializeable input value for the orchestrator function. + /// + public object Input { get; init; } + + /// + /// Gets or sets the ID for the new orchestration. + /// + public string InstanceId { get; init; } + + /// + /// Gets the name of the orchestrator function to start. + /// + public string OrchestratorFunctionName { get; } + + /// + /// Gets or sets tags to associate with the new orchestration. + /// + public IReadOnlyDictionary Tags { get; init; } = new Dictionary(); + } +} diff --git a/src/WebJobs.Extensions.DurableTask/ContextInterfaces/IDurableOrchestrationClient.cs b/src/WebJobs.Extensions.DurableTask/ContextInterfaces/IDurableOrchestrationClient.cs index b3113f7a3..c9fdc190b 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextInterfaces/IDurableOrchestrationClient.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextInterfaces/IDurableOrchestrationClient.cs @@ -114,6 +114,21 @@ Task WaitForCompletionOrCreateCheckStatusResponseAsync( TimeSpan? retryInterval = null, bool returnInternalServerErrorOnFailure = false); + /// + /// Starts a new execution of the specified orchestrator function. + /// + /// The options with which to start the orchestration. + /// A task that completes when the orchestration is started. The task contains the instance id of the started + /// orchestratation instance. + /// + /// The specified function does not exist, is disabled, or is not an orchestrator function. + /// + /// + /// The parameter is null. + /// + Task StartNewAsync( + DurableOrchestrationOptions options); + /// /// Starts a new execution of the specified orchestrator function. /// @@ -160,7 +175,10 @@ Task StartNewAsync( /// /// The specified function does not exist, is disabled, or is not an orchestrator function. /// - Task StartNewAsync(string orchestratorFunctionName, string instanceId, T input); + Task StartNewAsync( + string orchestratorFunctionName, + string instanceId, + T input); /// /// Sends an event notification message to a waiting orchestration instance. diff --git a/src/WebJobs.Extensions.DurableTask/ContextInterfaces/IDurableOrchestrationContext.cs b/src/WebJobs.Extensions.DurableTask/ContextInterfaces/IDurableOrchestrationContext.cs index 4d9ee12a7..791f8f53f 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextInterfaces/IDurableOrchestrationContext.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextInterfaces/IDurableOrchestrationContext.cs @@ -462,6 +462,23 @@ public interface IDurableOrchestrationContext /// The new value. Guid NewGuid(); + /// + /// Schedules an activity function for execution with the specified options. + /// + /// The return type of the scheduled activity function. + /// The options with which to execute the activity function. + /// A durable task that completes when the called activity function completes or fails. + /// + /// The specified function does not exist, is disabled, or is not an orchestrator function. + /// + /// + /// The current thread is different than the thread which started the orchestrator execution. + /// + /// + /// The activity function failed with an unhandled exception. + /// + Task CallActivityAsync(DurableActivityOptions options); + /// /// Schedules an activity function named for execution. /// diff --git a/src/WebJobs.Extensions.DurableTask/DurableOrchestrationStatus.cs b/src/WebJobs.Extensions.DurableTask/DurableOrchestrationStatus.cs index d4963b5df..439157de1 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableOrchestrationStatus.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableOrchestrationStatus.cs @@ -2,6 +2,8 @@ // Licensed under the MIT License. See LICENSE in the project root for license information. using System; +using System.Collections.Generic; +using System.Collections.Immutable; using Newtonsoft.Json.Linq; namespace Microsoft.Azure.WebJobs.Extensions.DurableTask @@ -102,5 +104,13 @@ public class DurableOrchestrationStatus /// The output as a JArray object or null. /// public JArray History { get; set; } + + /// + /// Gets or sets the tags associated with the orchestration instance. + /// + /// + /// The tags as a read-only dictionary. + /// + public IReadOnlyDictionary Tags { get; set; } = ImmutableDictionary.Empty; } } diff --git a/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs b/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs index d9e586d78..10b0f6db9 100644 --- a/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs +++ b/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.Collections.Immutable; using System.Collections.Specialized; using System.Diagnostics; using System.Linq; @@ -716,6 +717,7 @@ private static StatusResponsePayload ConvertFrom(DurableOrchestrationStatus stat CreatedTime = status.CreatedTime.ToString("s") + "Z", LastUpdatedTime = status.LastUpdatedTime.ToString("s") + "Z", HistoryEvents = status.History, + Tags = status.Tags ?? ImmutableDictionary.Empty, }; } diff --git a/src/WebJobs.Extensions.DurableTask/StatusResponsePayload.cs b/src/WebJobs.Extensions.DurableTask/StatusResponsePayload.cs index 32ef7854f..a4e939c8f 100644 --- a/src/WebJobs.Extensions.DurableTask/StatusResponsePayload.cs +++ b/src/WebJobs.Extensions.DurableTask/StatusResponsePayload.cs @@ -1,6 +1,8 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See LICENSE in the project root for license information. +using System.Collections.Generic; +using System.Collections.Immutable; using System.Runtime.Serialization; using Newtonsoft.Json.Linq; @@ -65,5 +67,11 @@ internal class StatusResponsePayload /// [DataMember(Name = "historyEvents", EmitDefaultValue = false)] public JArray HistoryEvents { get; set; } + + /// + /// Tags associated with the orchestration. + /// + [DataMember(Name = "tags")] + public IReadOnlyDictionary Tags { get; set; } = ImmutableDictionary.Empty; } } diff --git a/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs b/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs index 1487de8a8..d11c4e5cf 100644 --- a/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs +++ b/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs @@ -66,6 +66,11 @@ public override Task Hello(Empty request, ServerCallContext context) ScheduledStartTime = request.ScheduledStartTimestamp?.ToDateTime(), }; + if (request.Tags?.Count > 0) + { + executionStartedEvent.Tags = request.Tags; + } + // Get the parent trace context from CreateInstanceRequest string? traceParent = request.ParentTraceContext?.TraceParent; string? traceState = request.ParentTraceContext?.TraceState; diff --git a/test/Common/ClientFunctions.cs b/test/Common/ClientFunctions.cs index db221d133..1972ac7a0 100644 --- a/test/Common/ClientFunctions.cs +++ b/test/Common/ClientFunctions.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. See LICENSE in the project root for license information. using System; +using System.Collections.Generic; using System.Threading.Tasks; namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests @@ -14,11 +15,25 @@ public static async Task StartFunction( string functionName, string instanceId, object input, - TestDurableClient[] clientRef) + TestDurableClient[] clientRef, + IReadOnlyDictionary tags) { DateTime instanceCreationTime = DateTime.UtcNow; - instanceId = await client.StartNewAsync(functionName, instanceId, input); + if (tags == null) + { + instanceId = await client.StartNewAsync(functionName, instanceId, input); + } + else + { + instanceId = await client.StartNewAsync(new DurableOrchestrationOptions(functionName) + { + Input = input, + InstanceId = instanceId, + Tags = tags, + }); + } + clientRef[0] = new TestDurableClient( client, functionName, @@ -33,11 +48,25 @@ public static async Task StartFunctionWithTaskHub( string functionName, string instanceId, object input, - TestDurableClient[] clientRef) + TestDurableClient[] clientRef, + IReadOnlyDictionary tags) { DateTime instanceCreationTime = DateTime.UtcNow; - instanceId = await client.StartNewAsync(functionName, instanceId, input); + if (tags == null) + { + instanceId = await client.StartNewAsync(functionName, instanceId, input); + } + else + { + instanceId = await client.StartNewAsync(new DurableOrchestrationOptions(functionName) + { + Input = input, + InstanceId = instanceId, + Tags = tags, + }); + } + clientRef[0] = new TestDurableClient( client, functionName, diff --git a/test/Common/DurableTaskHostExtensions.cs b/test/Common/DurableTaskHostExtensions.cs index 969f02e71..0e006a93e 100644 --- a/test/Common/DurableTaskHostExtensions.cs +++ b/test/Common/DurableTaskHostExtensions.cs @@ -15,7 +15,8 @@ public static async Task StartOrchestratorAsync( object input, ITestOutputHelper output, string instanceId = null, - bool useTaskHubFromAppSettings = false) + bool useTaskHubFromAppSettings = false, + IReadOnlyDictionary tags = null) { var startFunction = useTaskHubFromAppSettings ? typeof(ClientFunctions).GetMethod(nameof(ClientFunctions.StartFunctionWithTaskHub)) : @@ -28,6 +29,7 @@ public static async Task StartOrchestratorAsync( { "instanceId", instanceId }, { "input", input }, { "clientRef", clientRef }, + { "tags", tags }, }; await host.CallAsync(startFunction, args); diff --git a/test/Common/TagsTests.cs b/test/Common/TagsTests.cs new file mode 100644 index 000000000..fd77c3aa4 --- /dev/null +++ b/test/Common/TagsTests.cs @@ -0,0 +1,85 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.TestCommon; +using Newtonsoft.Json.Linq; +using Xunit; +using Xunit.Abstractions; + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests +{ + public class TagsTests : IDisposable + { + private readonly ITestOutputHelper output; + + private readonly TestLoggerProvider loggerProvider; + + public TagsTests(ITestOutputHelper output) + { + this.output = output; + this.loggerProvider = new TestLoggerProvider(output); + } + + public void Dispose() + { + } + + [Theory] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + [InlineData(true, TestHelpers.AzureStorageProviderType)] + [InlineData(false, TestHelpers.AzureStorageProviderType)] + [InlineData(true, TestHelpers.EmulatorProviderType)] + [InlineData(false, TestHelpers.EmulatorProviderType)] + public async Task TestWithTags(bool extendedSessions, string storageProviderType) + { + string[] orchestratorFunctionNames = + { + nameof(TestOrchestrations.OrchestrationWithTags), + }; + + using (var host = TestHelpers.GetJobHost( + this.loggerProvider, + nameof(this.TestWithTags), + extendedSessions, + storageProviderType: storageProviderType)) + { + await host.StartAsync(); + + var client = await host.StartOrchestratorAsync(orchestratorFunctionNames[0], "World", this.output, tags: new Dictionary { { "key1", "value1" } }); + var status = await client.WaitForCompletionAsync(this.output); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, status?.RuntimeStatus); + Assert.Equal("World", status?.Input); + Assert.Equal(true, status?.Output); + + var historyStatus = await client.GetStatusAsync( + showHistory: true, + showHistoryOutput: true, + showInput: true); + + Assert.NotNull(historyStatus.Tags); + Assert.Contains(historyStatus.Tags, kvp => kvp.Key == "key1" && kvp.Value == "value1"); + + var taskCompletedEvent = + historyStatus + .History + .OfType() + .FirstOrDefault(j => (string)j["EventType"] == "TaskCompleted" && (string)j["FunctionName"] == nameof(TestActivities.ActivityWithTags)); + + Assert.NotNull(taskCompletedEvent); + + var tags = taskCompletedEvent["Tags"] as JObject; + + Assert.NotNull(tags); + + Assert.Equal("activityValue1", tags["activityKey1"].ToString()); + + await host.StopAsync(); + } + } + } +} \ No newline at end of file diff --git a/test/Common/TestActivities.cs b/test/Common/TestActivities.cs index 006bebcd2..e034ca9f1 100644 --- a/test/Common/TestActivities.cs +++ b/test/Common/TestActivities.cs @@ -24,6 +24,11 @@ public static string ActivityWithNoInput([ActivityTrigger] IDurableActivityConte return $"Hello!"; } + public static bool ActivityWithTags([ActivityTrigger] IDurableActivityContext ctx) + { + return true; + } + public static string Hello([ActivityTrigger] IDurableActivityContext ctx) { string input = ctx.GetInput(); diff --git a/test/Common/TestOrchestrations.cs b/test/Common/TestOrchestrations.cs index 0580a34f8..07edc9b2f 100644 --- a/test/Common/TestOrchestrations.cs +++ b/test/Common/TestOrchestrations.cs @@ -8,6 +8,7 @@ using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using Microsoft.AspNetCore.Components.Forms; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Primitives; using Xunit; @@ -177,6 +178,18 @@ public static async Task Factorial([OrchestrationTrigger] IDurableOrchestr return result; } + public static async Task OrchestrationWithTags([OrchestrationTrigger] IDurableOrchestrationContext ctx) + { + await ctx.CallActivityAsync( + new DurableActivityOptions(nameof(TestActivities.ActivityWithTags)) + { + Input = "Hello", + Tags = new Dictionary { { "activityKey1", "activityValue1" } }, + }); + + return true; + } + public static async Task DiskUsage([OrchestrationTrigger] IDurableOrchestrationContext ctx) { string directory = ctx.GetInput(); diff --git a/test/e2e/Apps/BasicDotNetIsolated/HelloCities.cs b/test/e2e/Apps/BasicDotNetIsolated/HelloCities.cs index 70b490646..97ecc35fc 100644 --- a/test/e2e/Apps/BasicDotNetIsolated/HelloCities.cs +++ b/test/e2e/Apps/BasicDotNetIsolated/HelloCities.cs @@ -41,14 +41,21 @@ public static async Task StartOrchestration( [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req, [DurableClient] DurableTaskClient client, FunctionContext executionContext, - string orchestrationName) + string orchestrationName, + string? tagKey, + string? tagValue) { ILogger logger = executionContext.GetLogger(nameof(StartOrchestration)); + StartOrchestrationOptions options = + !String.IsNullOrEmpty(tagKey) && !String.IsNullOrEmpty(tagValue) + ? new() { Tags = new Dictionary { { tagKey, tagValue } } } + : new(); + // Function input comes from the request content. - string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestrationName); + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestrationName, options); - logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId); + logger.LogInformation("Started orchestration with ID = '{instanceId}' and options '{options}'.", instanceId, options); // Returns an HTTP 202 response with an instance management payload. // See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration diff --git a/test/e2e/Apps/BasicDotNetIsolated/TaskTags.cs b/test/e2e/Apps/BasicDotNetIsolated/TaskTags.cs new file mode 100644 index 000000000..d2fbdb11f --- /dev/null +++ b/test/e2e/Apps/BasicDotNetIsolated/TaskTags.cs @@ -0,0 +1,35 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information.using System.Diagnostics; + +using Microsoft.Azure.Functions.Worker; +using Microsoft.DurableTask; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.Durable.Tests.E2E; + +public static class TaskTags +{ + [Function(nameof(TaskTags))] + public static async Task RunOrchestrator( + [OrchestrationTrigger] TaskOrchestrationContext context) + { + ILogger logger = context.CreateReplaySafeLogger(nameof(TaskTags)); + logger.LogInformation("Calling activities..."); + + string output1 = await context.CallActivityAsync(nameof(ActivityWithTags), "No Tags"); + string output2 = await context.CallActivityAsync(nameof(ActivityWithTags), "With Tags", new TaskOptions{ Tags = new Dictionary { { "key1", "value1" } } }); + + logger.LogInformation("Activities called."); + + return $"{output1}\n{output2}"; + } + + [Function(nameof(ActivityWithTags))] + public static string? ActivityWithTags([ActivityTrigger] string input, FunctionContext executionContext) + { + ILogger logger = executionContext.GetLogger("SayHello"); + logger.LogInformation("Echoing {input}.", nameof(ActivityWithTags)); + + return $"Echo: {input}"; + } +} diff --git a/test/e2e/Tests/Helpers/DurableHelpers.cs b/test/e2e/Tests/Helpers/DurableHelpers.cs index f5007fcba..63b7e4dc2 100644 --- a/test/e2e/Tests/Helpers/DurableHelpers.cs +++ b/test/e2e/Tests/Helpers/DurableHelpers.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System.Collections.Immutable; using System.Text.Json.Nodes; namespace Microsoft.Azure.Durable.Tests.DotnetIsolatedE2E; @@ -23,6 +24,7 @@ internal class OrchestrationStatusDetails public string Output { get; set; } = string.Empty; public DateTime CreatedTime { get; set; } public DateTime LastUpdatedTime { get; set; } + public IReadOnlyDictionary Tags { get; set; } = ImmutableDictionary.Empty; public OrchestrationStatusDetails(string statusQueryResponse) { JsonNode? statusQueryJsonNode = JsonNode.Parse(statusQueryResponse); @@ -35,6 +37,13 @@ public OrchestrationStatusDetails(string statusQueryResponse) this.Output = statusQueryJsonNode["output"]?.ToString() ?? string.Empty; this.CreatedTime = DateTime.Parse(statusQueryJsonNode["createdTime"]?.GetValue() ?? string.Empty).ToUniversalTime(); this.LastUpdatedTime = DateTime.Parse(statusQueryJsonNode["lastUpdatedTime"]?.GetValue() ?? string.Empty).ToUniversalTime(); + + var tags = statusQueryJsonNode["tags"] as JsonObject; + + if (tags is not null) + { + this.Tags = tags.ToDictionary(kvp => kvp.Key, kvp => kvp.Value?.ToString() ?? string.Empty); + } } } diff --git a/test/e2e/Tests/Tests/TaskTagsTests.cs b/test/e2e/Tests/Tests/TaskTagsTests.cs new file mode 100644 index 000000000..5c4570519 --- /dev/null +++ b/test/e2e/Tests/Tests/TaskTagsTests.cs @@ -0,0 +1,55 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Xunit.Abstractions; +using Xunit; +using System.Diagnostics; + +namespace Microsoft.Azure.Durable.Tests.DotnetIsolatedE2E; + +[Collection(Constants.FunctionAppCollectionName)] +public class TaskTagsTests +{ + private readonly FunctionAppFixture fixture; + private readonly ITestOutputHelper output; + + public TaskTagsTests(FunctionAppFixture fixture, ITestOutputHelper testOutputHelper) + { + this.fixture = fixture; + this.fixture.TestLogs.UseTestLogger(testOutputHelper); + this.output = testOutputHelper; + } + + // Due to some kind of asynchronous race condition in XUnit, when running these tests in pipelines, + // the output may be disposed before the message is written. Just ignore these types of errors for now. + private void WriteOutput(string message) + { + try + { + this.output.WriteLine(message); + } + catch + { + // Ignore + } + } + + [Fact] + [Trait("PowerShell", "Skip")] // Tags are not currently implemented in PowerShell + [Trait("Python", "Skip")] // Tags are not currently implemented in Python + [Trait("Node", "Skip")] // Tags are not currently implemented in Node + public async Task RunOrchestrationWithTags() + { + using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger("StartOrchestration", "?orchestrationName=TaskTags&tagKey=key1&tagValue=value1"); + + string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response); + await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Completed", 30); + var orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri); + + Assert.NotNull(orchestrationDetails?.Tags); + Assert.Contains("key1", orchestrationDetails.Tags.Keys); + Assert.Equal("value1", orchestrationDetails.Tags["key1"]); + + // TODO: Verify activity has tags. + } +} \ No newline at end of file