Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using static Microsoft.Azure.WebJobs.Extensions.DurableTask.OutOfProcOrchestrationShim;
using proto = Google.Protobuf.WellKnownTypes;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
Expand Down Expand Up @@ -172,6 +173,12 @@ public Task<ITriggerData> BindAsync(object? value, ValueBindingContext context)
EntityParameters = remoteContext.EntityParameters.ToProtobuf(),
};

// We only do a null check as an empty string is a valid version.
if (this.config.Options.DefaultVersion != null)
{
orchestratorRequest.Properties.Add("defaultVersion", proto.Value.ForString(this.config.Options.DefaultVersion));
}

// We convert the binary payload into a base64 string because that seems to be the most commonly supported
// format for Azure Functions language workers. Attempts to send unencoded byte[] payloads were unsuccessful.
string encodedRequest = ProtobufUtils.Base64Encode(orchestratorRequest);
Expand Down
8 changes: 7 additions & 1 deletion src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Middleware;
using DurableTask.Core.Settings;
using Microsoft.Azure.WebJobs.Description;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Listener;
Expand Down Expand Up @@ -356,7 +357,12 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)
context.AddBindingRule<EntityTriggerAttribute>()
.BindToTrigger(new EntityTriggerAttributeBindingProvider(this, connectionName));

this.taskHubWorker = new TaskHubWorker(this.defaultDurabilityProvider, this, this, this.loggerFactory);
this.taskHubWorker = new TaskHubWorker(this.defaultDurabilityProvider, this, this, loggerFactory: this.loggerFactory, versioningSettings: new VersioningSettings
{
Version = this.Options.DefaultVersion, // A null (or empty) version is valid as it signifies non-versioned case.
MatchStrategy = this.Options.VersionMatchStrategy, // The default value for this is to no-op on versioning.
FailureStrategy = this.Options.VersionFailureStrategy, // The default value for this is to ignore work if there is a mismatch.
});

// Add middleware to the DTFx dispatcher so that we can inject our own logic
// into and customize the orchestration execution pipeline.
Expand Down
2 changes: 1 addition & 1 deletion src/WebJobs.Extensions.DurableTask/Grpc/versions.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# The following files were downloaded from branch main at 2025-06-05 21:25:17 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/fd9369c6a03d6af4e95285e432b7c4e943c06970/protos/orchestrator_service.proto
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/fd9369c6a03d6af4e95285e432b7c4e943c06970/protos/orchestrator_service.proto
2 changes: 1 addition & 1 deletion src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public override Task<Empty> Hello(Empty request, ServerCallContext context)
ExecutionStartedEvent executionStartedEvent = new ExecutionStartedEvent(-1, request.Input)
{
Name = request.Name,
Version = request.Version,
Version = request.Version != null ? request.Version : this.extension.Options.DefaultVersion,
OrchestrationInstance = instance,
ScheduledStartTime = request.ScheduledStartTimestamp?.ToDateTime(),
};
Expand Down
11 changes: 11 additions & 0 deletions src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Net.Http;
using DurableTask.AzureStorage.Partitioning;
using DurableTask.Core.Settings;
using Microsoft.Azure.WebJobs.Host;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
Expand All @@ -30,6 +31,16 @@ public class DurableTaskOptions
/// </summary>
public string DefaultVersion { get; set; }

/// <summary>
/// The strategy that will be used for matching versions when running an orchestration. See <see cref="VersioningSettings.VersionMatchStrategy"/> for more information.
/// </summary>
public VersioningSettings.VersionMatchStrategy VersionMatchStrategy { get; set; } = VersioningSettings.VersionMatchStrategy.CurrentOrOlder;

/// <summary>
/// The strategy that will be used if a versioning failure is detected. See <see cref="VersioningSettings.VersionFailureStrategy"/> for more information.
/// </summary>
public VersioningSettings.VersionFailureStrategy VersionFailureStrategy { get; set; } = VersioningSettings.VersionFailureStrategy.Reject;

/// <summary>
/// Settings used for Durable HTTP functionality.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
</ItemGroup>

<ItemGroup>
<Protobuf Include="**/*.proto" GrpcServices="Both" Access="Internal" />
<Protobuf Include="**/*.proto" GrpcServices="Both" Access="Internal" />
</ItemGroup>

