diff --git a/src/WebJobs.Extensions.DurableTask/Bindings/OrchestrationTriggerAttributeBindingProvider.cs b/src/WebJobs.Extensions.DurableTask/Bindings/OrchestrationTriggerAttributeBindingProvider.cs index a69e442cc..36b64a5ac 100644 --- a/src/WebJobs.Extensions.DurableTask/Bindings/OrchestrationTriggerAttributeBindingProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/Bindings/OrchestrationTriggerAttributeBindingProvider.cs @@ -13,6 +13,7 @@ using Newtonsoft.Json; using Newtonsoft.Json.Linq; using static Microsoft.Azure.WebJobs.Extensions.DurableTask.OutOfProcOrchestrationShim; +using proto = Google.Protobuf.WellKnownTypes; namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { @@ -172,6 +173,12 @@ public Task BindAsync(object? value, ValueBindingContext context) EntityParameters = remoteContext.EntityParameters.ToProtobuf(), }; + // We only do a null check as an empty string is a valid version. + if (this.config.Options.DefaultVersion != null) + { + orchestratorRequest.Properties.Add("defaultVersion", proto.Value.ForString(this.config.Options.DefaultVersion)); + } + // We convert the binary payload into a base64 string because that seems to be the most commonly supported // format for Azure Functions language workers. Attempts to send unencoded byte[] payloads were unsuccessful. string encodedRequest = ProtobufUtils.Base64Encode(orchestratorRequest); diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs index 5640bbe67..d09230745 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs @@ -17,6 +17,7 @@ using DurableTask.Core.Exceptions; using DurableTask.Core.History; using DurableTask.Core.Middleware; +using DurableTask.Core.Settings; using Microsoft.Azure.WebJobs.Description; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Listener; @@ -356,7 +357,12 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context) context.AddBindingRule() .BindToTrigger(new EntityTriggerAttributeBindingProvider(this, connectionName)); - this.taskHubWorker = new TaskHubWorker(this.defaultDurabilityProvider, this, this, this.loggerFactory); + this.taskHubWorker = new TaskHubWorker(this.defaultDurabilityProvider, this, this, loggerFactory: this.loggerFactory, versioningSettings: new VersioningSettings + { + Version = this.Options.DefaultVersion, // A null (or empty) version is valid as it signifies non-versioned case. + MatchStrategy = this.Options.VersionMatchStrategy, // The default value for this is to no-op on versioning. + FailureStrategy = this.Options.VersionFailureStrategy, // The default value for this is to ignore work if there is a mismatch. + }); // Add middleware to the DTFx dispatcher so that we can inject our own logic // into and customize the orchestration execution pipeline. diff --git a/src/WebJobs.Extensions.DurableTask/Grpc/versions.txt b/src/WebJobs.Extensions.DurableTask/Grpc/versions.txt index 4953cb19d..f605a28ec 100644 --- a/src/WebJobs.Extensions.DurableTask/Grpc/versions.txt +++ b/src/WebJobs.Extensions.DurableTask/Grpc/versions.txt @@ -1,2 +1,2 @@ # The following files were downloaded from branch main at 2025-06-05 21:25:17 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/fd9369c6a03d6af4e95285e432b7c4e943c06970/protos/orchestrator_service.proto +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/fd9369c6a03d6af4e95285e432b7c4e943c06970/protos/orchestrator_service.proto \ No newline at end of file diff --git a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs b/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs index 8d991bff8..2bc556601 100644 --- a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs +++ b/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs @@ -225,7 +225,7 @@ public override Task Hello(Empty request, ServerCallContext context) ExecutionStartedEvent executionStartedEvent = new ExecutionStartedEvent(-1, request.Input) { Name = request.Name, - Version = request.Version, + Version = request.Version != null ? request.Version : this.extension.Options.DefaultVersion, OrchestrationInstance = instance, ScheduledStartTime = request.ScheduledStartTimestamp?.ToDateTime(), }; diff --git a/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs b/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs index a9eb0f4a7..de19f40ff 100644 --- a/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs +++ b/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Net.Http; using DurableTask.AzureStorage.Partitioning; +using DurableTask.Core.Settings; using Microsoft.Azure.WebJobs.Host; using Newtonsoft.Json; using Newtonsoft.Json.Converters; @@ -30,6 +31,16 @@ public class DurableTaskOptions /// public string DefaultVersion { get; set; } + /// + /// The strategy that will be used for matching versions when running an orchestration. See for more information. + /// + public VersioningSettings.VersionMatchStrategy VersionMatchStrategy { get; set; } = VersioningSettings.VersionMatchStrategy.CurrentOrOlder; + + /// + /// The strategy that will be used if a versioning failure is detected. See for more information. + /// + public VersioningSettings.VersionFailureStrategy VersionFailureStrategy { get; set; } = VersioningSettings.VersionFailureStrategy.Reject; + /// /// Settings used for Durable HTTP functionality. /// diff --git a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj index ad197fa00..bdeaf3f09 100644 --- a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj +++ b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj @@ -50,7 +50,7 @@ - + diff --git a/src/Worker.Extensions.DurableTask/FunctionsOrchestrationContext.cs b/src/Worker.Extensions.DurableTask/FunctionsOrchestrationContext.cs index 53b7cca9e..26918eb9c 100644 --- a/src/Worker.Extensions.DurableTask/FunctionsOrchestrationContext.cs +++ b/src/Worker.Extensions.DurableTask/FunctionsOrchestrationContext.cs @@ -48,6 +48,8 @@ public FunctionsOrchestrationContext(TaskOrchestrationContext innerContext, Func public override ParentOrchestrationInstance? Parent => this.innerContext.Parent; + public override string Version => this.innerContext.Version; + protected override ILoggerFactory LoggerFactory { get; } public override TaskOrchestrationEntityFeature Entities => @@ -87,6 +89,29 @@ public override Task CallSubOrchestratorAsync( TaskName orchestratorName, object? input = null, TaskOptions? options = null) { this.EnsureLegalAccess(); + // If we have a default version, add it to the TaskOptions and call the inner context with it. + if (this.innerContext.Properties.TryGetValue("defaultVersion", out var propVersion) && propVersion is string defaultVersion) + { + // TODO: Update to copy constructors when available - https://github.com/microsoft/durabletask-dotnet/issues/440 + SubOrchestrationOptions subOptions; + if (options is SubOrchestrationOptions subOrchestrationOptions) + { + subOptions = new SubOrchestrationOptions(options) + { + InstanceId = subOrchestrationOptions.InstanceId, + Version = subOrchestrationOptions.Version?.Version ?? defaultVersion, + }; + } + else + { + subOptions = new SubOrchestrationOptions(options ?? new TaskOptions()) + { + Version = defaultVersion + }; + } + return this.innerContext.CallSubOrchestratorAsync(orchestratorName, input, subOptions); + } + return this.innerContext.CallSubOrchestratorAsync(orchestratorName, input, options); } diff --git a/test/e2e/Apps/BasicDotNetIsolated/VersionedOrchestration.cs b/test/e2e/Apps/BasicDotNetIsolated/VersionedOrchestration.cs new file mode 100644 index 000000000..a57baaf4d --- /dev/null +++ b/test/e2e/Apps/BasicDotNetIsolated/VersionedOrchestration.cs @@ -0,0 +1,120 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Http; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.Durable.Tests.E2E; + +public static class VersionedOrchestration +{ + [Function(nameof(VersionedOrchestration))] + public static async Task RunOrchestrator( + [OrchestrationTrigger] TaskOrchestrationContext context) + { + ILogger logger = context.CreateReplaySafeLogger(nameof(VersionedOrchestration)); + logger.LogInformation($"Versioned orchestration! Version: '{context.Version}'"); + + return await context.CallActivityAsync(nameof(SayVersion), context.Version); + } + + [Function(nameof(RunWithSubOrchestrator))] + public static async Task RunWithSubOrchestrator( + [OrchestrationTrigger] TaskOrchestrationContext context, + string? subVersion) + { + ILogger logger = context.CreateReplaySafeLogger(nameof(VersionedOrchestration)); + logger.LogInformation($"Versioned orchestration! Version: '{context.Version}' Sub Version: '{subVersion}'"); + + string subOrchestrationResponse; + if (subVersion == null) + { + subOrchestrationResponse = await context.CallSubOrchestratorAsync(nameof(VersionedSubOrchestration)); + } + else + { + subOrchestrationResponse = await context.CallSubOrchestratorAsync(nameof(VersionedSubOrchestration), new SubOrchestrationOptions + { + Version = subVersion, + }); + } + return $"Parent Version: '{context.Version}' | Sub {subOrchestrationResponse}"; + } + + [Function(nameof(VersionedSubOrchestration))] + public static async Task VersionedSubOrchestration([OrchestrationTrigger] TaskOrchestrationContext context) + { + ILogger logger = context.CreateReplaySafeLogger(nameof(VersionedOrchestration)); + logger.LogInformation($"Versioned sub-orchestration! Version: '{context.Version}'"); + + return await context.CallActivityAsync(nameof(SayVersion), context.Version); + } + + [Function(nameof(SayVersion))] + public static string SayVersion([ActivityTrigger] string version, FunctionContext executionContext) + { + ILogger logger = executionContext.GetLogger("SayVersion"); + logger.LogInformation("Activity running with version: {name}.", version); + return $"Version: '{version}'"; + } + + [Function("OrchestrationVersion_HttpStart")] + public static async Task HttpStart( + [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req, + [DurableClient] DurableTaskClient client, + FunctionContext executionContext, + string? version) + { + ILogger logger = executionContext.GetLogger("VersionedOrchestration_HttpStart"); + + // Function input comes from the request content. + string instanceId; + if (version != null) + { + instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(VersionedOrchestration), new StartOrchestrationOptions + { + Version = version, + }); + } + else + { + instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(VersionedOrchestration)); + } + + logger.LogInformation("Started orchestration with ID = '{instanceId}' and Version = '{version}'.", instanceId, version); + + // Returns an HTTP 202 response with an instance management payload. + // See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration + return await client.CreateCheckStatusResponseAsync(req, instanceId); + } + + [Function("OrchestrationSubVersion_HttpStart")] + public static async Task HttpSubStart( + [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req, + [DurableClient] DurableTaskClient client, + FunctionContext executionContext, + string? subOrchestrationVersion) + { + ILogger logger = executionContext.GetLogger("OrchestrationSubVersion_HttpStart"); + + // Function input comes from the request content. + string instanceId; + if (subOrchestrationVersion != null) + { + instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(RunWithSubOrchestrator), input: subOrchestrationVersion); + } + else + { + instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(RunWithSubOrchestrator)); + } + + logger.LogInformation("Started orchestration with ID = '{instanceId}' and Version = '{subOrchestrationVersion}'.", instanceId, subOrchestrationVersion); + + // Returns an HTTP 202 response with an instance management payload. + // See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration + return await client.CreateCheckStatusResponseAsync(req, instanceId); + } +} diff --git a/test/e2e/Apps/BasicDotNetIsolated/host.json b/test/e2e/Apps/BasicDotNetIsolated/host.json index fa88b5d0b..e714c6342 100644 --- a/test/e2e/Apps/BasicDotNetIsolated/host.json +++ b/test/e2e/Apps/BasicDotNetIsolated/host.json @@ -11,10 +11,13 @@ }, "extensions": { "durableTask": { - "tracing": { - "DistributedTracingEnabled": true, - "Version": "V2" - } + "tracing": { + "DistributedTracingEnabled": true, + "Version": "V2" + }, + "defaultVersion": "2.0", + "versionMatchStrategy": "CurrentOrOlder", + "versionFailureStrategy": "Fail" } } } \ No newline at end of file diff --git a/test/e2e/Tests/Fixtures/FunctionAppFixture.cs b/test/e2e/Tests/Fixtures/FunctionAppFixture.cs index aa04f4396..7023b245a 100644 --- a/test/e2e/Tests/Fixtures/FunctionAppFixture.cs +++ b/test/e2e/Tests/Fixtures/FunctionAppFixture.cs @@ -15,6 +15,7 @@ public class FunctionAppFixture : IAsyncLifetime private readonly ILogger _logger; private bool _disposed; private Process? _funcProcess; + private string? _appName; private JobObjectRegistry? _jobObjectRegistry; @@ -25,6 +26,7 @@ public FunctionAppFixture(IMessageSink messageSink) this.TestLogs = new TestLoggerProvider(messageSink); loggerFactory.AddProvider(this.TestLogs); this._logger = loggerFactory.CreateLogger(); + this._appName = Environment.GetEnvironmentVariable("TEST_APP_NAME") ?? "BasicDotNetIsolated"; } public async Task InitializeAsync() @@ -40,7 +42,7 @@ public async Task InitializeAsync() this._logger.LogInformation($"Starting functions host for {Constants.FunctionAppCollectionName}..."); string rootDir = Path.GetFullPath(@"../../../../../../"); - string e2eAppBinPath = Path.Combine(rootDir, @"test/e2e/Apps/BasicDotNetIsolated/bin"); + string e2eAppBinPath = Path.Combine(rootDir, @$"test/e2e/Apps/{this._appName}/bin"); string? e2eHostJson = Directory.GetFiles(e2eAppBinPath, "host.json", SearchOption.AllDirectories).FirstOrDefault(); if (e2eHostJson == null) diff --git a/test/e2e/Tests/Tests/VersioningTests.cs b/test/e2e/Tests/Tests/VersioningTests.cs new file mode 100644 index 000000000..740fcbdc8 --- /dev/null +++ b/test/e2e/Tests/Tests/VersioningTests.cs @@ -0,0 +1,87 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Net; +using Xunit; +using Xunit.Abstractions; + +namespace Microsoft.Azure.Durable.Tests.DotnetIsolatedE2E; + +[Collection(Constants.FunctionAppCollectionName)] +public class VersioningTests +{ + private readonly FunctionAppFixture _fixture; + private readonly ITestOutputHelper _output; + + public VersioningTests(FunctionAppFixture fixture, ITestOutputHelper testOutputHelper) + { + _fixture = fixture; + _fixture.TestLogs.UseTestLogger(testOutputHelper); + _output = testOutputHelper; + } + + [Theory] + [InlineData(null)] // Represents a non-versioned case. + [InlineData("")] // Non-versioned/empty-versioned case. + [InlineData("1.0")] + [InlineData("2.0")] + public async Task TestVersionedOrchestration_OKWithMatchingVersion(string? version) + { + string queryString = version == null ? string.Empty : $"?version={version}"; + using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger("OrchestrationVersion_HttpStart", queryString); + + Assert.Equal(HttpStatusCode.Accepted, response.StatusCode); + string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response); + + await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Completed", 30); + + var orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri); + if (version != null) + { + Assert.Equal($"Version: '{version}'", orchestrationDetails.Output); + } + else + { + // The default version (2.0) from the host.json file should've been used here. + Assert.Equal("Version: '2.0'", orchestrationDetails.Output); + } + } + + [Theory] + [InlineData(null)] // Represents a non-versioned case. + [InlineData("")] // Non-versioned/empty-versioned case. + [InlineData("1.0")] + [InlineData("2.0")] + public async Task TestVersionedSubOrchestration_OKWithMatchingVersion(string? subOrchestrationVersion) + { + string queryString = subOrchestrationVersion == null ? string.Empty : $"?subOrchestrationVersion={subOrchestrationVersion}"; + using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger("OrchestrationSubVersion_HttpStart", queryString); + + Assert.Equal(HttpStatusCode.Accepted, response.StatusCode); + string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response); + + await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Completed", 30); + + var orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri); + if (subOrchestrationVersion != null) + { + Assert.Equal($"Parent Version: '2.0' | Sub Version: '{subOrchestrationVersion}'", orchestrationDetails.Output); + } + else + { + // The default version (2.0) from the host.json file should've been used here. + Assert.Equal("Parent Version: '2.0' | Sub Version: '2.0'", orchestrationDetails.Output); + } + } + + [Fact] + public async Task TestVersionedOrchestration_FailsWithNonMatchingVersion() + { + using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger("OrchestrationVersion_HttpStart", $"?version=3.0"); + + Assert.Equal(HttpStatusCode.Accepted, response.StatusCode); + string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response); + + await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Failed", 30); + } +}