Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public AzureStorageDurabilityProviderFactory(
WorkerRuntimeType runtimeType = platformInfo.GetWorkerRuntimeType();
if (runtimeType == WorkerRuntimeType.DotNetIsolated ||
runtimeType == WorkerRuntimeType.Java ||
(runtimeType == WorkerRuntimeType.Python && platformInfo.GrpcManuallyRequested) ||
runtimeType == WorkerRuntimeType.Custom)
{
this.useSeparateQueueForEntityWorkItems = true;
Expand Down
19 changes: 19 additions & 0 deletions src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,26 @@

private WorkerRuntimeType? workerRuntimeType;

public bool GrpcManuallyRequested
{
get
{
string? grpcSetting = this.ReadEnviromentVariable("DURABLE_FUNCTIONS_GRPC_OPT_IN");
if (string.IsNullOrEmpty(grpcSetting))
{
return false;
}

if (bool.TryParse(grpcSetting, out bool grpcManuallyRequested))
{
return grpcManuallyRequested;
}

return false;
}
}

public DefaultPlatformInformation(INameResolver nameResolver, ILoggerFactory loggerFactory)

Check failure on line 44 in src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / build

Check failure on line 44 in src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 44 in src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-linux

Check warning on line 44 in src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-linux

Check warning on line 44 in src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / e2e-dts

Check warning on line 44 in src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / e2e-dts

Check warning on line 44 in src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / e2e-mssql

Check warning on line 44 in src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / e2e-mssql

Check warning on line 44 in src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-windows

Check warning on line 44 in src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-windows

Check warning on line 44 in src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 44 in src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 44 in src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 44 in src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 44 in src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 44 in src/WebJobs.Extensions.DurableTask/DefaultPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / build

{
this.nameResolver = nameResolver;

Expand Down
15 changes: 8 additions & 7 deletions src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Reflection;
using System.Text;
Expand All @@ -22,11 +21,8 @@
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Grpc;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Listener;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Storage;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -171,9 +167,11 @@ public DurableTaskExtension(
// Starting with .NET isolated and Java, we have a more efficient out-of-process
// function invocation protocol. Other languages will use the existing protocol.
WorkerRuntimeType runtimeType = this.PlatformInformationService.GetWorkerRuntimeType();
if (runtimeType == WorkerRuntimeType.DotNetIsolated ||
bool manuallyRequestedGrpc = this.PlatformInformationService.GrpcManuallyRequested;
if (manuallyRequestedGrpc ||
(runtimeType == WorkerRuntimeType.DotNetIsolated ||
runtimeType == WorkerRuntimeType.Java ||
runtimeType == WorkerRuntimeType.Custom)
runtimeType == WorkerRuntimeType.Custom))
{
this.OutOfProcProtocol = OutOfProcOrchestrationProtocol.MiddlewarePassthrough;
this.localGrpcListener = LocalGrpcListener.Create(this, this.Options.GrpcListenerMode);
Expand Down Expand Up @@ -464,8 +462,11 @@ private void StartLocalHttpServer()
WorkerRuntimeType.DotNetIsolated => false,
WorkerRuntimeType.Java => false,

// Python only uses the HTTP server if gRPC was not manually requested
// If it was, we assume the user is using the durabletask-based SDK and doesn't need HTTP
WorkerRuntimeType.Python => !this.PlatformInformationService.GrpcManuallyRequested,

// everything else - assume the HTTP server
WorkerRuntimeType.Python => true,
WorkerRuntimeType.Node => true,
WorkerRuntimeType.PowerShell => true,
WorkerRuntimeType.Unknown => true,
Expand Down
21 changes: 21 additions & 0 deletions src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,27 @@
return this.schedulerId;
}

/// <summary>
/// Returns the entity ID for a given instance ID.
/// </summary>
/// <param name="instanceId">The instance ID.</param>
/// <returns>the corresponding entity ID.</returns>
public static EntityId FromString(string instanceId)
{
if (string.IsNullOrEmpty(instanceId))
{
throw new ArgumentException(nameof(instanceId));
}

Check failure on line 90 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / build

Check failure on line 90 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 90 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-linux

Check warning on line 90 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-linux

Check warning on line 90 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / e2e-dts

Check warning on line 90 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / e2e-dts

Check warning on line 90 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / e2e-mssql

Check warning on line 90 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / e2e-mssql

Check warning on line 90 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-windows

Check warning on line 90 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-windows

Check warning on line 90 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 90 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 90 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 90 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 90 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 90 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / build

var pos = instanceId.IndexOf('@', 1);
if (pos <= 0 || instanceId[0] != '@')
{
throw new ArgumentException($"Instance ID '{instanceId}' is not a valid entity ID.", nameof(instanceId));
}

Check failure on line 95 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / build

Check failure on line 95 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 95 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-linux

Check warning on line 95 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-linux

Check warning on line 95 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / e2e-dts

Check warning on line 95 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / e2e-dts

Check warning on line 95 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / e2e-mssql

Check warning on line 95 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / e2e-mssql

Check warning on line 95 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-windows

Check warning on line 95 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-windows

Check warning on line 95 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 95 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 95 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 95 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 95 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 95 in src/WebJobs.Extensions.DurableTask/EntityScheduler/EntityId.cs

View workflow job for this annotation

GitHub Actions / build

var entityName = instanceId.Substring(1, pos - 1);
var entityKey = instanceId.Substring(pos + 1);
return new EntityId(entityName, entityKey);
}

/// <inheritdoc/>
public override bool Equals(object obj)
{
Expand Down
8 changes: 8 additions & 0 deletions src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,15 @@
/// </summary>
[Obsolete]
public interface IPlatformInformation
{

Check failure on line 79 in src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / build

An opening brace should not be followed by a blank line. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1505.md)

Check failure on line 79 in src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / build

An opening brace should not be followed by a blank line. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1505.md)

Check warning on line 79 in src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-linux

An opening brace should not be followed by a blank line. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1505.md)

Check warning on line 79 in src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-linux

An opening brace should not be followed by a blank line. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1505.md)

Check warning on line 79 in src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / e2e-dts

An opening brace should not be followed by a blank line. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1505.md)

Check warning on line 79 in src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / e2e-dts

An opening brace should not be followed by a blank line. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1505.md)

