Skip to content

Commit 1fef0e4

Browse files
jviauhalspang
andauthored
Support class-based durable function invocations (#3229)
* Support class-based durable function invocations This change set adds support for: 1. Invoking class-based orchestration, entity, and activities directly. No source generator needed. 2. Moves handling invoking or func-based tasks out of the middleware and into a dedicated executor. Co-authored-by: Hal Spang <[email protected]>
1 parent 310e929 commit 1fef0e4

29 files changed

+1783
-206
lines changed

.github/workflows/smoketest-dotnet-isolated-v4.yml

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,13 @@ jobs:
7373
# when building the smoke test app in docker, causing the build to fail. This is a temporary workaround until the
7474
# root cause is identified and fixed.
7575

76-
# Due to a known issue with class-based orchestrators, this test fails to discover the orchestrator definition.
77-
# Should re-enable once this bug is fixed: https://github.com/microsoft/durabletask-dotnet/issues/247
78-
# - name: Run smoke tests (Hello Cities)
79-
# shell: pwsh
80-
# run: azurite --silent --blobPort 10000 --queuePort 10001 --tablePort 10002 &
81-
# cd ./test/SmokeTests/OOProcSmokeTests/DotNetIsolated && func host start --port 7071 &
82-
# ./test/SmokeTests/OOProcSmokeTests/DotNetIsolated/run-smoke-tests.ps1 -HttpStartPath api/StartHelloCitiesTyped
76+
- name: Run smoke tests (Hello Cities (Typed))
77+
shell: pwsh
78+
run: azurite --silent --blobPort 10000 --queuePort 10001 --tablePort 10002 &
79+
cd ./test/SmokeTests/OOProcSmokeTests/DotNetIsolated && func host start --port 7071 &
80+
./test/SmokeTests/OOProcSmokeTests/DotNetIsolated/run-smoke-tests.ps1 -HttpStartPath api/StartHelloCitiesTyped
8381

84-
- name: Run smoke tests (Hello Cities)
82+
- name: Run smoke tests (Hello Cities (Untyped))
8583
shell: pwsh
8684
run: azurite --silent --blobPort 10000 --queuePort 10001 --tablePort 10002 &
8785
cd ./test/SmokeTests/OOProcSmokeTests/DotNetIsolated && func host start --port 7071 &
@@ -108,4 +106,10 @@ jobs:
108106
shell: pwsh
109107
run: azurite --silent --blobPort 10000 --queuePort 10001 --tablePort 10002 &
110108
cd ./test/SmokeTests/OOProcSmokeTests/DotNetIsolated && func host start --port 7071 &
111-
./test/SmokeTests/OOProcSmokeTests/DotNetIsolated/run-smoke-tests.ps1 -HttpStartPath api/durable_HttpStartOOMOrchestrator
109+
./test/SmokeTests/OOProcSmokeTests/DotNetIsolated/run-smoke-tests.ps1 -HttpStartPath api/durable_HttpStartOOMOrchestrator
110+
111+
- name: Run smoke tests (Entity Counting (Typed))
112+
shell: pwsh
113+
run: azurite --silent --blobPort 10000 --queuePort 10001 --tablePort 10002 &
114+
cd ./test/SmokeTests/OOProcSmokeTests/DotNetIsolated && func host start --port 7071 &
115+
./test/SmokeTests/OOProcSmokeTests/DotNetIsolated/run-smoke-tests.ps1 -HttpStartPath api/StartEntityOrchestration

Directory.Packages.props

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
<PackageVersion Include="Microsoft.Azure.DurableTask.ApplicationInsights" Version="0.8.0" />
1717
<PackageVersion Include="Microsoft.Azure.DurableTask.AzureStorage" Version="2.7.0" />
1818
<PackageVersion Include="Microsoft.Azure.DurableTask.Core" Version="3.6.0" />
19-
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Core" Version="2.0.0" />
19+
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Core" Version="2.2.0" />
2020
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Abstractions" Version="1.3.0" />
2121
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Storage.Blobs" Version="6.7.0" />
2222
<PackageVersion Include="Microsoft.Azure.WebJobs" Version="3.0.39" />
@@ -51,14 +51,16 @@
5151
<!-- Dependencies used by both tests and samples -->
5252
<ItemGroup>
5353
<PackageVersion Include="Microsoft.ApplicationInsights.WorkerService" Version="2.22.0" />
54-
<PackageVersion Include="Microsoft.Azure.Functions.Worker" Version="2.0.0" />
55-
<!-- TODO: Update this to Worker SDK 2.x -->
56-
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.18.1" />
54+
<PackageVersion Include="Microsoft.Azure.DurableTask.Emulator" Version="2.5.4" />
55+
<PackageVersion Include="Microsoft.Azure.DurableTask.Netherite.AzureFunctions" Version="3.1.0" />
56+
<PackageVersion Include="Microsoft.Azure.DurableTask.Redis" Version="0.1.9-alpha" />
57+
<PackageVersion Include="Microsoft.Azure.Functions.Worker" Version="2.2.0" />
5758
<PackageVersion Include="Microsoft.Azure.Functions.Worker.ApplicationInsights" Version="2.0.0" />
5859
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask.AzureManaged" Version="0.4.2-alpha" />
5960
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask.SqlServer" Version="1.5.2" />
6061
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.3.0" />
6162
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore" Version="2.0.2" />
63+
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.0.5"/>
6264
<PackageVersion Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.13.7" />
6365
<PackageVersion Include="Microsoft.Azure.WebJobs.Extensions.Http" Version="3.2.0" />
6466
<PackageVersion Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="5.1.3" />
Lines changed: 7 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1,137 +1,28 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4-
using System;
5-
using System.Diagnostics.CodeAnalysis;
64
using System.Threading.Tasks;
7-
using Microsoft.Azure.Functions.Worker.Extensions.DurableTask.Exceptions;
5+
using Microsoft.Azure.Functions.Worker.Extensions.DurableTask.Execution;
6+
using Microsoft.Azure.Functions.Worker.Invocation;
87
using Microsoft.Azure.Functions.Worker.Middleware;
9-
using Microsoft.DurableTask.Worker;
10-
using Microsoft.DurableTask.Worker.Grpc;
118

129
namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask;
1310

1411
/// <summary>
1512
/// A middleware to handle orchestration triggers.
1613
/// </summary>
17-
internal class DurableTaskFunctionsMiddleware(ExtendedSessionsCache extendedSessionsCache) : IFunctionsWorkerMiddleware
14+
internal class DurableTaskFunctionsMiddleware(DurableFunctionExecutor invoker) : IFunctionsWorkerMiddleware
1815
{
19-
private readonly ExtendedSessionsCache extendedSessionsCache = extendedSessionsCache;
20-
2116
/// <inheritdoc />
2217
public Task Invoke(FunctionContext functionContext, FunctionExecutionDelegate next)
2318
{
24-
if (IsOrchestrationTrigger(functionContext, out BindingMetadata? triggerBinding))
25-
{
26-
return this.RunOrchestrationAsync(functionContext, triggerBinding, next);
27-
}
28-
29-
if (IsEntityTrigger(functionContext, out triggerBinding))
30-
{
31-
return RunEntityAsync(functionContext, triggerBinding, next);
32-
}
33-
34-
if (IsActivityTrigger(functionContext, out triggerBinding))
19+
if (functionContext.TryGetOrchestrationBinding(out _)
20+
|| functionContext.TryGetEntityBinding(out _)
21+
|| functionContext.TryGetActivityBinding(out _))
3522
{
36-
return RunActivityAsync(functionContext, triggerBinding, next);
23+
functionContext.Features.Set<IFunctionExecutor>(invoker);
3724
}
3825

3926
return next(functionContext);
4027
}
41-
42-
private static bool IsOrchestrationTrigger(
43-
FunctionContext context, [NotNullWhen(true)] out BindingMetadata? orchestrationTriggerBinding)
44-
{
45-
foreach (BindingMetadata binding in context.FunctionDefinition.InputBindings.Values)
46-
{
47-
if (string.Equals(binding.Type, "orchestrationTrigger", StringComparison.OrdinalIgnoreCase))
48-
{
49-
orchestrationTriggerBinding = binding;
50-
return true;
51-
}
52-
}
53-
54-
orchestrationTriggerBinding = null;
55-
return false;
56-
}
57-
58-
async Task RunOrchestrationAsync(
59-
FunctionContext context, BindingMetadata triggerBinding, FunctionExecutionDelegate next)
60-
{
61-
InputBindingData<object> triggerInputData = await context.BindInputAsync<object>(triggerBinding);
62-
if (triggerInputData?.Value is not string encodedOrchestratorState)
63-
{
64-
throw new InvalidOperationException("Orchestration history state was either missing from the input or not a string value.");
65-
}
66-
67-
FunctionsOrchestrator orchestrator = new(context, next, triggerInputData);
68-
string orchestratorOutput = GrpcOrchestrationRunner.LoadAndRun(
69-
encodedOrchestratorState, orchestrator, this.extendedSessionsCache, context.InstanceServices);
70-
71-
// Send the encoded orchestrator output as the return value seen by the functions host extension
72-
context.GetInvocationResult().Value = orchestratorOutput;
73-
}
74-
75-
private static bool IsEntityTrigger(
76-
FunctionContext context, [NotNullWhen(true)] out BindingMetadata? entityTriggerBinding)
77-
{
78-
foreach (BindingMetadata binding in context.FunctionDefinition.InputBindings.Values)
79-
{
80-
if (string.Equals(binding.Type, "entityTrigger", StringComparison.OrdinalIgnoreCase))
81-
{
82-
entityTriggerBinding = binding;
83-
return true;
84-
}
85-
}
86-
87-
entityTriggerBinding = null;
88-
return false;
89-
}
90-
91-
static async Task RunEntityAsync(
92-
FunctionContext context, BindingMetadata triggerBinding, FunctionExecutionDelegate next)
93-
{
94-
InputBindingData<object> triggerInputData = await context.BindInputAsync<object>(triggerBinding);
95-
if (triggerInputData?.Value is not string encodedEntityBatch)
96-
{
97-
throw new InvalidOperationException("Entity batch was either missing from the input or not a string value.");
98-
}
99-
100-
TaskEntityDispatcher dispatcher = new(encodedEntityBatch, context.InstanceServices);
101-
triggerInputData.Value = dispatcher;
102-
103-
await next(context);
104-
context.GetInvocationResult().Value = dispatcher.Result;
105-
}
106-
107-
private static bool IsActivityTrigger(
108-
FunctionContext context, [NotNullWhen(true)] out BindingMetadata? activityTriggerBinding)
109-
{
110-
foreach (BindingMetadata binding in context.FunctionDefinition.InputBindings.Values)
111-
{
112-
if (string.Equals(binding.Type, "activityTrigger", StringComparison.OrdinalIgnoreCase))
113-
{
114-
activityTriggerBinding = binding;
115-
return true;
116-
}
117-
}
118-
119-
activityTriggerBinding = null;
120-
return false;
121-
}
122-
123-
private static async Task RunActivityAsync(FunctionContext functionContext, BindingMetadata triggerBinding, FunctionExecutionDelegate next)
124-
{
125-
try
126-
{
127-
await next(functionContext);
128-
return;
129-
}
130-
catch (Exception ex)
131-
{
132-
// Get the exception properties provider from the service provider if available
133-
IExceptionPropertiesProvider? exceptionPropertiesProvider = functionContext.InstanceServices.GetService(typeof(IExceptionPropertiesProvider)) as IExceptionPropertiesProvider;
134-
throw new DurableSerializationException(ex, exceptionPropertiesProvider);
135-
}
136-
}
13728
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Threading.Tasks;
6+
using Microsoft.Azure.Functions.Worker.Extensions.DurableTask.Exceptions;
7+
using Microsoft.DurableTask;
8+
9+
namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask.Execution;
10+
11+
internal partial class DurableFunctionExecutor
12+
{
13+
// Must point to a PUBLIC method.
14+
// Functions runtime will validate this, even though it is never called.
15+
public static readonly string ActivityEntryPoint =
16+
$"{typeof(DurableFunctionExecutor).FullName}.{nameof(Activity)}";
17+
18+
public void Activity()
19+
{
20+
throw new NotImplementedException(
21+
"Do not call this method. It is a placeholder for activity function metadata.");
22+
}
23+
24+
private async ValueTask RunActivityAsync(FunctionContext context, BindingMetadata triggerBinding)
25+
{
26+
try
27+
{
28+
if (context.FunctionDefinition.EntryPoint == ActivityEntryPoint)
29+
{
30+
await this.RunDirectActivityAsync(context, triggerBinding);
31+
return;
32+
}
33+
34+
await inner.ExecuteAsync(context);
35+
return;
36+
}
37+
catch (Exception ex)
38+
{
39+
throw new DurableSerializationException(ex, exceptionPropertiesProvider);
40+
}
41+
}
42+
43+
private async Task RunDirectActivityAsync(FunctionContext context, BindingMetadata triggerBinding)
44+
{
45+
if (!factory.TryCreateActivity(
46+
context.FunctionDefinition.Name, context.InstanceServices, out ITaskActivity? activity))
47+
{
48+
throw new InvalidOperationException(
49+
$"No activity with name '{context.FunctionDefinition.Name}' is registered.");
50+
}
51+
52+
InputBindingData<object> triggerInputData = await context.BindInputAsync<object>(triggerBinding);
53+
if (triggerInputData?.Value is not string { } data)
54+
{
55+
throw new InvalidOperationException(
56+
"Activity input data was either missing from the input or not a JSON string.");
57+
}
58+
59+
object? input = this.Converter.Deserialize(data, activity.InputType);
60+
object? activityResult = await activity.RunAsync(new FunctionsTaskActivityContext(context), input);
61+
context.GetInvocationResult().Value = activityResult;
62+
}
63+
64+
private sealed class FunctionsTaskActivityContext(FunctionContext context)
65+
: TaskActivityContext
66+
{
67+
public FunctionContext Context { get; } = context;
68+
69+
public override TaskName Name { get; } = context.FunctionDefinition.Name;
70+
71+
public override string InstanceId { get; } = context.GetInstanceId();
72+
}
73+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Threading.Tasks;
6+
using Microsoft.DurableTask.Entities;
7+
using Microsoft.DurableTask.Worker;
8+
using Microsoft.DurableTask.Worker.Grpc;
9+
10+
namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask.Execution;
11+
12+
internal partial class DurableFunctionExecutor
13+
{
14+
// Must point to a PUBLIC method.
15+
// Functions runtime will validate this, even though it is never called.
16+
public static readonly string EntityEntryPoint =
17+
$"{typeof(DurableFunctionExecutor).FullName}.{nameof(Entity)}";
18+
19+
public void Entity()
20+
{
21+
throw new NotImplementedException(
22+
"Do not call this method. It is a placeholder for entity function metadata.");
23+
}
24+
25+
private async ValueTask RunEntityAsync(FunctionContext context, BindingMetadata triggerBinding)
26+
{
27+
InputBindingData<object> triggerInputData = await context.BindInputAsync<object>(triggerBinding);
28+
if (triggerInputData?.Value is not string encodedEntityBatch)
29+
{
30+
throw new InvalidOperationException(
31+
"Entity batch was either missing from the input or not a string value.");
32+
}
33+
34+
if (context.FunctionDefinition.EntryPoint == EntityEntryPoint)
35+
{
36+
await this.RunDirectEntityAsync(context, encodedEntityBatch);
37+
return;
38+
}
39+
40+
TaskEntityDispatcher dispatcher = new(encodedEntityBatch, context.InstanceServices);
41+
triggerInputData.Value = dispatcher;
42+
await inner.ExecuteAsync(context);
43+
context.GetInvocationResult().Value = dispatcher.Result;
44+
}
45+
46+
private async Task RunDirectEntityAsync(
47+
FunctionContext context, string encodedEntityBatch)
48+
{
49+
if (factory is not IDurableTaskFactory2 factory2)
50+
{
51+
throw new InvalidOperationException(
52+
"The registered durable task factory does not support entity invocations.");
53+
}
54+
55+
if (!factory2.TryCreateEntity(
56+
context.FunctionDefinition.Name, context.InstanceServices, out ITaskEntity? entity))
57+
{
58+
throw new InvalidOperationException(
59+
$"No entity with name '{context.FunctionDefinition.Name}' is registered.");
60+
}
61+
62+
string result = await GrpcEntityRunner.LoadAndRunAsync(
63+
encodedEntityBatch, entity, context.InstanceServices);
64+
context.GetInvocationResult().Value = result;
65+
}
66+
}

0 commit comments

Comments
 (0)