Skip to content

Commit 38fc852

Browse files
authored
Merge pull request #3095 from Azure/halspang/worker_versioning
Introduce worker versioning into durable functions
2 parents 95a5235 + 64ccbb9 commit 38fc852

File tree

11 files changed

+270
-9
lines changed

11 files changed

+270
-9
lines changed

src/WebJobs.Extensions.DurableTask/Bindings/OrchestrationTriggerAttributeBindingProvider.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
using Newtonsoft.Json;
1414
using Newtonsoft.Json.Linq;
1515
using static Microsoft.Azure.WebJobs.Extensions.DurableTask.OutOfProcOrchestrationShim;
16+
using proto = Google.Protobuf.WellKnownTypes;
1617

1718
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
1819
{
@@ -172,6 +173,12 @@ public Task<ITriggerData> BindAsync(object? value, ValueBindingContext context)
172173
EntityParameters = remoteContext.EntityParameters.ToProtobuf(),
173174
};
174175

176+
// We only do a null check as an empty string is a valid version.
177+
if (this.config.Options.DefaultVersion != null)
178+
{
179+
orchestratorRequest.Properties.Add("defaultVersion", proto.Value.ForString(this.config.Options.DefaultVersion));
180+
}
181+
175182
// We convert the binary payload into a base64 string because that seems to be the most commonly supported
176183
// format for Azure Functions language workers. Attempts to send unencoded byte[] payloads were unsuccessful.
177184
string encodedRequest = ProtobufUtils.Base64Encode(orchestratorRequest);

src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
using DurableTask.Core.Exceptions;
1818
using DurableTask.Core.History;
1919
using DurableTask.Core.Middleware;
20+
using DurableTask.Core.Settings;
2021
using Microsoft.Azure.WebJobs.Description;
2122
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation;
2223
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Listener;
@@ -356,7 +357,12 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)
356357
context.AddBindingRule<EntityTriggerAttribute>()
357358
.BindToTrigger(new EntityTriggerAttributeBindingProvider(this, connectionName));
358359

359-
this.taskHubWorker = new TaskHubWorker(this.defaultDurabilityProvider, this, this, this.loggerFactory);
360+
this.taskHubWorker = new TaskHubWorker(this.defaultDurabilityProvider, this, this, loggerFactory: this.loggerFactory, versioningSettings: new VersioningSettings
361+
{
362+
Version = this.Options.DefaultVersion, // A null (or empty) version is valid as it signifies non-versioned case.
363+
MatchStrategy = this.Options.VersionMatchStrategy, // The default value for this is to no-op on versioning.
364+
FailureStrategy = this.Options.VersionFailureStrategy, // The default value for this is to ignore work if there is a mismatch.
365+
});
360366

361367
// Add middleware to the DTFx dispatcher so that we can inject our own logic
362368
// into and customize the orchestration execution pipeline.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
# The following files were downloaded from branch main at 2025-06-05 21:25:17 UTC
2-
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/fd9369c6a03d6af4e95285e432b7c4e943c06970/protos/orchestrator_service.proto
2+
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/fd9369c6a03d6af4e95285e432b7c4e943c06970/protos/orchestrator_service.proto

src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ public override Task<Empty> Hello(Empty request, ServerCallContext context)
225225
ExecutionStartedEvent executionStartedEvent = new ExecutionStartedEvent(-1, request.Input)
226226
{
227227
Name = request.Name,
228-
Version = request.Version,
228+
Version = request.Version != null ? request.Version : this.extension.Options.DefaultVersion,
229229
OrchestrationInstance = instance,
230230
ScheduledStartTime = request.ScheduledStartTimestamp?.ToDateTime(),
231231
};

src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Collections.Generic;
66
using System.Net.Http;
77
using DurableTask.AzureStorage.Partitioning;
8+
using DurableTask.Core.Settings;
89
using Microsoft.Azure.WebJobs.Host;
910
using Newtonsoft.Json;
1011
using Newtonsoft.Json.Converters;
@@ -30,6 +31,16 @@ public class DurableTaskOptions
3031
/// </summary>
3132
public string DefaultVersion { get; set; }
3233

34+
/// <summary>
35+
/// The strategy that will be used for matching versions when running an orchestration. See <see cref="VersioningSettings.VersionMatchStrategy"/> for more information.
36+
/// </summary>
37+
public VersioningSettings.VersionMatchStrategy VersionMatchStrategy { get; set; } = VersioningSettings.VersionMatchStrategy.CurrentOrOlder;
38+
39+
/// <summary>
40+
/// The strategy that will be used if a versioning failure is detected. See <see cref="VersioningSettings.VersionFailureStrategy"/> for more information.
41+
/// </summary>
42+
public VersioningSettings.VersionFailureStrategy VersionFailureStrategy { get; set; } = VersioningSettings.VersionFailureStrategy.Reject;
43+
3344
/// <summary>
3445
/// Settings used for Durable HTTP functionality.
3546
/// </summary>

