From 3bd76fa4e614353a684cb67874a397a7128863b7 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Mon, 27 Jan 2025 08:41:15 -0800 Subject: [PATCH 1/6] Add OrchestrationServiceShim for in-app hub hosting --- Microsoft.DurableTask.sln | 7 + .../Core/Shims/DurableTaskShimFactory.cs | 24 ++-- .../Core/ShimActivityManager.cs | 43 +++++++ .../Core/ShimOrchestrationManager.cs | 18 +++ .../DurableTaskWorkerBuilderExtensions.cs | 73 +++++++++++ .../ShimDurableTaskClientOptions.cs | 19 +++ .../ShimDurableTaskWorker.cs | 121 ++++++++++++++++++ .../Worker.OrchestrationServiceShim.csproj | 19 +++ 8 files changed, 309 insertions(+), 15 deletions(-) create mode 100644 src/Worker/OrchestrationServiceShim/Core/ShimActivityManager.cs create mode 100644 src/Worker/OrchestrationServiceShim/Core/ShimOrchestrationManager.cs create mode 100644 src/Worker/OrchestrationServiceShim/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs create mode 100644 src/Worker/OrchestrationServiceShim/ShimDurableTaskClientOptions.cs create mode 100644 src/Worker/OrchestrationServiceShim/ShimDurableTaskWorker.cs create mode 100644 src/Worker/OrchestrationServiceShim/Worker.OrchestrationServiceShim.csproj diff --git a/Microsoft.DurableTask.sln b/Microsoft.DurableTask.sln index 1feeda15..fdf3688f 100644 --- a/Microsoft.DurableTask.sln +++ b/Microsoft.DurableTask.sln @@ -85,6 +85,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Shared.AzureManaged.Tests", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ConsoleAppMinimal", "samples\ConsoleAppMinimal\ConsoleAppMinimal.csproj", "{B48FACA9-A328-452A-BFAE-C4F60F9C7024}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Worker.OrchestrationServiceShim", "src\Worker\OrchestrationServiceShim\Worker.OrchestrationServiceShim.csproj", "{361E87D2-5CF6-4BB2-8B11-0BE736F88EB8}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -223,6 +225,10 @@ Global {B48FACA9-A328-452A-BFAE-C4F60F9C7024}.Debug|Any CPU.Build.0 = Debug|Any CPU {B48FACA9-A328-452A-BFAE-C4F60F9C7024}.Release|Any CPU.ActiveCfg = Release|Any CPU {B48FACA9-A328-452A-BFAE-C4F60F9C7024}.Release|Any CPU.Build.0 = Release|Any CPU + {361E87D2-5CF6-4BB2-8B11-0BE736F88EB8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {361E87D2-5CF6-4BB2-8B11-0BE736F88EB8}.Debug|Any CPU.Build.0 = Debug|Any CPU + {361E87D2-5CF6-4BB2-8B11-0BE736F88EB8}.Release|Any CPU.ActiveCfg = Release|Any CPU + {361E87D2-5CF6-4BB2-8B11-0BE736F88EB8}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -265,6 +271,7 @@ Global {CECADDB5-E30A-4CE2-8604-9AC596D4A2DC} = {E5637F81-2FB9-4CD7-900D-455363B142A7} {3272C041-F81D-4C85-A4FB-2A700B5A7A9D} = {CECADDB5-E30A-4CE2-8604-9AC596D4A2DC} {B48FACA9-A328-452A-BFAE-C4F60F9C7024} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} + {361E87D2-5CF6-4BB2-8B11-0BE736F88EB8} = {5B448FF6-EC42-491D-A22E-1DC8B618E6D5} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {AB41CB55-35EA-4986-A522-387AB3402E71} diff --git a/src/Worker/Core/Shims/DurableTaskShimFactory.cs b/src/Worker/Core/Shims/DurableTaskShimFactory.cs index c8182be3..229e71c5 100644 --- a/src/Worker/Core/Shims/DurableTaskShimFactory.cs +++ b/src/Worker/Core/Shims/DurableTaskShimFactory.cs @@ -16,22 +16,16 @@ namespace Microsoft.DurableTask.Worker.Shims; /// This class is intended for use with alternate .NET-based durable task runtimes. It's not intended for use /// in application code. /// -public class DurableTaskShimFactory +/// +/// Initializes a new instance of the class. +/// +/// The data converter. +/// The logger factory. +public class DurableTaskShimFactory( + DurableTaskWorkerOptions? options = null, ILoggerFactory? loggerFactory = null) { - readonly DurableTaskWorkerOptions options; - readonly ILoggerFactory loggerFactory; - - /// - /// Initializes a new instance of the class. - /// - /// The data converter. - /// The logger factory. - public DurableTaskShimFactory( - DurableTaskWorkerOptions? options = null, ILoggerFactory? loggerFactory = null) - { - this.options = options ?? new(); - this.loggerFactory = loggerFactory ?? NullLoggerFactory.Instance; - } + readonly DurableTaskWorkerOptions options = options ?? new(); + readonly ILoggerFactory loggerFactory = loggerFactory ?? NullLoggerFactory.Instance; /// /// Gets the default with default values. diff --git a/src/Worker/OrchestrationServiceShim/Core/ShimActivityManager.cs b/src/Worker/OrchestrationServiceShim/Core/ShimActivityManager.cs new file mode 100644 index 00000000..3a50b7e3 --- /dev/null +++ b/src/Worker/OrchestrationServiceShim/Core/ShimActivityManager.cs @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using DurableTask.Core; +using CoreTaskActivity = DurableTask.Core.TaskActivity; + +namespace Microsoft.DurableTask.Worker.OrchestrationServiceShim.Core; + +/// +/// A shim activity manager which allows for creating the actual activity in the middleware. +/// +sealed class ShimActivityManager : INameVersionObjectManager +{ + /// + public void Add(ObjectCreator creator) => throw new NotSupportedException(); + + /// + public CoreTaskActivity? GetObject(string name, string? version) => new ShimTaskActivity(); +} + +/// +/// A shim task activity which allows for creating the actual activity in the middleware. +/// +sealed class ShimTaskActivity : CoreTaskActivity +{ + CoreTaskActivity? activity; + + /// + public override string Run(TaskContext context, string input) => throw new NotImplementedException(); + + /// + public override Task RunAsync(TaskContext context, string input) + { + Verify.NotNull(this.activity); + return this.activity.RunAsync(context, input); + } + + /// + /// Sets the inner activity. + /// + /// The activity to set. + internal void SetInnerActivity(CoreTaskActivity activity) => this.activity = Check.NotNull(activity); +} diff --git a/src/Worker/OrchestrationServiceShim/Core/ShimOrchestrationManager.cs b/src/Worker/OrchestrationServiceShim/Core/ShimOrchestrationManager.cs new file mode 100644 index 00000000..d42b440c --- /dev/null +++ b/src/Worker/OrchestrationServiceShim/Core/ShimOrchestrationManager.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using DurableTask.Core; + +namespace Microsoft.DurableTask.Worker.OrchestrationServiceShim.Core; + +/// +/// A shim activity manager which allows for creating the actual activity in the middleware. +/// +sealed class ShimOrchestrationManager : INameVersionObjectManager +{ + /// + public void Add(ObjectCreator creator) => throw new NotSupportedException(); + + /// + public TaskOrchestration? GetObject(string name, string? version) => null; +} diff --git a/src/Worker/OrchestrationServiceShim/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs b/src/Worker/OrchestrationServiceShim/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs new file mode 100644 index 00000000..c3549889 --- /dev/null +++ b/src/Worker/OrchestrationServiceShim/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs @@ -0,0 +1,73 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using DurableTask.Core; +using Microsoft.DurableTask.Worker.OrchestrationServiceShim; +using Microsoft.Extensions.DependencyInjection; + +namespace Microsoft.DurableTask.Worker; + +/// +/// Extension methods for adding Durable Task support to .NET hosted services, such as ASP.NET Core hosts. +/// +public static class DurableTaskWorkerBuilderExtensions +{ + /// Configures the to be a corker backed by a + /// . + /// + /// + /// This must be called independently of worker registration. + /// + /// The builder to configure. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseOrchestrationService(this IDurableTaskWorkerBuilder builder) + => builder.UseOrchestrationService(opt => { }); + + /// + /// Configures the to be a Worker backed by a + /// . + /// + /// + /// This must be called independently of worker registration. + /// + /// The builder to configure. + /// The orchestration service to use. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseOrchestrationService( + this IDurableTaskWorkerBuilder builder, IOrchestrationService orchestrationService) + => builder.UseOrchestrationService(opt => + { + opt.Service = orchestrationService; + }); + + /// Configures the to be a Worker backed by a + /// . + /// + /// + /// This must be called independently of worker registration. + /// + /// The builder to configure. + /// The action to configure the Worker options. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseOrchestrationService( + this IDurableTaskWorkerBuilder builder, Action configure) + { + Check.NotNull(builder); + Check.NotNull(configure); + builder.Services.Configure(builder.Name, configure); + builder.Services.AddOptions(builder.Name) + .PostConfigure((opt, sp) => + { + if (opt.Service is not null) + { + return; + } + + // Try to resolve from service container. + opt.Service = sp.GetService(); + }) + .Validate(x => x.Service is not null, "ShimDurableTaskWorkerOptions.Service must not be null."); + + return builder.UseBuildTarget(); + } +} diff --git a/src/Worker/OrchestrationServiceShim/ShimDurableTaskClientOptions.cs b/src/Worker/OrchestrationServiceShim/ShimDurableTaskClientOptions.cs new file mode 100644 index 00000000..377de2b7 --- /dev/null +++ b/src/Worker/OrchestrationServiceShim/ShimDurableTaskClientOptions.cs @@ -0,0 +1,19 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using DurableTask.Core; +using Microsoft.DurableTask.Worker.Hosting; + +namespace Microsoft.DurableTask.Worker.OrchestrationServiceShim; + +/// +/// The shim client options. +/// +public sealed class ShimDurableTaskWorkerOptions : DurableTaskWorkerOptions +{ + /// + /// Gets or sets the to use in the . + /// If not manually set, this will be resolved from the , if available. + /// + public IOrchestrationService? Service { get; set; } +} diff --git a/src/Worker/OrchestrationServiceShim/ShimDurableTaskWorker.cs b/src/Worker/OrchestrationServiceShim/ShimDurableTaskWorker.cs new file mode 100644 index 00000000..24116c54 --- /dev/null +++ b/src/Worker/OrchestrationServiceShim/ShimDurableTaskWorker.cs @@ -0,0 +1,121 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using DurableTask.Core; +using DurableTask.Core.History; +using DurableTask.Core.Middleware; +using Microsoft.DurableTask.Worker.Hosting; +using Microsoft.DurableTask.Worker.OrchestrationServiceShim.Core; +using Microsoft.DurableTask.Worker.Shims; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using IOrchestrationService = DurableTask.Core.IOrchestrationService; + +namespace Microsoft.DurableTask.Worker.OrchestrationServiceShim; + +/// +/// A which uses a . +/// +class ShimDurableTaskWorker : DurableTaskWorker +{ + readonly ShimDurableTaskWorkerOptions options; + readonly IServiceProvider services; + readonly TaskHubWorker worker; + readonly DurableTaskShimFactory shimFactory; + + /// + /// Initializes a new instance of the class. + /// /// The name of this worker. + /// The . + /// The options for this worker. + /// The service provider. + /// The logger factory. + public ShimDurableTaskWorker( + string? name, + IDurableTaskFactory factory, + IOptionsMonitor options, + IServiceProvider services, + ILoggerFactory loggerFactory) + : base(name, factory) + { + this.options = Check.NotNull(options).Get(name); + this.services = Check.NotNull(services); + this.shimFactory = new(this.options, loggerFactory); + + // This should already be validated by options. + IOrchestrationService service = Verify.NotNull(this.options.Service); + this.worker = new TaskHubWorker( + service, new ShimOrchestrationManager(), new ShimActivityManager(), loggerFactory); + this.worker.AddActivityDispatcherMiddleware(this.InvokeActivityAsync); + this.worker.AddOrchestrationDispatcherMiddleware(this.InvokeOrchestrationAsync); + } + + /// + public override async Task StopAsync(CancellationToken cancellationToken) + { + await base.StopAsync(cancellationToken); + await this.worker.StopAsync(); + } + + /// + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + return this.worker.StartAsync().WaitAsync(stoppingToken); + } + + async Task InvokeActivityAsync(DispatchMiddlewareContext context, Func next) + { + Check.NotNull(context); + Check.NotNull(next); + + TaskScheduledEvent scheduled = context.GetProperty(); + if (scheduled.Name is null) + { + throw new InvalidOperationException("TaskScheduledEvent.Name is not set."); + } + + TaskName name = new(scheduled.Name); + TaskActivity coreActivity = context.GetProperty(); + if (coreActivity is not ShimTaskActivity shimActivity) + { + throw new InvalidOperationException("TaskActivity is not a ShimTaskActivity."); + } + + await using AsyncServiceScope scope = this.services.CreateAsyncScope(); + if (!this.Factory.TryCreateActivity(name, scope.ServiceProvider, out ITaskActivity? activity)) + { + throw new InvalidOperationException($"Activity not found: {name}"); + } + + shimActivity.SetInnerActivity(this.shimFactory.CreateActivity(name, activity)); + } + + async Task InvokeOrchestrationAsync(DispatchMiddlewareContext context, Func next) + { + Check.NotNull(context); + Check.NotNull(next); + + OrchestrationRuntimeState runtimeState = context.GetProperty(); + await using AsyncServiceScope scope = this.services.CreateAsyncScope(); + + TaskName name = new(runtimeState.Name); + if (!this.Factory.TryCreateOrchestrator(name, scope.ServiceProvider, out ITaskOrchestrator? orchestrator)) + { + throw new InvalidOperationException($"Orchestrator not found: {name}"); + } + + ParentOrchestrationInstance? parent = runtimeState.ParentInstance is { } p ? + new ParentOrchestrationInstance(p.Name, p.OrchestrationInstance.InstanceId) : null; + + TaskOrchestrationExecutor executor = new( + runtimeState, + this.shimFactory.CreateOrchestration(name, orchestrator, parent), + BehaviorOnContinueAsNew.Carryover, + ErrorPropagationMode.UseFailureDetails); + + OrchestratorExecutionResult result = executor.Execute(); + context.SetProperty(result); + await next(); + } +} diff --git a/src/Worker/OrchestrationServiceShim/Worker.OrchestrationServiceShim.csproj b/src/Worker/OrchestrationServiceShim/Worker.OrchestrationServiceShim.csproj new file mode 100644 index 00000000..035d6d75 --- /dev/null +++ b/src/Worker/OrchestrationServiceShim/Worker.OrchestrationServiceShim.csproj @@ -0,0 +1,19 @@ + + + + netstandard2.0 + A DurableTaskWorker implementation using IOrchestrationService from DurableTask.Core. + true + + + + + + + + + + + + + From 2c46debebb11426b24523a61b9e3060f068f8000 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Mon, 27 Jan 2025 09:41:05 -0800 Subject: [PATCH 2/6] Add entity support to taskhub shim --- .../DefaultDurableTaskWorkerBuilder.cs | 22 ++-- src/Worker/Core/Hosting/DurableTaskWorker.cs | 22 ++-- .../Core/OrchestrationServiceNoEntities.cs | 118 ++++++++++++++++++ .../Core/ShimEntityManager.cs | 19 +++ src/Worker/OrchestrationServiceShim/Logs.cs | 20 +++ .../ShimDurableTaskWorker.cs | 78 +++++++++++- 6 files changed, 245 insertions(+), 34 deletions(-) create mode 100644 src/Worker/OrchestrationServiceShim/Core/OrchestrationServiceNoEntities.cs create mode 100644 src/Worker/OrchestrationServiceShim/Core/ShimEntityManager.cs create mode 100644 src/Worker/OrchestrationServiceShim/Logs.cs diff --git a/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs b/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs index 7e70990b..75b380c4 100644 --- a/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs +++ b/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs @@ -10,26 +10,20 @@ namespace Microsoft.DurableTask.Worker; /// /// The default builder for durable task. /// -public class DefaultDurableTaskWorkerBuilder : IDurableTaskWorkerBuilder +/// +/// Initializes a new instance of the class. +/// +/// The service collection for this builder. +/// The name for this builder. +public class DefaultDurableTaskWorkerBuilder(string? name, IServiceCollection services) : IDurableTaskWorkerBuilder { Type? buildTarget; - /// - /// Initializes a new instance of the class. - /// - /// The service collection for this builder. - /// The name for this builder. - public DefaultDurableTaskWorkerBuilder(string? name, IServiceCollection services) - { - this.Name = name ?? Extensions.Options.Options.DefaultName; - this.Services = Check.NotNull(services); - } - /// - public string Name { get; } + public string Name { get; } = name ?? Extensions.Options.Options.DefaultName; /// - public IServiceCollection Services { get; } + public IServiceCollection Services { get; } = Check.NotNull(services); /// public Type? BuildTarget diff --git a/src/Worker/Core/Hosting/DurableTaskWorker.cs b/src/Worker/Core/Hosting/DurableTaskWorker.cs index b788a527..ee0cf4f8 100644 --- a/src/Worker/Core/Hosting/DurableTaskWorker.cs +++ b/src/Worker/Core/Hosting/DurableTaskWorker.cs @@ -8,27 +8,21 @@ namespace Microsoft.DurableTask.Worker.Hosting; /// /// Base class for durable workers. /// -public abstract class DurableTaskWorker : BackgroundService +/// +/// Initializes a new instance of the class. +/// +/// The name of the worker. +/// The durable factory. +public abstract class DurableTaskWorker(string? name, IDurableTaskFactory factory) : BackgroundService { - /// - /// Initializes a new instance of the class. - /// - /// The name of the worker. - /// The durable factory. - protected DurableTaskWorker(string? name, IDurableTaskFactory factory) - { - this.Name = name ?? Microsoft.Extensions.Options.Options.DefaultName; - this.Factory = Check.NotNull(factory); - } - /// /// Gets the name of this worker. /// - protected virtual string Name { get; } + protected virtual string Name { get; } = name ?? Microsoft.Extensions.Options.Options.DefaultName; /// /// Gets the which has been initialized from /// the configured tasks during host construction. /// - protected virtual IDurableTaskFactory Factory { get; } + protected virtual IDurableTaskFactory Factory { get; } = Check.NotNull(factory); } diff --git a/src/Worker/OrchestrationServiceShim/Core/OrchestrationServiceNoEntities.cs b/src/Worker/OrchestrationServiceShim/Core/OrchestrationServiceNoEntities.cs new file mode 100644 index 00000000..f93ddb31 --- /dev/null +++ b/src/Worker/OrchestrationServiceShim/Core/OrchestrationServiceNoEntities.cs @@ -0,0 +1,118 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using DurableTask.Core; + +namespace Microsoft.DurableTask.Worker.OrchestrationServiceShim.Core; + +/// +/// An that does not support entities. +/// +/// +/// This is used to suppress entity support when is false. +/// +sealed class OrchestrationServiceNoEntities(IOrchestrationService service) : IOrchestrationService +{ + /// + public int TaskOrchestrationDispatcherCount => service.TaskOrchestrationDispatcherCount; + + /// + public int MaxConcurrentTaskOrchestrationWorkItems => service.MaxConcurrentTaskOrchestrationWorkItems; + + /// + public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew => service.EventBehaviourForContinueAsNew; + + /// + public int TaskActivityDispatcherCount => service.TaskActivityDispatcherCount; + + /// + public int MaxConcurrentTaskActivityWorkItems => service.MaxConcurrentTaskActivityWorkItems; + + /// + public Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem) + => service.AbandonTaskActivityWorkItemAsync(workItem); + + /// + public Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem) + => service.AbandonTaskOrchestrationWorkItemAsync(workItem); + + /// + public Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workItem, TaskMessage responseMessage) + => service.CompleteTaskActivityWorkItemAsync(workItem, responseMessage); + + /// + public Task CompleteTaskOrchestrationWorkItemAsync( + TaskOrchestrationWorkItem workItem, + OrchestrationRuntimeState newOrchestrationRuntimeState, + IList outboundMessages, + IList orchestratorMessages, + IList timerMessages, + TaskMessage continuedAsNewMessage, + OrchestrationState orchestrationState) + => service.CompleteTaskOrchestrationWorkItemAsync( + workItem, + newOrchestrationRuntimeState, + outboundMessages, + orchestratorMessages, + timerMessages, + continuedAsNewMessage, + orchestrationState); + + /// + public Task CreateAsync() => service.CreateAsync(); + + /// + public Task CreateAsync(bool recreateInstanceStore) => service.CreateAsync(recreateInstanceStore); + + /// + public Task CreateIfNotExistsAsync() => service.CreateIfNotExistsAsync(); + + /// + public Task DeleteAsync() => service.DeleteAsync(); + + /// + public Task DeleteAsync(bool deleteInstanceStore) => service.DeleteAsync(deleteInstanceStore); + + /// + public int GetDelayInSecondsAfterOnFetchException(Exception exception) + => service.GetDelayInSecondsAfterOnFetchException(exception); + + /// + public int GetDelayInSecondsAfterOnProcessException(Exception exception) + => service.GetDelayInSecondsAfterOnProcessException(exception); + + /// + public bool IsMaxMessageCountExceeded(int currentMessageCount, OrchestrationRuntimeState runtimeState) + => service.IsMaxMessageCountExceeded(currentMessageCount, runtimeState); + + /// + public Task LockNextTaskActivityWorkItem( + TimeSpan receiveTimeout, CancellationToken cancellationToken) + => service.LockNextTaskActivityWorkItem(receiveTimeout, cancellationToken); + + /// + public Task LockNextTaskOrchestrationWorkItemAsync( + TimeSpan receiveTimeout, CancellationToken cancellationToken) + => service.LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken); + + /// + public Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem) + => service.ReleaseTaskOrchestrationWorkItemAsync(workItem); + + /// + public Task RenewTaskActivityWorkItemLockAsync(TaskActivityWorkItem workItem) + => service.RenewTaskActivityWorkItemLockAsync(workItem); + + /// + public Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkItem workItem) + => service.RenewTaskOrchestrationWorkItemLockAsync(workItem); + + /// + public Task StartAsync() => service.StartAsync(); + + /// + public Task StopAsync() => service.StopAsync(); + + /// + public Task StopAsync(bool isForced) => service.StopAsync(isForced); +} diff --git a/src/Worker/OrchestrationServiceShim/Core/ShimEntityManager.cs b/src/Worker/OrchestrationServiceShim/Core/ShimEntityManager.cs new file mode 100644 index 00000000..f4c92175 --- /dev/null +++ b/src/Worker/OrchestrationServiceShim/Core/ShimEntityManager.cs @@ -0,0 +1,19 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using DurableTask.Core; +using DurableTask.Core.Entities; + +namespace Microsoft.DurableTask.Worker.OrchestrationServiceShim.Core; + +/// +/// A shim activity manager which allows for creating the actual activity in the middleware. +/// +sealed class ShimEntityManager : INameVersionObjectManager +{ + /// + public void Add(ObjectCreator creator) => throw new NotSupportedException(); + + /// + public TaskEntity? GetObject(string name, string? version) => null; +} diff --git a/src/Worker/OrchestrationServiceShim/Logs.cs b/src/Worker/OrchestrationServiceShim/Logs.cs new file mode 100644 index 00000000..0d271a31 --- /dev/null +++ b/src/Worker/OrchestrationServiceShim/Logs.cs @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; + +namespace Microsoft.DurableTask +{ + /// + /// Log messages. + /// + static partial class Logs + { + [LoggerMessage(EventId = 0, Level = LogLevel.Information, Message = "Entity support not enabled via options. Entities will be disabled.")] + public static partial void EntitiesDisabled(this ILogger logger); + + [LoggerMessage(EventId = 1, Level = LogLevel.Warning, Message = "Entity support is enabled, but the IDurableTaskFactory does not support entities.")] + public static partial void TaskFactoryDoesNotSupportEntities(this ILogger logger); + } +} diff --git a/src/Worker/OrchestrationServiceShim/ShimDurableTaskWorker.cs b/src/Worker/OrchestrationServiceShim/ShimDurableTaskWorker.cs index 24116c54..67d9b48e 100644 --- a/src/Worker/OrchestrationServiceShim/ShimDurableTaskWorker.cs +++ b/src/Worker/OrchestrationServiceShim/ShimDurableTaskWorker.cs @@ -1,9 +1,13 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Diagnostics; using DurableTask.Core; +using DurableTask.Core.Entities; +using DurableTask.Core.Entities.OperationFormat; using DurableTask.Core.History; using DurableTask.Core.Middleware; +using Microsoft.DurableTask.Entities; using Microsoft.DurableTask.Worker.Hosting; using Microsoft.DurableTask.Worker.OrchestrationServiceShim.Core; using Microsoft.DurableTask.Worker.Shims; @@ -21,8 +25,9 @@ class ShimDurableTaskWorker : DurableTaskWorker { readonly ShimDurableTaskWorkerOptions options; readonly IServiceProvider services; - readonly TaskHubWorker worker; readonly DurableTaskShimFactory shimFactory; + readonly ILogger logger; + readonly TaskHubWorker worker; /// /// Initializes a new instance of the class. @@ -42,13 +47,12 @@ public ShimDurableTaskWorker( this.options = Check.NotNull(options).Get(name); this.services = Check.NotNull(services); this.shimFactory = new(this.options, loggerFactory); + this.logger = loggerFactory.CreateLogger(); // This should already be validated by options. IOrchestrationService service = Verify.NotNull(this.options.Service); - this.worker = new TaskHubWorker( - service, new ShimOrchestrationManager(), new ShimActivityManager(), loggerFactory); - this.worker.AddActivityDispatcherMiddleware(this.InvokeActivityAsync); - this.worker.AddOrchestrationDispatcherMiddleware(this.InvokeOrchestrationAsync); + this.worker = service is IEntityOrchestrationService entity + ? this.CreateWorker(entity, loggerFactory) : this.CreateWorker(service, loggerFactory); } /// @@ -64,6 +68,43 @@ protected override Task ExecuteAsync(CancellationToken stoppingToken) return this.worker.StartAsync().WaitAsync(stoppingToken); } + TaskHubWorker CreateWorker(IOrchestrationService service, ILoggerFactory loggerFactory) + { + TaskHubWorker worker = new( + service, new ShimOrchestrationManager(), new ShimActivityManager(), loggerFactory); + worker.AddActivityDispatcherMiddleware(this.InvokeActivityAsync); + worker.AddOrchestrationDispatcherMiddleware(this.InvokeOrchestrationAsync); + + return worker; + } + + TaskHubWorker CreateWorker(IEntityOrchestrationService service, ILoggerFactory loggerFactory) + { + if (!this.options.EnableEntitySupport) + { + this.logger.EntitiesDisabled(); + return this.CreateWorker(new OrchestrationServiceNoEntities(service), loggerFactory); + } + + if (this.Factory is not IDurableTaskFactory2) + { + this.logger.TaskFactoryDoesNotSupportEntities(); + return this.CreateWorker(new OrchestrationServiceNoEntities(service), loggerFactory); + } + + TaskHubWorker worker = new( + service, + new ShimOrchestrationManager(), + new ShimActivityManager(), + new ShimEntityManager(), + loggerFactory); + worker.AddActivityDispatcherMiddleware(this.InvokeActivityAsync); + worker.AddOrchestrationDispatcherMiddleware(this.InvokeOrchestrationAsync); + worker.AddEntityDispatcherMiddleware(this.InvokeEntityAsync); + + return worker; + } + async Task InvokeActivityAsync(DispatchMiddlewareContext context, Func next) { Check.NotNull(context); @@ -97,9 +138,9 @@ async Task InvokeOrchestrationAsync(DispatchMiddlewareContext context, Func(); - await using AsyncServiceScope scope = this.services.CreateAsyncScope(); TaskName name = new(runtimeState.Name); + await using AsyncServiceScope scope = this.services.CreateAsyncScope(); if (!this.Factory.TryCreateOrchestrator(name, scope.ServiceProvider, out ITaskOrchestrator? orchestrator)) { throw new InvalidOperationException($"Orchestrator not found: {name}"); @@ -118,4 +159,29 @@ async Task InvokeOrchestrationAsync(DispatchMiddlewareContext context, Func next) + { + Check.NotNull(context); + Check.NotNull(next); + + EntityBatchRequest request = context.GetProperty(); + if (request?.InstanceId is null) + { + throw new InvalidOperationException("EntityBatchRequest.InstanceId is not set."); + } + + EntityId entityId = EntityId.FromString(request.InstanceId); + IDurableTaskFactory2 factory = (IDurableTaskFactory2)this.Factory; // verified castable at startup. + await using AsyncServiceScope scope = this.services.CreateAsyncScope(); + if (!factory.TryCreateEntity(entityId.Name, this.services, out ITaskEntity? entity)) + { + throw new InvalidOperationException($"Entity not found: {entityId.Name}"); + } + + TaskEntity shim = this.shimFactory.CreateEntity(entityId.Name, entity, entityId); + EntityBatchResult result = await shim.ExecuteOperationBatchAsync(request); + context.SetProperty(result); + await next(); + } } From 742f91260cd44f494c1b674a8a139fda840403a4 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Mon, 27 Jan 2025 09:59:06 -0800 Subject: [PATCH 3/6] Add README and releasenotes --- .../OrchestrationServiceClientShim/README.md | 13 ++++++++++++- src/Worker/Core/README.md | 2 +- src/Worker/OrchestrationServiceShim/README.md | 18 ++++++++++++++++++ .../OrchestrationServiceShim/RELEASENOTES.md | 1 + 4 files changed, 32 insertions(+), 2 deletions(-) create mode 100644 src/Worker/OrchestrationServiceShim/README.md create mode 100644 src/Worker/OrchestrationServiceShim/RELEASENOTES.md diff --git a/src/Client/OrchestrationServiceClientShim/README.md b/src/Client/OrchestrationServiceClientShim/README.md index ea4b9d18..ebc156ae 100644 --- a/src/Client/OrchestrationServiceClientShim/README.md +++ b/src/Client/OrchestrationServiceClientShim/README.md @@ -4,4 +4,15 @@ Commonly used types: - `ShimDurableTaskClient` - `ShimDurableTaskClientOptions` -For more information, see https://github.com/microsoft/durabletask-dotnet \ No newline at end of file +For more information, see https://github.com/microsoft/durabletask-dotnet + +## Getting Started + +``` CSharp +HostApplicationBuilder builder = Host.CreateApplicationBuilder(args); + +// instantiate this using existing Microsoft.Azure.DurableTask packages. +IOrchestrationServiceClient orchestrationServiceClient = new AzureStorageOrchestrationService(...); +builder.Services.AddDurableTaskClient() + .UseOrchestrationService(orchestrationService); +``` diff --git a/src/Worker/Core/README.md b/src/Worker/Core/README.md index 8cd4b666..1440b6ef 100644 --- a/src/Worker/Core/README.md +++ b/src/Worker/Core/README.md @@ -5,4 +5,4 @@ Commonly used types: - `DurableTaskWorkerOptions` - `IDurableTaskWorkerBuilder` -For more information, see https://github.com/microsoft/durabletask-dotnet \ No newline at end of file +For more information, see https://github.com/microsoft/durabletask-dotnet diff --git a/src/Worker/OrchestrationServiceShim/README.md b/src/Worker/OrchestrationServiceShim/README.md new file mode 100644 index 00000000..49d6e7c8 --- /dev/null +++ b/src/Worker/OrchestrationServiceShim/README.md @@ -0,0 +1,18 @@ +A client implementation for `Microsoft.DurableTask`. This package includes a `DurableTaskWorker` implementation for interacting with a task hub via a `DurableTask.Core.IOrchestrationService`. + +Commonly used types: +- `ShimDurableTaskWorker` +- `ShimDurableTaskWorkerOptions` + +For more information, see https://github.com/microsoft/durabletask-dotnet + +## Getting Started + +``` CSharp +HostApplicationBuilder builder = Host.CreateApplicationBuilder(args); + +// instantiate this using existing Microsoft.Azure.DurableTask packages. +IOrchestrationService orchestrationService = new AzureStorageOrchestrationService(...); +builder.Services.AddDurableTaskWorker() + .UseOrchestrationService(orchestrationService); +``` diff --git a/src/Worker/OrchestrationServiceShim/RELEASENOTES.md b/src/Worker/OrchestrationServiceShim/RELEASENOTES.md new file mode 100644 index 00000000..0b61b23b --- /dev/null +++ b/src/Worker/OrchestrationServiceShim/RELEASENOTES.md @@ -0,0 +1 @@ +- Initial implementation of `ShimDurableTaskWorker` From 8923128ff20303a5b7234fd9424aff37d93c68f8 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Mon, 27 Jan 2025 13:58:45 -0800 Subject: [PATCH 4/6] Add OrchestrationServiceShim unit tests --- Microsoft.DurableTask.sln | 7 + .../ShimDurableTaskWorker.cs | 26 ++-- ...DurableTaskClientBuilderExtensionsTests.cs | 88 ++++++++++++ .../ShimDurableTaskWorkerTests.cs | 133 ++++++++++++++++++ .../OrchestrationServiceShim.Tests/Usings.cs | 6 + ...rker.OrchestrationServiceShim.Tests.csproj | 11 ++ 6 files changed, 260 insertions(+), 11 deletions(-) create mode 100644 test/Worker/OrchestrationServiceShim.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs create mode 100644 test/Worker/OrchestrationServiceShim.Tests/ShimDurableTaskWorkerTests.cs create mode 100644 test/Worker/OrchestrationServiceShim.Tests/Usings.cs create mode 100644 test/Worker/OrchestrationServiceShim.Tests/Worker.OrchestrationServiceShim.Tests.csproj diff --git a/Microsoft.DurableTask.sln b/Microsoft.DurableTask.sln index fdf3688f..f4769ba4 100644 --- a/Microsoft.DurableTask.sln +++ b/Microsoft.DurableTask.sln @@ -87,6 +87,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ConsoleAppMinimal", "sample EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Worker.OrchestrationServiceShim", "src\Worker\OrchestrationServiceShim\Worker.OrchestrationServiceShim.csproj", "{361E87D2-5CF6-4BB2-8B11-0BE736F88EB8}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Worker.OrchestrationServiceShim.Tests", "test\Worker\OrchestrationServiceShim.Tests\Worker.OrchestrationServiceShim.Tests.csproj", "{14822652-388B-4521-924A-2834B75F783C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -229,6 +231,10 @@ Global {361E87D2-5CF6-4BB2-8B11-0BE736F88EB8}.Debug|Any CPU.Build.0 = Debug|Any CPU {361E87D2-5CF6-4BB2-8B11-0BE736F88EB8}.Release|Any CPU.ActiveCfg = Release|Any CPU {361E87D2-5CF6-4BB2-8B11-0BE736F88EB8}.Release|Any CPU.Build.0 = Release|Any CPU + {14822652-388B-4521-924A-2834B75F783C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {14822652-388B-4521-924A-2834B75F783C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {14822652-388B-4521-924A-2834B75F783C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {14822652-388B-4521-924A-2834B75F783C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -272,6 +278,7 @@ Global {3272C041-F81D-4C85-A4FB-2A700B5A7A9D} = {CECADDB5-E30A-4CE2-8604-9AC596D4A2DC} {B48FACA9-A328-452A-BFAE-C4F60F9C7024} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} {361E87D2-5CF6-4BB2-8B11-0BE736F88EB8} = {5B448FF6-EC42-491D-A22E-1DC8B618E6D5} + {14822652-388B-4521-924A-2834B75F783C} = {51DC98A3-0193-4C66-964B-C26C748E25B6} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {AB41CB55-35EA-4986-A522-387AB3402E71} diff --git a/src/Worker/OrchestrationServiceShim/ShimDurableTaskWorker.cs b/src/Worker/OrchestrationServiceShim/ShimDurableTaskWorker.cs index 67d9b48e..c95c8e88 100644 --- a/src/Worker/OrchestrationServiceShim/ShimDurableTaskWorker.cs +++ b/src/Worker/OrchestrationServiceShim/ShimDurableTaskWorker.cs @@ -27,7 +27,6 @@ class ShimDurableTaskWorker : DurableTaskWorker readonly IServiceProvider services; readonly DurableTaskShimFactory shimFactory; readonly ILogger logger; - readonly TaskHubWorker worker; /// /// Initializes a new instance of the class. @@ -51,22 +50,27 @@ public ShimDurableTaskWorker( // This should already be validated by options. IOrchestrationService service = Verify.NotNull(this.options.Service); - this.worker = service is IEntityOrchestrationService entity + this.Worker = service is IEntityOrchestrationService entity ? this.CreateWorker(entity, loggerFactory) : this.CreateWorker(service, loggerFactory); } + /// + /// Gets the inner . + /// + /// + /// For internal test verification. + /// + internal TaskHubWorker Worker { get; } + /// - public override async Task StopAsync(CancellationToken cancellationToken) - { - await base.StopAsync(cancellationToken); - await this.worker.StopAsync(); - } + public override Task StopAsync(CancellationToken cancellationToken) => this.Worker.StopAsync(); /// - protected override Task ExecuteAsync(CancellationToken stoppingToken) - { - return this.worker.StartAsync().WaitAsync(stoppingToken); - } + public override Task StartAsync(CancellationToken cancellationToken) => this.Worker.StartAsync(); + + /// + /// Not actually called. + protected override Task ExecuteAsync(CancellationToken stoppingToken) => Task.CompletedTask; TaskHubWorker CreateWorker(IOrchestrationService service, ILoggerFactory loggerFactory) { diff --git a/test/Worker/OrchestrationServiceShim.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs b/test/Worker/OrchestrationServiceShim.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs new file mode 100644 index 00000000..3c603e94 --- /dev/null +++ b/test/Worker/OrchestrationServiceShim.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs @@ -0,0 +1,88 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using DurableTask.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; + +namespace Microsoft.DurableTask.Worker.OrchestrationServiceShim.Tests; + +public class DurableTaskWorkerBuilderExtensionsTests +{ + [Fact] + public void UseOrchestrationService_NotSet_Throws() + { + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new(null, services); + Action act = () => builder.UseOrchestrationService(); + act.Should().NotThrow(); + builder.BuildTarget.Should().Be(typeof(ShimDurableTaskWorker)); + + IServiceProvider provider = services.BuildServiceProvider(); + act = () => provider.GetOptions(); + + act.Should().ThrowExactly() + .WithMessage("ShimDurableTaskWorkerOptions.Service must not be null."); + } + + [Fact] + public void UseOrchestrationService_Service_Sets() + { + ServiceCollection services = new(); + IOrchestrationService service = Mock.Of(); + DefaultDurableTaskWorkerBuilder builder = new(null, services); + builder.UseOrchestrationService(service); + + IServiceProvider provider = services.BuildServiceProvider(); + ShimDurableTaskWorkerOptions options = provider.GetOptions(); + + options.Service.Should().Be(service); + } + + [Fact] + public void UseOrchestrationService_FromServices1() + { + ServiceCollection services = new(); + IOrchestrationService service = Mock.Of(); + services.AddSingleton(service); + DefaultDurableTaskWorkerBuilder builder = new(null, services); + + builder.UseOrchestrationService(); + + IServiceProvider provider = services.BuildServiceProvider(); + ShimDurableTaskWorkerOptions options = provider.GetOptions(); + + options.Service.Should().Be(service); + } + + [Fact] + public void UseOrchestrationService_FromServices2() + { + ServiceCollection services = new(); + Mock mock = new(); + mock.As(); + services.AddSingleton(mock.As().Object); + DefaultDurableTaskWorkerBuilder builder = new(null, services); + + builder.UseOrchestrationService(); + + IServiceProvider provider = services.BuildServiceProvider(); + ShimDurableTaskWorkerOptions options = provider.GetOptions(); + + options.Service.Should().Be(mock.Object); + } + + [Fact] + public void UseOrchestrationService_Callback_Sets() + { + ServiceCollection services = new(); + IOrchestrationService service = Mock.Of(); + DefaultDurableTaskWorkerBuilder builder = new(null, services); + builder.UseOrchestrationService(opt => opt.Service = service); + + IServiceProvider provider = services.BuildServiceProvider(); + ShimDurableTaskWorkerOptions options = provider.GetOptions(); + + options.Service.Should().Be(service); + } +} diff --git a/test/Worker/OrchestrationServiceShim.Tests/ShimDurableTaskWorkerTests.cs b/test/Worker/OrchestrationServiceShim.Tests/ShimDurableTaskWorkerTests.cs new file mode 100644 index 00000000..2f2c5392 --- /dev/null +++ b/test/Worker/OrchestrationServiceShim.Tests/ShimDurableTaskWorkerTests.cs @@ -0,0 +1,133 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using DurableTask.Core; +using DurableTask.Core.Entities; +using Microsoft.DurableTask.Worker.OrchestrationServiceShim.Core; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; + +namespace Microsoft.DurableTask.Worker.OrchestrationServiceShim.Tests; + +public class ShimDurableTaskWorkerTests +{ + readonly Mock orchestrationService = new(MockBehavior.Strict); + readonly Mock durableTaskFactory = new(MockBehavior.Strict); + readonly Mock serviceProvider = new(MockBehavior.Strict); + readonly ShimDurableTaskWorkerOptions options = new(); + + [Fact] + public void Ctor_WithEntityService_EntitiesEnabled() + { + // Arrange + this.durableTaskFactory.As(); + Mock entities = this.AddEntitiesToOrchestrationService(); + this.options.EnableEntitySupport = true; + + // Act + ShimDurableTaskWorker worker = this.CreateWorker(); + + // Assert + worker.Worker.orchestrationService.Should().BeSameAs(this.orchestrationService.Object); + entities.Verify(m => m.EntityBackendProperties, Times.Once); + } + + [Fact] + public void Ctor_WithEntityService_EntitiesDisabled() + { + // Arrange + this.durableTaskFactory.As(); + this.AddEntitiesToOrchestrationService(); + this.options.EnableEntitySupport = false; + + // Act + ShimDurableTaskWorker worker = this.CreateWorker(); + + // Assert + worker.Worker.orchestrationService.Should().BeOfType(); + } + + [Fact] + public void Ctor_FactoryNoEntitySupport_EntitiesDisabled() + { + // Arrange + this.AddEntitiesToOrchestrationService(); + this.options.EnableEntitySupport = true; + + // Act + ShimDurableTaskWorker worker = this.CreateWorker(); + + // Assert + worker.Worker.orchestrationService.Should().BeOfType(); + } + + [Fact] + public void Ctor_NoEntityService_EntitiesDisabled() + { + // Arrange + this.durableTaskFactory.As(); + this.options.EnableEntitySupport = true; + + // Act + ShimDurableTaskWorker worker = this.CreateWorker(); + + // Assert + worker.Worker.orchestrationService.Should().BeSameAs(this.orchestrationService.Object); + } + + [Fact] + public async Task Start_StartsInnerWorker() + { + // Arrange + this.orchestrationService.Setup(m => m.TaskOrchestrationDispatcherCount).Returns(1); + this.orchestrationService.Setup(m => m.MaxConcurrentTaskOrchestrationWorkItems).Returns(1); + this.orchestrationService.Setup(m => m.TaskActivityDispatcherCount).Returns(1); + this.orchestrationService.Setup(m => m.MaxConcurrentTaskActivityWorkItems).Returns(1); + this.orchestrationService.Setup(m => m.StartAsync()).Returns(Task.CompletedTask); + ShimDurableTaskWorker worker = this.CreateWorker(); + + // Act + await worker.StartAsync(default); + + // Assert + this.orchestrationService.Verify(m => m.StartAsync(), Times.Once); + } + + [Fact] + public async Task Stop_StopsInnerWorker() + { + // Arrange + this.orchestrationService.Setup(m => m.TaskOrchestrationDispatcherCount).Returns(1); + this.orchestrationService.Setup(m => m.MaxConcurrentTaskOrchestrationWorkItems).Returns(1); + this.orchestrationService.Setup(m => m.TaskActivityDispatcherCount).Returns(1); + this.orchestrationService.Setup(m => m.MaxConcurrentTaskActivityWorkItems).Returns(1); + this.orchestrationService.Setup(m => m.StartAsync()).Returns(Task.CompletedTask); + this.orchestrationService.Setup(m => m.StopAsync(false)).Returns(Task.CompletedTask); + ShimDurableTaskWorker worker = this.CreateWorker(); + await worker.StartAsync(default); + + // Act + await worker.StopAsync(default); + + // Assert + this.orchestrationService.Verify(m => m.StopAsync(false), Times.Once); + } + + Mock AddEntitiesToOrchestrationService() + { + Mock mock = this.orchestrationService.As(); + mock.Setup(m => m.EntityBackendProperties).Returns(new EntityBackendProperties()); + return mock; + } + + ShimDurableTaskWorker CreateWorker() + { + this.options.Service = this.orchestrationService.Object; + return new( + "test", + this.durableTaskFactory.Object, + Mock.Of>(m => m.Get("test") == this.options), + this.serviceProvider.Object, + NullLoggerFactory.Instance); + } +} diff --git a/test/Worker/OrchestrationServiceShim.Tests/Usings.cs b/test/Worker/OrchestrationServiceShim.Tests/Usings.cs new file mode 100644 index 00000000..39fe94e9 --- /dev/null +++ b/test/Worker/OrchestrationServiceShim.Tests/Usings.cs @@ -0,0 +1,6 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +global using FluentAssertions; +global using Moq; +global using Xunit; diff --git a/test/Worker/OrchestrationServiceShim.Tests/Worker.OrchestrationServiceShim.Tests.csproj b/test/Worker/OrchestrationServiceShim.Tests/Worker.OrchestrationServiceShim.Tests.csproj new file mode 100644 index 00000000..968d3e2e --- /dev/null +++ b/test/Worker/OrchestrationServiceShim.Tests/Worker.OrchestrationServiceShim.Tests.csproj @@ -0,0 +1,11 @@ + + + + + + + + + + + From 209bed48f097a007a86fe9af31ffe8decd330cb6 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Fri, 31 Jan 2025 13:45:56 -0800 Subject: [PATCH 5/6] Fix DurableTaskWorkerOptions.ApplyTo --- src/Client/Core/DurableTaskClientOptions.cs | 56 +++++-------- src/Worker/Core/DurableTaskWorkerOptions.cs | 87 +++++++++++++++------ 2 files changed, 81 insertions(+), 62 deletions(-) diff --git a/src/Client/Core/DurableTaskClientOptions.cs b/src/Client/Core/DurableTaskClientOptions.cs index 296b750d..96daad1f 100644 --- a/src/Client/Core/DurableTaskClientOptions.cs +++ b/src/Client/Core/DurableTaskClientOptions.cs @@ -10,8 +10,8 @@ namespace Microsoft.DurableTask.Client; /// public class DurableTaskClientOptions { - DataConverter dataConverter = JsonDataConverter.Default; - bool enableEntitySupport; + DataConverter? dataConverter; + bool? enableEntitySupport; /// /// Gets or sets the data converter. Default value is . @@ -31,34 +31,18 @@ public class DurableTaskClientOptions /// public DataConverter DataConverter { - get => this.dataConverter; - set - { - if (value is null) - { - this.dataConverter = JsonDataConverter.Default; - this.DataConverterExplicitlySet = false; - } - else - { - this.dataConverter = value; - this.DataConverterExplicitlySet = true; - } - } + get => this.dataConverter ?? JsonDataConverter.Default; + set => this.dataConverter = value; } /// - /// Gets or sets a value indicating whether this client should support entities. If true, all instance ids starting with '@' are reserved for entities, - /// and validation checks are performed where appropriate. + /// Gets or sets a value indicating whether this client should support entities. If true, all instance ids starting + /// with '@' are reserved for entities, and validation checks are performed where appropriate. /// public bool EnableEntitySupport { - get => this.enableEntitySupport; - set - { - this.enableEntitySupport = value; - this.EntitySupportExplicitlySet = true; - } + get => this.enableEntitySupport ?? false; + set => this.enableEntitySupport = value; } /// @@ -70,12 +54,7 @@ public bool EnableEntitySupport /// will not resolve it. If not set, we will attempt to resolve it. This is so the /// behavior is consistently irrespective of option configuration ordering. /// - internal bool DataConverterExplicitlySet { get; private set; } - - /// - /// Gets a value indicating whether was explicitly set or not. - /// - internal bool EntitySupportExplicitlySet { get; private set; } + internal bool DataConverterExplicitlySet => this.dataConverter is not null; /// /// Applies these option values to another. @@ -86,15 +65,16 @@ internal void ApplyTo(DurableTaskClientOptions other) if (other is not null) { // Make sure to keep this up to date as values are added. - if (!other.DataConverterExplicitlySet) - { - other.DataConverter = this.DataConverter; - } + ApplyIfSet(this.dataConverter, ref other.dataConverter); + ApplyIfSet(this.enableEntitySupport, ref other.enableEntitySupport); + } + } - if (!other.EntitySupportExplicitlySet) - { - other.EnableEntitySupport = this.EnableEntitySupport; - } + static void ApplyIfSet(T? value, ref T? target) + { + if (value is not null && target is null) + { + target = value; } } } diff --git a/src/Worker/Core/DurableTaskWorkerOptions.cs b/src/Worker/Core/DurableTaskWorkerOptions.cs index 7233d604..9dcc65af 100644 --- a/src/Worker/Core/DurableTaskWorkerOptions.cs +++ b/src/Worker/Core/DurableTaskWorkerOptions.cs @@ -10,7 +10,9 @@ namespace Microsoft.DurableTask.Worker; /// public class DurableTaskWorkerOptions { - DataConverter dataConverter = JsonDataConverter.Default; + DataConverter? dataConverter; + bool? enableEntitySupport; + TimeSpan? maximumTimerInterval; /// /// Gets or sets the data converter. Default value is . @@ -28,27 +30,19 @@ public class DurableTaskWorkerOptions /// public DataConverter DataConverter { - get => this.dataConverter; - set - { - if (value is null) - { - this.dataConverter = JsonDataConverter.Default; - this.DataConverterExplicitlySet = false; - } - else - { - this.dataConverter = value; - this.DataConverterExplicitlySet = true; - } - } + get => this.dataConverter ?? JsonDataConverter.Default; + set => this.dataConverter = value; } /// /// Gets or sets a value indicating whether this client should support entities. If true, all instance ids starting /// with '@' are reserved for entities, and validation checks are performed where appropriate. /// - public bool EnableEntitySupport { get; set; } + public bool EnableEntitySupport + { + get => this.enableEntitySupport ?? false; + set => this.enableEntitySupport = value; + } /// /// Gets or sets the maximum timer interval for the @@ -81,7 +75,11 @@ public DataConverter DataConverter /// orchestrations. /// /// - public TimeSpan MaximumTimerInterval { get; set; } = TimeSpan.FromDays(3); + public TimeSpan MaximumTimerInterval + { + get => this.maximumTimerInterval ?? TimeSpan.FromDays(3); + set => this.maximumTimerInterval = value; + } /// /// Gets options for the Durable Task worker concurrency. @@ -102,7 +100,7 @@ public DataConverter DataConverter /// will not resolve it. If not set, we will attempt to resolve it. This is so the /// behavior is consistently irrespective of option configuration ordering. /// - internal bool DataConverterExplicitlySet { get; private set; } + internal bool DataConverterExplicitlySet => this.dataConverter is not null; /// /// Applies these option values to another. @@ -113,9 +111,18 @@ internal void ApplyTo(DurableTaskWorkerOptions other) if (other is not null) { // Make sure to keep this up to date as values are added. - other.DataConverter = this.DataConverter; - other.MaximumTimerInterval = this.MaximumTimerInterval; - other.EnableEntitySupport = this.EnableEntitySupport; + ApplyIfSet(this.dataConverter, ref other.dataConverter); + ApplyIfSet(this.enableEntitySupport, ref other.enableEntitySupport); + ApplyIfSet(this.maximumTimerInterval, ref other.maximumTimerInterval); + this.Concurrency.ApplyTo(other.Concurrency); + } + } + + static void ApplyIfSet(T? value, ref T? target) + { + if (value is not null && target is null) + { + target = value; } } @@ -124,19 +131,51 @@ internal void ApplyTo(DurableTaskWorkerOptions other) /// public class ConcurrencyOptions { + static readonly int DefaultMaxConcurrency = 100 * Environment.ProcessorCount; + + int? maxActivity; + int? maxOrchestration; + int? maxEntity; + /// /// Gets or sets the maximum number of concurrent activity work items that can be processed by the worker. /// - public int MaximumConcurrentActivityWorkItems { get; set; } = 100 * Environment.ProcessorCount; + public int MaximumConcurrentActivityWorkItems + { + get => this.maxActivity ?? DefaultMaxConcurrency; + set => this.maxActivity = value; + } /// /// Gets or sets the maximum number of concurrent orchestration work items that can be processed by the worker. /// - public int MaximumConcurrentOrchestrationWorkItems { get; set; } = 100 * Environment.ProcessorCount; + public int MaximumConcurrentOrchestrationWorkItems + { + get => this.maxOrchestration ?? DefaultMaxConcurrency; + set => this.maxOrchestration = value; + } /// /// Gets or sets the maximum number of concurrent entity work items that can be processed by the worker. /// - public int MaximumConcurrentEntityWorkItems { get; set; } = 100 * Environment.ProcessorCount; + public int MaximumConcurrentEntityWorkItems + { + get => this.maxEntity ?? DefaultMaxConcurrency; + set => this.maxEntity = value; + } + + /// + /// Applies these option values to another. + /// + /// The options to apply this options values to. + internal void ApplyTo(ConcurrencyOptions other) + { + if (other is not null) + { + ApplyIfSet(this.maxActivity, ref other.maxActivity); + ApplyIfSet(this.maxOrchestration, ref other.maxOrchestration); + ApplyIfSet(this.maxEntity, ref other.maxEntity); + } + } } } From f953ddd9211b4ec5c66609c5eee690e548d7ecc8 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Fri, 31 Jan 2025 14:02:50 -0800 Subject: [PATCH 6/6] use upload-artifact@v4 --- .github/workflows/validate-build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/validate-build.yml b/.github/workflows/validate-build.yml index 5f3694b4..3a61f47a 100644 --- a/.github/workflows/validate-build.yml +++ b/.github/workflows/validate-build.yml @@ -48,7 +48,7 @@ jobs: run: dotnet pack $solution --configuration $config --no-build - name: Upload - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: pkg path: out/pkg