diff --git a/src/WebJobs.Extensions.DurableTask/Bindings/BindingHelper.cs b/src/WebJobs.Extensions.DurableTask/Bindings/BindingHelper.cs index a0c2fddde..9af738c44 100644 --- a/src/WebJobs.Extensions.DurableTask/Bindings/BindingHelper.cs +++ b/src/WebJobs.Extensions.DurableTask/Bindings/BindingHelper.cs @@ -31,18 +31,10 @@ public string DurableOrchestrationClientToString(IDurableOrchestrationClient cli if (this.config.OutOfProcProtocol == OutOfProcOrchestrationProtocol.MiddlewarePassthrough) { // Out-of-proc v2 (aka middleware passthrough) uses gRPC instead of vanilla HTTP + JSON as the RPC protocol. - string? localRpcAddress = this.config.GetLocalRpcAddress(); - if (localRpcAddress == null) - { - throw new InvalidOperationException("The local RPC address has not been configured!"); - } - return JsonConvert.SerializeObject(new OrchestrationClientInputData { TaskHubName = string.IsNullOrEmpty(attr.TaskHub) ? client.TaskHubName : attr.TaskHub, ConnectionName = attr.ConnectionName, - RpcBaseUrl = localRpcAddress, - RequiredQueryStringParameters = this.config.HttpApiHandler.GetUniversalQueryStrings(), }); } diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs index cc1e63976..6a1152555 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs @@ -6,7 +6,6 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; -using System.Net; using System.Net.Http; using System.Reflection; using System.Text; @@ -70,9 +69,6 @@ public class DurableTaskExtension : #pragma warning disable CS0169 private readonly ITelemetryActivator telemetryActivator; #pragma warning restore CS0169 -#endif -#if FUNCTIONS_V3_OR_GREATER - private readonly LocalGrpcListener localGrpcListener; #endif private readonly bool isOptionsConfigured; private readonly Guid extensionGuid; @@ -202,10 +198,6 @@ public DurableTaskExtension( runtimeType == WorkerRuntimeType.Custom) { this.OutOfProcProtocol = OutOfProcOrchestrationProtocol.MiddlewarePassthrough; -#if FUNCTIONS_V3_OR_GREATER - this.localGrpcListener = new LocalGrpcListener(this); - this.HostLifetimeService.OnStopped.Register(this.StopLocalGrpcServer); -#endif } else { @@ -464,12 +456,6 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context) // The RPC server needs to be started sometime before any functions can be triggered // and this is the latest point in the pipeline available to us. -#if FUNCTIONS_V3_OR_GREATER - if (this.OutOfProcProtocol == OutOfProcOrchestrationProtocol.MiddlewarePassthrough) - { - this.StartLocalGrpcServer(); - } -#endif #if FUNCTIONS_V2_OR_GREATER if (this.OutOfProcProtocol == OutOfProcOrchestrationProtocol.OrchestratorShim) { @@ -478,18 +464,6 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context) #endif } - internal string GetLocalRpcAddress() - { -#if FUNCTIONS_V3_OR_GREATER - if (this.OutOfProcProtocol == OutOfProcOrchestrationProtocol.MiddlewarePassthrough) - { - return this.localGrpcListener.ListenAddress; - } -#endif - - return this.HttpApiHandler.GetBaseUrl(); - } - internal DurabilityProvider GetDurabilityProvider(DurableClientAttribute attribute) { return this.durabilityProviderFactory.GetDurabilityProvider(attribute); @@ -566,18 +540,6 @@ private void StopLocalHttpServer() } #endif -#if FUNCTIONS_V3_OR_GREATER - private void StartLocalGrpcServer() - { - this.localGrpcListener.StartAsync().GetAwaiter().GetResult(); - } - - private void StopLocalGrpcServer() - { - this.localGrpcListener.StopAsync().GetAwaiter().GetResult(); - } -#endif - private void ResolveAppSettingOptions() { if (this.Options == null) @@ -1606,7 +1568,9 @@ private sealed class NoOpScaleMonitor : IScaleMonitor /// A descriptive name. public NoOpScaleMonitor(string name) { +#pragma warning disable CS0618 // Type or member is obsolete this.Descriptor = new ScaleMonitorDescriptor(name); +#pragma warning restore CS0618 // Type or member is obsolete } /// diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs index ac98d8632..0fe33429f 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs @@ -9,10 +9,12 @@ using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options; +using Microsoft.Azure.WebJobs.Host.Config; using Microsoft.Extensions.Azure; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Options; +using Microsoft.ApplicationInsights.Extensibility; #else using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options; @@ -22,6 +24,10 @@ using Microsoft.Extensions.DependencyInjection.Extensions; #endif +#if FUNCTIONS_V3_OR_GREATER +using Microsoft.Azure.WebJobs.Extensions.Rpc; +#endif + namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { /// @@ -85,7 +91,7 @@ public static IWebJobsBuilder AddDurableTask(this IWebJobsBuilder builder) throw new ArgumentNullException(nameof(builder)); } - builder + IWebJobsExtensionBuilder extension = builder .AddExtension() .BindOptions(); @@ -106,6 +112,23 @@ public static IWebJobsBuilder AddDurableTask(this IWebJobsBuilder builder) serviceCollection.AddSingleton(); #pragma warning restore CS0612, CS0618 // Type or member is obsolete +#if FUNCTIONS_V3_OR_GREATER + serviceCollection.AddSingleton(sp => + { + foreach (IExtensionConfigProvider cfg in sp.GetServices()) + { + if (cfg is DurableTaskExtension ext) + { + return ext; + } + } + + throw new InvalidOperationException($"Unable to resolve service {typeof(DurableTaskExtension)}."); + }); + + extension.MapWorkerGrpcService(); +#endif + return builder; } diff --git a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs b/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs deleted file mode 100644 index 6d18376e7..000000000 --- a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs +++ /dev/null @@ -1,338 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See LICENSE in the project root for license information. -#nullable enable -#if FUNCTIONS_V3_OR_GREATER -using System; -using System.Collections.Generic; -using System.IO; -using System.Threading; -using System.Threading.Tasks; -using DurableTask.Core; -using DurableTask.Core.History; -using DurableTask.Core.Query; -using Google.Protobuf.WellKnownTypes; -using Grpc.Core; -using Microsoft.Extensions.Hosting; -using P = Microsoft.DurableTask.Protobuf; - -namespace Microsoft.Azure.WebJobs.Extensions.DurableTask -{ - internal class LocalGrpcListener : IHostedService - { - private const int DefaultPort = 4001; - - // Pick a large, fixed range of ports that are going to be valid in all environment. - // Avoiding ports below 1024 as those are blocked by app service sandbox. - // Ephemeral ports for most OS start well above 32768. See https://www.ncftp.com/ncftpd/doc/misc/ephemeral_ports.html - private const int MinPort = 30000; - private const int MaxPort = 31000; - - private readonly DurableTaskExtension extension; - - private readonly Random portGenerator; - private readonly HashSet attemptedPorts; - - private Server? grpcServer; - - public LocalGrpcListener(DurableTaskExtension extension) - { - this.extension = extension ?? throw new ArgumentNullException(nameof(extension)); - - this.portGenerator = new Random(); - this.attemptedPorts = new HashSet(); - } - - public string? ListenAddress { get; private set; } - - public Task StartAsync(CancellationToken cancelToken = default) - { - const int maxAttempts = 10; - int numAttempts = 1; - while (numAttempts <= maxAttempts) - { - this.grpcServer = new Server(); - this.grpcServer.Services.Add(P.TaskHubSidecarService.BindService(new TaskHubGrpcServer(this))); - - int listeningPort = numAttempts == 1 ? DefaultPort : this.GetRandomPort(); - int portBindingResult = this.grpcServer.Ports.Add("localhost", listeningPort, ServerCredentials.Insecure); - if (portBindingResult != 0) - { - try - { - this.grpcServer.Start(); - this.ListenAddress = $"http://localhost:{listeningPort}"; - - this.extension.TraceHelper.ExtensionInformationalEvent( - this.extension.Options.HubName, - instanceId: string.Empty, - functionName: string.Empty, - message: $"Opened local gRPC endpoint: {this.ListenAddress}", - writeToUserLogs: true); - - return Task.CompletedTask; - } - catch (IOException) - { - portBindingResult = 0; - } - } - - if (portBindingResult == 0) - { - this.extension.TraceHelper.ExtensionWarningEvent( - this.extension.Options.HubName, - functionName: string.Empty, - instanceId: string.Empty, - message: $"Failed to open local port {listeningPort}. This was attempt #{numAttempts} to open a local port."); - this.attemptedPorts.Add(listeningPort); - numAttempts++; - } - } - - throw new IOException($"Unable to find a port to open an RPC endpoint on after {maxAttempts} attempts"); - } - - public async Task StopAsync(CancellationToken cancelToken = default) - { - if (this.grpcServer != null) - { - await this.grpcServer.ShutdownAsync(); - } - } - - private int GetRandomPort() - { - // Get a random port that has not already been attempted so we don't waste time trying - // to listen to a port we know is not free. - int randomPort; - do - { - randomPort = this.portGenerator.Next(MinPort, MaxPort); - } - while (this.attemptedPorts.Contains(randomPort)); - - return randomPort; - } - - private class TaskHubGrpcServer : P.TaskHubSidecarService.TaskHubSidecarServiceBase - { - private readonly DurableTaskExtension extension; - - public TaskHubGrpcServer(LocalGrpcListener listener) - { - this.extension = listener.extension; - } - - public override Task Hello(Empty request, ServerCallContext context) - { - return Task.FromResult(new Empty()); - } - - public override Task CreateTaskHub(P.CreateTaskHubRequest request, ServerCallContext context) - { - this.GetDurabilityProvider(context).CreateAsync(request.RecreateIfExists); - return Task.FromResult(new P.CreateTaskHubResponse()); - } - - public override Task DeleteTaskHub(P.DeleteTaskHubRequest request, ServerCallContext context) - { - this.GetDurabilityProvider(context).DeleteAsync(); - return Task.FromResult(new P.DeleteTaskHubResponse()); - } - - public async override Task StartInstance(P.CreateInstanceRequest request, ServerCallContext context) - { - var instance = new OrchestrationInstance - { - InstanceId = request.InstanceId ?? Guid.NewGuid().ToString("N"), - ExecutionId = Guid.NewGuid().ToString(), - }; - - await this.GetDurabilityProvider(context).CreateTaskOrchestrationAsync( - new TaskMessage - { - Event = new ExecutionStartedEvent(-1, request.Input) - { - Name = request.Name, - Version = request.Version, - OrchestrationInstance = instance, - ScheduledStartTime = request.ScheduledStartTimestamp?.ToDateTime(), - }, - OrchestrationInstance = instance, - }); - - return new P.CreateInstanceResponse - { - InstanceId = instance.InstanceId, - }; - } - - public async override Task RaiseEvent(P.RaiseEventRequest request, ServerCallContext context) - { - await this.GetDurabilityProvider(context).SendTaskOrchestrationMessageAsync( - new TaskMessage - { - Event = new EventRaisedEvent(-1, request.Input) - { - Name = request.Name, - }, - OrchestrationInstance = new OrchestrationInstance - { - InstanceId = request.InstanceId, - }, - }); - - // No fields in the response - return new P.RaiseEventResponse(); - } - - public async override Task TerminateInstance(P.TerminateRequest request, ServerCallContext context) - { - await this.GetDurabilityProvider(context).ForceTerminateTaskOrchestrationAsync( - request.InstanceId, - request.Output); - - // No fields in the response - return new P.TerminateResponse(); - } - - public async override Task SuspendInstance(P.SuspendRequest request, ServerCallContext context) - { - await this.GetDurabilityProvider(context).SuspendTaskOrchestrationAsync(request.InstanceId, request.Reason); - return new P.SuspendResponse(); - } - - public async override Task ResumeInstance(P.ResumeRequest request, ServerCallContext context) - { - await this.GetDurabilityProvider(context).ResumeTaskOrchestrationAsync(request.InstanceId, request.Reason); - return new P.ResumeResponse(); - } - - public async override Task RewindInstance(P.RewindInstanceRequest request, ServerCallContext context) - { - await this.GetDurabilityProvider(context).RewindAsync(request.InstanceId, request.Reason); - return new P.RewindInstanceResponse(); - } - - public async override Task GetInstance(P.GetInstanceRequest request, ServerCallContext context) - { - OrchestrationState state = await this.GetDurabilityProvider(context) - .GetOrchestrationStateAsync(request.InstanceId, executionId: null); - if (state == null) - { - return new P.GetInstanceResponse() { Exists = false }; - } - - return CreateGetInstanceResponse(state, request); - } - - public async override Task QueryInstances(P.QueryInstancesRequest request, ServerCallContext context) - { - var query = ProtobufUtils.ToOrchestrationQuery(request); - var queryClient = (IOrchestrationServiceQueryClient)this.GetDurabilityProvider(context); - OrchestrationQueryResult result = await queryClient.GetOrchestrationWithQueryAsync(query, context.CancellationToken); - return ProtobufUtils.CreateQueryInstancesResponse(result, request); - } - - public async override Task PurgeInstances(P.PurgeInstancesRequest request, ServerCallContext context) - { - var purgeClient = (IOrchestrationServicePurgeClient)this.GetDurabilityProvider(context); - - PurgeResult result; - switch (request.RequestCase) - { - case P.PurgeInstancesRequest.RequestOneofCase.InstanceId: - result = await purgeClient.PurgeInstanceStateAsync(request.InstanceId); - break; - - case P.PurgeInstancesRequest.RequestOneofCase.PurgeInstanceFilter: - var purgeInstanceFilter = ProtobufUtils.ToPurgeInstanceFilter(request); - result = await purgeClient.PurgeInstanceStateAsync(purgeInstanceFilter); - break; - - default: - throw new ArgumentException($"Unknown purge request type '{request.RequestCase}'."); - } - - return ProtobufUtils.CreatePurgeInstancesResponse(result); - } - - public async override Task WaitForInstanceStart(P.GetInstanceRequest request, ServerCallContext context) - { - int retryCount = 0; - while (true) - { - // Keep fetching the status until we get one of the states we care about - OrchestrationState state = await this.GetDurabilityProvider(context) - .GetOrchestrationStateAsync(request.InstanceId, executionId: null); - if (state != null && state.OrchestrationStatus != OrchestrationStatus.Pending) - { - return CreateGetInstanceResponse(state, request); - } - - // Increase the delay time by 1 second every 10 seconds up to 10 seconds. - // The cancellation token is what will break us out of this loop if the orchestration - // never leaves the "Pending" state. - var delay = TimeSpan.FromSeconds(Math.Min(10, (retryCount / 10) + 1)); - await Task.Delay(delay, context.CancellationToken); - retryCount++; - } - } - - public async override Task WaitForInstanceCompletion(P.GetInstanceRequest request, ServerCallContext context) - { - OrchestrationState state = await this.GetDurabilityProvider(context).WaitForOrchestrationAsync( - request.InstanceId, - executionId: null, - timeout: Timeout.InfiniteTimeSpan, - context.CancellationToken); - - return CreateGetInstanceResponse(state, request); - } - - private static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, P.GetInstanceRequest request) - { - return new P.GetInstanceResponse - { - Exists = true, - OrchestrationState = new P.OrchestrationState - { - InstanceId = state.OrchestrationInstance.InstanceId, - Name = state.Name, - OrchestrationStatus = (P.OrchestrationStatus)state.OrchestrationStatus, - CreatedTimestamp = Timestamp.FromDateTime(state.CreatedTime), - LastUpdatedTimestamp = Timestamp.FromDateTime(state.LastUpdatedTime), - Input = request.GetInputsAndOutputs ? state.Input : null, - Output = request.GetInputsAndOutputs ? state.Output : null, - CustomStatus = request.GetInputsAndOutputs ? state.Status : null, - FailureDetails = request.GetInputsAndOutputs ? GetFailureDetails(state.FailureDetails) : null, - }, - }; - } - - private static P.TaskFailureDetails? GetFailureDetails(FailureDetails? failureDetails) - { - if (failureDetails == null) - { - return null; - } - - return new P.TaskFailureDetails - { - ErrorType = failureDetails.ErrorType, - ErrorMessage = failureDetails.ErrorMessage, - StackTrace = failureDetails.StackTrace, - }; - } - - private DurabilityProvider GetDurabilityProvider(ServerCallContext context) - { - string? taskHub = context.RequestHeaders.GetValue("Durable-TaskHub"); - string? connectionName = context.RequestHeaders.GetValue("Durable-ConnectionName"); - var attribute = new DurableClientAttribute() { TaskHub = taskHub, ConnectionName = connectionName }; - return this.extension.GetDurabilityProvider(attribute); - } - } - } -} -#endif \ No newline at end of file diff --git a/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs b/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs new file mode 100644 index 000000000..82f5c14c7 --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs @@ -0,0 +1,237 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. +#nullable enable +#if FUNCTIONS_V3_OR_GREATER +using System; +using System.Threading; +using System.Threading.Tasks; +using DurableTask.Core; +using DurableTask.Core.History; +using DurableTask.Core.Query; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using P = Microsoft.DurableTask.Protobuf; + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask +{ + internal class TaskHubGrpcServer : P.TaskHubSidecarService.TaskHubSidecarServiceBase + { + private readonly DurableTaskExtension extension; + + public TaskHubGrpcServer(DurableTaskExtension extension) + { + this.extension = extension; + } + + public override Task Hello(Empty request, ServerCallContext context) + { + return Task.FromResult(new Empty()); + } + + public override Task CreateTaskHub(P.CreateTaskHubRequest request, ServerCallContext context) + { + this.GetDurabilityProvider(context).CreateAsync(request.RecreateIfExists); + return Task.FromResult(new P.CreateTaskHubResponse()); + } + + public override Task DeleteTaskHub(P.DeleteTaskHubRequest request, ServerCallContext context) + { + this.GetDurabilityProvider(context).DeleteAsync(); + return Task.FromResult(new P.DeleteTaskHubResponse()); + } + + public async override Task StartInstance(P.CreateInstanceRequest request, ServerCallContext context) + { + var instance = new OrchestrationInstance + { + InstanceId = request.InstanceId ?? Guid.NewGuid().ToString("N"), + ExecutionId = Guid.NewGuid().ToString(), + }; + + await this.GetDurabilityProvider(context).CreateTaskOrchestrationAsync( + new TaskMessage + { + Event = new ExecutionStartedEvent(-1, request.Input) + { + Name = request.Name, + Version = request.Version, + OrchestrationInstance = instance, + ScheduledStartTime = request.ScheduledStartTimestamp?.ToDateTime(), + }, + OrchestrationInstance = instance, + }); + + return new P.CreateInstanceResponse + { + InstanceId = instance.InstanceId, + }; + } + + public async override Task RaiseEvent(P.RaiseEventRequest request, ServerCallContext context) + { + await this.GetDurabilityProvider(context).SendTaskOrchestrationMessageAsync( + new TaskMessage + { + Event = new EventRaisedEvent(-1, request.Input) + { + Name = request.Name, + }, + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = request.InstanceId, + }, + }); + + // No fields in the response + return new P.RaiseEventResponse(); + } + + public async override Task TerminateInstance(P.TerminateRequest request, ServerCallContext context) + { + await this.GetDurabilityProvider(context).ForceTerminateTaskOrchestrationAsync( + request.InstanceId, + request.Output); + + // No fields in the response + return new P.TerminateResponse(); + } + + public async override Task SuspendInstance(P.SuspendRequest request, ServerCallContext context) + { + await this.GetDurabilityProvider(context).SuspendTaskOrchestrationAsync(request.InstanceId, request.Reason); + return new P.SuspendResponse(); + } + + public async override Task ResumeInstance(P.ResumeRequest request, ServerCallContext context) + { + await this.GetDurabilityProvider(context).ResumeTaskOrchestrationAsync(request.InstanceId, request.Reason); + return new P.ResumeResponse(); + } + + public async override Task RewindInstance(P.RewindInstanceRequest request, ServerCallContext context) + { + await this.GetDurabilityProvider(context).RewindAsync(request.InstanceId, request.Reason); + return new P.RewindInstanceResponse(); + } + + public async override Task GetInstance(P.GetInstanceRequest request, ServerCallContext context) + { + OrchestrationState state = await this.GetDurabilityProvider(context) + .GetOrchestrationStateAsync(request.InstanceId, executionId: null); + if (state == null) + { + return new P.GetInstanceResponse() { Exists = false }; + } + + return CreateGetInstanceResponse(state, request); + } + + public async override Task QueryInstances(P.QueryInstancesRequest request, ServerCallContext context) + { + var query = ProtobufUtils.ToOrchestrationQuery(request); + var queryClient = (IOrchestrationServiceQueryClient)this.GetDurabilityProvider(context); + OrchestrationQueryResult result = await queryClient.GetOrchestrationWithQueryAsync(query, context.CancellationToken); + return ProtobufUtils.CreateQueryInstancesResponse(result, request); + } + + public async override Task PurgeInstances(P.PurgeInstancesRequest request, ServerCallContext context) + { + var purgeClient = (IOrchestrationServicePurgeClient)this.GetDurabilityProvider(context); + + PurgeResult result; + switch (request.RequestCase) + { + case P.PurgeInstancesRequest.RequestOneofCase.InstanceId: + result = await purgeClient.PurgeInstanceStateAsync(request.InstanceId); + break; + + case P.PurgeInstancesRequest.RequestOneofCase.PurgeInstanceFilter: + var purgeInstanceFilter = ProtobufUtils.ToPurgeInstanceFilter(request); + result = await purgeClient.PurgeInstanceStateAsync(purgeInstanceFilter); + break; + + default: + throw new ArgumentException($"Unknown purge request type '{request.RequestCase}'."); + } + + return ProtobufUtils.CreatePurgeInstancesResponse(result); + } + + public async override Task WaitForInstanceStart(P.GetInstanceRequest request, ServerCallContext context) + { + int retryCount = 0; + while (true) + { + // Keep fetching the status until we get one of the states we care about + OrchestrationState state = await this.GetDurabilityProvider(context) + .GetOrchestrationStateAsync(request.InstanceId, executionId: null); + if (state != null && state.OrchestrationStatus != OrchestrationStatus.Pending) + { + return CreateGetInstanceResponse(state, request); + } + + // Increase the delay time by 1 second every 10 seconds up to 10 seconds. + // The cancellation token is what will break us out of this loop if the orchestration + // never leaves the "Pending" state. + var delay = TimeSpan.FromSeconds(Math.Min(10, (retryCount / 10) + 1)); + await Task.Delay(delay, context.CancellationToken); + retryCount++; + } + } + + public async override Task WaitForInstanceCompletion(P.GetInstanceRequest request, ServerCallContext context) + { + OrchestrationState state = await this.GetDurabilityProvider(context).WaitForOrchestrationAsync( + request.InstanceId, + executionId: null, + timeout: Timeout.InfiniteTimeSpan, + context.CancellationToken); + + return CreateGetInstanceResponse(state, request); + } + + private static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, P.GetInstanceRequest request) + { + return new P.GetInstanceResponse + { + Exists = true, + OrchestrationState = new P.OrchestrationState + { + InstanceId = state.OrchestrationInstance.InstanceId, + Name = state.Name, + OrchestrationStatus = (P.OrchestrationStatus)state.OrchestrationStatus, + CreatedTimestamp = Timestamp.FromDateTime(state.CreatedTime), + LastUpdatedTimestamp = Timestamp.FromDateTime(state.LastUpdatedTime), + Input = request.GetInputsAndOutputs ? state.Input : null, + Output = request.GetInputsAndOutputs ? state.Output : null, + CustomStatus = request.GetInputsAndOutputs ? state.Status : null, + FailureDetails = request.GetInputsAndOutputs ? GetFailureDetails(state.FailureDetails) : null, + }, + }; + } + + private static P.TaskFailureDetails? GetFailureDetails(FailureDetails? failureDetails) + { + if (failureDetails == null) + { + return null; + } + + return new P.TaskFailureDetails + { + ErrorType = failureDetails.ErrorType, + ErrorMessage = failureDetails.ErrorMessage, + StackTrace = failureDetails.StackTrace, + }; + } + + private DurabilityProvider GetDurabilityProvider(ServerCallContext context) + { + string? taskHub = context.RequestHeaders.GetValue("Durable-TaskHub"); + string? connectionName = context.RequestHeaders.GetValue("Durable-ConnectionName"); + var attribute = new DurableClientAttribute() { TaskHub = taskHub, ConnectionName = connectionName }; + return this.extension.GetDurabilityProvider(attribute); + } + } +} +#endif \ No newline at end of file diff --git a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj index badcf8dd6..4627da11a 100644 --- a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj +++ b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj @@ -1,7 +1,7 @@  - netcoreapp3.1;netstandard2.0;net462 + net6.0;netstandard2.0;net462 Microsoft.Azure.WebJobs.Extensions.DurableTask Microsoft.Azure.WebJobs.Extensions.DurableTask 2 @@ -84,22 +84,22 @@ .NET Core 3.1 settings: These are for Functions V3 and above. This is the target for extension bundles V4 and above. --> - + $(DefineConstants);FUNCTIONS_V2_OR_GREATER;FUNCTIONS_V3_OR_GREATER - + - + + - - + diff --git a/src/Worker.Extensions.DurableTask/DurableTaskClientConverter.cs b/src/Worker.Extensions.DurableTask/DurableTaskClientConverter.cs index 1d3da9003..247e5045c 100644 --- a/src/Worker.Extensions.DurableTask/DurableTaskClientConverter.cs +++ b/src/Worker.Extensions.DurableTask/DurableTaskClientConverter.cs @@ -13,7 +13,6 @@ internal sealed partial class DurableTaskClientConverter : IInputConverter { private readonly FunctionsDurableClientProvider clientProvider; - // Constructor parameters are optional DI-injected services. public DurableTaskClientConverter(FunctionsDurableClientProvider clientProvider) { this.clientProvider = clientProvider ?? throw new ArgumentNullException(nameof(clientProvider)); @@ -42,13 +41,7 @@ public ValueTask ConvertAsync(ConverterContext context) try { DurableClientInputData? inputData = JsonSerializer.Deserialize(clientConfigText); - if (!Uri.TryCreate(inputData?.rpcBaseUrl, UriKind.Absolute, out Uri? endpoint)) - { - return new ValueTask(ConversionResult.Failed( - new InvalidOperationException("Failed to parse the input binding payload data"))); - } - - DurableTaskClient client = this.clientProvider.GetClient(endpoint, inputData?.taskHubName, inputData?.connectionName); + DurableTaskClient client = this.clientProvider.GetClient(inputData?.taskHubName, inputData?.connectionName); client = new FunctionsDurableTaskClient(client, inputData!.requiredQueryStringParameters); return new ValueTask(ConversionResult.Success(client)); } @@ -62,5 +55,5 @@ public ValueTask ConvertAsync(ConverterContext context) } // Serializer is case-sensitive and incoming JSON properties are camel-cased. - private record DurableClientInputData(string rpcBaseUrl, string taskHubName, string connectionName, string requiredQueryStringParameters); + private record DurableClientInputData(string taskHubName, string connectionName, string requiredQueryStringParameters); } diff --git a/src/Worker.Extensions.DurableTask/DurableTaskExtensionStartup.cs b/src/Worker.Extensions.DurableTask/DurableTaskExtensionStartup.cs index 516fd41af..f9c9c49aa 100644 --- a/src/Worker.Extensions.DurableTask/DurableTaskExtensionStartup.cs +++ b/src/Worker.Extensions.DurableTask/DurableTaskExtensionStartup.cs @@ -2,12 +2,10 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using Azure.Core.Serialization; using Microsoft.Azure.Functions.Worker.Core; using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; using Microsoft.DurableTask; using Microsoft.DurableTask.Client; -using Microsoft.DurableTask.Converters; using Microsoft.DurableTask.Worker; using Microsoft.DurableTask.Worker.Shims; using Microsoft.Extensions.DependencyInjection; diff --git a/src/Worker.Extensions.DurableTask/FunctionsDurableClientProvider.cs b/src/Worker.Extensions.DurableTask/FunctionsDurableClientProvider.cs index 03edaf463..003081bd1 100644 --- a/src/Worker.Extensions.DurableTask/FunctionsDurableClientProvider.cs +++ b/src/Worker.Extensions.DurableTask/FunctionsDurableClientProvider.cs @@ -2,21 +2,13 @@ // Licensed under the MIT License. See LICENSE in the project root for license information. using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; using Microsoft.DurableTask.Client; using Microsoft.DurableTask.Client.Grpc; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; - -#if NET6_0_OR_GREATER -using Grpc.Net.Client; -#endif - -#if NETSTANDARD -using GrpcChannel = Grpc.Core.Channel; -#endif +using Microsoft.Azure.Functions.Worker.Extensions.Rpc; +using Grpc.Core; +using Grpc.Core.Interceptors; namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask; @@ -26,188 +18,93 @@ namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask; /// /// This class does NOT provide is meant as a per-binding wrapper. /// -internal partial class FunctionsDurableClientProvider : IAsyncDisposable +internal partial class FunctionsDurableClientProvider { - private readonly ReaderWriterLockSlim sync = new(); private readonly ILoggerFactory loggerFactory; private readonly ILogger logger; private readonly DurableTaskClientOptions options; - private Dictionary? clients = new(); + private readonly FunctionsGrpcOptions grpcOptions; - private bool disposed; + private readonly DurableTaskClient defaultClient; /// /// Initializes a new instance of class. /// /// The service provider. /// The client options. - public FunctionsDurableClientProvider(ILoggerFactory loggerFactory, IOptions options) + /// The grpc options. + public FunctionsDurableClientProvider( + ILoggerFactory loggerFactory, + IOptions options, + IOptions grpcOptions) { this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); this.logger = loggerFactory.CreateLogger(); this.options = options?.Value ?? throw new ArgumentNullException(nameof(options)); - } - - /// - public async ValueTask DisposeAsync() - { - if (this.disposed) - { - return; - } + this.grpcOptions = grpcOptions?.Value ?? throw new ArgumentNullException(nameof(grpcOptions)); - try - { - this.sync.EnterWriteLock(); - try - { - if (this.disposed) - { - return; - } - - foreach (ClientHolder holder in this.clients!.Values) - { - await holder.DisposeAsync(); - } - - this.clients = null; - this.disposed = true; - } - finally - { - this.sync.ExitWriteLock(); - } - } - catch (ObjectDisposedException) - { - // this can happen when 'this.sync' is disposed from concurrent DisposeAsync() calls. - } - - this.sync.Dispose(); + this.defaultClient = this.GetClientCore(null, null); } /// /// Gets a by name and gRPC endpoint. /// - /// The gRPC endpoint this client should connect to. /// The name of the task hub this client is for. /// The name of the connection to use for the task-hub. /// A . - public DurableTaskClient GetClient(Uri endpoint, string? taskHub, string? connectionName) + public DurableTaskClient GetClient(string? taskHub, string? connectionName) { - this.VerifyNotDisposed(); - this.sync.EnterReadLock(); - - taskHub ??= string.Empty; - connectionName ??= string.Empty; - ClientKey key = new(endpoint, taskHub, connectionName); - try - { - this.VerifyNotDisposed(); - if (this.clients!.TryGetValue(key, out ClientHolder? holder)) - { - this.logger.LogTrace("DurableTaskClient resolved from cache"); - return holder.Client; - } - } - finally + if (string.IsNullOrEmpty(taskHub) && string.IsNullOrEmpty(connectionName)) { - this.sync.ExitReadLock(); + // optimization for the most often used client. + return this.defaultClient; } - this.sync.EnterWriteLock(); - try - { - this.VerifyNotDisposed(); - if (this.clients!.TryGetValue(key, out ClientHolder? holder)) - { - this.logger.LogTrace("DurableTaskClient resolved from cache"); - return holder.Client; - } - - this.logger.LogTrace( - "DurableTaskClient cache miss, constructing for Endpoint: '{Endpoint}', TaskHub: '{TaskHub}', ConnectionName: '{ConnectionName}'", - endpoint, - taskHub, - connectionName); - GrpcChannel channel = CreateChannel(key); - GrpcDurableTaskClientOptions options = new() - { - Channel = channel, - DataConverter = this.options.DataConverter, - }; - - ILogger logger = this.loggerFactory.CreateLogger(); - GrpcDurableTaskClient client = new(taskHub, options, logger); - holder = new(client, channel); - this.clients[key] = holder; - return client; - } - finally - { - this.sync.ExitWriteLock(); - } + return this.GetClientCore(taskHub, connectionName); } - private void VerifyNotDisposed() + private DurableTaskClient GetClientCore(string? name, string? connection) { - if (this.disposed) + GrpcDurableTaskClientOptions options = new() { - throw new ObjectDisposedException(nameof(FunctionsDurableClientProvider)); - } + CallInvoker = this.CreateCallInvoker(name, connection), + DataConverter = this.options.DataConverter, + }; + + ILogger logger = this.loggerFactory.CreateLogger(); + return new GrpcDurableTaskClient(name ?? string.Empty, options, logger); } - // Wrapper class to conveniently dispose/shutdown the client and channel together. - private class ClientHolder : IAsyncDisposable + private CallInvoker CreateCallInvoker(string? name, string? connection) { - private readonly GrpcChannel channel; - - public ClientHolder(DurableTaskClient client, GrpcChannel channel) - { - this.Client = client; - this.channel = channel; - } - - public DurableTaskClient Client { get; } - - public async ValueTask DisposeAsync() + if (string.IsNullOrEmpty(name) && string.IsNullOrEmpty(connection)) { - try - { - await this.Client.DisposeAsync(); - await this.channel.ShutdownAsync(); - } - catch - { - // dispose should not throw and unsure how Channel multiple ShutdownAsync() calls behave. - } + return this.grpcOptions.CallInvoker; } - } - - private record ClientKey(Uri Address, string? Name, string? Connection) - { - private static readonly Dictionary EmptyHeaders = new(); - public IReadOnlyDictionary GetHeaders() + return this.grpcOptions.CallInvoker.Intercept(incoming => { - if (string.IsNullOrEmpty(this.Name) && string.IsNullOrEmpty(this.Connection)) + Metadata metadata = incoming; + if (incoming.IsReadOnly) { - return EmptyHeaders; + metadata = new(); + foreach (Metadata.Entry entry in incoming) + { + metadata.Add(entry); + } } - Dictionary headers = new(); - if (!string.IsNullOrEmpty(this.Name)) + if (!string.IsNullOrEmpty(name)) { - headers["Durable-TaskHub"] = this.Name!; + metadata.Add("Durable-TaskHub", name); } - if (!string.IsNullOrEmpty(this.Connection)) + if (!string.IsNullOrEmpty(connection)) { - headers["Durable-ConnectionName"] = this.Connection!; + metadata.Add("Durable-ConnectionName", connection); } - return headers; - } + return metadata; + }); } } diff --git a/src/Worker.Extensions.DurableTask/FunctionsDurableClientProvider.net6.cs b/src/Worker.Extensions.DurableTask/FunctionsDurableClientProvider.net6.cs deleted file mode 100644 index ac35f2dbb..000000000 --- a/src/Worker.Extensions.DurableTask/FunctionsDurableClientProvider.net6.cs +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -#if NET6_0_OR_GREATER -using System.Collections.Generic; -using System.Net.Http; -using Grpc.Net.Client; - -namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask; - -internal partial class FunctionsDurableClientProvider -{ - private static GrpcChannel CreateChannel(ClientKey key) - { - IReadOnlyDictionary headers = key.GetHeaders(); - if (headers.Count == 0) - { - return GrpcChannel.ForAddress(key.Address); - } - - - HttpClient httpClient = new(); - foreach (KeyValuePair header in headers) - { - httpClient.DefaultRequestHeaders.Add(header.Key, header.Value); - } - - GrpcChannelOptions options = new() - { - HttpClient = httpClient, - DisposeHttpClient = true, - }; - - return GrpcChannel.ForAddress(key.Address, options); - } -} -#endif diff --git a/src/Worker.Extensions.DurableTask/FunctionsDurableClientProvider.netstandard.cs b/src/Worker.Extensions.DurableTask/FunctionsDurableClientProvider.netstandard.cs deleted file mode 100644 index ec4908654..000000000 --- a/src/Worker.Extensions.DurableTask/FunctionsDurableClientProvider.netstandard.cs +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -#if NETSTANDARD -using System.Collections.Generic; -using Grpc.Core; -using Grpc.Core.Interceptors; - -namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask; - -internal partial class FunctionsDurableClientProvider -{ - private static Channel CreateChannel(ClientKey key) - { - IReadOnlyDictionary headers = key.GetHeaders(); - string address = $"{key.Address.Host}:{key.Address.Port}"; - return headers.Count > 0 - ? new ChannelWithHeaders(address, headers) - : new Channel(address, ChannelCredentials.Insecure); - } - - private class ChannelWithHeaders : Channel - { - private readonly IReadOnlyDictionary headers; - - public ChannelWithHeaders(string address, IReadOnlyDictionary headers) - : base(address, ChannelCredentials.Insecure) - { - this.headers = headers; - } - - public override CallInvoker CreateCallInvoker() - { - return base.CreateCallInvoker().Intercept(metadata => - { - foreach (KeyValuePair kvp in this.headers) - { - metadata.Add(kvp.Key, kvp.Value); - } - - return metadata; - }); - } - } -} -#endif diff --git a/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj b/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj index 9eba89f34..31abcb134 100644 --- a/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj +++ b/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj @@ -36,10 +36,11 @@ - - - - + + + + + diff --git a/test/FunctionsV2/WebJobs.Extensions.DurableTask.Tests.V2.csproj b/test/FunctionsV2/WebJobs.Extensions.DurableTask.Tests.V2.csproj index 4840ca88a..b589709c6 100644 --- a/test/FunctionsV2/WebJobs.Extensions.DurableTask.Tests.V2.csproj +++ b/test/FunctionsV2/WebJobs.Extensions.DurableTask.Tests.V2.csproj @@ -16,7 +16,6 @@ -