<ItemGroup>
Expand Down
25 changes: 25 additions & 0 deletions src/Worker.Extensions.DurableTask/FunctionsOrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public FunctionsOrchestrationContext(TaskOrchestrationContext innerContext, Func

public override ParentOrchestrationInstance? Parent => this.innerContext.Parent;

public override string Version => this.innerContext.Version;

protected override ILoggerFactory LoggerFactory { get; }

public override TaskOrchestrationEntityFeature Entities =>
Expand Down Expand Up @@ -87,6 +89,29 @@ public override Task<TResult> CallSubOrchestratorAsync<TResult>(
TaskName orchestratorName, object? input = null, TaskOptions? options = null)
{
this.EnsureLegalAccess();
// If we have a default version, add it to the TaskOptions and call the inner context with it.
if (this.innerContext.Properties.TryGetValue("defaultVersion", out var propVersion) && propVersion is string defaultVersion)
{
// TODO: Update to copy constructors when available - https://github.com/microsoft/durabletask-dotnet/issues/440
SubOrchestrationOptions subOptions;
if (options is SubOrchestrationOptions subOrchestrationOptions)
{
subOptions = new SubOrchestrationOptions(options)
{
InstanceId = subOrchestrationOptions.InstanceId,
Version = subOrchestrationOptions.Version?.Version ?? defaultVersion,
};
}
else
{
subOptions = new SubOrchestrationOptions(options ?? new TaskOptions())
{
Version = defaultVersion
};
}
return this.innerContext.CallSubOrchestratorAsync<TResult>(orchestratorName, input, subOptions);
}

return this.innerContext.CallSubOrchestratorAsync<TResult>(orchestratorName, input, options);
}

Expand Down
120 changes: 120 additions & 0 deletions test/e2e/Apps/BasicDotNetIsolated/VersionedOrchestration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.Durable.Tests.E2E;

public static class VersionedOrchestration
{
[Function(nameof(VersionedOrchestration))]
public static async Task<string> RunOrchestrator(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
ILogger logger = context.CreateReplaySafeLogger(nameof(VersionedOrchestration));
logger.LogInformation($"Versioned orchestration! Version: '{context.Version}'");

return await context.CallActivityAsync<string>(nameof(SayVersion), context.Version);
}

[Function(nameof(RunWithSubOrchestrator))]
public static async Task<string> RunWithSubOrchestrator(
[OrchestrationTrigger] TaskOrchestrationContext context,
string? subVersion)
{
ILogger logger = context.CreateReplaySafeLogger(nameof(VersionedOrchestration));
logger.LogInformation($"Versioned orchestration! Version: '{context.Version}' Sub Version: '{subVersion}'");

string subOrchestrationResponse;
if (subVersion == null)
{
subOrchestrationResponse = await context.CallSubOrchestratorAsync<string>(nameof(VersionedSubOrchestration));
}
else
{
subOrchestrationResponse = await context.CallSubOrchestratorAsync<string>(nameof(VersionedSubOrchestration), new SubOrchestrationOptions
{
Version = subVersion,
});
}
return $"Parent Version: '{context.Version}' | Sub {subOrchestrationResponse}";
}

[Function(nameof(VersionedSubOrchestration))]
public static async Task<string> VersionedSubOrchestration([OrchestrationTrigger] TaskOrchestrationContext context)
{
ILogger logger = context.CreateReplaySafeLogger(nameof(VersionedOrchestration));
logger.LogInformation($"Versioned sub-orchestration! Version: '{context.Version}'");

return await context.CallActivityAsync<string>(nameof(SayVersion), context.Version);
}

[Function(nameof(SayVersion))]
public static string SayVersion([ActivityTrigger] string version, FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("SayVersion");
logger.LogInformation("Activity running with version: {name}.", version);
return $"Version: '{version}'";
}

[Function("OrchestrationVersion_HttpStart")]
public static async Task<HttpResponseData> HttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
FunctionContext executionContext,
string? version)
{
ILogger logger = executionContext.GetLogger("VersionedOrchestration_HttpStart");

// Function input comes from the request content.
string instanceId;
if (version != null)
{
instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(VersionedOrchestration), new StartOrchestrationOptions
{
Version = version,
});
}
else
{
instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(VersionedOrchestration));
}

logger.LogInformation("Started orchestration with ID = '{instanceId}' and Version = '{version}'.", instanceId, version);

// Returns an HTTP 202 response with an instance management payload.
// See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration
return await client.CreateCheckStatusResponseAsync(req, instanceId);
}

[Function("OrchestrationSubVersion_HttpStart")]
public static async Task<HttpResponseData> HttpSubStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
FunctionContext executionContext,
string? subOrchestrationVersion)
{
ILogger logger = executionContext.GetLogger("OrchestrationSubVersion_HttpStart");

// Function input comes from the request content.
string instanceId;
if (subOrchestrationVersion != null)
{
instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(RunWithSubOrchestrator), input: subOrchestrationVersion);
}
else
{
instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(RunWithSubOrchestrator));
}

logger.LogInformation("Started orchestration with ID = '{instanceId}' and Version = '{subOrchestrationVersion}'.", instanceId, subOrchestrationVersion);