src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
</ItemGroup>
5151

5252
<ItemGroup>
53-
<Protobuf Include="**/*.proto" GrpcServices="Both" Access="Internal" />
53+
<Protobuf Include="**/*.proto" GrpcServices="Both" Access="Internal" />
5454
</ItemGroup>
5555

5656
<ItemGroup>

src/Worker.Extensions.DurableTask/FunctionsOrchestrationContext.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public FunctionsOrchestrationContext(TaskOrchestrationContext innerContext, Func
4848

4949
public override ParentOrchestrationInstance? Parent => this.innerContext.Parent;
5050

51+
public override string Version => this.innerContext.Version;
52+
5153
protected override ILoggerFactory LoggerFactory { get; }
5254

5355
public override TaskOrchestrationEntityFeature Entities =>
@@ -87,6 +89,29 @@ public override Task<TResult> CallSubOrchestratorAsync<TResult>(
8789
TaskName orchestratorName, object? input = null, TaskOptions? options = null)
8890
{
8991
this.EnsureLegalAccess();
92+
// If we have a default version, add it to the TaskOptions and call the inner context with it.
93+
if (this.innerContext.Properties.TryGetValue("defaultVersion", out var propVersion) && propVersion is string defaultVersion)
94+
{
95+
// TODO: Update to copy constructors when available - https://github.com/microsoft/durabletask-dotnet/issues/440
96+
SubOrchestrationOptions subOptions;
97+
if (options is SubOrchestrationOptions subOrchestrationOptions)
98+
{
99+
subOptions = new SubOrchestrationOptions(options)
100+
{
101+
InstanceId = subOrchestrationOptions.InstanceId,
102+
Version = subOrchestrationOptions.Version?.Version ?? defaultVersion,
103+
};
104+
}
105+
else
106+
{
107+
subOptions = new SubOrchestrationOptions(options ?? new TaskOptions())
108+
{
109+
Version = defaultVersion
110+
};
111+
}
112+
return this.innerContext.CallSubOrchestratorAsync<TResult>(orchestratorName, input, subOptions);
113+
}
114+
90115
return this.innerContext.CallSubOrchestratorAsync<TResult>(orchestratorName, input, options);
91116
}
92117

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using Microsoft.Azure.Functions.Worker;
5+
using Microsoft.Azure.Functions.Worker.Http;
6+
using Microsoft.DurableTask;
7+
using Microsoft.DurableTask.Client;
8+
using Microsoft.Extensions.Logging;
9+
10+
namespace Microsoft.Azure.Durable.Tests.E2E;
11+
12+
public static class VersionedOrchestration
13+
{
14+
[Function(nameof(VersionedOrchestration))]
15+
public static async Task<string> RunOrchestrator(
16+
[OrchestrationTrigger] TaskOrchestrationContext context)
17+
{
18+
ILogger logger = context.CreateReplaySafeLogger(nameof(VersionedOrchestration));
19+
logger.LogInformation($"Versioned orchestration! Version: '{context.Version}'");
20+
21+
return await context.CallActivityAsync<string>(nameof(SayVersion), context.Version);
22+
}
23+
24+
[Function(nameof(RunWithSubOrchestrator))]
25+
public static async Task<string> RunWithSubOrchestrator(
26+
[OrchestrationTrigger] TaskOrchestrationContext context,
27+
string? subVersion)
28+
{
29+
ILogger logger = context.CreateReplaySafeLogger(nameof(VersionedOrchestration));
30+
logger.LogInformation($"Versioned orchestration! Version: '{context.Version}' Sub Version: '{subVersion}'");
31+
32+
string subOrchestrationResponse;
33+
if (subVersion == null)
34+
{
35+
subOrchestrationResponse = await context.CallSubOrchestratorAsync<string>(nameof(VersionedSubOrchestration));
36+
}
37+
else
38+
{
39+
subOrchestrationResponse = await context.CallSubOrchestratorAsync<string>(nameof(VersionedSubOrchestration), new SubOrchestrationOptions
40+
{
41+
Version = subVersion,
42+
});
43+
}
44+
return $"Parent Version: '{context.Version}' | Sub {subOrchestrationResponse}";
45+
}
46+
47+
[Function(nameof(VersionedSubOrchestration))]
48+
public static async Task<string> VersionedSubOrchestration([OrchestrationTrigger] TaskOrchestrationContext context)
49+
{
50+
ILogger logger = context.CreateReplaySafeLogger(nameof(VersionedOrchestration));
51+
logger.LogInformation($"Versioned sub-orchestration! Version: '{context.Version}'");
52+
53+
return await context.CallActivityAsync<string>(nameof(SayVersion), context.Version);
54+
}
55+
56+
[Function(nameof(SayVersion))]
57+
public static string SayVersion([ActivityTrigger] string version, FunctionContext executionContext)
58+
{
59+
ILogger logger = executionContext.GetLogger("SayVersion");
60+
logger.LogInformation("Activity running with version: {name}.", version);
61+
return $"Version: '{version}'";
62+
}
63+
64+
[Function("OrchestrationVersion_HttpStart")]
65+
public static async Task<HttpResponseData> HttpStart(
66+
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
67+
[DurableClient] DurableTaskClient client,
68+
FunctionContext executionContext,
69+
string? version)
70+
{
71+
ILogger logger = executionContext.GetLogger("VersionedOrchestration_HttpStart");
72+
73+
// Function input comes from the request content.
74+
string instanceId;
75+
if (version != null)
76+
{
77+
instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(VersionedOrchestration), new StartOrchestrationOptions
78+
{
79+
Version = version,
80+
});
81+
}
82+
else
83+
{
84+
instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(VersionedOrchestration));
85+
}
86+
87+
logger.LogInformation("Started orchestration with ID = '{instanceId}' and Version = '{version}'.", instanceId, version);
88+
89+
// Returns an HTTP 202 response with an instance management payload.
90+
// See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration
91+
return await client.CreateCheckStatusResponseAsync(req, instanceId);
92+
}
93+
94+
[Function("OrchestrationSubVersion_HttpStart")]
95+
public static async Task<HttpResponseData> HttpSubStart(
96+
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
97+
[DurableClient] DurableTaskClient client,
98+
FunctionContext executionContext,
99+
string? subOrchestrationVersion)
100+
{
101+
ILogger logger = executionContext.GetLogger("OrchestrationSubVersion_HttpStart");
102+
103+
// Function input comes from the request content.
104+
string instanceId;
105+
if (subOrchestrationVersion != null)
106+
{
107+
instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(RunWithSubOrchestrator), input: subOrchestrationVersion);
108+
}
109+
else
110+
{
111+
instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(RunWithSubOrchestrator));
112+
}
113+
114+
logger.LogInformation("Started orchestration with ID = '{instanceId}' and Version = '{subOrchestrationVersion}'.", instanceId, subOrchestrationVersion);
115+
116+
// Returns an HTTP 202 response with an instance management payload.
117+
// See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration
118+
return await client.CreateCheckStatusResponseAsync(req, instanceId);
119+
}
120+
}

