Skip to content

Commit 395b4d8

Browse files
committed
Introduce orchestration versioning into worker
This commit gives the worker several ways to configure versioning. Included are, setting the version, how to match against incoming versions, and how to react to a failed match. Signed-off-by: halspang <[email protected]>
1 parent e32fa7a commit 395b4d8

File tree

9 files changed

+1073
-655
lines changed

9 files changed

+1073
-655
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
- Introduce default version setting to DurableTaskClient and expose to orchestrator ([#393](https://github.com/microsoft/durabletask-dotnet/pull/393))
66
- Add support for local credential types in DTS libraries ([#396](https://github.com/microsoft/durabletask-dotnet/pull/396))
77
- Add utility for easier version comparison in orchestration context ([#394](https://github.com/microsoft/durabletask-dotnet/pull/394))
8-
- Add tags support for orchestrations ([#397])(https://github.com/microsoft/durabletask-dotnet/pull/397)
8+
- Add tags support for orchestrations ([#397](https://github.com/microsoft/durabletask-dotnet/pull/397))
9+
- Add support for versioning in the gRPC worker ([#401](https://github.com/microsoft/durabletask-dotnet/pull/401))
910

1011
## v1.8.1
1112

src/Abstractions/TaskOrchestrationContext.cs

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4+
using Microsoft.DurableTask.Abstractions;
45
using Microsoft.DurableTask.Entities;
56
using Microsoft.Extensions.Logging;
67

@@ -421,32 +422,7 @@ public virtual ILogger CreateReplaySafeLogger<T>()
421422
/// <returns>True if the orchestration's version is greater than the provided version, false otherwise.</returns>
422423
public virtual int CompareVersionTo(string version)
423424
{
424-
// Both versions are empty, treat as equal.
425-
if (string.IsNullOrWhiteSpace(this.Version) && string.IsNullOrWhiteSpace(version))
426-
{
427-
return 0;
428-
}
429-
430-
// An empty version in the context is always less than a defined version in the parameter.
431-
if (string.IsNullOrWhiteSpace(this.Version))
432-
{
433-
return -1;
434-
}
435-
436-
// An empty version in the parameter is always less than a defined version in the context.
437-
if (string.IsNullOrWhiteSpace(version))
438-
{
439-
return 1;
440-
}
441-
442-
// If both versions use the .NET Version class, return that comparison.
443-
if (System.Version.TryParse(this.Version, out Version contextVersion) && System.Version.TryParse(version, out Version otherVersion))
444-
{
445-
return contextVersion.CompareTo(otherVersion);
446-
}
447-
448-
// If we have gotten to here, we don't know the syntax of the versions we are comparing, use a string comparison as a final check.
449-
return string.Compare(this.Version, version, StringComparison.OrdinalIgnoreCase);
425+
return TaskOrchestrationVersioningUtils.CompareVersions(this.Version, version);
450426
}
451427

452428
class ReplaySafeLogger : ILogger
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
namespace Microsoft.DurableTask.Abstractions;
5+
6+
/// <summary>
7+
/// Utilities for handling Orchestration/Task versioning operations.
8+
/// </summary>
9+
public static class TaskOrchestrationVersioningUtils
10+
{
11+
/// <summary>
12+
/// Compare two versions to each other.
13+
/// </summary>
14+
/// <remarks>
15+
/// This method's comparison is handled in the following order:
16+
/// 1. The versions are checked if they are empty (non-versioned). Both being empty signifies equality.
17+
/// 2. If sourceVersion is empty but otherVersion is defined, this is treated as the source being less than the other.
18+
/// 3. If otherVersion is empty but sourceVersion is defined, this is treated as the source being greater than the other.
19+
/// 4. Both versions are attempted to be parsed into System.Version and compared as such.
20+
/// 5. If all else fails, a direct string comparison is done between the versions.
21+
/// </remarks>
22+
/// <param name="sourceVersion">The source version that will be compared against the other version.</param>
23+
/// <param name="otherVersion">The other version to compare against.</param>
24+
/// <returns>An int representing how sourceVersion compares to otherVersion.</returns>
25+
public static int CompareVersions(string sourceVersion, string otherVersion)
26+
{
27+
// Both versions are empty, treat as equal.
28+
if (string.IsNullOrWhiteSpace(sourceVersion) && string.IsNullOrWhiteSpace(otherVersion))
29+
{
30+
return 0;
31+
}
32+
33+
// An empty version in the context is always less than a defined version in the parameter.
34+
if (string.IsNullOrWhiteSpace(sourceVersion))
35+
{
36+
return -1;
37+
}
38+
39+
// An empty version in the parameter is always less than a defined version in the context.
40+
if (string.IsNullOrWhiteSpace(otherVersion))
41+
{
42+
return 1;
43+
}
44+
45+
// If both versions use the .NET Version class, return that comparison.
46+
if (System.Version.TryParse(sourceVersion, out Version parsedSourceVersion) && System.Version.TryParse(otherVersion, out Version parsedOtherVersion))
47+
{
48+
return parsedSourceVersion.CompareTo(parsedOtherVersion);
49+
}
50+
51+
// If we have gotten to here, we don't know the syntax of the versions we are comparing, use a string comparison as a final check.
52+
return string.Compare(sourceVersion, otherVersion, StringComparison.OrdinalIgnoreCase);
53+
}
54+
}

src/Client/Core/DurableTaskClient.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,19 @@ public virtual Task<PurgeResult> PurgeAllInstancesAsync(
399399
throw new NotSupportedException($"{this.GetType()} does not support purging of orchestration instances.");
400400
}
401401

402+
/// <summary>
403+
/// Abandons a task orchestration work item to be re-enqueued at a later time.
404+
/// </summary>
405+
/// <param name="completionToken">The completion token that was given when the WorkItem was fetched.</param>
406+
/// <param name="cancellation">
407+
/// A <see cref="CancellationToken"/> that can be used to cancel the purge operation.
408+
/// </param>
409+
/// <returns>A task that completes when the WorkItem has been abandoned.</returns>
410+
public virtual Task AbandonOrchestrationTask(string completionToken, CancellationToken cancellation = default)
411+
{
412+
throw new NotSupportedException($"{this.GetType()} does not support abandoning task orchestration work items.");
413+
}
414+
402415
// TODO: Create task hub
403416

404417
// TODO: Delete task hub

src/Client/Grpc/GrpcDurableTaskClient.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,17 @@ public override Task<PurgeResult> PurgeAllInstancesAsync(
396396
return this.PurgeInstancesCoreAsync(request, cancellation);
397397
}
398398

399+
/// <inheritdoc/>
400+
public override async Task AbandonOrchestrationTask(string completionToken, CancellationToken cancellation = default)
401+
{
402+
P.AbandonOrchestrationTaskRequest request = new()
403+
{
404+
CompletionToken = completionToken,
405+
};
406+
407+
await this.sidecarClient.AbandonTaskOrchestratorWorkItemAsync(request, cancellationToken: cancellation);
408+
}
409+
399410
static AsyncDisposable GetCallInvoker(GrpcDurableTaskClientOptions options, out CallInvoker callInvoker)
400411
{
401412
if (options.Channel is GrpcChannel c)

src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Microsoft.DurableTask.Worker.Hosting;
55
using Microsoft.Extensions.DependencyInjection;
66
using Microsoft.Extensions.Options;
7+
using static Microsoft.DurableTask.Worker.DurableTaskWorkerOptions;
78

89
namespace Microsoft.DurableTask.Worker;
910

@@ -86,4 +87,25 @@ public static IDurableTaskWorkerBuilder UseBuildTarget<TTarget, TOptions>(this I
8687
});
8788
return builder;
8889
}
90+
91+
/// <summary>
92+
/// Configures the versioning options for this builder.
93+
/// </summary>
94+
/// <param name="builder">The builder to set the builder target for.</param>
95+
/// <param name="versionOptions">The collection of options specified for versioning the worker.</param>
96+
/// <returns>The original builder, for call chaining.</returns>
97+
public static IDurableTaskWorkerBuilder UseVersioning(this IDurableTaskWorkerBuilder builder, VersioningOptions versionOptions)
98+
{
99+
Check.NotNull(builder);
100+
builder.Configure(options =>
101+
{
102+
options.Versioning = new VersioningOptions
103+
{
104+
Version = versionOptions.Version,
105+
MatchStrategy = versionOptions.MatchStrategy,
106+
FailureStrategy = versionOptions.FailureStrategy,
107+
};
108+
});
109+
return builder;
110+
}
89111
}

src/Worker/Core/DurableTaskWorkerOptions.cs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,51 @@ namespace Microsoft.DurableTask.Worker;
1010
/// </summary>
1111
public class DurableTaskWorkerOptions
1212
{
13+
readonly VersioningOptions versioning = new();
1314
DataConverter dataConverter = JsonDataConverter.Default;
1415

16+
/// <summary>
17+
/// Defines the version matching strategy for the Durable Task worker.
18+
/// </summary>
19+
public enum VersionMatchStrategy
20+
{
21+
/// <summary>
22+
/// Ignore Orchestration version, all work received is processed.
23+
/// </summary>
24+
None = 0,
25+
26+
/// <summary>
27+
/// Worker will only process Tasks from Orchestrations with the same version as the worker.
28+
/// </summary>
29+
Strict = 1,
30+
31+
/// <summary>
32+
/// Worker will process Tasks from Orchestrations whose version is less than or equal to the worker.
33+
/// </summary>
34+
CurrentOrOlder = 2,
35+
}
36+
37+
/// <summary>
38+
/// Defines the versioning failure strategy for the Durable Task worker.
39+
/// </summary>
40+
public enum VersionFailureStrategy
41+
{
42+
/// <summary>
43+
/// Do not change the orchestration state if the version does not adhere to the matching strategy.
44+
/// </summary>
45+
Reject = 0,
46+
47+
/// <summary>
48+
/// Suspend the orchestration if the version does not adhere to the matching strategy.
49+
/// </summary>
50+
Suspend = 1,
51+
52+
/// <summary>
53+
/// Fail the orchestration if the version does not adhere to the matching strategy.
54+
/// </summary>
55+
Fail = 2,
56+
}
57+
1558
/// <summary>
1659
/// Gets or sets the data converter. Default value is <see cref="JsonDataConverter.Default" />.
1760
/// </summary>
@@ -93,6 +136,38 @@ public DataConverter DataConverter
93136
/// </remarks>
94137
public ConcurrencyOptions Concurrency { get; } = new();
95138

139+
/// <summary>
140+
/// Gets or sets the versioning options for the Durable Task worker.
141+
/// </summary>
142+
/// <remarks>
143+
/// Worker versioning controls how a worker will handle orchestrations of different versions. Defining both the
144+
/// version of the worker, the versions that can be worked on, and what to do in case a version does not comply
145+
/// with the given options.
146+
/// </remarks>
147+
public VersioningOptions Versioning
148+
{
149+
get => this.versioning;
150+
set
151+
{
152+
if (value is not null)
153+
{
154+
this.IsVersioningSet = true;
155+
this.Versioning.Version = value.Version;
156+
this.Versioning.MatchStrategy = value.MatchStrategy;
157+
this.Versioning.FailureStrategy = value.FailureStrategy;
158+
}
159+
else
160+
{
161+
this.IsVersioningSet = false;
162+
}
163+
}
164+
}
165+
166+
/// <summary>
167+
/// Gets a value indicating whether versioning is explicitly set or not.
168+
/// </summary>
169+
public bool IsVersioningSet { get; internal set; }
170+
96171
/// <summary>
97172
/// Gets a value indicating whether <see cref="DataConverter" /> was explicitly set or not.
98173
/// </summary>
@@ -116,6 +191,7 @@ internal void ApplyTo(DurableTaskWorkerOptions other)
116191
other.DataConverter = this.DataConverter;
117192
other.MaximumTimerInterval = this.MaximumTimerInterval;
118193
other.EnableEntitySupport = this.EnableEntitySupport;
194+
other.Versioning = this.Versioning;
119195
}
120196
}
121197

@@ -139,4 +215,28 @@ public class ConcurrencyOptions
139215
/// </summary>
140216
public int MaximumConcurrentEntityWorkItems { get; set; } = 100 * Environment.ProcessorCount;
141217
}
218+
219+
/// <summary>
220+
/// Options for the Durable Task worker versioning.
221+
/// </summary>
222+
public class VersioningOptions
223+
{
224+
/// <summary>
225+
/// Gets or sets the version of orchestrations that the worker can work on.
226+
/// </summary>
227+
public string Version { get; set; } = string.Empty;
228+
229+
/// <summary>
230+
/// Gets or sets the versioning strategy for the Durable Task worker.
231+
/// </summary>
232+
public VersionMatchStrategy MatchStrategy { get; set; } = VersionMatchStrategy.None;
233+
234+
/// <summary>
235+
/// Gets or sets the versioning failure strategy for the Durable Task worker.
236+
/// </summary>
237+
/// <remarks>
238+
/// If the version matching strategy is set to <see cref="VersionMatchStrategy.None"/>, this value has no effect.
239+
/// </remarks>
240+
public VersionFailureStrategy FailureStrategy { get; set; } = VersionFailureStrategy.Reject;
241+
}
142242
}

0 commit comments

Comments
 (0)