diff --git a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProviderFactory.cs b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProviderFactory.cs index f0041f403..702e53fed 100644 --- a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProviderFactory.cs +++ b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProviderFactory.cs @@ -77,6 +77,7 @@ public AzureStorageDurabilityProviderFactory( WorkerRuntimeType runtimeType = platformInfo.GetWorkerRuntimeType(); if (runtimeType == WorkerRuntimeType.DotNetIsolated || runtimeType == WorkerRuntimeType.Java || + (runtimeType == WorkerRuntimeType.Python && platformInfo.GrpcManuallyRequested) || runtimeType == WorkerRuntimeType.Custom) { this.useSeparateQueueForEntityWorkItems = true; diff --git a/src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs b/src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs index 807d5c885..f3521d913 100644 --- a/src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs +++ b/src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs @@ -22,6 +22,25 @@ internal class DefaultPlatformInformation : IPlatformInformation private WorkerRuntimeType? workerRuntimeType; + public bool GrpcManuallyRequested + { + get + { + string? grpcSetting = this.ReadEnviromentVariable("DURABLE_FUNCTIONS_GRPC_OPT_IN"); + if (string.IsNullOrEmpty(grpcSetting)) + { + return false; + } + + if (bool.TryParse(grpcSetting, out bool grpcManuallyRequested)) + { + return grpcManuallyRequested; + } + + return false; + } + } + public DefaultPlatformInformation(INameResolver nameResolver, ILoggerFactory loggerFactory) { this.nameResolver = nameResolver; diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs index 2955f2440..28b582a7d 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs @@ -6,7 +6,6 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; -using System.Net; using System.Net.Http; using System.Reflection; using System.Text; @@ -22,11 +21,8 @@ using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Grpc; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Listener; -using Microsoft.Azure.WebJobs.Extensions.DurableTask.Storage; -using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Config; using Microsoft.Azure.WebJobs.Host.Executors; -using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Azure.WebJobs.Logging; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -171,9 +167,11 @@ public DurableTaskExtension( // Starting with .NET isolated and Java, we have a more efficient out-of-process // function invocation protocol. Other languages will use the existing protocol. WorkerRuntimeType runtimeType = this.PlatformInformationService.GetWorkerRuntimeType(); - if (runtimeType == WorkerRuntimeType.DotNetIsolated || + bool manuallyRequestedGrpc = this.PlatformInformationService.GrpcManuallyRequested; + if (manuallyRequestedGrpc || + (runtimeType == WorkerRuntimeType.DotNetIsolated || runtimeType == WorkerRuntimeType.Java || - runtimeType == WorkerRuntimeType.Custom) + runtimeType == WorkerRuntimeType.Custom)) { this.OutOfProcProtocol = OutOfProcOrchestrationProtocol.MiddlewarePassthrough; this.localGrpcListener = LocalGrpcListener.Create(this, this.Options.GrpcListenerMode); @@ -464,8 +462,11 @@ private void StartLocalHttpServer() WorkerRuntimeType.DotNetIsolated => false, WorkerRuntimeType.Java => false, + // Python only uses the HTTP server if gRPC was not manually requested + // If it was, we assume the user is using the durabletask-based SDK and doesn't need HTTP + WorkerRuntimeType.Python => !this.PlatformInformationService.GrpcManuallyRequested, + // everything else - assume the HTTP server - WorkerRuntimeType.Python => true, WorkerRuntimeType.Node => true, WorkerRuntimeType.PowerShell => true, WorkerRuntimeType.Unknown => true, diff --git a/src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs b/src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs index afc854181..68aa97ff9 100644 --- a/src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs +++ b/src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs @@ -77,6 +77,27 @@ public override string ToString() return this.schedulerId; } + /// + /// Returns the entity ID for a given instance ID. + /// + /// The instance ID. + /// the corresponding entity ID. + public static EntityId FromString(string instanceId) + { + if (string.IsNullOrEmpty(instanceId)) + { + throw new ArgumentException(nameof(instanceId)); + } + var pos = instanceId.IndexOf('@', 1); + if (pos <= 0 || instanceId[0] != '@') + { + throw new ArgumentException($"Instance ID '{instanceId}' is not a valid entity ID.", nameof(instanceId)); + } + var entityName = instanceId.Substring(1, pos - 1); + var entityKey = instanceId.Substring(pos + 1); + return new EntityId(entityName, entityKey); + } + /// public override bool Equals(object obj) { diff --git a/src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs b/src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs index 2c73d5cd6..289b22325 100644 --- a/src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs +++ b/src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs @@ -77,6 +77,14 @@ public enum WorkerRuntimeType [Obsolete] public interface IPlatformInformation { + + /// + /// Determines whether the user has manually requested the gRPC protocol for worker communications. + /// + /// Used in Python when using the new durabletask-based Functions SDK. + /// True if the user has requested gRPC, False otherwise. + bool GrpcManuallyRequested { get; } + /// /// Determine the underlying plan is Consumption or not. /// diff --git a/src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs b/src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs index 0975c5e98..e1bfe6ea9 100644 --- a/src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs +++ b/src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs @@ -6,7 +6,6 @@ using System.Collections; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; -using System.Globalization; using System.IO; using System.Linq; using System.Reflection; @@ -314,11 +313,83 @@ public static OrchestratorAction ToOrchestratorAction(P.OrchestratorAction a) } return action; + case P.OrchestratorAction.OrchestratorActionTypeOneofCase.SendEntityMessage: + RequestMessage? entityMessage = null; + string? eventName = null; + string? targetInstance = null; + switch (a.SendEntityMessage.EntityMessageTypeCase) + { + case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityLockRequested: + entityMessage = new RequestMessage() + { + Operation = null, + Id = Guid.Parse(a.SendEntityMessage.EntityLockRequested.CriticalSectionId), + LockSet = a.SendEntityMessage.EntityLockRequested.LockSet.Skip(1).Select(s => EntityId.FromString(s)).ToArray(), + Position = a.SendEntityMessage.EntityLockRequested.Position, + ParentInstanceId = a.SendEntityMessage.EntityLockRequested.ParentInstanceId, + }; + targetInstance = a.SendEntityMessage.EntityLockRequested.LockSet.ElementAt(0); + eventName = EncodeEventName(null); + break; + case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityUnlockSent: + entityMessage = new RequestMessage() + { + Id = Guid.Parse(a.SendEntityMessage.EntityUnlockSent.CriticalSectionId), + ParentInstanceId = a.SendEntityMessage.EntityUnlockSent.ParentInstanceId, + }; + targetInstance = a.SendEntityMessage.EntityUnlockSent.TargetInstanceId; + eventName = "release"; + break; + case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityOperationCalled: + entityMessage = new RequestMessage() + { + Operation = a.SendEntityMessage.EntityOperationCalled.Operation, + IsSignal = false, + Input = a.SendEntityMessage.EntityOperationCalled.Input, + Id = Guid.Parse(a.SendEntityMessage.EntityOperationCalled.RequestId), + ScheduledTime = a.SendEntityMessage.EntityOperationCalled.ScheduledTime?.ToDateTime(), + ParentInstanceId = a.SendEntityMessage.EntityOperationCalled.ParentInstanceId, + ParentExecutionId = a.SendEntityMessage.EntityOperationCalled.ParentExecutionId, + }; + targetInstance = a.SendEntityMessage.EntityOperationCalled.TargetInstanceId; + eventName = EncodeEventName(a.SendEntityMessage.EntityOperationCalled.ScheduledTime?.ToDateTime()); + break; + case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityOperationSignaled: + entityMessage = new RequestMessage() + { + Operation = a.SendEntityMessage.EntityOperationSignaled.Operation, + IsSignal = true, + Input = a.SendEntityMessage.EntityOperationSignaled.Input, + Id = Guid.Parse(a.SendEntityMessage.EntityOperationSignaled.RequestId), + ScheduledTime = a.SendEntityMessage.EntityOperationSignaled.ScheduledTime?.ToDateTime(), + }; + targetInstance = a.SendEntityMessage.EntityOperationSignaled.TargetInstanceId; + eventName = EncodeEventName(a.SendEntityMessage.EntityOperationSignaled.ScheduledTime?.ToDateTime()); + break; + default: + throw new NotSupportedException($"Deserialization of SendEntityMessage action type '{a.SendEntityMessage.EntityMessageTypeCase}' is not supported."); + } + + return new SendEventOrchestratorAction + { + Id = a.Id, + Instance = new OrchestrationInstance + { + InstanceId = targetInstance, + }, + EventName = eventName, // TODO: Determine event name for entity messages + EventData = JsonConvert.SerializeObject(entityMessage, new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.None }), + }; + + throw new NotSupportedException("Deserialization of SendEntityMessage action is not supported."); default: throw new NotSupportedException($"Received unsupported action type '{a.OrchestratorActionTypeCase}'."); } } + private static string EncodeEventName(DateTime? scheduledTime) + => scheduledTime.HasValue ? $"op@{scheduledTime.Value:o}" : "op"; + [return: NotNullIfNotNull("parameters")] public static P.OrchestratorEntityParameters? ToProtobuf(this TaskOrchestrationEntityParameters? parameters) { diff --git a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj index a954508ba..ca05db7f8 100644 --- a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj +++ b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj @@ -5,8 +5,8 @@ Microsoft.Azure.WebJobs.Extensions.DurableTask Microsoft.Azure.WebJobs.Extensions.DurableTask 3 - 8 - 0 + 9 + 0-preview0 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).0.0.0