Skip to content

Commit 63d45d2

Browse files
authored
Add versioning to DTFx orchestration dispatch (#1201)
This commit adds versioning similar to the feature created in durabletask-dotnet. This portion of versioning allows for an orchestration to be failed or abandoned based on the configuration provided. Signed-off-by: halspang <[email protected]>
1 parent 1514129 commit 63d45d2

File tree

8 files changed

+447
-53
lines changed

8 files changed

+447
-53
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// ----------------------------------------------------------------------------------
2+
// Copyright Microsoft Corporation
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ----------------------------------------------------------------------------------
13+
14+
using DurableTask.Core.Settings;
15+
using Microsoft.VisualStudio.TestTools.UnitTesting;
16+
17+
namespace DurableTask.Core.Tests
18+
{
19+
[TestClass]
20+
public class VersionSettingsTests
21+
{
22+
[TestMethod]
23+
[DataRow("1.0.0", "1.0.0", 0)]
24+
[DataRow("1.1.0", "1.0.0", 1)]
25+
[DataRow("1.0.0", "1.1.0", -1)]
26+
[DataRow("1", "1", 0)]
27+
[DataRow("2", "1", 1)]
28+
[DataRow("1", "2", -1)]
29+
[DataRow("", "1", -1)]
30+
[DataRow("1", "", 1)]
31+
[DataRow("", "", 0)]
32+
public void TestVersionComparison(string orchVersion, string settingVersion, int expectedComparison)
33+
{
34+
int result = VersioningSettings.CompareVersions(orchVersion, settingVersion);
35+
36+
if (expectedComparison == 0)
37+
{
38+
Assert.AreEqual(0, result, $"Expected {orchVersion} to be equal to {settingVersion}");
39+
}
40+
else if (expectedComparison < 0)
41+
{
42+
Assert.IsTrue(result < 0, $"Expected {orchVersion} to be less than {settingVersion}");
43+
}
44+
else
45+
{
46+
Assert.IsTrue(result > 0, $"Expected {orchVersion} to be greater than {settingVersion}");
47+
}
48+
}
49+
}
50+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// ----------------------------------------------------------------------------------
2+
// Copyright Microsoft Corporation
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ----------------------------------------------------------------------------------
13+
14+
using System;
15+
16+
namespace DurableTask.Core.Settings
17+
{
18+
/// <summary>
19+
/// Collection of settings that define the overall versioning behavior.
20+
/// </summary>
21+
public class VersioningSettings
22+
{
23+
/// <summary>
24+
/// Defines the version matching strategy for the Durable Task worker.
25+
/// </summary>
26+
public enum VersionMatchStrategy
27+
{
28+
/// <summary>
29+
/// Ignore Orchestration version, all work received is processed.
30+
/// </summary>
31+
None = 0,
32+
33+
/// <summary>
34+
/// Worker will only process Tasks from Orchestrations with the same version as the worker.
35+
/// </summary>
36+
Strict = 1,
37+
38+
/// <summary>
39+
/// Worker will process Tasks from Orchestrations whose version is less than or equal to the worker.
40+
/// </summary>
41+
CurrentOrOlder = 2,
42+
}
43+
44+
/// <summary>
45+
/// Defines the versioning failure strategy for the Durable Task worker.
46+
/// </summary>
47+
public enum VersionFailureStrategy
48+
{
49+
/// <summary>
50+
/// Do not change the orchestration state if the version does not adhere to the matching strategy.
51+
/// </summary>
52+
Reject = 0,
53+
54+
/// <summary>
55+
/// Fail the orchestration if the version does not adhere to the matching strategy.
56+
/// </summary>
57+
Fail = 1,
58+
}
59+
60+
/// <summary>
61+
/// Gets or sets the version associated with the settings.
62+
/// </summary>
63+
public string Version { get; set; } = string.Empty;
64+
65+
/// <summary>
66+
/// Gets or sets the <see cref="VersionMatchStrategy"/> that is used for matching versions.
67+
/// </summary>
68+
public VersionMatchStrategy MatchStrategy { get; set; } = VersionMatchStrategy.None;
69+
70+
/// <summary>
71+
/// Gets or sets the <see cref="VersionFailureStrategy"/> that is used to determine what happens on a versioning failure.
72+
/// </summary>
73+
public VersionFailureStrategy FailureStrategy { get; set; } = VersionFailureStrategy.Reject;
74+
75+
/// <summary>
76+
/// Compare two versions to each other.
77+
/// </summary>
78+
/// <remarks>
79+
/// This method's comparison is handled in the following order:
80+
/// 1. The versions are checked if they are empty (non-versioned). Both being empty signifies equality.
81+
/// 2. If sourceVersion is empty but otherVersion is defined, this is treated as the source being less than the other.
82+
/// 3. If otherVersion is empty but sourceVersion is defined, this is treated as the source being greater than the other.
83+
/// 4. Both versions are attempted to be parsed into System.Version and compared as such.
84+
/// 5. If all else fails, a direct string comparison is done between the versions.
85+
/// </remarks>
86+
/// <param name="sourceVersion">The source version that will be compared against the other version.</param>
87+
/// <param name="otherVersion">The other version to compare against.</param>
88+
/// <returns>An int representing how sourceVersion compares to otherVersion.</returns>
89+
public static int CompareVersions(string sourceVersion, string otherVersion)
90+
{
91+
// Both versions are empty, treat as equal.
92+
if (string.IsNullOrWhiteSpace(sourceVersion) && string.IsNullOrWhiteSpace(otherVersion))
93+
{
94+
return 0;
95+
}
96+
97+
// An empty version in the context is always less than a defined version in the parameter.
98+
if (string.IsNullOrWhiteSpace(sourceVersion))
99+
{
100+
return -1;
101+
}
102+
103+
// An empty version in the parameter is always less than a defined version in the context.
104+
if (string.IsNullOrWhiteSpace(otherVersion))
105+
{
106+
return 1;
107+
}
108+
109+
// If both versions use the .NET Version class, return that comparison.
110+
if (System.Version.TryParse(sourceVersion, out Version parsedSourceVersion) && System.Version.TryParse(otherVersion, out Version parsedOtherVersion))
111+
{
112+
return parsedSourceVersion.CompareTo(parsedOtherVersion);
113+
}
114+
115+
// If we have gotten to here, we don't know the syntax of the versions we are comparing, use a string comparison as a final check.
116+
return string.Compare(sourceVersion, otherVersion, StringComparison.OrdinalIgnoreCase);
117+
}
118+
}
119+
}

src/DurableTask.Core/TaskHubWorker.cs

Lines changed: 80 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@
1313

1414
namespace DurableTask.Core
1515
{
16+
using DurableTask.Core.Entities;
17+
using DurableTask.Core.Exceptions;
18+
using DurableTask.Core.Logging;
19+
using DurableTask.Core.Middleware;
20+
using DurableTask.Core.Settings;
21+
using Microsoft.Extensions.Logging;
1622
using System;
1723
using System.Collections.Generic;
1824
using System.Diagnostics;
@@ -21,11 +27,6 @@ namespace DurableTask.Core
2127
using System.Runtime.CompilerServices;
2228
using System.Threading;
2329
using System.Threading.Tasks;
24-
using DurableTask.Core.Entities;
25-
using DurableTask.Core.Exceptions;
26-
using DurableTask.Core.Logging;
27-
using DurableTask.Core.Middleware;
28-
using Microsoft.Extensions.Logging;
2930

3031
/// <summary>
3132
/// Allows users to load the TaskOrchestration and TaskActivity classes and start
@@ -36,6 +37,7 @@ public sealed class TaskHubWorker : IDisposable
3637
readonly INameVersionObjectManager<TaskActivity> activityManager;
3738
readonly INameVersionObjectManager<TaskOrchestration> orchestrationManager;
3839
readonly INameVersionObjectManager<TaskEntity> entityManager;
40+
readonly VersioningSettings versioningSettings;
3941

4042
readonly DispatchMiddlewarePipeline orchestrationDispatchPipeline = new DispatchMiddlewarePipeline();
4143
readonly DispatchMiddlewarePipeline entityDispatchPipeline = new DispatchMiddlewarePipeline();
@@ -86,6 +88,23 @@ public TaskHubWorker(IOrchestrationService orchestrationService, ILoggerFactory
8688
{
8789
}
8890

91+
/// <summary>
92+
/// Create a new TaskHubWorker with given OrchestrationService
93+
/// </summary>
94+
/// <param name="orchestrationService">Reference the orchestration service implementation</param>
95+
/// <param name="versioningSettings">The <see cref="VersioningSettings"/> that define how orchestration versions are handled</param>
96+
/// <param name="loggerFactory">The <see cref="ILoggerFactory"/> to use for logging</param>
97+
public TaskHubWorker(IOrchestrationService orchestrationService, VersioningSettings versioningSettings, ILoggerFactory loggerFactory = null)
98+
: this(
99+
orchestrationService,
100+
new NameVersionObjectManager<TaskOrchestration>(),
101+
new NameVersionObjectManager<TaskActivity>(),
102+
new NameVersionObjectManager<TaskEntity>(),
103+
versioningSettings,
104+
loggerFactory)
105+
{
106+
}
107+
89108
/// <summary>
90109
/// Create a new TaskHubWorker with given OrchestrationService and name version managers
91110
/// </summary>
@@ -101,7 +120,29 @@ public TaskHubWorker(
101120
orchestrationObjectManager,
102121
activityObjectManager,
103122
new NameVersionObjectManager<TaskEntity>(),
104-
loggerFactory: null)
123+
loggerFactory: null,
124+
versioningSettings: null)
125+
{
126+
}
127+
128+
/// <summary>
129+
/// Create a new <see cref="TaskHubWorker"/> with given <see cref="IOrchestrationService"/> and name version managers
130+
/// </summary>
131+
/// <param name="orchestrationService">The orchestration service implementation</param>
132+
/// <param name="orchestrationObjectManager">The <see cref="INameVersionObjectManager{TaskOrchestration}"/> for orchestrations</param>
133+
/// <param name="activityObjectManager">The <see cref="INameVersionObjectManager{TaskActivity}"/> for activities</param>
134+
/// <param name="loggerFactory">The <see cref="ILoggerFactory"/> to use for logging</param>
135+
public TaskHubWorker(
136+
IOrchestrationService orchestrationService,
137+
INameVersionObjectManager<TaskOrchestration> orchestrationObjectManager,
138+
INameVersionObjectManager<TaskActivity> activityObjectManager,
139+
ILoggerFactory loggerFactory = null)
140+
: this(
141+
orchestrationService,
142+
orchestrationObjectManager,
143+
activityObjectManager,
144+
new NameVersionObjectManager<TaskEntity>(),
145+
loggerFactory)
105146
{
106147
}
107148

@@ -111,17 +152,20 @@ public TaskHubWorker(
111152
/// <param name="orchestrationService">The orchestration service implementation</param>
112153
/// <param name="orchestrationObjectManager">The <see cref="INameVersionObjectManager{TaskOrchestration}"/> for orchestrations</param>
113154
/// <param name="activityObjectManager">The <see cref="INameVersionObjectManager{TaskActivity}"/> for activities</param>
155+
/// <param name="versioningSettings">The <see cref="VersioningSettings"/> that define how orchestration versions are handled</param>
114156
/// <param name="loggerFactory">The <see cref="ILoggerFactory"/> to use for logging</param>
115157
public TaskHubWorker(
116158
IOrchestrationService orchestrationService,
117159
INameVersionObjectManager<TaskOrchestration> orchestrationObjectManager,
118160
INameVersionObjectManager<TaskActivity> activityObjectManager,
161+
VersioningSettings versioningSettings,
119162
ILoggerFactory loggerFactory = null)
120163
: this(
121164
orchestrationService,
122165
orchestrationObjectManager,
123166
activityObjectManager,
124167
new NameVersionObjectManager<TaskEntity>(),
168+
versioningSettings,
125169
loggerFactory)
126170
{
127171
}
@@ -140,13 +184,40 @@ public TaskHubWorker(
140184
INameVersionObjectManager<TaskActivity> activityObjectManager,
141185
INameVersionObjectManager<TaskEntity> entityObjectManager,
142186
ILoggerFactory loggerFactory = null)
187+
: this(
188+
orchestrationService,
189+
orchestrationObjectManager,
190+
activityObjectManager,
191+
entityObjectManager,
192+
null,
193+
loggerFactory)
194+
{
195+
}
196+
197+
/// <summary>
198+
/// Create a new TaskHubWorker with given OrchestrationService and name version managers
199+
/// </summary>
200+
/// <param name="orchestrationService">Reference the orchestration service implementation</param>
201+
/// <param name="orchestrationObjectManager">NameVersionObjectManager for Orchestrations</param>
202+
/// <param name="activityObjectManager">NameVersionObjectManager for Activities</param>
203+
/// <param name="entityObjectManager">The NameVersionObjectManager for entities. The version is the entity key.</param>
204+
/// <param name="versioningSettings">The <see cref="VersioningSettings"/> that define how orchestration versions are handled</param>
205+
/// <param name="loggerFactory">The <see cref="ILoggerFactory"/> to use for logging</param>
206+
public TaskHubWorker(
207+
IOrchestrationService orchestrationService,
208+
INameVersionObjectManager<TaskOrchestration> orchestrationObjectManager,
209+
INameVersionObjectManager<TaskActivity> activityObjectManager,
210+
INameVersionObjectManager<TaskEntity> entityObjectManager,
211+
VersioningSettings versioningSettings,
212+
ILoggerFactory loggerFactory = null)
143213
{
144214
this.orchestrationManager = orchestrationObjectManager ?? throw new ArgumentException("orchestrationObjectManager");
145215
this.activityManager = activityObjectManager ?? throw new ArgumentException("activityObjectManager");
146216
this.entityManager = entityObjectManager ?? throw new ArgumentException("entityObjectManager");
147217
this.orchestrationService = orchestrationService ?? throw new ArgumentException("orchestrationService");
148218
this.logHelper = new LogHelper(loggerFactory?.CreateLogger("DurableTask.Core"));
149219
this.dispatchEntitiesSeparately = (orchestrationService as IEntityOrchestrationService)?.EntityBackendProperties?.UseSeparateQueueForEntityWorkItems ?? false;
220+
this.versioningSettings = versioningSettings;
150221
}
151222

152223
/// <summary>
@@ -219,13 +290,13 @@ public async Task<TaskHubWorker> StartAsync()
219290

220291
this.logHelper.TaskHubWorkerStarting();
221292
var sw = Stopwatch.StartNew();
222-
223293
this.orchestrationDispatcher = new TaskOrchestrationDispatcher(
224294
this.orchestrationService,
225295
this.orchestrationManager,
226296
this.orchestrationDispatchPipeline,
227297
this.logHelper,
228-
this.ErrorPropagationMode);
298+
this.ErrorPropagationMode,
299+
this.versioningSettings);
229300
this.activityDispatcher = new TaskActivityDispatcher(
230301
this.orchestrationService,
231302
this.activityManager,
@@ -357,7 +428,7 @@ public TaskHubWorker AddTaskEntities(params Type[] taskEntityTypes)
357428
type.Name,
358429
string.Empty,
359430
type);
360-
431+
361432
this.entityManager.Add(creator);
362433
}
363434

0 commit comments

Comments
 (0)