test/e2e/Apps/BasicDotNetIsolated/host.json

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@
1111
},
1212
"extensions": {
1313
"durableTask": {
14-
"tracing": {
15-
"DistributedTracingEnabled": true,
16-
"Version": "V2"
17-
}
14+
"tracing": {
15+
"DistributedTracingEnabled": true,
16+
"Version": "V2"
17+
},
18+
"defaultVersion": "2.0",
19+
"versionMatchStrategy": "CurrentOrOlder",
20+
"versionFailureStrategy": "Fail"
1821
}
1922
}
2023
}

test/e2e/Tests/Fixtures/FunctionAppFixture.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public class FunctionAppFixture : IAsyncLifetime
1515
private readonly ILogger _logger;
1616
private bool _disposed;
1717
private Process? _funcProcess;
18+
private string? _appName;
1819

1920
private JobObjectRegistry? _jobObjectRegistry;
2021

@@ -25,6 +26,7 @@ public FunctionAppFixture(IMessageSink messageSink)
2526
this.TestLogs = new TestLoggerProvider(messageSink);
2627
loggerFactory.AddProvider(this.TestLogs);
2728
this._logger = loggerFactory.CreateLogger<FunctionAppFixture>();
29+
this._appName = Environment.GetEnvironmentVariable("TEST_APP_NAME") ?? "BasicDotNetIsolated";
2830
}
2931

3032
public async Task InitializeAsync()
@@ -40,7 +42,7 @@ public async Task InitializeAsync()
4042
this._logger.LogInformation($"Starting functions host for {Constants.FunctionAppCollectionName}...");
4143

4244
string rootDir = Path.GetFullPath(@"../../../../../../");
43-
string e2eAppBinPath = Path.Combine(rootDir, @"test/e2e/Apps/BasicDotNetIsolated/bin");
45+
string e2eAppBinPath = Path.Combine(rootDir, @$"test/e2e/Apps/{this._appName}/bin");
4446
string? e2eHostJson = Directory.GetFiles(e2eAppBinPath, "host.json", SearchOption.AllDirectories).FirstOrDefault();
4547

4648
if (e2eHostJson == null)

0 commit comments

Comments
 (0)