Check warning on line 79 in src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / e2e-mssql

An opening brace should not be followed by a blank line. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1505.md)

Check warning on line 79 in src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / e2e-mssql

An opening brace should not be followed by a blank line. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1505.md)

Check warning on line 79 in src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-windows

An opening brace should not be followed by a blank line. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1505.md)

Check warning on line 79 in src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-windows

An opening brace should not be followed by a blank line. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1505.md)

Check warning on line 79 in src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / build

An opening brace should not be followed by a blank line. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1505.md)

Check warning on line 79 in src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / build

An opening brace should not be followed by a blank line. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1505.md)

Check warning on line 79 in src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / build

An opening brace should not be followed by a blank line. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1505.md)

Check warning on line 79 in src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / build

An opening brace should not be followed by a blank line. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1505.md)

Check warning on line 79 in src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / build

An opening brace should not be followed by a blank line. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1505.md)

Check warning on line 79 in src/WebJobs.Extensions.DurableTask/IPlatformInformation.cs

View workflow job for this annotation

GitHub Actions / build

An opening brace should not be followed by a blank line. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1505.md)

/// <summary>
/// Determines whether the user has manually requested the gRPC protocol for worker communications.
/// </summary>
/// Used in Python when using the new durabletask-based Functions SDK.
/// <returns>True if the user has requested gRPC, False otherwise.</returns>
bool GrpcManuallyRequested { get; }

/// <summary>
/// Determine the underlying plan is Consumption or not.
/// </summary>
Expand Down
73 changes: 72 additions & 1 deletion src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Reflection;
Expand Down Expand Up @@ -314,11 +313,83 @@ public static OrchestratorAction ToOrchestratorAction(P.OrchestratorAction a)
}

