Skip to content

Commit 3bd76fa

Browse files
committed
Add OrchestrationServiceShim for in-app hub hosting
1 parent f8aeb69 commit 3bd76fa

File tree

8 files changed

+309
-15
lines changed

8 files changed

+309
-15
lines changed

Microsoft.DurableTask.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Shared.AzureManaged.Tests",
8585
EndProject
8686
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ConsoleAppMinimal", "samples\ConsoleAppMinimal\ConsoleAppMinimal.csproj", "{B48FACA9-A328-452A-BFAE-C4F60F9C7024}"
8787
EndProject
88+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Worker.OrchestrationServiceShim", "src\Worker\OrchestrationServiceShim\Worker.OrchestrationServiceShim.csproj", "{361E87D2-5CF6-4BB2-8B11-0BE736F88EB8}"
89+
EndProject
8890
Global
8991
GlobalSection(SolutionConfigurationPlatforms) = preSolution
9092
Debug|Any CPU = Debug|Any CPU
@@ -223,6 +225,10 @@ Global
223225
{B48FACA9-A328-452A-BFAE-C4F60F9C7024}.Debug|Any CPU.Build.0 = Debug|Any CPU
224226
{B48FACA9-A328-452A-BFAE-C4F60F9C7024}.Release|Any CPU.ActiveCfg = Release|Any CPU
225227
{B48FACA9-A328-452A-BFAE-C4F60F9C7024}.Release|Any CPU.Build.0 = Release|Any CPU
228+
{361E87D2-5CF6-4BB2-8B11-0BE736F88EB8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
229+
{361E87D2-5CF6-4BB2-8B11-0BE736F88EB8}.Debug|Any CPU.Build.0 = Debug|Any CPU
230+
{361E87D2-5CF6-4BB2-8B11-0BE736F88EB8}.Release|Any CPU.ActiveCfg = Release|Any CPU
231+
{361E87D2-5CF6-4BB2-8B11-0BE736F88EB8}.Release|Any CPU.Build.0 = Release|Any CPU
226232
EndGlobalSection
227233
GlobalSection(SolutionProperties) = preSolution
228234
HideSolutionNode = FALSE
@@ -265,6 +271,7 @@ Global
265271
{CECADDB5-E30A-4CE2-8604-9AC596D4A2DC} = {E5637F81-2FB9-4CD7-900D-455363B142A7}
266272
{3272C041-F81D-4C85-A4FB-2A700B5A7A9D} = {CECADDB5-E30A-4CE2-8604-9AC596D4A2DC}
267273
{B48FACA9-A328-452A-BFAE-C4F60F9C7024} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17}
274+
{361E87D2-5CF6-4BB2-8B11-0BE736F88EB8} = {5B448FF6-EC42-491D-A22E-1DC8B618E6D5}
268275
EndGlobalSection
269276
GlobalSection(ExtensibilityGlobals) = postSolution
270277
SolutionGuid = {AB41CB55-35EA-4986-A522-387AB3402E71}