// Returns an HTTP 202 response with an instance management payload.
// See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration
return await client.CreateCheckStatusResponseAsync(req, instanceId);
}
}
11 changes: 7 additions & 4 deletions test/e2e/Apps/BasicDotNetIsolated/host.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
},
"extensions": {
"durableTask": {
"tracing": {
"DistributedTracingEnabled": true,
"Version": "V2"
}
"tracing": {
"DistributedTracingEnabled": true,
"Version": "V2"
},
"defaultVersion": "2.0",
"versionMatchStrategy": "CurrentOrOlder",
"versionFailureStrategy": "Fail"
}
}
}
4 changes: 3 additions & 1 deletion test/e2e/Tests/Fixtures/FunctionAppFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class FunctionAppFixture : IAsyncLifetime
private readonly ILogger _logger;
private bool _disposed;
private Process? _funcProcess;
private string? _appName;

private JobObjectRegistry? _jobObjectRegistry;

Expand All @@ -25,6 +26,7 @@ public FunctionAppFixture(IMessageSink messageSink)
this.TestLogs = new TestLoggerProvider(messageSink);
loggerFactory.AddProvider(this.TestLogs);
this._logger = loggerFactory.CreateLogger<FunctionAppFixture>();
this._appName = Environment.GetEnvironmentVariable("TEST_APP_NAME") ?? "BasicDotNetIsolated";
}

public async Task InitializeAsync()
Expand All @@ -40,7 +42,7 @@ public async Task InitializeAsync()
this._logger.LogInformation($"Starting functions host for {Constants.FunctionAppCollectionName}...");

string rootDir = Path.GetFullPath(@"../../../../../../");
string e2eAppBinPath = Path.Combine(rootDir, @"test/e2e/Apps/BasicDotNetIsolated/bin");
string e2eAppBinPath = Path.Combine(rootDir, @$"test/e2e/Apps/{this._appName}/bin");
string? e2eHostJson = Directory.GetFiles(e2eAppBinPath, "host.json", SearchOption.AllDirectories).FirstOrDefault();

if (e2eHostJson == null)
Expand Down
87 changes: 87 additions & 0 deletions test/e2e/Tests/Tests/VersioningTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Net;
using Xunit;
using Xunit.Abstractions;

namespace Microsoft.Azure.Durable.Tests.DotnetIsolatedE2E;

[Collection(Constants.FunctionAppCollectionName)]
public class VersioningTests
{
private readonly FunctionAppFixture _fixture;
private readonly ITestOutputHelper _output;

public VersioningTests(FunctionAppFixture fixture, ITestOutputHelper testOutputHelper)
{
_fixture = fixture;
_fixture.TestLogs.UseTestLogger(testOutputHelper);
_output = testOutputHelper;
}

[Theory]
[InlineData(null)] // Represents a non-versioned case.
[InlineData("")] // Non-versioned/empty-versioned case.
[InlineData("1.0")]
[InlineData("2.0")]
public async Task TestVersionedOrchestration_OKWithMatchingVersion(string? version)
{
string queryString = version == null ? string.Empty : $"?version={version}";
using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger("OrchestrationVersion_HttpStart", queryString);

Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);
string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response);

await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Completed", 30);

var orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
if (version != null)
{
Assert.Equal($"Version: '{version}'", orchestrationDetails.Output);
}
else
{
// The default version (2.0) from the host.json file should've been used here.
Assert.Equal("Version: '2.0'", orchestrationDetails.Output);
}
}

[Theory]
[InlineData(null)] // Represents a non-versioned case.
[InlineData("")] // Non-versioned/empty-versioned case.
[InlineData("1.0")]
[InlineData("2.0")]
public async Task TestVersionedSubOrchestration_OKWithMatchingVersion(string? subOrchestrationVersion)
{
string queryString = subOrchestrationVersion == null ? string.Empty : $"?subOrchestrationVersion={subOrchestrationVersion}";
using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger("OrchestrationSubVersion_HttpStart", queryString);

Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);
string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response);

await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Completed", 30);

var orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
if (subOrchestrationVersion != null)
{
Assert.Equal($"Parent Version: '2.0' | Sub Version: '{subOrchestrationVersion}'", orchestrationDetails.Output);
}
else
{
// The default version (2.0) from the host.json file should've been used here.
Assert.Equal("Parent Version: '2.0' | Sub Version: '2.0'", orchestrationDetails.Output);
}
}

[Fact]
public async Task TestVersionedOrchestration_FailsWithNonMatchingVersion()
{
using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger("OrchestrationVersion_HttpStart", $"?version=3.0");

Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);
string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response);

await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Failed", 30);
}
}
Loading