return action;
case P.OrchestratorAction.OrchestratorActionTypeOneofCase.SendEntityMessage:
RequestMessage? entityMessage = null;
string? eventName = null;
string? targetInstance = null;
switch (a.SendEntityMessage.EntityMessageTypeCase)
{
case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityLockRequested:
entityMessage = new RequestMessage()
{
Operation = null,
Id = Guid.Parse(a.SendEntityMessage.EntityLockRequested.CriticalSectionId),
LockSet = a.SendEntityMessage.EntityLockRequested.LockSet.Skip(1).Select(s => EntityId.FromString(s)).ToArray(),
Position = a.SendEntityMessage.EntityLockRequested.Position,
ParentInstanceId = a.SendEntityMessage.EntityLockRequested.ParentInstanceId,
};
targetInstance = a.SendEntityMessage.EntityLockRequested.LockSet.ElementAt(0);
eventName = EncodeEventName(null);
break;
case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityUnlockSent:
entityMessage = new RequestMessage()
{
Id = Guid.Parse(a.SendEntityMessage.EntityUnlockSent.CriticalSectionId),
ParentInstanceId = a.SendEntityMessage.EntityUnlockSent.ParentInstanceId,
};
targetInstance = a.SendEntityMessage.EntityUnlockSent.TargetInstanceId;
eventName = "release";
break;
case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityOperationCalled:
entityMessage = new RequestMessage()
{
Operation = a.SendEntityMessage.EntityOperationCalled.Operation,
IsSignal = false,
Input = a.SendEntityMessage.EntityOperationCalled.Input,
Id = Guid.Parse(a.SendEntityMessage.EntityOperationCalled.RequestId),
ScheduledTime = a.SendEntityMessage.EntityOperationCalled.ScheduledTime?.ToDateTime(),
ParentInstanceId = a.SendEntityMessage.EntityOperationCalled.ParentInstanceId,
ParentExecutionId = a.SendEntityMessage.EntityOperationCalled.ParentExecutionId,
};
targetInstance = a.SendEntityMessage.EntityOperationCalled.TargetInstanceId;
eventName = EncodeEventName(a.SendEntityMessage.EntityOperationCalled.ScheduledTime?.ToDateTime());
break;
case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityOperationSignaled:
entityMessage = new RequestMessage()
{
Operation = a.SendEntityMessage.EntityOperationSignaled.Operation,
IsSignal = true,
Input = a.SendEntityMessage.EntityOperationSignaled.Input,
Id = Guid.Parse(a.SendEntityMessage.EntityOperationSignaled.RequestId),
ScheduledTime = a.SendEntityMessage.EntityOperationSignaled.ScheduledTime?.ToDateTime(),
};
targetInstance = a.SendEntityMessage.EntityOperationSignaled.TargetInstanceId;
eventName = EncodeEventName(a.SendEntityMessage.EntityOperationSignaled.ScheduledTime?.ToDateTime());
break;
default:
throw new NotSupportedException($"Deserialization of SendEntityMessage action type '{a.SendEntityMessage.EntityMessageTypeCase}' is not supported.");
}

return new SendEventOrchestratorAction
{
Id = a.Id,
Instance = new OrchestrationInstance
{
InstanceId = targetInstance,
},
EventName = eventName, // TODO: Determine event name for entity messages
EventData = JsonConvert.SerializeObject(entityMessage, new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.None }),
};

throw new NotSupportedException("Deserialization of SendEntityMessage action is not supported.");
default:
throw new NotSupportedException($"Received unsupported action type '{a.OrchestratorActionTypeCase}'.");
}
}

private static string EncodeEventName(DateTime? scheduledTime)
=> scheduledTime.HasValue ? $"op@{scheduledTime.Value:o}" : "op";

[return: NotNullIfNotNull("parameters")]
public static P.OrchestratorEntityParameters? ToProtobuf(this TaskOrchestrationEntityParameters? parameters)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
<AssemblyName>Microsoft.Azure.WebJobs.Extensions.DurableTask</AssemblyName>
<RootNamespace>Microsoft.Azure.WebJobs.Extensions.DurableTask</RootNamespace>
<MajorVersion>3</MajorVersion>
<MinorVersion>8</MinorVersion>
<PatchVersion>0</PatchVersion>
<MinorVersion>9</MinorVersion>
<PatchVersion>0-preview0</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<FileVersion>$(MajorVersion).$(MinorVersion).$(PatchVersion)</FileVersion>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
Expand Down
Loading