src/Worker/Core/Shims/DurableTaskShimFactory.cs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,16 @@ namespace Microsoft.DurableTask.Worker.Shims;
1616
/// This class is intended for use with alternate .NET-based durable task runtimes. It's not intended for use
1717
/// in application code.
1818
/// </remarks>
19-
public class DurableTaskShimFactory
19+
/// <remarks>
20+
/// Initializes a new instance of the <see cref="DurableTaskShimFactory" /> class.
21+
/// </remarks>
22+
/// <param name="options">The data converter.</param>
23+
/// <param name="loggerFactory">The logger factory.</param>
24+
public class DurableTaskShimFactory(
25+
DurableTaskWorkerOptions? options = null, ILoggerFactory? loggerFactory = null)
2026
{
21-
readonly DurableTaskWorkerOptions options;
22-
readonly ILoggerFactory loggerFactory;
23-
24-
/// <summary>
25-
/// Initializes a new instance of the <see cref="DurableTaskShimFactory" /> class.
26-
/// </summary>
27-
/// <param name="options">The data converter.</param>
28-
/// <param name="loggerFactory">The logger factory.</param>
29-
public DurableTaskShimFactory(
30-
DurableTaskWorkerOptions? options = null, ILoggerFactory? loggerFactory = null)
31-
{
32-
this.options = options ?? new();
33-
this.loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
34-
}
27+
readonly DurableTaskWorkerOptions options = options ?? new();
28+
readonly ILoggerFactory loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
3529

3630
/// <summary>
3731
/// Gets the default <see cref="DurableTaskShimFactory" /> with default values.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using DurableTask.Core;
5+
using CoreTaskActivity = DurableTask.Core.TaskActivity;
6+
7+
namespace Microsoft.DurableTask.Worker.OrchestrationServiceShim.Core;
8+
9+
/// <summary>
10+
/// A shim activity manager which allows for creating the actual activity in the middleware.
11+
/// </summary>
12+
sealed class ShimActivityManager : INameVersionObjectManager<CoreTaskActivity>
13+
{
14+
/// <inheritdoc/>
15+
public void Add(ObjectCreator<CoreTaskActivity> creator) => throw new NotSupportedException();
16+
17+
/// <inheritdoc/>
18+
public CoreTaskActivity? GetObject(string name, string? version) => new ShimTaskActivity();
19+
}
20+
21+
/// <summary>
22+
/// A shim task activity which allows for creating the actual activity in the middleware.
23+
/// </summary>
24+
sealed class ShimTaskActivity : CoreTaskActivity
25+
{
26+
CoreTaskActivity? activity;
27+
28+
/// <inheritdoc/>
29+
public override string Run(TaskContext context, string input) => throw new NotImplementedException();
30+
31+
/// <inheritdoc/>
32+
public override Task<string> RunAsync(TaskContext context, string input)
33+
{
34+
Verify.NotNull(this.activity);
35+
return this.activity.RunAsync(context, input);
36+
}
37+
38+
/// <summary>
39+
/// Sets the inner activity.
40+
/// </summary>
41+
/// <param name="activity">The activity to set.</param>
42+
internal void SetInnerActivity(CoreTaskActivity activity) => this.activity = Check.NotNull(activity);
43+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using DurableTask.Core;
5+
6+
namespace Microsoft.DurableTask.Worker.OrchestrationServiceShim.Core;
7+
8+
/// <summary>
9+
/// A shim activity manager which allows for creating the actual activity in the middleware.
10+
/// </summary>
11+
sealed class ShimOrchestrationManager : INameVersionObjectManager<TaskOrchestration>
12+
{
13+
/// <inheritdoc/>
14+
public void Add(ObjectCreator<TaskOrchestration> creator) => throw new NotSupportedException();
15+
16+
/// <inheritdoc/>
17+
public TaskOrchestration? GetObject(string name, string? version) => null;
18+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using DurableTask.Core;
5+
using Microsoft.DurableTask.Worker.OrchestrationServiceShim;
6+
using Microsoft.Extensions.DependencyInjection;
7+
8+
namespace Microsoft.DurableTask.Worker;
9+
10+
/// <summary>
11+
/// Extension methods for adding Durable Task support to .NET hosted services, such as ASP.NET Core hosts.
12+
/// </summary>
13+
public static class DurableTaskWorkerBuilderExtensions
14+
{
15+
/// <summary>Configures the <see cref="IDurableTaskWorkerBuilder" /> to be a corker backed by a
16+
/// <see cref="IOrchestrationService" />.
17+
/// </summary>
18+
/// <remarks>
19+
/// This must be called independently of worker registration.
20+
/// </remarks>
21+
/// <param name="builder">The builder to configure.</param>
22+
/// <returns>The original builder, for call chaining.</returns>
23+
public static IDurableTaskWorkerBuilder UseOrchestrationService(this IDurableTaskWorkerBuilder builder)
24+
=> builder.UseOrchestrationService(opt => { });
25+
26+
/// <summary>
27+
/// Configures the <see cref="IDurableTaskWorkerBuilder" /> to be a Worker backed by a
28+
/// <see cref="IOrchestrationService" />.
29+
/// </summary>
30+
/// <remarks>
31+
/// This must be called independently of worker registration.
32+
/// </remarks>
33+
/// <param name="builder">The builder to configure.</param>
34+
/// <param name="orchestrationService">The orchestration service to use.</param>
35+
/// <returns>The original builder, for call chaining.</returns>
36+
public static IDurableTaskWorkerBuilder UseOrchestrationService(
37+
this IDurableTaskWorkerBuilder builder, IOrchestrationService orchestrationService)
38+
=> builder.UseOrchestrationService(opt =>
39+
{
40+
opt.Service = orchestrationService;
41+
});
42+
43+
/// <summary>Configures the <see cref="IDurableTaskWorkerBuilder" /> to be a Worker backed by a
44+
/// <see cref="IOrchestrationService" />.
45+
/// </summary>
46+
/// <remarks>
47+
/// This must be called independently of worker registration.
48+
/// </remarks>
49+
/// <param name="builder">The builder to configure.</param>
50+
/// <param name="configure">The action to configure the Worker options.</param>
51+
/// <returns>The original builder, for call chaining.</returns>
52+
public static IDurableTaskWorkerBuilder UseOrchestrationService(
53+
this IDurableTaskWorkerBuilder builder, Action<ShimDurableTaskWorkerOptions> configure)
54+
{
55+
Check.NotNull(builder);
56+
Check.NotNull(configure);
57+
builder.Services.Configure(builder.Name, configure);
58+
builder.Services.AddOptions<ShimDurableTaskWorkerOptions>(builder.Name)
59+
.PostConfigure<IServiceProvider>((opt, sp) =>
60+
{
61+
if (opt.Service is not null)
62+
{
63+
return;
64+
}
65+
66+
// Try to resolve from service container.
67+
opt.Service = sp.GetService<IOrchestrationService>();
68+
})
69+
.Validate(x => x.Service is not null, "ShimDurableTaskWorkerOptions.Service must not be null.");
70+
71+
return builder.UseBuildTarget<ShimDurableTaskWorker, ShimDurableTaskWorkerOptions>();
72+
}
73+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using DurableTask.Core;
5+
using Microsoft.DurableTask.Worker.Hosting;
6+
7+
namespace Microsoft.DurableTask.Worker.OrchestrationServiceShim;
8+
9+
/// <summary>
10+
/// The shim client options.
11+
/// </summary>
12+
public sealed class ShimDurableTaskWorkerOptions : DurableTaskWorkerOptions
13+
{
14+
/// <summary>
15+
/// Gets or sets the <see cref="IOrchestrationServiceClient" /> to use in the <see cref="DurableTaskWorker" />.
16+
/// If not manually set, this will be resolved from the <see cref="IServiceProvider" />, if available.
17+
/// </summary>
18+
public IOrchestrationService? Service { get; set; }
19+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using DurableTask.Core;
5+
using DurableTask.Core.History;
6+
using DurableTask.Core.Middleware;
7+
using Microsoft.DurableTask.Worker.Hosting;
8+
using Microsoft.DurableTask.Worker.OrchestrationServiceShim.Core;
9+
using Microsoft.DurableTask.Worker.Shims;
10+
using Microsoft.Extensions.DependencyInjection;
11+
using Microsoft.Extensions.Logging;
12+
using Microsoft.Extensions.Options;
13+
using IOrchestrationService = DurableTask.Core.IOrchestrationService;
14+
15+
namespace Microsoft.DurableTask.Worker.OrchestrationServiceShim;
16+
17+
/// <summary>
18+
/// A <see cref="DurableTaskWorker" /> which uses a <see cref="IOrchestrationService"/>.
19+
/// </summary>
20+
class ShimDurableTaskWorker : DurableTaskWorker
21+
{
22+
readonly ShimDurableTaskWorkerOptions options;
23+
readonly IServiceProvider services;
24+
readonly TaskHubWorker worker;
25+
readonly DurableTaskShimFactory shimFactory;
26+
27+
/// <summary>
28+
/// Initializes a new instance of the <see cref="ShimDurableTaskWorker" /> class.
29+
/// </summary>/// <param name="name">The name of this worker.</param>
30+
/// <param name="factory">The <see cref="IDurableTaskFactory"/>.</param>
31+
/// <param name="options">The options for this worker.</param>
32+
/// <param name="services">The service provider.</param>
33+
/// <param name="loggerFactory">The logger factory.</param>
34+
public ShimDurableTaskWorker(
35+
string? name,
36+
IDurableTaskFactory factory,
37+
IOptionsMonitor<ShimDurableTaskWorkerOptions> options,
38+
IServiceProvider services,
39+
ILoggerFactory loggerFactory)
40+
: base(name, factory)
41+
{
42+
this.options = Check.NotNull(options).Get(name);
43+
this.services = Check.NotNull(services);
44+
this.shimFactory = new(this.options, loggerFactory);
45+
46+
// This should already be validated by options.
47+
IOrchestrationService service = Verify.NotNull(this.options.Service);
48+
this.worker = new TaskHubWorker(
49+
service, new ShimOrchestrationManager(), new ShimActivityManager(), loggerFactory);
50+
this.worker.AddActivityDispatcherMiddleware(this.InvokeActivityAsync);
51+
this.worker.AddOrchestrationDispatcherMiddleware(this.InvokeOrchestrationAsync);
52+
}
53+
54+
/// <inheritdoc/>
55+
public override async Task StopAsync(CancellationToken cancellationToken)
56+
{
57+
await base.StopAsync(cancellationToken);
58+
await this.worker.StopAsync();
59+
}
60+
61+
/// <inheritdoc/>
62+
protected override Task ExecuteAsync(CancellationToken stoppingToken)
63+
{
64+
return this.worker.StartAsync().WaitAsync(stoppingToken);
65+
}
66+
67+
async Task InvokeActivityAsync(DispatchMiddlewareContext context, Func<Task> next)
68+
{
69+
Check.NotNull(context);
70+
Check.NotNull(next);
71+
72+
TaskScheduledEvent scheduled = context.GetProperty<TaskScheduledEvent>();
73+
if (scheduled.Name is null)
74+
{
75+
throw new InvalidOperationException("TaskScheduledEvent.Name is not set.");
76+
}
77+
78+
TaskName name = new(scheduled.Name);
79+
TaskActivity coreActivity = context.GetProperty<TaskActivity>();
80+
if (coreActivity is not ShimTaskActivity shimActivity)
81+
{
82+
throw new InvalidOperationException("TaskActivity is not a ShimTaskActivity.");
83+
}
84+
85+
await using AsyncServiceScope scope = this.services.CreateAsyncScope();
86+
if (!this.Factory.TryCreateActivity(name, scope.ServiceProvider, out ITaskActivity? activity))
87+
{
88+
throw new InvalidOperationException($"Activity not found: {name}");
89+
}
90+
91+
shimActivity.SetInnerActivity(this.shimFactory.CreateActivity(name, activity));
92+
}
93+
94+
async Task InvokeOrchestrationAsync(DispatchMiddlewareContext context, Func<Task> next)
95+
{
96+
Check.NotNull(context);
97+
Check.NotNull(next);
98+
99+
OrchestrationRuntimeState runtimeState = context.GetProperty<OrchestrationRuntimeState>();
100+
await using AsyncServiceScope scope = this.services.CreateAsyncScope();
101+
102+
TaskName name = new(runtimeState.Name);
103+
if (!this.Factory.TryCreateOrchestrator(name, scope.ServiceProvider, out ITaskOrchestrator? orchestrator))
104+
{
105+
throw new InvalidOperationException($"Orchestrator not found: {name}");
106+
}
107+
108+
ParentOrchestrationInstance? parent = runtimeState.ParentInstance is { } p ?
109+
new ParentOrchestrationInstance(p.Name, p.OrchestrationInstance.InstanceId) : null;
110+
111+
TaskOrchestrationExecutor executor = new(
112+
runtimeState,
113+
this.shimFactory.CreateOrchestration(name, orchestrator, parent),
114+
BehaviorOnContinueAsNew.Carryover,
115+
ErrorPropagationMode.UseFailureDetails);
116+
117+
OrchestratorExecutionResult result = executor.Execute();
118+
context.SetProperty(result);
119+
await next();
120+
}
121+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>netstandard2.0</TargetFramework>
5+
<PackageDescription>A DurableTaskWorker implementation using IOrchestrationService from DurableTask.Core.</PackageDescription>
6+
<EnableStyleCop>true</EnableStyleCop>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<ProjectReference Include="../Core/Worker.csproj" />
11+
<ProjectReference Include="../../Abstractions/Abstractions.csproj" />
12+
</ItemGroup>
13+
14+
<ItemGroup>
15+
<SharedSection Include="Core" />
16+
<SharedSection Include="DependencyInjection" />
17+
</ItemGroup>
18+
19+
</Project>

0 commit comments

Comments
 (0)