diff --git a/Directory.Packages.props b/Directory.Packages.props index 34509fe6..b90b4723 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -24,6 +24,7 @@ + @@ -34,6 +35,11 @@ + + + + + diff --git a/Microsoft.DurableTask.sln b/Microsoft.DurableTask.sln index 26c2e80d..383f2e15 100644 --- a/Microsoft.DurableTask.sln +++ b/Microsoft.DurableTask.sln @@ -93,6 +93,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ScheduleWebApp", "samples\S EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ScheduledTasks.Tests", "test\ScheduledTasks.Tests\ScheduledTasks.Tests.csproj", "{D2779F32-A548-44F8-B60A-6AC018966C79}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LargePayloadConsoleApp", "samples\LargePayloadConsoleApp\LargePayloadConsoleApp.csproj", "{6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AzureBlobPayloads", "src\Extensions\AzureBlobPayloads\AzureBlobPayloads.csproj", "{FE1DA748-D6DB-E168-BC42-6DBBCEAF229C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -247,6 +251,14 @@ Global {D2779F32-A548-44F8-B60A-6AC018966C79}.Debug|Any CPU.Build.0 = Debug|Any CPU {D2779F32-A548-44F8-B60A-6AC018966C79}.Release|Any CPU.ActiveCfg = Release|Any CPU {D2779F32-A548-44F8-B60A-6AC018966C79}.Release|Any CPU.Build.0 = Release|Any CPU + {6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC}.Release|Any CPU.Build.0 = Release|Any CPU + {FE1DA748-D6DB-E168-BC42-6DBBCEAF229C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {FE1DA748-D6DB-E168-BC42-6DBBCEAF229C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {FE1DA748-D6DB-E168-BC42-6DBBCEAF229C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {FE1DA748-D6DB-E168-BC42-6DBBCEAF229C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -293,6 +305,8 @@ Global {A89B766C-987F-4C9F-8937-D0AB9FE640C8} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} {100348B5-4D97-4A3F-B777-AB14F276F8FE} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} {D2779F32-A548-44F8-B60A-6AC018966C79} = {E5637F81-2FB9-4CD7-900D-455363B142A7} + {6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} + {FE1DA748-D6DB-E168-BC42-6DBBCEAF229C} = {8AFC9781-F6F1-4696-BB4A-9ED7CA9D612B} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {AB41CB55-35EA-4986-A522-387AB3402E71} diff --git a/samples/LargePayloadConsoleApp/LargePayloadConsoleApp.csproj b/samples/LargePayloadConsoleApp/LargePayloadConsoleApp.csproj new file mode 100644 index 00000000..b0f2914c --- /dev/null +++ b/samples/LargePayloadConsoleApp/LargePayloadConsoleApp.csproj @@ -0,0 +1,24 @@ + + + + Exe + net8.0 + enable + + + + + + + + + + + + + + + + + + diff --git a/samples/LargePayloadConsoleApp/Program.cs b/samples/LargePayloadConsoleApp/Program.cs new file mode 100644 index 00000000..9e956180 --- /dev/null +++ b/samples/LargePayloadConsoleApp/Program.cs @@ -0,0 +1,198 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.AzureManaged; +using Microsoft.DurableTask.Client.Entities; +using Microsoft.DurableTask.Entities; +using Microsoft.DurableTask.Worker; +using Microsoft.DurableTask.Worker.AzureManaged; +using Microsoft.DurableTask; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +// Demonstrates Large Payload Externalization using Azure Blob Storage. +// This sample uses Azurite/emulator by default via UseDevelopmentStorage=true. + +HostApplicationBuilder builder = Host.CreateApplicationBuilder(args); + +// Connection string for Durable Task Scheduler +string schedulerConnectionString = builder.Configuration.GetValue("DURABLE_TASK_SCHEDULER_CONNECTION_STRING") + ?? throw new InvalidOperationException("Missing required configuration 'DURABLE_TASK_SCHEDULER_CONNECTION_STRING'"); + +// 1) Register shared payload store ONCE +builder.Services.AddExternalizedPayloadStore(opts => +{ + // Keep threshold small to force externalization for demo purposes + opts.ExternalizeThresholdBytes = 1024; // 1KB + opts.ConnectionString = builder.Configuration.GetValue("DURABLETASK_STORAGE") ?? "UseDevelopmentStorage=true"; + opts.ContainerName = builder.Configuration.GetValue("DURABLETASK_PAYLOAD_CONTAINER"); +}); + +// 2) Configure Durable Task client +builder.Services.AddDurableTaskClient(b => +{ + b.UseDurableTaskScheduler(schedulerConnectionString); + b.Configure(o => o.EnableEntitySupport = true); + + // Use shared store (no duplication of options) + b.UseExternalizedPayloads(); +}); + +// 3) Configure Durable Task worker +builder.Services.AddDurableTaskWorker(b => +{ + b.UseDurableTaskScheduler(schedulerConnectionString); + + b.AddTasks(tasks => + { + // Orchestrator: call activity first, return its output (should equal original input) + tasks.AddOrchestratorFunc("LargeInputEcho", async (ctx, input) => + { + string echoed = await ctx.CallActivityAsync("Echo", input); + return echoed; + }); + + // Activity: validate it receives raw input (not token) and return it + tasks.AddActivityFunc("Echo", (ctx, value) => + { + if (value is null) + { + return string.Empty; + } + + if (value.StartsWith("blob:v1:", StringComparison.Ordinal)) + { + throw new InvalidOperationException("Activity received a payload token instead of raw input."); + } + + return value; + }); + + // Entity samples + tasks.AddOrchestratorFunc( + "LargeEntityOperationInput", + (ctx, _) => ctx.Entities.CallEntityAsync( + new EntityInstanceId(nameof(EchoLengthEntity), "1"), + operationName: "EchoLength", + input: new string('E', 700 * 1024))); + tasks.AddEntity(nameof(EchoLengthEntity)); + + tasks.AddOrchestratorFunc( + "LargeEntityOperationOutput", + async (ctx, _) => (await ctx.Entities.CallEntityAsync( + new EntityInstanceId(nameof(LargeResultEntity), "1"), + operationName: "Produce", + input: 850 * 1024)).Length); + tasks.AddEntity(nameof(LargeResultEntity)); + + tasks.AddOrchestratorFunc( + "LargeEntityState", + async (ctx, _) => + { + await ctx.Entities.CallEntityAsync( + new EntityInstanceId(nameof(StateEntity), "1"), + operationName: "Set", + input: new string('S', 900 * 1024)); + return null; + }); + tasks.AddEntity(nameof(StateEntity)); + }); + + // Use shared store (no duplication of options) + b.UseExternalizedPayloads(); + + b.Configure(o => o.EnableEntitySupport = true); +}); + +IHost host = builder.Build(); +await host.StartAsync(); + +await using DurableTaskClient client = host.Services.GetRequiredService(); + +// Option A: Directly pass an oversized input to orchestration to trigger externalization +string largeInput = new string('B', 1024 * 1024); // 1MB +string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("LargeInputEcho", largeInput); +Console.WriteLine($"Started orchestration with direct large input. Instance: {instanceId}"); + + +using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(120)); +OrchestrationMetadata result = await client.WaitForInstanceCompletionAsync( + instanceId, + getInputsAndOutputs: true, + cts.Token); + +Console.WriteLine($"RuntimeStatus: {result.RuntimeStatus}"); +string deserializedInput = result.ReadInputAs() ?? string.Empty; +string deserializedOutput = result.ReadOutputAs() ?? string.Empty; + +Console.WriteLine($"SerializedInput: {result.SerializedInput}"); +Console.WriteLine($"SerializedOutput: {result.SerializedOutput}"); +Console.WriteLine($"Deserialized input equals original: {deserializedInput == largeInput}"); +Console.WriteLine($"Deserialized output equals original: {deserializedOutput == largeInput}"); +Console.WriteLine($"Deserialized input length: {deserializedInput.Length}"); + +// Run entity samples +Console.WriteLine(); +Console.WriteLine("Running LargeEntityOperationInput..."); +string largeEntityInput = new string('E', 700 * 1024); // 700KB +string entityInputInstance = await client.ScheduleNewOrchestrationInstanceAsync("LargeEntityOperationInput"); +OrchestrationMetadata entityInputResult = await client.WaitForInstanceCompletionAsync(entityInputInstance, getInputsAndOutputs: true, cts.Token); +int entityInputLength = entityInputResult.ReadOutputAs(); +Console.WriteLine($"Status: {entityInputResult.RuntimeStatus}, Output length: {entityInputLength}"); +Console.WriteLine($"Deserialized input length equals original: {entityInputLength == largeEntityInput.Length}"); + +Console.WriteLine(); +Console.WriteLine("Running LargeEntityOperationOutput..."); +int largeEntityOutputLength = 850 * 1024; // 850KB +string entityOutputInstance = await client.ScheduleNewOrchestrationInstanceAsync("LargeEntityOperationOutput"); +OrchestrationMetadata entityOutputResult = await client.WaitForInstanceCompletionAsync(entityOutputInstance, getInputsAndOutputs: true, cts.Token); +int entityOutputLength = entityOutputResult.ReadOutputAs(); +Console.WriteLine($"Status: {entityOutputResult.RuntimeStatus}, Output length: {entityOutputLength}"); +Console.WriteLine($"Deserialized output length equals original: {entityOutputLength == largeEntityOutputLength}"); + +Console.WriteLine(); +Console.WriteLine("Running LargeEntityState and querying state..."); +string largeEntityState = new string('S', 900 * 1024); // 900KB +string entityStateInstance = await client.ScheduleNewOrchestrationInstanceAsync("LargeEntityState"); +OrchestrationMetadata entityStateOrch = await client.WaitForInstanceCompletionAsync(entityStateInstance, getInputsAndOutputs: true, cts.Token); +Console.WriteLine($"Status: {entityStateOrch.RuntimeStatus}"); +EntityMetadata? state = await client.Entities.GetEntityAsync(new EntityInstanceId(nameof(StateEntity), "1"), includeState: true); +int stateLength = state?.State?.Length ?? 0; +Console.WriteLine($"State length: {stateLength}"); +Console.WriteLine($"Deserialized state equals original: {state?.State == largeEntityState}"); + + + + + +public class EchoLengthEntity : TaskEntity +{ + public int EchoLength(string input) + { + return input.Length; + } +} + +public class LargeResultEntity : TaskEntity +{ + public string Produce(int length) + { + return new string('R', length); + } +} + +public class StateEntity : TaskEntity +{ + protected override string? InitializeState(TaskEntityOperation entityOperation) + { + // Avoid Activator.CreateInstance() which throws; start as null (no state) + return null; + } + + public void Set(string value) + { + this.State = value; + } +} \ No newline at end of file diff --git a/samples/LargePayloadConsoleApp/Properties/launchSettings.json b/samples/LargePayloadConsoleApp/Properties/launchSettings.json new file mode 100644 index 00000000..89f6b592 --- /dev/null +++ b/samples/LargePayloadConsoleApp/Properties/launchSettings.json @@ -0,0 +1,12 @@ +{ + "profiles": { + "LargePayloadConsoleApp": { + "commandName": "Project", + "environmentVariables": { + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "", + "DURABLETASK_STORAGE": "", + "DURABLETASK_PAYLOAD_CONTAINER": "" + } + } + } +} diff --git a/samples/LargePayloadConsoleApp/README.md b/samples/LargePayloadConsoleApp/README.md new file mode 100644 index 00000000..812098ef --- /dev/null +++ b/samples/LargePayloadConsoleApp/README.md @@ -0,0 +1,29 @@ +# Large Payload Externalization Sample + +This sample demonstrates configuring Durable Task to externalize large payloads to Azure Blob Storage using `UseExternalizedPayloads` on both client and worker, connecting via Durable Task Scheduler (no local sidecar). + +- Defaults to Azurite/Storage Emulator via `UseDevelopmentStorage=true`. +- Threshold is set to 1KB for demo, so even modest inputs are externalized. + +## Prerequisites + +- A Durable Task Scheduler connection string (e.g., from Azure portal) in `DURABLE_TASK_SCHEDULER_CONNECTION_STRING`. +- Optional: Run Azurite (if not using real Azure Storage) for payload storage tokens. + +## Configure + +Environment variables (optional): + +- `DURABLETASK_STORAGE`: Azure Storage connection string. Defaults to `UseDevelopmentStorage=true`. +- `DURABLETASK_PAYLOAD_CONTAINER`: Blob container name. Defaults to `durabletask-payloads`. + +## Run + +```bash +# from repo root +dotnet run --project samples/LargePayloadConsoleApp/LargePayloadConsoleApp.csproj +``` + +The app starts an orchestration with a 1MB input, which is externalized by the client and resolved by the worker. The console shows a token-like serialized input and a deserialized input length. + + diff --git a/src/Extensions/AzureBlobPayloads/AzureBlobPayloads.csproj b/src/Extensions/AzureBlobPayloads/AzureBlobPayloads.csproj new file mode 100644 index 00000000..8462b0f1 --- /dev/null +++ b/src/Extensions/AzureBlobPayloads/AzureBlobPayloads.csproj @@ -0,0 +1,30 @@ + + + + netstandard2.0;net6.0 + Azure Blob Storage externalized payload support for Durable Task. + Microsoft.DurableTask.Extensions.AzureBlobPayloads + Microsoft.DurableTask + true + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/Extensions/AzureBlobPayloads/DependencyInjection/DurableTaskClientBuilderExtensions.AzureBlobPayloads.cs b/src/Extensions/AzureBlobPayloads/DependencyInjection/DurableTaskClientBuilderExtensions.AzureBlobPayloads.cs new file mode 100644 index 00000000..0817bcea --- /dev/null +++ b/src/Extensions/AzureBlobPayloads/DependencyInjection/DurableTaskClientBuilderExtensions.AzureBlobPayloads.cs @@ -0,0 +1,61 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Grpc.Core.Interceptors; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.Grpc; +using Microsoft.DurableTask.Converters; +using Microsoft.DurableTask.Worker.Grpc.Internal; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; + +namespace Microsoft.DurableTask; + +/// +/// Extension methods to enable externalized payloads using Azure Blob Storage for Durable Task Client. +/// +public static class DurableTaskClientBuilderExtensionsAzureBlobPayloads +{ + /// + /// Enables externalized payload storage using a pre-configured shared payload store. + /// This overload helps ensure client and worker use the same configuration. + /// + /// The builder to configure. + /// The original builder, for call chaining. + public static IDurableTaskClientBuilder UseExternalizedPayloads( + this IDurableTaskClientBuilder builder) + { + Check.NotNull(builder); + return UseExternalizedPayloadsCore(builder); + } + + static IDurableTaskClientBuilder UseExternalizedPayloadsCore(IDurableTaskClientBuilder builder) + { + // Wrap the gRPC CallInvoker with our interceptor when using the gRPC client + builder.Services + .AddOptions(builder.Name) + .PostConfigure>((opt, store, monitor) => + { + LargePayloadStorageOptions opts = monitor.Get(builder.Name); + if (opt.Channel is not null) + { + Grpc.Core.CallInvoker invoker = opt.Channel.Intercept(new AzureBlobPayloadsSideCarInterceptor(store, opts)); + opt.CallInvoker = invoker; + + // Ensure client uses the intercepted invoker path + opt.Channel = null; + } + else if (opt.CallInvoker is not null) + { + opt.CallInvoker = opt.CallInvoker.Intercept(new AzureBlobPayloadsSideCarInterceptor(store, opts)); + } + else + { + throw new ArgumentException( + "Channel or CallInvoker must be provided to use Azure Blob Payload Externalization feature"); + } + }); + + return builder; + } +} diff --git a/src/Extensions/AzureBlobPayloads/DependencyInjection/DurableTaskWorkerBuilderExtensions.AzureBlobPayloads.cs b/src/Extensions/AzureBlobPayloads/DependencyInjection/DurableTaskWorkerBuilderExtensions.AzureBlobPayloads.cs new file mode 100644 index 00000000..d65e30b8 --- /dev/null +++ b/src/Extensions/AzureBlobPayloads/DependencyInjection/DurableTaskWorkerBuilderExtensions.AzureBlobPayloads.cs @@ -0,0 +1,84 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Grpc.Core.Interceptors; +using Grpc.Net.Client; +using Microsoft.DurableTask.Converters; +using Microsoft.DurableTask.Worker; +using Microsoft.DurableTask.Worker.Grpc; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; + +namespace Microsoft.DurableTask; + +/// +/// Extension methods to enable externalized payloads using Azure Blob Storage for Durable Task Worker. +/// +public static class DurableTaskWorkerBuilderExtensionsAzureBlobPayloads +{ + /// + /// Enables externalized payload storage using Azure Blob Storage for the specified worker builder. + /// + /// The builder to configure. + /// The callback to configure the storage options. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseExternalizedPayloads( + this IDurableTaskWorkerBuilder builder, + Action configure) + { + Check.NotNull(builder); + Check.NotNull(configure); + + builder.Services.Configure(builder.Name, configure); + builder.Services.AddSingleton(sp => + { + LargePayloadStorageOptions opts = sp.GetRequiredService>().Get(builder.Name); + return new BlobPayloadStore(opts); + }); + + return UseExternalizedPayloadsCore(builder); + } + + /// + /// Enables externalized payload storage using a pre-configured shared payload store. + /// This overload helps ensure client and worker use the same configuration. + /// + /// The builder to configure. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseExternalizedPayloads( + this IDurableTaskWorkerBuilder builder) + { + Check.NotNull(builder); + return UseExternalizedPayloadsCore(builder); + } + + static IDurableTaskWorkerBuilder UseExternalizedPayloadsCore(IDurableTaskWorkerBuilder builder) + { + // Wrap the gRPC CallInvoker with our interceptor when using the gRPC worker + builder.Services + .AddOptions(builder.Name) + .PostConfigure>((opt, store, monitor) => + { + LargePayloadStorageOptions opts = monitor.Get(builder.Name); + if (opt.Channel is not null) + { + var invoker = opt.Channel.Intercept(new AzureBlobPayloadsSideCarInterceptor(store, opts)); + opt.CallInvoker = invoker; + + // Ensure worker uses the intercepted invoker path + opt.Channel = null; + } + else if (opt.CallInvoker is not null) + { + opt.CallInvoker = opt.CallInvoker.Intercept(new AzureBlobPayloadsSideCarInterceptor(store, opts)); + } + else + { + throw new ArgumentException( + "Channel or CallInvoker must be provided to use Azure Blob Payload Externalization feature"); + } + }); + + return builder; + } +} diff --git a/src/Extensions/AzureBlobPayloads/DependencyInjection/ServiceCollectionExtensions.AzureBlobPayloads.cs b/src/Extensions/AzureBlobPayloads/DependencyInjection/ServiceCollectionExtensions.AzureBlobPayloads.cs new file mode 100644 index 00000000..787888b6 --- /dev/null +++ b/src/Extensions/AzureBlobPayloads/DependencyInjection/ServiceCollectionExtensions.AzureBlobPayloads.cs @@ -0,0 +1,45 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; + +namespace Microsoft.DurableTask; + +/// +/// DI extensions for configuring a shared Azure Blob payload store used by both client and worker. +/// +public static class ServiceCollectionExtensionsAzureBlobPayloads +{ + /// + /// Registers a shared Azure Blob-based externalized payload store and its options. + /// The provided options apply to all named Durable Task builders (client/worker), + /// so UseExternalizedPayloads() can be called without repeating configuration. + /// + /// The service collection. + /// The configuration callback for the payload store. + /// The original service collection. + public static IServiceCollection AddExternalizedPayloadStore( + this IServiceCollection services, + Action configure) + { + Check.NotNull(services); + Check.NotNull(configure); + + // Apply once to ALL names (IConfigureOptions hits every named options instance), + // so monitor.Get(builder.Name) in the client/worker extensions will see the same config. + services.Configure(configure); + + // Provide a single shared PayloadStore instance built from the default options. + services.AddSingleton(sp => + { + IOptionsMonitor monitor = + sp.GetRequiredService>(); + + LargePayloadStorageOptions opts = monitor.Get(Options.DefaultName); + return new BlobPayloadStore(opts); + }); + + return services; + } +} diff --git a/src/Extensions/AzureBlobPayloads/Factories/AzureBlobPayloadCallInvokerFactory.cs b/src/Extensions/AzureBlobPayloads/Factories/AzureBlobPayloadCallInvokerFactory.cs new file mode 100644 index 00000000..62beb8bd --- /dev/null +++ b/src/Extensions/AzureBlobPayloads/Factories/AzureBlobPayloadCallInvokerFactory.cs @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Grpc.Core; +using Grpc.Core.Interceptors; +using Grpc.Net.Client; + +namespace Microsoft.DurableTask; + +/// +/// Static factory for creating large payload interceptors without exposing internal implementation details. +/// +public static class AzureBlobPayloadCallInvokerFactory +{ + /// + /// Creates a CallInvoker with large payload support interceptor applied to the given GrpcChannel. + /// + /// The gRPC channel to intercept. + /// The large payload storage options. + /// A CallInvoker with the large payload interceptor applied. + public static CallInvoker Create(GrpcChannel channel, LargePayloadStorageOptions options) + { + PayloadStore payloadStore = new BlobPayloadStore(options); + return channel.CreateCallInvoker().Intercept(new AzureBlobPayloadsManagedBackendInterceptor(payloadStore, options)); + } +} diff --git a/src/Extensions/AzureBlobPayloads/Interceptors/AzureBlobPayloadsManagedBackendInterceptor.cs b/src/Extensions/AzureBlobPayloads/Interceptors/AzureBlobPayloadsManagedBackendInterceptor.cs new file mode 100644 index 00000000..5e2f1acd --- /dev/null +++ b/src/Extensions/AzureBlobPayloads/Interceptors/AzureBlobPayloadsManagedBackendInterceptor.cs @@ -0,0 +1,495 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using P = Microsoft.DurableTask.AzureManagedBackend.Protobuf; + +namespace Microsoft.DurableTask; + +/// +/// gRPC interceptor that externalizes large payloads to an on requests +/// and resolves known payload tokens on responses for Azure Managed Backend. +/// +public sealed class AzureBlobPayloadsManagedBackendInterceptor(PayloadStore payloadStore, LargePayloadStorageOptions options) + : BasePayloadInterceptor(payloadStore, options) +{ + /// + protected override async Task ExternalizeRequestPayloadsAsync(TRequest request, CancellationToken cancellation) + { + // Azure Managed Backend -> Backend Service + // Note: This interceptor is designed for backend_service.proto types, but since those types + // are not yet generated with the updated namespace, we'll use the existing orchestrator_service.proto types + // for now. The user should run refresh-protos.ps1 to generate the proper types. + switch (request) + { + case P.CreateInstanceRequest r: + r.Input = await this.MaybeExternalizeAsync(r.Input, cancellation); + break; + case P.RaiseEventRequest r: + r.Input = await this.MaybeExternalizeAsync(r.Input, cancellation); + break; + case P.TerminateRequest r: + r.Output = await this.MaybeExternalizeAsync(r.Output, cancellation); + break; + case P.SuspendRequest r: + r.Reason = await this.MaybeExternalizeAsync(r.Reason, cancellation); + break; + case P.ResumeRequest r: + r.Reason = await this.MaybeExternalizeAsync(r.Reason, cancellation); + break; + case P.SignalEntityRequest r: + r.Input = await this.MaybeExternalizeAsync(r.Input, cancellation); + break; + case P.ActivityResponse r: + r.Result = await this.MaybeExternalizeAsync(r.Result, cancellation); + break; + case P.OrchestratorResponse r: + await this.ExternalizeOrchestratorResponseAsync(r, cancellation); + break; + case P.EntityBatchResult r: + await this.ExternalizeEntityBatchResultAsync(r, cancellation); + break; + case P.EntityBatchRequest r: + await this.ExternalizeEntityBatchRequestAsync(r, cancellation); + break; + case P.EntityRequest r: + r.EntityState = await this.MaybeExternalizeAsync(r.EntityState, cancellation); + break; + } + } + + async Task ExternalizeOrchestratorResponseAsync(P.OrchestratorResponse r, CancellationToken cancellation) + { + r.CustomStatus = await this.MaybeExternalizeAsync(r.CustomStatus, cancellation); + foreach (P.OrchestratorAction a in r.Actions) + { + if (a.CompleteOrchestration is { } complete) + { + complete.Result = await this.MaybeExternalizeAsync(complete.Result, cancellation); + complete.Details = await this.MaybeExternalizeAsync(complete.Details, cancellation); + } + + if (a.TerminateOrchestration is { } term) + { + term.Reason = await this.MaybeExternalizeAsync(term.Reason, cancellation); + } + + if (a.ScheduleTask is { } schedule) + { + schedule.Input = await this.MaybeExternalizeAsync(schedule.Input, cancellation); + } + + if (a.CreateSubOrchestration is { } sub) + { + sub.Input = await this.MaybeExternalizeAsync(sub.Input, cancellation); + } + + if (a.SendEvent is { } sendEvt) + { + sendEvt.Data = await this.MaybeExternalizeAsync(sendEvt.Data, cancellation); + } + + if (a.SendEntityMessage is { } entityMsg) + { + if (entityMsg.EntityOperationSignaled is { } sig) + { + sig.Input = await this.MaybeExternalizeAsync(sig.Input, cancellation); + } + + if (entityMsg.EntityOperationCalled is { } called) + { + called.Input = await this.MaybeExternalizeAsync(called.Input, cancellation); + } + } + } + } + + async Task ExternalizeEntityBatchResultAsync(P.EntityBatchResult r, CancellationToken cancellation) + { + r.EntityState = await this.MaybeExternalizeAsync(r.EntityState, cancellation); + if (r.Results != null) + { + foreach (P.OperationResult result in r.Results) + { + if (result.Success is { } success) + { + success.Result = await this.MaybeExternalizeAsync(success.Result, cancellation); + } + } + } + + if (r.Actions != null) + { + foreach (P.OperationAction action in r.Actions) + { + if (action.SendSignal is { } sendSig) + { + sendSig.Input = await this.MaybeExternalizeAsync(sendSig.Input, cancellation); + } + + if (action.StartNewOrchestration is { } start) + { + start.Input = await this.MaybeExternalizeAsync(start.Input, cancellation); + } + } + } + } + + async Task ExternalizeEntityBatchRequestAsync(P.EntityBatchRequest r, CancellationToken cancellation) + { + r.EntityState = await this.MaybeExternalizeAsync(r.EntityState, cancellation); + if (r.Operations != null) + { + foreach (P.OperationRequest op in r.Operations) + { + op.Input = await this.MaybeExternalizeAsync(op.Input, cancellation); + } + } + } + + async Task ExternalizeHistoryEventAsync(P.HistoryEvent e, CancellationToken cancellation) + { + switch (e.EventTypeCase) + { + case P.HistoryEvent.EventTypeOneofCase.ExecutionStarted: + if (e.ExecutionStarted is { } es) + { + es.Input = await this.MaybeExternalizeAsync(es.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionCompleted: + if (e.ExecutionCompleted is { } ec) + { + ec.Result = await this.MaybeExternalizeAsync(ec.Result, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.EventRaised: + if (e.EventRaised is { } er) + { + er.Input = await this.MaybeExternalizeAsync(er.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.TaskScheduled: + if (e.TaskScheduled is { } ts) + { + ts.Input = await this.MaybeExternalizeAsync(ts.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.TaskCompleted: + if (e.TaskCompleted is { } tc) + { + tc.Result = await this.MaybeExternalizeAsync(tc.Result, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCreated: + if (e.SubOrchestrationInstanceCreated is { } soc) + { + soc.Input = await this.MaybeExternalizeAsync(soc.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCompleted: + if (e.SubOrchestrationInstanceCompleted is { } sox) + { + sox.Result = await this.MaybeExternalizeAsync(sox.Result, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.EventSent: + if (e.EventSent is { } esent) + { + esent.Input = await this.MaybeExternalizeAsync(esent.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.GenericEvent: + if (e.GenericEvent is { } ge) + { + ge.Data = await this.MaybeExternalizeAsync(ge.Data, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.ContinueAsNew: + if (e.ContinueAsNew is { } can) + { + can.Input = await this.MaybeExternalizeAsync(can.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionTerminated: + if (e.ExecutionTerminated is { } et) + { + et.Input = await this.MaybeExternalizeAsync(et.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionSuspended: + if (e.ExecutionSuspended is { } esus) + { + esus.Input = await this.MaybeExternalizeAsync(esus.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionResumed: + if (e.ExecutionResumed is { } eres) + { + eres.Input = await this.MaybeExternalizeAsync(eres.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationSignaled: + if (e.EntityOperationSignaled is { } eos) + { + eos.Input = await this.MaybeExternalizeAsync(eos.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationCalled: + if (e.EntityOperationCalled is { } eoc) + { + eoc.Input = await this.MaybeExternalizeAsync(eoc.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationCompleted: + if (e.EntityOperationCompleted is { } ecomp) + { + ecomp.Output = await this.MaybeExternalizeAsync(ecomp.Output, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.HistoryState: + if (e.HistoryState is { } hs && hs.OrchestrationState is { } os) + { + os.Input = await this.MaybeExternalizeAsync(os.Input, cancellation); + os.Output = await this.MaybeExternalizeAsync(os.Output, cancellation); + os.CustomStatus = await this.MaybeExternalizeAsync(os.CustomStatus, cancellation); + } + + break; + } + } + + /// + protected override async Task ResolveResponsePayloadsAsync(TResponse response, CancellationToken cancellation) + { + // Backend Service -> Azure Managed Backend + // Note: This interceptor is designed for backend_service.proto types, but since those types + // are not yet generated with the updated namespace, we'll use the existing orchestrator_service.proto types + // for now. The user should run refresh-protos.ps1 to generate the proper types. + switch (response) + { + case P.GetInstanceResponse r when r.OrchestrationState is { } s: + await this.MaybeResolveAsync(v => s.Input = v, s.Input, cancellation); + await this.MaybeResolveAsync(v => s.Output = v, s.Output, cancellation); + await this.MaybeResolveAsync(v => s.CustomStatus = v, s.CustomStatus, cancellation); + break; + case P.HistoryChunk c when c.Events != null: + foreach (P.HistoryEvent e in c.Events) + { + await this.ResolveEventPayloadsAsync(e, cancellation); + } + + break; + case P.QueryInstancesResponse r: + foreach (P.OrchestrationState s in r.OrchestrationState) + { + await this.MaybeResolveAsync(v => s.Input = v, s.Input, cancellation); + await this.MaybeResolveAsync(v => s.Output = v, s.Output, cancellation); + await this.MaybeResolveAsync(v => s.CustomStatus = v, s.CustomStatus, cancellation); + } + + break; + case P.GetEntityResponse r when r.Entity is { } em: + await this.MaybeResolveAsync(v => em.SerializedState = v, em.SerializedState, cancellation); + break; + case P.QueryEntitiesResponse r: + foreach (P.EntityMetadata em in r.Entities) + { + await this.MaybeResolveAsync(v => em.SerializedState = v, em.SerializedState, cancellation); + } + + break; + case P.WorkItem wi: + // Resolve activity input + if (wi.ActivityRequest is { } ar) + { + await this.MaybeResolveAsync(v => ar.Input = v, ar.Input, cancellation); + } + + // Resolve orchestration input embedded in ExecutionStarted event and external events + if (wi.OrchestratorRequest is { } or) + { + foreach (P.HistoryEvent? e in or.PastEvents) + { + await this.ResolveEventPayloadsAsync(e, cancellation); + } + + foreach (P.HistoryEvent? e in or.NewEvents) + { + await this.ResolveEventPayloadsAsync(e, cancellation); + } + } + + // Resolve entity V1 batch request (OperationRequest inputs and entity state) + if (wi.EntityRequest is { } er1) + { + await this.MaybeResolveAsync(v => er1.EntityState = v, er1.EntityState, cancellation); + if (er1.Operations != null) + { + foreach (P.OperationRequest op in er1.Operations) + { + await this.MaybeResolveAsync(v => op.Input = v, op.Input, cancellation); + } + } + } + + // Resolve entity V2 request (history-based operation requests and entity state) + if (wi.EntityRequestV2 is { } er2) + { + await this.MaybeResolveAsync(v => er2.EntityState = v, er2.EntityState, cancellation); + if (er2.OperationRequests != null) + { + foreach (P.HistoryEvent opEvt in er2.OperationRequests) + { + await this.ResolveEventPayloadsAsync(opEvt, cancellation); + } + } + } + + break; + } + } + + async Task ResolveEventPayloadsAsync(P.HistoryEvent e, CancellationToken cancellation) + { + switch (e.EventTypeCase) + { + case P.HistoryEvent.EventTypeOneofCase.ExecutionStarted: + if (e.ExecutionStarted is { } es) + { + await this.MaybeResolveAsync(v => es.Input = v, es.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionCompleted: + if (e.ExecutionCompleted is { } ec) + { + await this.MaybeResolveAsync(v => ec.Result = v, ec.Result, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.EventRaised: + if (e.EventRaised is { } er) + { + await this.MaybeResolveAsync(v => er.Input = v, er.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.TaskScheduled: + if (e.TaskScheduled is { } ts) + { + await this.MaybeResolveAsync(v => ts.Input = v, ts.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.TaskCompleted: + if (e.TaskCompleted is { } tc) + { + await this.MaybeResolveAsync(v => tc.Result = v, tc.Result, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCreated: + if (e.SubOrchestrationInstanceCreated is { } soc) + { + await this.MaybeResolveAsync(v => soc.Input = v, soc.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCompleted: + if (e.SubOrchestrationInstanceCompleted is { } sox) + { + await this.MaybeResolveAsync(v => sox.Result = v, sox.Result, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.EventSent: + if (e.EventSent is { } esent) + { + await this.MaybeResolveAsync(v => esent.Input = v, esent.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.GenericEvent: + if (e.GenericEvent is { } ge) + { + await this.MaybeResolveAsync(v => ge.Data = v, ge.Data, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.ContinueAsNew: + if (e.ContinueAsNew is { } can) + { + await this.MaybeResolveAsync(v => can.Input = v, can.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionTerminated: + if (e.ExecutionTerminated is { } et) + { + await this.MaybeResolveAsync(v => et.Input = v, et.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionSuspended: + if (e.ExecutionSuspended is { } esus) + { + await this.MaybeResolveAsync(v => esus.Input = v, esus.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionResumed: + if (e.ExecutionResumed is { } eres) + { + await this.MaybeResolveAsync(v => eres.Input = v, eres.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationSignaled: + if (e.EntityOperationSignaled is { } eos) + { + await this.MaybeResolveAsync(v => eos.Input = v, eos.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationCalled: + if (e.EntityOperationCalled is { } eoc) + { + await this.MaybeResolveAsync(v => eoc.Input = v, eoc.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationCompleted: + if (e.EntityOperationCompleted is { } ecomp) + { + await this.MaybeResolveAsync(v => ecomp.Output = v, ecomp.Output, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.HistoryState: + if (e.HistoryState is { } hs && hs.OrchestrationState is { } os) + { + await this.MaybeResolveAsync(v => os.Input = v, os.Input, cancellation); + await this.MaybeResolveAsync(v => os.Output = v, os.Output, cancellation); + await this.MaybeResolveAsync(v => os.CustomStatus = v, os.CustomStatus, cancellation); + } + + break; + } + } +} diff --git a/src/Extensions/AzureBlobPayloads/Interceptors/AzureBlobPayloadsSideCarInterceptor.cs b/src/Extensions/AzureBlobPayloads/Interceptors/AzureBlobPayloadsSideCarInterceptor.cs new file mode 100644 index 00000000..60db027e --- /dev/null +++ b/src/Extensions/AzureBlobPayloads/Interceptors/AzureBlobPayloadsSideCarInterceptor.cs @@ -0,0 +1,363 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Grpc.Core.Interceptors; + +using P = Microsoft.DurableTask.Protobuf; + +namespace Microsoft.DurableTask; + +/// +/// gRPC interceptor that externalizes large payloads to an on requests +/// and resolves known payload tokens on responses for SideCar. +/// +public sealed class AzureBlobPayloadsSideCarInterceptor(PayloadStore payloadStore, LargePayloadStorageOptions options) + : BasePayloadInterceptor(payloadStore, options) +{ + /// + protected override async Task ExternalizeRequestPayloadsAsync(TRequest request, CancellationToken cancellation) + { + // Client -> sidecar + switch (request) + { + case P.CreateInstanceRequest r: + r.Input = await this.MaybeExternalizeAsync(r.Input, cancellation); + break; + case P.RaiseEventRequest r: + r.Input = await this.MaybeExternalizeAsync(r.Input, cancellation); + break; + case P.TerminateRequest r: + r.Output = await this.MaybeExternalizeAsync(r.Output, cancellation); + break; + case P.SuspendRequest r: + r.Reason = await this.MaybeExternalizeAsync(r.Reason, cancellation); + break; + case P.ResumeRequest r: + r.Reason = await this.MaybeExternalizeAsync(r.Reason, cancellation); + break; + case P.SignalEntityRequest r: + r.Input = await this.MaybeExternalizeAsync(r.Input, cancellation); + break; + case P.ActivityResponse r: + r.Result = await this.MaybeExternalizeAsync(r.Result, cancellation); + break; + case P.OrchestratorResponse r: + await this.ExternalizeOrchestratorResponseAsync(r, cancellation); + break; + case P.EntityBatchResult r: + await this.ExternalizeEntityBatchResultAsync(r, cancellation); + break; + case P.EntityBatchRequest r: + await this.ExternalizeEntityBatchRequestAsync(r, cancellation); + break; + case P.EntityRequest r: + r.EntityState = await this.MaybeExternalizeAsync(r.EntityState, cancellation); + break; + } + } + + /// + protected override async Task ResolveResponsePayloadsAsync(TResponse response, CancellationToken cancellation) + { + // Sidecar -> client/worker + switch (response) + { + case P.GetInstanceResponse r when r.OrchestrationState is { } s: + await this.MaybeResolveAsync(v => s.Input = v, s.Input, cancellation); + await this.MaybeResolveAsync(v => s.Output = v, s.Output, cancellation); + await this.MaybeResolveAsync(v => s.CustomStatus = v, s.CustomStatus, cancellation); + break; + case P.HistoryChunk c when c.Events != null: + foreach (P.HistoryEvent e in c.Events) + { + await this.ResolveEventPayloadsAsync(e, cancellation); + } + + break; + case P.QueryInstancesResponse r: + foreach (P.OrchestrationState s in r.OrchestrationState) + { + await this.MaybeResolveAsync(v => s.Input = v, s.Input, cancellation); + await this.MaybeResolveAsync(v => s.Output = v, s.Output, cancellation); + await this.MaybeResolveAsync(v => s.CustomStatus = v, s.CustomStatus, cancellation); + } + + break; + case P.GetEntityResponse r when r.Entity is { } em: + await this.MaybeResolveAsync(v => em.SerializedState = v, em.SerializedState, cancellation); + break; + case P.QueryEntitiesResponse r: + foreach (P.EntityMetadata em in r.Entities) + { + await this.MaybeResolveAsync(v => em.SerializedState = v, em.SerializedState, cancellation); + } + + break; + case P.WorkItem wi: + // Resolve activity input + if (wi.ActivityRequest is { } ar) + { + await this.MaybeResolveAsync(v => ar.Input = v, ar.Input, cancellation); + } + + // Resolve orchestration input embedded in ExecutionStarted event and external events + if (wi.OrchestratorRequest is { } or) + { + foreach (P.HistoryEvent? e in or.PastEvents) + { + await this.ResolveEventPayloadsAsync(e, cancellation); + } + + foreach (P.HistoryEvent? e in or.NewEvents) + { + await this.ResolveEventPayloadsAsync(e, cancellation); + } + } + + // Resolve entity V1 batch request (OperationRequest inputs and entity state) + if (wi.EntityRequest is { } er1) + { + await this.MaybeResolveAsync(v => er1.EntityState = v, er1.EntityState, cancellation); + if (er1.Operations != null) + { + foreach (P.OperationRequest op in er1.Operations) + { + await this.MaybeResolveAsync(v => op.Input = v, op.Input, cancellation); + } + } + } + + // Resolve entity V2 request (history-based operation requests and entity state) + if (wi.EntityRequestV2 is { } er2) + { + await this.MaybeResolveAsync(v => er2.EntityState = v, er2.EntityState, cancellation); + if (er2.OperationRequests != null) + { + foreach (P.HistoryEvent opEvt in er2.OperationRequests) + { + await this.ResolveEventPayloadsAsync(opEvt, cancellation); + } + } + } + + break; + } + } + + async Task ExternalizeOrchestratorResponseAsync(P.OrchestratorResponse r, CancellationToken cancellation) + { + r.CustomStatus = await this.MaybeExternalizeAsync(r.CustomStatus, cancellation); + foreach (P.OrchestratorAction a in r.Actions) + { + if (a.CompleteOrchestration is { } complete) + { + complete.Result = await this.MaybeExternalizeAsync(complete.Result, cancellation); + complete.Details = await this.MaybeExternalizeAsync(complete.Details, cancellation); + } + + if (a.TerminateOrchestration is { } term) + { + term.Reason = await this.MaybeExternalizeAsync(term.Reason, cancellation); + } + + if (a.ScheduleTask is { } schedule) + { + schedule.Input = await this.MaybeExternalizeAsync(schedule.Input, cancellation); + } + + if (a.CreateSubOrchestration is { } sub) + { + sub.Input = await this.MaybeExternalizeAsync(sub.Input, cancellation); + } + + if (a.SendEvent is { } sendEvt) + { + sendEvt.Data = await this.MaybeExternalizeAsync(sendEvt.Data, cancellation); + } + + if (a.SendEntityMessage is { } entityMsg) + { + if (entityMsg.EntityOperationSignaled is { } sig) + { + sig.Input = await this.MaybeExternalizeAsync(sig.Input, cancellation); + } + + if (entityMsg.EntityOperationCalled is { } called) + { + called.Input = await this.MaybeExternalizeAsync(called.Input, cancellation); + } + } + } + } + + async Task ExternalizeEntityBatchResultAsync(P.EntityBatchResult r, CancellationToken cancellation) + { + r.EntityState = await this.MaybeExternalizeAsync(r.EntityState, cancellation); + if (r.Results != null) + { + foreach (P.OperationResult result in r.Results) + { + if (result.Success is { } success) + { + success.Result = await this.MaybeExternalizeAsync(success.Result, cancellation); + } + } + } + + if (r.Actions != null) + { + foreach (P.OperationAction action in r.Actions) + { + if (action.SendSignal is { } sendSig) + { + sendSig.Input = await this.MaybeExternalizeAsync(sendSig.Input, cancellation); + } + + if (action.StartNewOrchestration is { } start) + { + start.Input = await this.MaybeExternalizeAsync(start.Input, cancellation); + } + } + } + } + + async Task ExternalizeEntityBatchRequestAsync(P.EntityBatchRequest r, CancellationToken cancellation) + { + r.EntityState = await this.MaybeExternalizeAsync(r.EntityState, cancellation); + if (r.Operations != null) + { + foreach (P.OperationRequest op in r.Operations) + { + op.Input = await this.MaybeExternalizeAsync(op.Input, cancellation); + } + } + } + + async Task ResolveEventPayloadsAsync(P.HistoryEvent e, CancellationToken cancellation) + { + switch (e.EventTypeCase) + { + case P.HistoryEvent.EventTypeOneofCase.ExecutionStarted: + if (e.ExecutionStarted is { } es) + { + await this.MaybeResolveAsync(v => es.Input = v, es.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionCompleted: + if (e.ExecutionCompleted is { } ec) + { + await this.MaybeResolveAsync(v => ec.Result = v, ec.Result, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.EventRaised: + if (e.EventRaised is { } er) + { + await this.MaybeResolveAsync(v => er.Input = v, er.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.TaskScheduled: + if (e.TaskScheduled is { } ts) + { + await this.MaybeResolveAsync(v => ts.Input = v, ts.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.TaskCompleted: + if (e.TaskCompleted is { } tc) + { + await this.MaybeResolveAsync(v => tc.Result = v, tc.Result, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCreated: + if (e.SubOrchestrationInstanceCreated is { } soc) + { + await this.MaybeResolveAsync(v => soc.Input = v, soc.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCompleted: + if (e.SubOrchestrationInstanceCompleted is { } sox) + { + await this.MaybeResolveAsync(v => sox.Result = v, sox.Result, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.EventSent: + if (e.EventSent is { } esent) + { + await this.MaybeResolveAsync(v => esent.Input = v, esent.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.GenericEvent: + if (e.GenericEvent is { } ge) + { + await this.MaybeResolveAsync(v => ge.Data = v, ge.Data, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.ContinueAsNew: + if (e.ContinueAsNew is { } can) + { + await this.MaybeResolveAsync(v => can.Input = v, can.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionTerminated: + if (e.ExecutionTerminated is { } et) + { + await this.MaybeResolveAsync(v => et.Input = v, et.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionSuspended: + if (e.ExecutionSuspended is { } esus) + { + await this.MaybeResolveAsync(v => esus.Input = v, esus.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionResumed: + if (e.ExecutionResumed is { } eres) + { + await this.MaybeResolveAsync(v => eres.Input = v, eres.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationSignaled: + if (e.EntityOperationSignaled is { } eos) + { + await this.MaybeResolveAsync(v => eos.Input = v, eos.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationCalled: + if (e.EntityOperationCalled is { } eoc) + { + await this.MaybeResolveAsync(v => eoc.Input = v, eoc.Input, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationCompleted: + if (e.EntityOperationCompleted is { } ecomp) + { + await this.MaybeResolveAsync(v => ecomp.Output = v, ecomp.Output, cancellation); + } + + break; + case P.HistoryEvent.EventTypeOneofCase.HistoryState: + if (e.HistoryState is { } hs && hs.OrchestrationState is { } os) + { + await this.MaybeResolveAsync(v => os.Input = v, os.Input, cancellation); + await this.MaybeResolveAsync(v => os.Output = v, os.Output, cancellation); + await this.MaybeResolveAsync(v => os.CustomStatus = v, os.CustomStatus, cancellation); + } + + break; + } + } +} diff --git a/src/Extensions/AzureBlobPayloads/Interceptors/BasePayloadInterceptor.cs b/src/Extensions/AzureBlobPayloads/Interceptors/BasePayloadInterceptor.cs new file mode 100644 index 00000000..80e90f88 --- /dev/null +++ b/src/Extensions/AzureBlobPayloads/Interceptors/BasePayloadInterceptor.cs @@ -0,0 +1,223 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Text; +using Grpc.Core; +using Grpc.Core.Interceptors; + +namespace Microsoft.DurableTask; + +/// +/// Base class for gRPC interceptors that externalize large payloads to an on requests +/// and resolves known payload tokens on responses. +/// +/// The namespace for request message types. +/// The namespace for response message types. +public abstract class BasePayloadInterceptor : Interceptor + where TRequestNamespace : class + where TResponseNamespace : class +{ + readonly PayloadStore payloadStore; + readonly LargePayloadStorageOptions options; + + /// + /// Initializes a new instance of the class. + /// + /// The payload store. + /// The options. + protected BasePayloadInterceptor(PayloadStore payloadStore, LargePayloadStorageOptions options) + { + this.payloadStore = payloadStore; + this.options = options; + } + + // Unary: externalize on request, resolve on response + + /// + public override AsyncUnaryCall AsyncUnaryCall( + TRequest request, + ClientInterceptorContext context, + AsyncUnaryCallContinuation continuation) + { + // Build the underlying call lazily after async externalization + Task> startCallTask = Task.Run(async () => + { + // Externalize first; if this fails, do not proceed to send the gRPC call + await this.ExternalizeRequestPayloadsAsync(request, context.Options.CancellationToken); + + // Only if externalization succeeds, proceed with the continuation + return continuation(request, context); + }); + + async Task ResponseAsync() + { + AsyncUnaryCall innerCall = await startCallTask; + TResponse response = await innerCall.ResponseAsync; + await this.ResolveResponsePayloadsAsync(response, context.Options.CancellationToken); + return response; + } + + async Task ResponseHeadersAsync() + { + AsyncUnaryCall innerCall = await startCallTask; + return await innerCall.ResponseHeadersAsync; + } + + Status GetStatus() + { + if (startCallTask.IsCanceled) + { + return new Status(StatusCode.Cancelled, "Call was cancelled."); + } + + if (startCallTask.IsFaulted) + { + return new Status(StatusCode.Internal, startCallTask.Exception?.Message ?? "Unknown error"); + } + + if (startCallTask.Status == TaskStatus.RanToCompletion) + { + return startCallTask.Result.GetStatus(); + } + + // Not started yet; unknown + return new Status(StatusCode.Unknown, string.Empty); + } + + Metadata GetTrailers() + { + return startCallTask.Status == TaskStatus.RanToCompletion ? startCallTask.Result.GetTrailers() : []; + } + + void Dispose() + { + _ = startCallTask.ContinueWith( + t => + { + if (t.Status == TaskStatus.RanToCompletion) + { + t.Result.Dispose(); + } + }, + CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + } + + return new AsyncUnaryCall( + ResponseAsync(), + ResponseHeadersAsync(), + GetStatus, + GetTrailers, + Dispose); + } + + // Server streaming: resolve payloads in streamed responses (e.g., GetWorkItems) + + /// + public override AsyncServerStreamingCall AsyncServerStreamingCall( + TRequest request, + ClientInterceptorContext context, + AsyncServerStreamingCallContinuation continuation) + { + // For streaming, request externalization is not needed currently + AsyncServerStreamingCall call = continuation(request, context); + + IAsyncStreamReader wrapped = new TransformingStreamReader(call.ResponseStream, async (msg, ct) => + { + await this.ResolveResponsePayloadsAsync(msg, ct); + return msg; + }); + + return new AsyncServerStreamingCall( + wrapped, + call.ResponseHeadersAsync, + call.GetStatus, + call.GetTrailers, + call.Dispose); + } + + /// + /// Externalizes large payloads in request messages. + /// + /// The request type. + /// The request to process. + /// Cancellation token. + /// A task representing the async operation. + protected abstract Task ExternalizeRequestPayloadsAsync(TRequest request, CancellationToken cancellation); + + /// + /// Resolves payload tokens in response messages. + /// + /// The response type. + /// The response to process. + /// Cancellation token. + /// A task representing the async operation. + protected abstract Task ResolveResponsePayloadsAsync(TResponse response, CancellationToken cancellation); + + /// + /// Externalizes a payload if it exceeds the threshold. + /// + /// The value to potentially externalize. + /// Cancellation token. + /// A task that returns the externalized token or the original value. + protected async Task MaybeExternalizeAsync(string? value, CancellationToken cancellation) + { + if (string.IsNullOrEmpty(value)) + { + return value; + } + + int size = Encoding.UTF8.GetByteCount(value); + if (size < this.options.ExternalizeThresholdBytes) + { + return value; + } + + return await this.payloadStore.UploadAsync(value, cancellation); + } + + /// + /// Resolves a payload token if it's known to the store. + /// + /// Action to assign the resolved value. + /// The value to potentially resolve. + /// Cancellation token. + /// A task representing the async operation. + protected async Task MaybeResolveAsync(Action assign, string? value, CancellationToken cancellation) + { + if (string.IsNullOrEmpty(value) || !this.payloadStore.IsKnownPayloadToken(value)) + { + return; + } + + string resolved = await this.payloadStore.DownloadAsync(value, cancellation); + assign(resolved); + } + + sealed class TransformingStreamReader : IAsyncStreamReader + { + readonly IAsyncStreamReader inner; + readonly Func> transform; + + public TransformingStreamReader(IAsyncStreamReader inner, Func> transform) + { + this.inner = inner; + this.transform = transform; + } + + public T Current { get; private set; } = default!; + + public async Task MoveNext(CancellationToken cancellationToken) + { + bool hasNext = await this.inner.MoveNext(cancellationToken); + if (!hasNext) + { + return false; + } + + this.Current = await this.transform(this.inner.Current, cancellationToken); + return true; + } + } +} diff --git a/src/Extensions/AzureBlobPayloads/Options/LargePayloadStorageOptions.cs b/src/Extensions/AzureBlobPayloads/Options/LargePayloadStorageOptions.cs new file mode 100644 index 00000000..115a9b08 --- /dev/null +++ b/src/Extensions/AzureBlobPayloads/Options/LargePayloadStorageOptions.cs @@ -0,0 +1,93 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Azure.Core; + +// Intentionally no DataAnnotations to avoid extra package requirements in minimal hosts. +namespace Microsoft.DurableTask; + +/// +/// Options for externalized payload storage, used by SDKs to store large payloads out-of-band. +/// Supports both connection string and identity-based authentication. +/// +/// +/// Connection string authentication: +/// +/// var options = new LargePayloadStorageOptions("DefaultEndpointsProtocol=https;AccountName=mystorageaccount;AccountKey=..."); +/// +/// +/// Identity-based authentication: +/// +/// var options = new LargePayloadStorageOptions( +/// new Uri("https://mystorageaccount.blob.core.windows.net"), +/// new DefaultAzureCredential()); +/// +/// +/// +public sealed class LargePayloadStorageOptions +{ + /// + /// Initializes a new instance of the class. + /// Parameterless constructor required for options activation. + /// + public LargePayloadStorageOptions() + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The Azure Storage connection string to the customer's storage account. + public LargePayloadStorageOptions(string connectionString) + { + Check.NotNullOrEmpty(connectionString, nameof(connectionString)); + this.ConnectionString = connectionString; + } + + /// + /// Initializes a new instance of the class. + /// + /// The Azure Storage account URI. + /// The credential to use for authentication. + public LargePayloadStorageOptions(Uri accountUri, TokenCredential credential) + { + Check.NotNull(accountUri, nameof(accountUri)); + Check.NotNull(credential, nameof(credential)); + this.AccountUri = accountUri; + this.Credential = credential; + } + + /// + /// Gets or sets the threshold in bytes at which payloads are externalized. Default is 900_000 bytes. + /// + public int ExternalizeThresholdBytes { get; set; } = 900_000; // leave headroom below 1MB + + /// + /// Gets or sets the Azure Storage connection string to the customer's storage account. + /// Either this or and must be set. + /// + public string ConnectionString { get; set; } = string.Empty; + + /// + /// Gets or sets the Azure Storage account URI. + /// Either this and or must be set. + /// + public Uri? AccountUri { get; set; } + + /// + /// Gets or sets the credential to use for authentication. + /// Either this and or must be set. + /// + public TokenCredential? Credential { get; set; } + + /// + /// Gets or sets the blob container name to use for payloads. Defaults to "durabletask-payloads". + /// + public string ContainerName { get; set; } = "durabletask-payloads"; + + /// + /// Gets or sets a value indicating whether payloads should be gzip-compressed when stored. + /// Defaults to true for reduced storage and bandwidth. + /// + public bool CompressPayloads { get; set; } = true; +} diff --git a/src/Extensions/AzureBlobPayloads/PayloadStore/BlobPayloadStore.cs b/src/Extensions/AzureBlobPayloads/PayloadStore/BlobPayloadStore.cs new file mode 100644 index 00000000..5d0757a4 --- /dev/null +++ b/src/Extensions/AzureBlobPayloads/PayloadStore/BlobPayloadStore.cs @@ -0,0 +1,166 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.IO.Compression; +using System.Text; +using Azure.Core; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; + +namespace Microsoft.DurableTask; + +/// +/// Azure Blob Storage implementation of . +/// Stores payloads as blobs and returns opaque tokens in the form "blob:v1:<container>:<blobName>". +/// +sealed class BlobPayloadStore : PayloadStore +{ + const string TokenPrefix = "blob:v1:"; + const string ContentEncodingGzip = "gzip"; + const int DefaultCopyBufferSize = 81920; + const int MaxRetryAttempts = 8; + const int BaseDelayMs = 250; + const int MaxDelayMs = 10_000; + const int NetworkTimeoutMinutes = 2; + readonly BlobContainerClient containerClient; + readonly LargePayloadStorageOptions options; + + /// + /// Initializes a new instance of the class. + /// + /// The options for the blob payload store. + /// Thrown when is null. + /// Thrown when neither connection string nor account URI/credential are provided. + public BlobPayloadStore(LargePayloadStorageOptions options) + { + this.options = options ?? throw new ArgumentNullException(nameof(options)); + Check.NotNullOrEmpty(options.ContainerName, nameof(options.ContainerName)); + + // Validate that either connection string or account URI/credential are provided + bool hasConnectionString = !string.IsNullOrEmpty(options.ConnectionString); + bool hasIdentityAuth = options.AccountUri != null && options.Credential != null; + + if (!hasConnectionString && !hasIdentityAuth) + { + throw new ArgumentException( + "Either ConnectionString or AccountUri and Credential must be provided.", + nameof(options)); + } + + BlobClientOptions clientOptions = new() + { + Retry = + { + Mode = RetryMode.Exponential, + MaxRetries = MaxRetryAttempts, + Delay = TimeSpan.FromMilliseconds(BaseDelayMs), + MaxDelay = TimeSpan.FromSeconds(MaxDelayMs), + NetworkTimeout = TimeSpan.FromMinutes(NetworkTimeoutMinutes), + }, + }; + + BlobServiceClient serviceClient = hasIdentityAuth + ? new BlobServiceClient(options.AccountUri, options.Credential, clientOptions) + : new BlobServiceClient(options.ConnectionString, clientOptions); + + this.containerClient = serviceClient.GetBlobContainerClient(options.ContainerName); + } + + /// + public override async Task UploadAsync(string payLoad, CancellationToken cancellationToken) + { + // One blob per payload using GUID-based name for uniqueness (stable across retries) + string blobName = $"{Guid.NewGuid():N}"; + BlobClient blob = this.containerClient.GetBlobClient(blobName); + + byte[] payloadBuffer = Encoding.UTF8.GetBytes(payLoad); + + // Ensure container exists (idempotent) + await this.containerClient.CreateIfNotExistsAsync(PublicAccessType.None, default, default, cancellationToken); + + if (this.options.CompressPayloads) + { + BlobOpenWriteOptions writeOptions = new() + { + HttpHeaders = new BlobHttpHeaders { ContentEncoding = ContentEncodingGzip }, + }; + using Stream blobStream = await blob.OpenWriteAsync(true, writeOptions, cancellationToken); + using GZipStream compressedBlobStream = new(blobStream, System.IO.Compression.CompressionLevel.Optimal, leaveOpen: true); + + // using MemoryStream payloadStream = new(payloadBuffer, writable: false); + + // await payloadStream.CopyToAsync(compressedBlobStream, bufferSize: DefaultCopyBufferSize, cancellationToken); + await compressedBlobStream.WriteAsync(payloadBuffer, 0, payloadBuffer.Length, cancellationToken); + await compressedBlobStream.FlushAsync(cancellationToken); + await blobStream.FlushAsync(cancellationToken); + } + else + { + using Stream blobStream = await blob.OpenWriteAsync(true, default, cancellationToken); + + // using MemoryStream payloadStream = new(payloadBuffer, writable: false); + // await payloadStream.CopyToAsync(blobStream, bufferSize: DefaultCopyBufferSize, cancellationToken); + await blobStream.WriteAsync(payloadBuffer, 0, payloadBuffer.Length, cancellationToken); + await blobStream.FlushAsync(cancellationToken); + } + + return EncodeToken(this.containerClient.Name, blobName); + } + + /// + public override async Task DownloadAsync(string token, CancellationToken cancellationToken) + { + (string container, string name) = DecodeToken(token); + if (!string.Equals(container, this.containerClient.Name, StringComparison.Ordinal)) + { + throw new ArgumentException("Token container does not match configured container.", nameof(token)); + } + + BlobClient blob = this.containerClient.GetBlobClient(name); + + using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken: cancellationToken); + Stream contentStream = result.Content; + bool isGzip = string.Equals( + result.Details.ContentEncoding, ContentEncodingGzip, StringComparison.OrdinalIgnoreCase); + + if (isGzip) + { + using GZipStream decompressed = new(contentStream, CompressionMode.Decompress); + using StreamReader reader = new(decompressed, Encoding.UTF8); + return await reader.ReadToEndAsync(); + } + + using StreamReader uncompressedReader = new(contentStream, Encoding.UTF8); + return await uncompressedReader.ReadToEndAsync(); + } + + /// + public override bool IsKnownPayloadToken(string value) + { + if (string.IsNullOrEmpty(value)) + { + return false; + } + + return value.StartsWith(TokenPrefix, StringComparison.Ordinal); + } + + static string EncodeToken(string container, string name) => $"blob:v1:{container}:{name}"; + + static (string Container, string Name) DecodeToken(string token) + { + if (!token.StartsWith("blob:v1:", StringComparison.Ordinal)) + { + throw new ArgumentException("Invalid external payload token.", nameof(token)); + } + + string rest = token.Substring("blob:v1:".Length); + int sep = rest.IndexOf(':'); + if (sep <= 0 || sep >= rest.Length - 1) + { + throw new ArgumentException("Invalid external payload token format.", nameof(token)); + } + + return (rest.Substring(0, sep), rest.Substring(sep + 1)); + } +} diff --git a/src/Extensions/AzureBlobPayloads/PayloadStore/IPayloadStore.cs b/src/Extensions/AzureBlobPayloads/PayloadStore/IPayloadStore.cs new file mode 100644 index 00000000..6d6896d6 --- /dev/null +++ b/src/Extensions/AzureBlobPayloads/PayloadStore/IPayloadStore.cs @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask; + +/// +/// Abstraction for storing and retrieving large payloads out-of-band. +/// +public abstract class PayloadStore +{ + /// + /// Uploads a payload and returns an opaque reference token that can be embedded in orchestration messages. + /// + /// The payload. + /// Cancellation token. + /// Opaque reference token. + public abstract Task UploadAsync(string payLoad, CancellationToken cancellationToken); + + /// + /// Downloads the payload referenced by the token. + /// + /// The opaque reference token. + /// Cancellation token. + /// Payload string. + public abstract Task DownloadAsync(string token, CancellationToken cancellationToken); + + /// + /// Returns true if the specified value appears to be a token understood by this store. + /// Implementations should not throw for unknown tokens. + /// + /// The value to check. + /// true if the value is a token issued by this store; otherwise, false. + public abstract bool IsKnownPayloadToken(string value); +} \ No newline at end of file diff --git a/src/Grpc.AzureManagedBackend/Grpc.AzureManagedBackend.csproj b/src/Grpc.AzureManagedBackend/Grpc.AzureManagedBackend.csproj new file mode 100644 index 00000000..dcb538b5 --- /dev/null +++ b/src/Grpc.AzureManagedBackend/Grpc.AzureManagedBackend.csproj @@ -0,0 +1,31 @@ + + + + netstandard2.0;net6.0 + The gRPC Protobuf .NET services for Durable Task Framework. + + + + + + + + + + + + + + + + + + + + diff --git a/src/Grpc.AzureManagedBackend/README.md b/src/Grpc.AzureManagedBackend/README.md new file mode 100644 index 00000000..a4f5e97b --- /dev/null +++ b/src/Grpc.AzureManagedBackend/README.md @@ -0,0 +1,24 @@ +# Durable Task Protobuf + +This directory contains the protobuf definitions for the Durable Task SDK, which are used to generate the C# source code for the gRPC service contracts. The official protobuf definitions are maintained in the [Durable Task Protobuf repository](https://github.com/microsoft/durabletask-protobuf). + +## Updating the Protobuf Definitions + +To update the protobuf definitions in this directory, follow these steps: + +1. Make sure you have [PowerShell](https://learn.microsoft.com/powershell/scripting/install/installing-powershell) installed on your machine. +2. Run the following command to download the latest protobuf definitions from the Durable Task SDK repository: + +```powershell +.\refresh-protos.ps1 +``` + +This script will download the latest protobuf definitions from the `https://github.com/microsoft/durabletask-protobuf` repository and copy them to this directory. + +By default, the latest versions of the protobufs are downloaded from the `main` branch. To specify an alternative branch, use the `-branch` parameter: + +```powershell +.\refresh-protos.ps1 -branch +``` + +The `versions.txt` file in this directory contains the list of protobuf files and their commit hashes that were last downloaded. It is updated automatically by the `refresh-protos.ps1` script. diff --git a/src/Grpc.AzureManagedBackend/backend_service.proto b/src/Grpc.AzureManagedBackend/backend_service.proto new file mode 100644 index 00000000..8ac980cf --- /dev/null +++ b/src/Grpc.AzureManagedBackend/backend_service.proto @@ -0,0 +1,282 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +syntax = "proto3"; + +package durabletask.protos.backend.v1; + +option csharp_namespace = "Microsoft.DurableTask.AzureManagedBackend.Protobuf"; +option java_package = "com.microsoft.durabletask.implementation.protobuf"; +option go_package = "github.com/microsoft/durabletask-protobuf/internal/protos"; + +import "orchestrator_service.proto"; + + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/wrappers.proto"; +import "google/protobuf/empty.proto"; + +// gRPC service used by Durable Task Framework (DTFx) backend implementations. +// The RPCs in this service are used by DTFx backends to manage orchestration state. +service BackendService { + // Creates a new orchestration instance. + rpc CreateInstance (CreateInstanceRequest) returns (CreateInstanceResponse); + + // Sends an event to an orchestration instance. This RPC is used for raising external events to orchestrations + // and for sending orchestration lifecycle events, such as terminate, suspend, resume, etc. + rpc AddEvent (AddEventRequest) returns (AddEventResponse); + + // Returns metadata about an orchestration instance. + rpc GetInstance (GetInstanceRequest) returns (GetInstanceResponse); + + // Returns metadata about an entity instance. + rpc GetEntity (GetEntityRequest) returns (GetEntityResponse); + + // Returns metadata about multiple orchestration instances using a query. + rpc QueryInstances (QueryInstancesRequest) returns (QueryInstancesResponse); + + // Returns metadata for multiple entities using a query. + rpc QueryEntities(QueryEntitiesRequest) returns (QueryEntitiesResponse); + + // Waits for an orchestration to reach a terminal state and then returns metadata for that orchestration. + rpc WaitForInstance (WaitForInstanceRequest) returns (WaitForInstanceResponse); + + // Purges the state of one or more orchestration instances. + rpc PurgeInstances (PurgeInstancesRequest) returns (PurgeInstancesResponse); + + // Restarts an orchestration instance with the option to use a new instance ID. + rpc RestartInstance (RestartInstanceRequest) returns (RestartInstanceResponse); + + // Cleans entity storage. + rpc CleanEntityStorage(CleanEntityStorageRequest) returns (CleanEntityStorageResponse); + + // Starts a server stream for receiving work items + rpc GetWorkItems (GetWorkItemsRequest) returns (stream WorkItem); + + // Gets orchestration runtime state (history, etc.) for a given orchestration instance. + rpc GetOrchestrationRuntimeState (GetOrchestrationRuntimeStateRequest) returns (GetOrchestrationRuntimeStateResponse); + + // Gets the history of an orchestration instance as a stream of events. + rpc StreamInstanceHistory(StreamInstanceHistoryRequest) returns (stream HistoryChunk); + + // Completes an outstanding activity work item and adds a new event to the target orchestration's inbox. + rpc CompleteActivityWorkItem (CompleteActivityWorkItemRequest) returns (CompleteActivityWorkItemResponse); + + // Abandons an outstanding activity work item. Abandoned work items will be delivered again after some delay. + rpc AbandonActivityWorkItem (AbandonActivityWorkItemRequest) returns (AbandonActivityWorkItemResponse); + + // Completes an outstanding orchestrator work item, and adds a new event to the target orchestration's inbox. + rpc CompleteOrchestrationWorkItem (CompleteOrchestrationWorkItemRequest) returns (CompleteOrchestrationWorkItemResponse); + + // Abandons an outstanding orchestrator work item. Abandoned work items will be delivered again after some delay. + rpc AbandonOrchestrationWorkItem (AbandonOrchestrationWorkItemRequest) returns (AbandonOrchestrationWorkItemResponse); + + // Completes an outstanding entity work item. + rpc CompleteEntityWorkItem (CompleteEntityWorkItemRequest) returns (CompleteEntityWorkItemResponse); + + // Abandons an outstanding entity work item. Abandoned work items will be delivered again after some delay. + rpc AbandonEntityWorkItem (AbandonEntityWorkItemRequest) returns (AbandonEntityWorkItemResponse); + + // Sends a health check ping to the backend service. + rpc Ping (PingRequest) returns (PingResponse); + + // Returns the current metrics for the backend service. + rpc GetMetrics (GetMetricsRequest) returns (GetMetricsResponse); + + // "Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated". + // Note that a maximum of 500 orchestrations can be terminated at a time using this method. + rpc SkipGracefulOrchestrationTerminations(SkipGracefulOrchestrationTerminationsRequest) returns (SkipGracefulOrchestrationTerminationsResponse); +} + +// Request payload for adding new orchestration events. +message AddEventRequest { + // The ID of the orchestration to send an event to. + OrchestrationInstance instance = 1; + // The event to send to the orchestration. + HistoryEvent event = 2; +} + +// Response payload for adding new orchestration events. +message AddEventResponse { + // No fields +} + +// Request payload for waiting for instance completion. +message WaitForInstanceRequest { + string instanceId = 1; + bool getInputsAndOutputs = 2; +} + +// Response payload for waiting for instance completion. +message WaitForInstanceResponse { + bool exists = 1; + OrchestrationState orchestrationState = 2; +} + +// Request parameters for fetching orchestration runtime state. +message GetOrchestrationRuntimeStateRequest { + // The ID of the target orchestration instance. + OrchestrationInstance instance = 1; +} + +// Response payload returned when fetching orchestration runtime state. +message GetOrchestrationRuntimeStateResponse { + // The existing history events for the target orchestration instance. + repeated HistoryEvent history = 1; +} + +// Request payload for completing an activity work item. +message CompleteActivityWorkItemRequest { + // The completion token that was provided when the work item was fetched. + string completionToken = 1; + + // The response event that will be sent to the orchestrator. + // This must be either a TaskCompleted event or a TaskFailed event. + HistoryEvent responseEvent = 2; +} + +// Response payload for completing an activity work item. +message CompleteActivityWorkItemResponse { + // No fields +} + +// Request payload for abandoning an activity work item. +message AbandonActivityWorkItemRequest { + // The completion token that was provided when the work item was fetched. + string completionToken = 1; +} + +// Response payload for abandoning an activity work item. +message AbandonActivityWorkItemResponse { + // No fields +} + +// Request payload for completing an orchestration work item. +message CompleteOrchestrationWorkItemRequest { + // The completion token that was provided when the work item was fetched. + string completionToken = 1; + OrchestrationInstance instance = 2; + OrchestrationStatus runtimeStatus = 3; + google.protobuf.StringValue customStatus = 4; + repeated HistoryEvent newHistory = 5; + repeated HistoryEvent newTasks = 6; + repeated HistoryEvent newTimers = 7; + repeated OrchestratorMessage newMessages = 8; + + // The number of work item events that were processed by the orchestrator. + // This field is optional. If not set, the service should assume that the orchestrator processed all events. + google.protobuf.Int32Value numEventsProcessed = 9; + + OrchestrationTraceContext orchestrationTraceContext = 10; +} + +// Response payload for completing an orchestration work item. +message CompleteOrchestrationWorkItemResponse { + // No fields +} + +// A message to be delivered to an orchestration by the backend. +message OrchestratorMessage { + // The ID of the orchestration instance to receive the message. + OrchestrationInstance instance = 1; + // The event payload to be received by the target orchestration. + HistoryEvent event = 2; +} + +// Request payload for abandoning an orchestration work item. +message AbandonOrchestrationWorkItemRequest { + // The completion token that was provided when the work item was fetched. + string completionToken = 1; +} + +// Response payload for abandoning an orchestration work item. +message AbandonOrchestrationWorkItemResponse { + // No fields +} + +// Request payload for completing an entity work item. +message CompleteEntityWorkItemRequest { + // The completion token that was provided when the work item was fetched. + string completionToken = 1; + + // The execution id of the scheduler. + string executionId = 2; + + // The number of requests that were executed. + // If this is smaller than the number of operations in the work item, + // any left-over operations will be sent again with the next work item. + int32 numberOperationsExecuted = 3; + + // The state of the entity after the executed operations, or null if none + google.protobuf.StringValue entityState = 4; + + // The messages that were sent by the executed operations. This must + // include any responses to the operation calls. + repeated OrchestratorMessage messages = 5; +} + +// Response payload for completing an entity work item. +message CompleteEntityWorkItemResponse { + // No fields +} + +// Request payload for abandoning an entity work item. +message AbandonEntityWorkItemRequest { + // The completion token that was provided when the work item was fetched. + string completionToken = 1; + string reason = 2; +} + +// Response payload for abandoning an entity work item. +message AbandonEntityWorkItemResponse { + // No fields +} + +// Request payload for ping operations. +message PingRequest { + // No fields +} + +// Response payload for ping operations. +message PingResponse { + // No fields +} + +// Request payload for fetching backend metrics. +message GetMetricsRequest { + // No fields +} + +// Response payload for fetching backend metrics +message GetMetricsResponse { + // The current metrics for the backend service. + BackendMetrics metrics = 1; +} + +// Metrics for the backend service. +message BackendMetrics { + // Activity work item metrics + WorkItemMetrics activityWorkItems = 1 [json_name="activityWorkItems"]; + // Orchestrator work item metrics + WorkItemMetrics orchestratorWorkItems = 2 [json_name="orchestratorWorkItems"]; + // Entity work item metrics + WorkItemMetrics entityWorkItems = 3 [json_name="entityWorkItems"]; + // Metrics related to workers currently connected to the backend + ConnectedWorkerMetrics connectedWorkers = 4 [json_name="connectedWorkers"]; +} + +// Metrics related to work items +message WorkItemMetrics { + // Number of work items that are queued and waiting to be processed + int32 pending = 1 [json_name="pending"]; + // Number of work items that are currently being processed + int32 active = 2 [json_name="active"]; + // Age of the oldest work item in the queue, in seconds + int32 oldestAgeInSeconds = 3 [json_name="oldestAgeInSeconds"]; +} + +// Metrics related to workers currently connected to the backend +message ConnectedWorkerMetrics { + // Number of worker instances that are currently connected to the backend + int32 count = 1 [json_name="count"]; +} diff --git a/src/Grpc.AzureManagedBackend/orchestrator_service.proto b/src/Grpc.AzureManagedBackend/orchestrator_service.proto new file mode 100644 index 00000000..ce458d4e --- /dev/null +++ b/src/Grpc.AzureManagedBackend/orchestrator_service.proto @@ -0,0 +1,826 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +syntax = "proto3"; + +option csharp_namespace = "Microsoft.DurableTask.AzureManagedBackend.Protobuf"; +option java_package = "com.microsoft.durabletask.implementation.protobuf"; +option go_package = "/internal/protos"; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/wrappers.proto"; +import "google/protobuf/empty.proto"; +import "google/protobuf/struct.proto"; + +message OrchestrationInstance { + string instanceId = 1; + google.protobuf.StringValue executionId = 2; +} + +message ActivityRequest { + string name = 1; + google.protobuf.StringValue version = 2; + google.protobuf.StringValue input = 3; + OrchestrationInstance orchestrationInstance = 4; + int32 taskId = 5; + TraceContext parentTraceContext = 6; +} + +message ActivityResponse { + string instanceId = 1; + int32 taskId = 2; + google.protobuf.StringValue result = 3; + TaskFailureDetails failureDetails = 4; + string completionToken = 5; +} + +message TaskFailureDetails { + string errorType = 1; + string errorMessage = 2; + google.protobuf.StringValue stackTrace = 3; + TaskFailureDetails innerFailure = 4; + bool isNonRetriable = 5; +} + +enum OrchestrationStatus { + ORCHESTRATION_STATUS_RUNNING = 0; + ORCHESTRATION_STATUS_COMPLETED = 1; + ORCHESTRATION_STATUS_CONTINUED_AS_NEW = 2; + ORCHESTRATION_STATUS_FAILED = 3; + ORCHESTRATION_STATUS_CANCELED = 4; + ORCHESTRATION_STATUS_TERMINATED = 5; + ORCHESTRATION_STATUS_PENDING = 6; + ORCHESTRATION_STATUS_SUSPENDED = 7; +} + +message ParentInstanceInfo { + int32 taskScheduledId = 1; + google.protobuf.StringValue name = 2; + google.protobuf.StringValue version = 3; + OrchestrationInstance orchestrationInstance = 4; +} + +message TraceContext { + string traceParent = 1; + string spanID = 2 [deprecated=true]; + google.protobuf.StringValue traceState = 3; +} + +message ExecutionStartedEvent { + string name = 1; + google.protobuf.StringValue version = 2; + google.protobuf.StringValue input = 3; + OrchestrationInstance orchestrationInstance = 4; + ParentInstanceInfo parentInstance = 5; + google.protobuf.Timestamp scheduledStartTimestamp = 6; + TraceContext parentTraceContext = 7; + google.protobuf.StringValue orchestrationSpanID = 8; + map tags = 9; +} + +message ExecutionCompletedEvent { + OrchestrationStatus orchestrationStatus = 1; + google.protobuf.StringValue result = 2; + TaskFailureDetails failureDetails = 3; +} + +message ExecutionTerminatedEvent { + google.protobuf.StringValue input = 1; + bool recurse = 2; +} + +message TaskScheduledEvent { + string name = 1; + google.protobuf.StringValue version = 2; + google.protobuf.StringValue input = 3; + TraceContext parentTraceContext = 4; + map tags = 5; +} + +message TaskCompletedEvent { + int32 taskScheduledId = 1; + google.protobuf.StringValue result = 2; +} + +message TaskFailedEvent { + int32 taskScheduledId = 1; + TaskFailureDetails failureDetails = 2; +} + +message SubOrchestrationInstanceCreatedEvent { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + google.protobuf.StringValue input = 4; + TraceContext parentTraceContext = 5; +} + +message SubOrchestrationInstanceCompletedEvent { + int32 taskScheduledId = 1; + google.protobuf.StringValue result = 2; +} + +message SubOrchestrationInstanceFailedEvent { + int32 taskScheduledId = 1; + TaskFailureDetails failureDetails = 2; +} + +message TimerCreatedEvent { + google.protobuf.Timestamp fireAt = 1; +} + +message TimerFiredEvent { + google.protobuf.Timestamp fireAt = 1; + int32 timerId = 2; +} + +message OrchestratorStartedEvent { + // No payload data +} + +message OrchestratorCompletedEvent { + // No payload data +} + +message EventSentEvent { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue input = 3; +} + +message EventRaisedEvent { + string name = 1; + google.protobuf.StringValue input = 2; +} + +message GenericEvent { + google.protobuf.StringValue data = 1; +} + +message HistoryStateEvent { + OrchestrationState orchestrationState = 1; +} + +message ContinueAsNewEvent { + google.protobuf.StringValue input = 1; +} + +message ExecutionSuspendedEvent { + google.protobuf.StringValue input = 1; +} + +message ExecutionResumedEvent { + google.protobuf.StringValue input = 1; +} + +message EntityOperationSignaledEvent { + string requestId = 1; + string operation = 2; + google.protobuf.Timestamp scheduledTime = 3; + google.protobuf.StringValue input = 4; + google.protobuf.StringValue targetInstanceId = 5; // used only within histories, null in messages +} + +message EntityOperationCalledEvent { + string requestId = 1; + string operation = 2; + google.protobuf.Timestamp scheduledTime = 3; + google.protobuf.StringValue input = 4; + google.protobuf.StringValue parentInstanceId = 5; // used only within messages, null in histories + google.protobuf.StringValue parentExecutionId = 6; // used only within messages, null in histories + google.protobuf.StringValue targetInstanceId = 7; // used only within histories, null in messages +} + +message EntityLockRequestedEvent { + string criticalSectionId = 1; + repeated string lockSet = 2; + int32 position = 3; + google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories +} + +message EntityOperationCompletedEvent { + string requestId = 1; + google.protobuf.StringValue output = 2; +} + +message EntityOperationFailedEvent { + string requestId = 1; + TaskFailureDetails failureDetails = 2; +} + +message EntityUnlockSentEvent { + string criticalSectionId = 1; + google.protobuf.StringValue parentInstanceId = 2; // used only within messages, null in histories + google.protobuf.StringValue targetInstanceId = 3; // used only within histories, null in messages +} + +message EntityLockGrantedEvent { + string criticalSectionId = 1; +} + +message HistoryEvent { + int32 eventId = 1; + google.protobuf.Timestamp timestamp = 2; + oneof eventType { + ExecutionStartedEvent executionStarted = 3; + ExecutionCompletedEvent executionCompleted = 4; + ExecutionTerminatedEvent executionTerminated = 5; + TaskScheduledEvent taskScheduled = 6; + TaskCompletedEvent taskCompleted = 7; + TaskFailedEvent taskFailed = 8; + SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreated = 9; + SubOrchestrationInstanceCompletedEvent subOrchestrationInstanceCompleted = 10; + SubOrchestrationInstanceFailedEvent subOrchestrationInstanceFailed = 11; + TimerCreatedEvent timerCreated = 12; + TimerFiredEvent timerFired = 13; + OrchestratorStartedEvent orchestratorStarted = 14; + OrchestratorCompletedEvent orchestratorCompleted = 15; + EventSentEvent eventSent = 16; + EventRaisedEvent eventRaised = 17; + GenericEvent genericEvent = 18; + HistoryStateEvent historyState = 19; + ContinueAsNewEvent continueAsNew = 20; + ExecutionSuspendedEvent executionSuspended = 21; + ExecutionResumedEvent executionResumed = 22; + EntityOperationSignaledEvent entityOperationSignaled = 23; + EntityOperationCalledEvent entityOperationCalled = 24; + EntityOperationCompletedEvent entityOperationCompleted = 25; + EntityOperationFailedEvent entityOperationFailed = 26; + EntityLockRequestedEvent entityLockRequested = 27; + EntityLockGrantedEvent entityLockGranted = 28; + EntityUnlockSentEvent entityUnlockSent = 29; + } +} + +message ScheduleTaskAction { + string name = 1; + google.protobuf.StringValue version = 2; + google.protobuf.StringValue input = 3; + map tags = 4; + TraceContext parentTraceContext = 5; +} + +message CreateSubOrchestrationAction { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + google.protobuf.StringValue input = 4; + TraceContext parentTraceContext = 5; +} + +message CreateTimerAction { + google.protobuf.Timestamp fireAt = 1; +} + +message SendEventAction { + OrchestrationInstance instance = 1; + string name = 2; + google.protobuf.StringValue data = 3; +} + +message CompleteOrchestrationAction { + OrchestrationStatus orchestrationStatus = 1; + google.protobuf.StringValue result = 2; + google.protobuf.StringValue details = 3; + google.protobuf.StringValue newVersion = 4; + repeated HistoryEvent carryoverEvents = 5; + TaskFailureDetails failureDetails = 6; +} + +message TerminateOrchestrationAction { + string instanceId = 1; + google.protobuf.StringValue reason = 2; + bool recurse = 3; +} + +message SendEntityMessageAction { + oneof EntityMessageType { + EntityOperationSignaledEvent entityOperationSignaled = 1; + EntityOperationCalledEvent entityOperationCalled = 2; + EntityLockRequestedEvent entityLockRequested = 3; + EntityUnlockSentEvent entityUnlockSent = 4; + } +} + +message OrchestratorAction { + int32 id = 1; + oneof orchestratorActionType { + ScheduleTaskAction scheduleTask = 2; + CreateSubOrchestrationAction createSubOrchestration = 3; + CreateTimerAction createTimer = 4; + SendEventAction sendEvent = 5; + CompleteOrchestrationAction completeOrchestration = 6; + TerminateOrchestrationAction terminateOrchestration = 7; + SendEntityMessageAction sendEntityMessage = 8; + } +} + +message OrchestrationTraceContext { + google.protobuf.StringValue spanID = 1; + google.protobuf.Timestamp spanStartTime = 2; +} + +message OrchestratorRequest { + string instanceId = 1; + google.protobuf.StringValue executionId = 2; + repeated HistoryEvent pastEvents = 3; + repeated HistoryEvent newEvents = 4; + OrchestratorEntityParameters entityParameters = 5; + bool requiresHistoryStreaming = 6; + map properties = 7; + + OrchestrationTraceContext orchestrationTraceContext = 8; +} + +message OrchestratorResponse { + string instanceId = 1; + repeated OrchestratorAction actions = 2; + google.protobuf.StringValue customStatus = 3; + string completionToken = 4; + + // The number of work item events that were processed by the orchestrator. + // This field is optional. If not set, the service should assume that the orchestrator processed all events. + google.protobuf.Int32Value numEventsProcessed = 5; + OrchestrationTraceContext orchestrationTraceContext = 6; + + // Whether or not a history is required to complete the original OrchestratorRequest and none was provided. + bool requiresHistory = 7; +} + +message CreateInstanceRequest { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + google.protobuf.StringValue input = 4; + google.protobuf.Timestamp scheduledStartTimestamp = 5; + OrchestrationIdReusePolicy orchestrationIdReusePolicy = 6; + google.protobuf.StringValue executionId = 7; + map tags = 8; + TraceContext parentTraceContext = 9; + google.protobuf.Timestamp requestTime = 10; +} + +message OrchestrationIdReusePolicy { + repeated OrchestrationStatus replaceableStatus = 1; + reserved 2; +} + +message CreateInstanceResponse { + string instanceId = 1; +} + +message GetInstanceRequest { + string instanceId = 1; + bool getInputsAndOutputs = 2; +} + +message GetInstanceResponse { + bool exists = 1; + OrchestrationState orchestrationState = 2; +} + +message RewindInstanceRequest { + string instanceId = 1; + google.protobuf.StringValue reason = 2; +} + +message RewindInstanceResponse { + // Empty for now. Using explicit type incase we want to add content later. +} + +message OrchestrationState { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + OrchestrationStatus orchestrationStatus = 4; + google.protobuf.Timestamp scheduledStartTimestamp = 5; + google.protobuf.Timestamp createdTimestamp = 6; + google.protobuf.Timestamp lastUpdatedTimestamp = 7; + google.protobuf.StringValue input = 8; + google.protobuf.StringValue output = 9; + google.protobuf.StringValue customStatus = 10; + TaskFailureDetails failureDetails = 11; + google.protobuf.StringValue executionId = 12; + google.protobuf.Timestamp completedTimestamp = 13; + google.protobuf.StringValue parentInstanceId = 14; + map tags = 15; +} + +message RaiseEventRequest { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue input = 3; +} + +message RaiseEventResponse { + // No payload +} + +message TerminateRequest { + string instanceId = 1; + google.protobuf.StringValue output = 2; + bool recursive = 3; +} + +message TerminateResponse { + // No payload +} + +message SuspendRequest { + string instanceId = 1; + google.protobuf.StringValue reason = 2; +} + +message SuspendResponse { + // No payload +} + +message ResumeRequest { + string instanceId = 1; + google.protobuf.StringValue reason = 2; +} + +message ResumeResponse { + // No payload +} + +message QueryInstancesRequest { + InstanceQuery query = 1; +} + +message InstanceQuery{ + repeated OrchestrationStatus runtimeStatus = 1; + google.protobuf.Timestamp createdTimeFrom = 2; + google.protobuf.Timestamp createdTimeTo = 3; + repeated google.protobuf.StringValue taskHubNames = 4; + int32 maxInstanceCount = 5; + google.protobuf.StringValue continuationToken = 6; + google.protobuf.StringValue instanceIdPrefix = 7; + bool fetchInputsAndOutputs = 8; +} + +message QueryInstancesResponse { + repeated OrchestrationState orchestrationState = 1; + google.protobuf.StringValue continuationToken = 2; +} + +message PurgeInstancesRequest { + oneof request { + string instanceId = 1; + PurgeInstanceFilter purgeInstanceFilter = 2; + InstanceBatch instanceBatch = 4; + } + bool recursive = 3; +} + +message PurgeInstanceFilter { + google.protobuf.Timestamp createdTimeFrom = 1; + google.protobuf.Timestamp createdTimeTo = 2; + repeated OrchestrationStatus runtimeStatus = 3; +} + +message PurgeInstancesResponse { + int32 deletedInstanceCount = 1; + google.protobuf.BoolValue isComplete = 2; +} + +message RestartInstanceRequest { + string instanceId = 1; + bool restartWithNewInstanceId = 2; +} + +message RestartInstanceResponse { + string instanceId = 1; +} + +message CreateTaskHubRequest { + bool recreateIfExists = 1; +} + +message CreateTaskHubResponse { + //no playload +} + +message DeleteTaskHubRequest { + //no playload +} + +message DeleteTaskHubResponse { + //no playload +} + +message SignalEntityRequest { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue input = 3; + string requestId = 4; + google.protobuf.Timestamp scheduledTime = 5; + TraceContext parentTraceContext = 6; + google.protobuf.Timestamp requestTime = 7; +} + +message SignalEntityResponse { + // no payload +} + +message GetEntityRequest { + string instanceId = 1; + bool includeState = 2; +} + +message GetEntityResponse { + bool exists = 1; + EntityMetadata entity = 2; +} + +message EntityQuery { + google.protobuf.StringValue instanceIdStartsWith = 1; + google.protobuf.Timestamp lastModifiedFrom = 2; + google.protobuf.Timestamp lastModifiedTo = 3; + bool includeState = 4; + bool includeTransient = 5; + google.protobuf.Int32Value pageSize = 6; + google.protobuf.StringValue continuationToken = 7; +} + +message QueryEntitiesRequest { + EntityQuery query = 1; +} + +message QueryEntitiesResponse { + repeated EntityMetadata entities = 1; + google.protobuf.StringValue continuationToken = 2; +} + +message EntityMetadata { + string instanceId = 1; + google.protobuf.Timestamp lastModifiedTime = 2; + int32 backlogQueueSize = 3; + google.protobuf.StringValue lockedBy = 4; + google.protobuf.StringValue serializedState = 5; +} + +message CleanEntityStorageRequest { + google.protobuf.StringValue continuationToken = 1; + bool removeEmptyEntities = 2; + bool releaseOrphanedLocks = 3; +} + +message CleanEntityStorageResponse { + google.protobuf.StringValue continuationToken = 1; + int32 emptyEntitiesRemoved = 2; + int32 orphanedLocksReleased = 3; +} + +message OrchestratorEntityParameters { + google.protobuf.Duration entityMessageReorderWindow = 1; +} + +message EntityBatchRequest { + string instanceId = 1; + google.protobuf.StringValue entityState = 2; + repeated OperationRequest operations = 3; +} + +message EntityBatchResult { + repeated OperationResult results = 1; + repeated OperationAction actions = 2; + google.protobuf.StringValue entityState = 3; + TaskFailureDetails failureDetails = 4; + string completionToken = 5; + repeated OperationInfo operationInfos = 6; // used only with DTS +} + +message EntityRequest { + string instanceId = 1; + string executionId = 2; + google.protobuf.StringValue entityState = 3; // null if entity does not exist + repeated HistoryEvent operationRequests = 4; +} + +message OperationRequest { + string operation = 1; + string requestId = 2; + google.protobuf.StringValue input = 3; + TraceContext traceContext = 4; +} + +message OperationResult { + oneof resultType { + OperationResultSuccess success = 1; + OperationResultFailure failure = 2; + } +} + +message OperationInfo { + string requestId = 1; + OrchestrationInstance responseDestination = 2; // null for signals +} + +message OperationResultSuccess { + google.protobuf.StringValue result = 1; + google.protobuf.Timestamp startTimeUtc = 2; + google.protobuf.Timestamp endTimeUtc = 3; +} + +message OperationResultFailure { + TaskFailureDetails failureDetails = 1; + google.protobuf.Timestamp startTimeUtc = 2; + google.protobuf.Timestamp endTimeUtc = 3; +} + +message OperationAction { + int32 id = 1; + oneof operationActionType { + SendSignalAction sendSignal = 2; + StartNewOrchestrationAction startNewOrchestration = 3; + } +} + +message SendSignalAction { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue input = 3; + google.protobuf.Timestamp scheduledTime = 4; + google.protobuf.Timestamp requestTime = 5; + TraceContext parentTraceContext = 6; +} + +message StartNewOrchestrationAction { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + google.protobuf.StringValue input = 4; + google.protobuf.Timestamp scheduledTime = 5; + google.protobuf.Timestamp requestTime = 6; + TraceContext parentTraceContext = 7; +} + +message AbandonActivityTaskRequest { + string completionToken = 1; +} + +message AbandonActivityTaskResponse { + // Empty. +} + +message AbandonOrchestrationTaskRequest { + string completionToken = 1; +} + +message AbandonOrchestrationTaskResponse { + // Empty. +} + +message AbandonEntityTaskRequest { + string completionToken = 1; +} + +message AbandonEntityTaskResponse { + // Empty. +} + +message SkipGracefulOrchestrationTerminationsRequest { + InstanceBatch instanceBatch = 1; + google.protobuf.StringValue reason = 2; +} + +message SkipGracefulOrchestrationTerminationsResponse { + // Those instances which could not be terminated because they had locked entities at the time of this termination call, + // are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged) + repeated string unterminatedInstanceIds = 1; +} + +service TaskHubSidecarService { + // Sends a hello request to the sidecar service. + rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty); + + // Starts a new orchestration instance. + rpc StartInstance(CreateInstanceRequest) returns (CreateInstanceResponse); + + // Gets the status of an existing orchestration instance. + rpc GetInstance(GetInstanceRequest) returns (GetInstanceResponse); + + // Rewinds an orchestration instance to last known good state and replays from there. + rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse); + + // Restarts an orchestration instance. + rpc RestartInstance(RestartInstanceRequest) returns (RestartInstanceResponse); + + // Waits for an orchestration instance to reach a running or completion state. + rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse); + + // Waits for an orchestration instance to reach a completion state (completed, failed, terminated, etc.). + rpc WaitForInstanceCompletion(GetInstanceRequest) returns (GetInstanceResponse); + + // Raises an event to a running orchestration instance. + rpc RaiseEvent(RaiseEventRequest) returns (RaiseEventResponse); + + // Terminates a running orchestration instance. + rpc TerminateInstance(TerminateRequest) returns (TerminateResponse); + + // Suspends a running orchestration instance. + rpc SuspendInstance(SuspendRequest) returns (SuspendResponse); + + // Resumes a suspended orchestration instance. + rpc ResumeInstance(ResumeRequest) returns (ResumeResponse); + + // rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse); + + rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse); + rpc PurgeInstances(PurgeInstancesRequest) returns (PurgeInstancesResponse); + + rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem); + rpc CompleteActivityTask(ActivityResponse) returns (CompleteTaskResponse); + rpc CompleteOrchestratorTask(OrchestratorResponse) returns (CompleteTaskResponse); + rpc CompleteEntityTask(EntityBatchResult) returns (CompleteTaskResponse); + + // Gets the history of an orchestration instance as a stream of events. + rpc StreamInstanceHistory(StreamInstanceHistoryRequest) returns (stream HistoryChunk); + + // Deletes and Creates the necessary resources for the orchestration service and the instance store + rpc CreateTaskHub(CreateTaskHubRequest) returns (CreateTaskHubResponse); + + // Deletes the resources for the orchestration service and optionally the instance store + rpc DeleteTaskHub(DeleteTaskHubRequest) returns (DeleteTaskHubResponse); + + // sends a signal to an entity + rpc SignalEntity(SignalEntityRequest) returns (SignalEntityResponse); + + // get information about a specific entity + rpc GetEntity(GetEntityRequest) returns (GetEntityResponse); + + // query entities + rpc QueryEntities(QueryEntitiesRequest) returns (QueryEntitiesResponse); + + // clean entity storage + rpc CleanEntityStorage(CleanEntityStorageRequest) returns (CleanEntityStorageResponse); + + // Abandons a single work item + rpc AbandonTaskActivityWorkItem(AbandonActivityTaskRequest) returns (AbandonActivityTaskResponse); + + // Abandon an orchestration work item + rpc AbandonTaskOrchestratorWorkItem(AbandonOrchestrationTaskRequest) returns (AbandonOrchestrationTaskResponse); + + // Abandon an entity work item + rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse); + + // "Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated". + // Note that a maximum of 500 orchestrations can be terminated at a time using this method. + rpc SkipGracefulOrchestrationTerminations(SkipGracefulOrchestrationTerminationsRequest) returns (SkipGracefulOrchestrationTerminationsResponse); +} + +message GetWorkItemsRequest { + int32 maxConcurrentOrchestrationWorkItems = 1; + int32 maxConcurrentActivityWorkItems = 2; + int32 maxConcurrentEntityWorkItems = 3; + + repeated WorkerCapability capabilities = 10; +} + +enum WorkerCapability { + WORKER_CAPABILITY_UNSPECIFIED = 0; + + // Indicates that the worker is capable of streaming instance history as a more optimized + // alternative to receiving the full history embedded in the orchestrator work-item. + // When set, the service may return work items without any history events as an optimization. + // It is strongly recommended that all SDKs support this capability. + WORKER_CAPABILITY_HISTORY_STREAMING = 1; +} + +message WorkItem { + oneof request { + OrchestratorRequest orchestratorRequest = 1; + ActivityRequest activityRequest = 2; + EntityBatchRequest entityRequest = 3; // (older) used by orchestration services implementations + HealthPing healthPing = 4; + EntityRequest entityRequestV2 = 5; // (newer) used by backend service implementations + } + string completionToken = 10; +} + +message CompleteTaskResponse { + // No payload +} + +message HealthPing { + // No payload +} + +message StreamInstanceHistoryRequest { + string instanceId = 1; + google.protobuf.StringValue executionId = 2; + + // When set to true, the service may return a more optimized response suitable for workers. + bool forWorkItemProcessing = 3; +} + +message HistoryChunk { + repeated HistoryEvent events = 1; +} + +message InstanceBatch { + // A maximum of 500 instance IDs can be provided in this list. + repeated string instanceIds = 1; +} \ No newline at end of file diff --git a/src/Grpc.AzureManagedBackend/refresh-protos.ps1 b/src/Grpc.AzureManagedBackend/refresh-protos.ps1 new file mode 100644 index 00000000..a70e3cb6 --- /dev/null +++ b/src/Grpc.AzureManagedBackend/refresh-protos.ps1 @@ -0,0 +1,66 @@ +#!/usr/bin/env pwsh +param( + [string]$branch = "main" +) + +# Fail with an error if the PowerShell version is less than 7.0 +if ($PSVersionTable.PSVersion -lt [Version]"7.0") { + Write-Error "This script requires PowerShell 7.0 or later." + exit 1 +} + +# Get the commit ID of the latest commit in the durabletask-protobuf repository. +# We need this to download the proto files from the correct commit, avoiding race conditions +# in rare cases where the proto files are updated between the time we download the commit ID +# and the time we download the proto files. +$commitDetails = Invoke-RestMethod -Uri "https://api.github.com/repos/microsoft/durabletask-protobuf/commits/$branch" +$commitId = $commitDetails.sha + +# These are the proto files we need to download from the durabletask-protobuf repository. +$protoFileNames = @( + "orchestrator_service.proto", + "backend_service.proto" +) + +# Download each proto file to the local directory using the above commit ID +foreach ($protoFileName in $protoFileNames) { + $url = "https://raw.githubusercontent.com/microsoft/durabletask-protobuf/$commitId/protos/$protoFileName" + $outputFile = "$PSScriptRoot\$protoFileName" + + try { + Invoke-WebRequest -Uri $url -OutFile $outputFile + } + catch { + Write-Error "Failed to download $url to ${outputFile}: $_" + exit 1 + } + + Write-Output "Downloaded $url to $outputFile" +} + +# Post-process all downloaded proto files to update the namespace +foreach ($protoFileName in $protoFileNames) { + $protoFilePath = "$PSScriptRoot\$protoFileName" + if (Test-Path $protoFilePath) { + $content = Get-Content $protoFilePath -Raw + $content = $content -replace 'option csharp_namespace = "Microsoft\.DurableTask\.Protobuf";', 'option csharp_namespace = "Microsoft.DurableTask.AzureManagedBackend.Protobuf";' + Set-Content -Path $protoFilePath -Value $content -NoNewline + } +} + +# Log the commit ID and the URLs of the downloaded proto files to a versions file. +# Overwrite the file if it already exists. +$versionsFile = "$PSScriptRoot\versions.txt" +Remove-Item -Path $versionsFile -ErrorAction SilentlyContinue + +Add-Content ` + -Path $versionsFile ` + -Value "# The following files were downloaded from branch $branch at $(Get-Date -Format "yyyy-MM-dd HH:mm:ss" -AsUTC) UTC" + +foreach ($protoFileName in $protoFileNames) { + Add-Content ` + -Path $versionsFile ` + -Value "https://raw.githubusercontent.com/microsoft/durabletask-protobuf/$commitId/protos/$protoFileName" +} + +Write-Host "Wrote commit ID $commitId to $versionsFile" -ForegroundColor Green diff --git a/src/Grpc.AzureManagedBackend/versions.txt b/src/Grpc.AzureManagedBackend/versions.txt new file mode 100644 index 00000000..d5d60da9 --- /dev/null +++ b/src/Grpc.AzureManagedBackend/versions.txt @@ -0,0 +1,3 @@ +# The following files were downloaded from branch main at 2025-10-01 21:40:16 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/a4e448066e3d85e676839a8bd23036a36b3c5f88/protos/orchestrator_service.proto +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/a4e448066e3d85e676839a8bd23036a36b3c5f88/protos/backend_service.proto diff --git a/src/Grpc/Grpc.csproj b/src/Grpc/Grpc.csproj index 4839af20..2c489063 100644 --- a/src/Grpc/Grpc.csproj +++ b/src/Grpc/Grpc.csproj @@ -18,7 +18,10 @@ - + diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index df5143bc..52ca10d3 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -469,6 +469,7 @@ message PurgeInstancesRequest { oneof request { string instanceId = 1; PurgeInstanceFilter purgeInstanceFilter = 2; + InstanceBatch instanceBatch = 4; } bool recursive = 3; } @@ -681,8 +682,7 @@ message AbandonEntityTaskResponse { } message SkipGracefulOrchestrationTerminationsRequest { - // A maximum of 500 instance IDs can be provided in this list. - repeated string instanceIds = 1; + InstanceBatch instanceBatch = 1; google.protobuf.StringValue reason = 2; } @@ -818,4 +818,9 @@ message StreamInstanceHistoryRequest { message HistoryChunk { repeated HistoryEvent events = 1; +} + +message InstanceBatch { + // A maximum of 500 instance IDs can be provided in this list. + repeated string instanceIds = 1; } \ No newline at end of file diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index 3e4d1b21..51497576 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,3 @@ -# The following files were downloaded from branch main at 2025-09-17 01:45:58 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/f5745e0d83f608d77871c1894d9260ceaae08967/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2025-09-29 04:31:40 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/a4e448066e3d85e676839a8bd23036a36b3c5f88/protos/orchestrator_service.proto +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/a4e448066e3d85e676839a8bd23036a36b3c5f88/protos/backend_service.proto diff --git a/test/Grpc.IntegrationTests/Grpc.IntegrationTests.csproj b/test/Grpc.IntegrationTests/Grpc.IntegrationTests.csproj index e6b0aee7..ba2c8b68 100644 --- a/test/Grpc.IntegrationTests/Grpc.IntegrationTests.csproj +++ b/test/Grpc.IntegrationTests/Grpc.IntegrationTests.csproj @@ -7,6 +7,7 @@ + diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs index e3a320f7..87f0e631 100644 --- a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs +++ b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs @@ -31,6 +31,9 @@ public class TaskHubGrpcServer : P.TaskHubSidecarService.TaskHubSidecarServiceBa readonly TaskHubDispatcherHost dispatcherHost; readonly IsConnectedSignal isConnectedSignal = new(); readonly SemaphoreSlim sendWorkItemLock = new(initialCount: 1); + readonly ConcurrentDictionary> streamingPastEvents = new(StringComparer.OrdinalIgnoreCase); + + volatile bool supportsHistoryStreaming; // Initialized when a client connects to this service to receive work-item commands. IServerStreamWriter? workerToClientStream; @@ -479,6 +482,8 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, public override async Task GetWorkItems(P.GetWorkItemsRequest request, IServerStreamWriter responseStream, ServerCallContext context) { + // Record whether the client supports history streaming + this.supportsHistoryStreaming = request.Capabilities.Contains(P.WorkerCapability.HistoryStreaming); // Use a lock to mitigate the race condition where we signal the dispatch host to start but haven't // yet saved a reference to the client response stream. lock (this.isConnectedSignal) @@ -521,6 +526,35 @@ public override async Task GetWorkItems(P.GetWorkItemsRequest request, IServerSt } } + public override async Task StreamInstanceHistory(P.StreamInstanceHistoryRequest request, IServerStreamWriter responseStream, ServerCallContext context) + { + if (this.streamingPastEvents.TryGetValue(request.InstanceId, out List? pastEvents)) + { + const int MaxChunkBytes = 256 * 1024; // 256KB per chunk to simulate chunked streaming + int currentSize = 0; + P.HistoryChunk chunk = new(); + + foreach (P.HistoryEvent e in pastEvents) + { + int eventSize = e.CalculateSize(); + if (currentSize > 0 && currentSize + eventSize > MaxChunkBytes) + { + await responseStream.WriteAsync(chunk); + chunk = new P.HistoryChunk(); + currentSize = 0; + } + + chunk.Events.Add(e); + currentSize += eventSize; + } + + if (chunk.Events.Count > 0) + { + await responseStream.WriteAsync(chunk); + } + } + } + /// /// Invoked by the when a work item is available, proxies the call to execute an orchestrator over a gRPC channel. /// @@ -547,16 +581,37 @@ async Task ITaskExecutor.ExecuteOrchestrator( try { + var orkRequest = new P.OrchestratorRequest + { + InstanceId = instance.InstanceId, + ExecutionId = instance.ExecutionId, + NewEvents = { newEvents.Select(ProtobufUtils.ToHistoryEventProto) }, + OrchestrationTraceContext = orchestrationTraceContext, + }; + + // Decide whether to stream based on total size of past events (> 1MiB) + List protoPastEvents = pastEvents.Select(ProtobufUtils.ToHistoryEventProto).ToList(); + int totalBytes = 0; + foreach (P.HistoryEvent ev in protoPastEvents) + { + totalBytes += ev.CalculateSize(); + } + + if (this.supportsHistoryStreaming && totalBytes > (1024)) + { + orkRequest.RequiresHistoryStreaming = true; + // Store past events to serve via StreamInstanceHistory + this.streamingPastEvents[instance.InstanceId] = protoPastEvents; + } + else + { + // Embed full history in the work item + orkRequest.PastEvents.AddRange(protoPastEvents); + } + await this.SendWorkItemToClientAsync(new P.WorkItem { - OrchestratorRequest = new P.OrchestratorRequest - { - InstanceId = instance.InstanceId, - ExecutionId = instance.ExecutionId, - NewEvents = { newEvents.Select(ProtobufUtils.ToHistoryEventProto) }, - OrchestrationTraceContext = orchestrationTraceContext, - PastEvents = { pastEvents.Select(ProtobufUtils.ToHistoryEventProto) }, - } + OrchestratorRequest = orkRequest, }); } catch diff --git a/test/Grpc.IntegrationTests/IntegrationTestBase.cs b/test/Grpc.IntegrationTests/IntegrationTestBase.cs index 642107ef..13d92a7a 100644 --- a/test/Grpc.IntegrationTests/IntegrationTestBase.cs +++ b/test/Grpc.IntegrationTests/IntegrationTestBase.cs @@ -45,9 +45,12 @@ void IDisposable.Dispose() GC.SuppressFinalize(this); } - protected async Task StartWorkerAsync(Action workerConfigure, Action? clientConfigure = null) + protected async Task StartWorkerAsync( + Action workerConfigure, + Action? clientConfigure = null, + Action? servicesConfigure = null) { - IHost host = this.CreateHostBuilder(workerConfigure, clientConfigure).Build(); + IHost host = this.CreateHostBuilder(workerConfigure, clientConfigure, servicesConfigure).Build(); await host.StartAsync(this.TimeoutToken); return new HostTestLifetime(host, this.TimeoutToken); } @@ -57,7 +60,10 @@ protected async Task StartWorkerAsync(Action /// Configures the durable task worker builder. /// Configures the durable task client builder. - protected IHostBuilder CreateHostBuilder(Action workerConfigure, Action? clientConfigure) + protected IHostBuilder CreateHostBuilder( + Action workerConfigure, + Action? clientConfigure, + Action? servicesConfigure) { var host = Host.CreateDefaultBuilder() .ConfigureLogging(b => @@ -69,6 +75,7 @@ protected IHostBuilder CreateHostBuilder(Action worke }) .ConfigureServices((context, services) => { + servicesConfigure?.Invoke(services); services.AddDurableTaskWorker(b => { b.UseGrpc(this.sidecarFixture.Channel); diff --git a/test/Grpc.IntegrationTests/LargePayloadTests.cs b/test/Grpc.IntegrationTests/LargePayloadTests.cs new file mode 100644 index 00000000..6e73f877 --- /dev/null +++ b/test/Grpc.IntegrationTests/LargePayloadTests.cs @@ -0,0 +1,647 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Text.Json; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Converters; +using Microsoft.DurableTask.Worker; +using Microsoft.Extensions.DependencyInjection; +using Xunit.Abstractions; + +namespace Microsoft.DurableTask.Grpc.Tests; + +public class LargePayloadTests(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture) : IntegrationTestBase(output, sidecarFixture) +{ + // Validates client externalizes a large orchestration input and worker resolves it. + [Fact] + public async Task LargeOrchestrationInputAndOutputAndCustomStatus() + { + string largeInput = new string('A', 1024 * 1024); // 1MB + TaskName orchestratorName = nameof(LargeOrchestrationInputAndOutputAndCustomStatus); + + InMemoryPayloadStore fakeStore = new InMemoryPayloadStore(); + + await using HostTestLifetime server = await this.StartWorkerAsync( + worker => + { + worker.AddTasks(tasks => tasks.AddOrchestratorFunc( + orchestratorName, + (ctx, input) => + { + ctx.SetCustomStatus(largeInput); + return Task.FromResult(input + input); + })); + + worker.UseExternalizedPayloads(); + + worker.Services.AddSingleton(fakeStore); + }, + client => + { + client.UseExternalizedPayloads(); + + // Override store with in-memory test double + client.Services.AddSingleton(fakeStore); + }, + services => + { + services.AddExternalizedPayloadStore(opts => + { + opts.ExternalizeThresholdBytes = 1024; + opts.ContainerName = "test"; + opts.ConnectionString = "UseDevelopmentStorage=true"; + }); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: largeInput); + + OrchestrationMetadata completed = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, completed.RuntimeStatus); + + // Validate that the input made a roundtrip and was resolved on the worker + // validate input + string? input = completed.ReadInputAs(); + Assert.NotNull(input); + Assert.Equal(largeInput.Length, input!.Length); + Assert.Equal(largeInput, input); + + string? echoed = completed.ReadOutputAs(); + Assert.NotNull(echoed); + Assert.Equal(largeInput.Length * 2, echoed!.Length); + Assert.Equal(largeInput + largeInput, echoed); + + string? customStatus = completed.ReadCustomStatusAs(); + Assert.NotNull(customStatus); + Assert.Equal(largeInput.Length, customStatus!.Length); + Assert.Equal(largeInput, customStatus); + + // Ensure client externalized the input + Assert.True(fakeStore.UploadCount >= 1); + Assert.True(fakeStore.DownloadCount >= 1); + Assert.Contains(JsonSerializer.Serialize(largeInput), fakeStore.uploadedPayloads); + Assert.Contains(JsonSerializer.Serialize(largeInput + largeInput), fakeStore.uploadedPayloads); + } + + // Validates history streaming path resolves externalized inputs/outputs in HistoryChunk. + [Fact] + public async Task HistoryStreaming_ResolvesPayloads() + { + // Make payloads large enough so that past events history exceeds 1 MiB to trigger streaming + string largeInput = new string('H', 2 * 1024 * 1024); // 2 MiB + string largeOutput = new string('O', 2 * 1024 * 1024); // 2 MiB + TaskName orch = nameof(HistoryStreaming_ResolvesPayloads); + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + + await using HostTestLifetime server = await this.StartWorkerAsync( + worker => + { + worker.AddTasks(tasks => tasks.AddOrchestratorFunc( + orch, + async (ctx, input) => + { + // Emit several events so that the serialized history size grows + for (int i = 0; i < 50; i++) + { + await ctx.CreateTimer(TimeSpan.FromMilliseconds(10), CancellationToken.None); + } + return largeOutput; + })); + + worker.UseExternalizedPayloads(); + worker.Services.AddSingleton(store); + }, + client => + { + client.UseExternalizedPayloads(); + client.Services.AddSingleton(store); + }, + services => + { + services.AddExternalizedPayloadStore(opts => + { + opts.ExternalizeThresholdBytes = 1024; + opts.ContainerName = "test"; + opts.ConnectionString = "UseDevelopmentStorage=true"; + }); + }); + + // Start orchestration with large input to exercise history input resolution + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orch, largeInput); + OrchestrationMetadata completed = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, completed.RuntimeStatus); + Assert.Equal(largeInput, completed.ReadInputAs()); + Assert.Equal(largeOutput, completed.ReadOutputAs()); + Assert.True(store.UploadCount >= 2); + Assert.True(store.DownloadCount >= 2); + } + + // Validates client externalizes large suspend and resume reasons. + [Fact] + public async Task SuspendAndResume_Reason_IsExternalizedByClient() + { + string largeReason1 = new string('Z', 700 * 1024); // 700KB + string largeReason2 = new string('Y', 650 * 1024); // 650KB + TaskName orchestratorName = nameof(SuspendAndResume_Reason_IsExternalizedByClient); + + InMemoryPayloadStore clientStore = new InMemoryPayloadStore(); + + await using HostTestLifetime server = await this.StartWorkerAsync( + worker => + { + // Long-running orchestrator to give time for suspend/resume + worker.AddTasks(tasks => tasks.AddOrchestratorFunc( + orchestratorName, + async (ctx, _) => + { + await ctx.CreateTimer(TimeSpan.FromMinutes(5), CancellationToken.None); + return "done"; + })); + }, + client => + { + // Enable externalization on the client and use the in-memory store to track uploads + client.UseExternalizedPayloads(); + client.Services.AddSingleton(clientStore); + }, + services => + { + services.AddExternalizedPayloadStore(opts => + { + opts.ExternalizeThresholdBytes = 1024; + opts.ContainerName = "test"; + opts.ConnectionString = "UseDevelopmentStorage=true"; + }); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + await server.Client.WaitForInstanceStartAsync(instanceId, this.TimeoutToken); + + // Suspend with large reason (should be externalized by client) + await server.Client.SuspendInstanceAsync(instanceId, largeReason1, this.TimeoutToken); + await server.Client.WaitForInstanceStartAsync(instanceId, this.TimeoutToken); + + // poll up to 5 seconds to verify it is suspended + var deadline1 = DateTime.UtcNow.AddSeconds(5); + while (true) + { + OrchestrationMetadata? status1 = await server.Client.GetInstanceAsync(instanceId, getInputsAndOutputs: false, this.TimeoutToken); + if (status1 is not null && status1.RuntimeStatus == OrchestrationRuntimeStatus.Suspended) + { + break; + } + + if (DateTime.UtcNow >= deadline1) + { + Assert.NotNull(status1); + Assert.Equal(OrchestrationRuntimeStatus.Suspended, status1!.RuntimeStatus); + } + } + // Resume with large reason (should be externalized by client) + await server.Client.ResumeInstanceAsync(instanceId, largeReason2, this.TimeoutToken); + + // verify it is resumed (poll up to 5 seconds) + var deadline = DateTime.UtcNow.AddSeconds(5); + while (true) + { + OrchestrationMetadata? status = await server.Client.GetInstanceAsync(instanceId, getInputsAndOutputs: false, this.TimeoutToken); + if (status is not null && status.RuntimeStatus == OrchestrationRuntimeStatus.Running) + { + break; + } + + if (DateTime.UtcNow >= deadline) + { + Assert.NotNull(status); + Assert.Equal(OrchestrationRuntimeStatus.Running, status!.RuntimeStatus); + } + + await Task.Delay(TimeSpan.FromSeconds(1), this.TimeoutToken); + } + + + + Assert.True(clientStore.UploadCount >= 2); + Assert.Contains(largeReason1, clientStore.uploadedPayloads); + Assert.Contains(largeReason2, clientStore.uploadedPayloads); + } + + // Validates terminating an instance with a large output payload is externalized by the client. + [Fact] + public async Task LargeTerminateWithPayload() + { + string largeInput = new string('I', 900 * 1024); + string largeOutput = new string('T', 900 * 1024); + TaskName orch = nameof(LargeTerminateWithPayload); + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + + await using HostTestLifetime server = await this.StartWorkerAsync( + worker => + { + worker.AddTasks(tasks => tasks.AddOrchestratorFunc( + orch, + async (ctx, _) => + { + await ctx.CreateTimer(TimeSpan.FromSeconds(30), CancellationToken.None); + return null; + })); + + worker.UseExternalizedPayloads(); + worker.Services.AddSingleton(store); + }, + client => + { + client.UseExternalizedPayloads(); + client.Services.AddSingleton(store); + }, + services => + { + services.AddExternalizedPayloadStore(opts => + { + opts.ExternalizeThresholdBytes = 1024; + opts.ContainerName = "test"; + opts.ConnectionString = "UseDevelopmentStorage=true"; + }); + }); + + string id = await server.Client.ScheduleNewOrchestrationInstanceAsync(orch, largeInput); + await server.Client.WaitForInstanceStartAsync(id, this.TimeoutToken); + + await server.Client.TerminateInstanceAsync(id, new TerminateInstanceOptions { Output = largeOutput }, this.TimeoutToken); + + await server.Client.WaitForInstanceCompletionAsync(id, this.TimeoutToken); + OrchestrationMetadata? status = await server.Client.GetInstanceAsync(id, getInputsAndOutputs: false); + Assert.NotNull(status); + Assert.Equal(OrchestrationRuntimeStatus.Terminated, status!.RuntimeStatus); + Assert.True(store.UploadCount >= 1); + Assert.True(store.DownloadCount >= 1); + Assert.Contains(JsonSerializer.Serialize(largeOutput), store.uploadedPayloads); + } + // Validates large custom status and ContinueAsNew input are externalized and resolved across iterations. + [Fact] + public async Task LargeContinueAsNewAndCustomStatus() + { + string largeStatus = new string('S', 700 * 1024); + string largeNextInput = new string('N', 800 * 1024); + string largeFinalOutput = new string('F', 750 * 1024); + TaskName orch = nameof(LargeContinueAsNewAndCustomStatus); + + var shared = new Dictionary(); + InMemoryPayloadStore workerStore = new InMemoryPayloadStore(shared); + + await using HostTestLifetime server = await this.StartWorkerAsync( + worker => + { + worker.AddTasks(tasks => tasks.AddOrchestratorFunc( + orch, + async (ctx, input) => + { + if (input == null) + { + ctx.SetCustomStatus(largeStatus); + ctx.ContinueAsNew(largeNextInput); + // unreachable + return ""; + } + else + { + // second iteration returns final + return largeFinalOutput; + } + })); + + worker.UseExternalizedPayloads(); + worker.Services.AddSingleton(workerStore); + }, + client => + { + client.UseExternalizedPayloads(); + client.Services.AddSingleton(workerStore); + }, + services => + { + services.AddExternalizedPayloadStore(opts => + { + opts.ExternalizeThresholdBytes = 1024; + opts.ContainerName = "test"; + opts.ConnectionString = "UseDevelopmentStorage=true"; + }); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orch); + OrchestrationMetadata completed = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, completed.RuntimeStatus); + Assert.Equal(largeFinalOutput, completed.ReadOutputAs()); + Assert.Contains(JsonSerializer.Serialize(largeStatus), workerStore.uploadedPayloads); + Assert.Contains(JsonSerializer.Serialize(largeNextInput), workerStore.uploadedPayloads); + Assert.Contains(JsonSerializer.Serialize(largeFinalOutput), workerStore.uploadedPayloads); + } + + // Validates large sub-orchestration input and an activity large output in one flow. + [Fact] + public async Task LargeSubOrchestrationAndActivityOutput() + { + string largeChildInput = new string('C', 650 * 1024); + string largeActivityOutput = new string('A', 820 * 1024); + TaskName parent = nameof(LargeSubOrchestrationAndActivityOutput) + "_Parent"; + TaskName child = nameof(LargeSubOrchestrationAndActivityOutput) + "_Child"; + TaskName activity = "ProduceBig"; + + var shared = new Dictionary(); + InMemoryPayloadStore workerStore = new InMemoryPayloadStore(shared); + + await using HostTestLifetime server = await this.StartWorkerAsync( + worker => + { + worker.AddTasks(tasks => tasks + .AddOrchestratorFunc( + parent, + async (ctx, _) => + { + string echoed = await ctx.CallSubOrchestratorAsync(child, largeChildInput); + string act = await ctx.CallActivityAsync(activity); + return echoed.Length + act.Length; + }) + .AddOrchestratorFunc(child, (ctx, input) => Task.FromResult(input)) + .AddActivityFunc(activity, (ctx) => Task.FromResult(largeActivityOutput))); + + worker.UseExternalizedPayloads(); + worker.Services.AddSingleton(workerStore); + }, + client => + { + client.UseExternalizedPayloads(); + client.Services.AddSingleton(workerStore); + }, + services => + { + services.AddExternalizedPayloadStore(opts => + { + opts.ExternalizeThresholdBytes = 1024; + opts.ContainerName = "test"; + opts.ConnectionString = "UseDevelopmentStorage=true"; + }); + }); + + string id = await server.Client.ScheduleNewOrchestrationInstanceAsync(parent); + OrchestrationMetadata done = await server.Client.WaitForInstanceCompletionAsync( + id, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, done.RuntimeStatus); + Assert.Equal(largeChildInput.Length + largeActivityOutput.Length, done.ReadOutputAs()); + Assert.True(workerStore.UploadCount >= 1); + Assert.True(workerStore.DownloadCount >= 1); + Assert.Contains(JsonSerializer.Serialize(largeChildInput), workerStore.uploadedPayloads); + Assert.Contains(JsonSerializer.Serialize(largeActivityOutput), workerStore.uploadedPayloads); + } + + // Validates query with fetch I/O resolves large outputs for completed instances. + [Fact] + public async Task LargeQueryFetchInputsAndOutputs() + { + string largeIn = new string('I', 750 * 1024); + string largeOut = new string('Q', 880 * 1024); + TaskName orch = nameof(LargeQueryFetchInputsAndOutputs); + + var shared = new Dictionary(); + InMemoryPayloadStore workerStore = new InMemoryPayloadStore(shared); + + await using HostTestLifetime server = await this.StartWorkerAsync( + worker => + { + worker.AddTasks(tasks => tasks.AddOrchestratorFunc( + orch, + (ctx, input) => Task.FromResult(largeOut))); + + worker.UseExternalizedPayloads(); + worker.Services.AddSingleton(workerStore); + }, + client => + { + client.UseExternalizedPayloads(); + client.Services.AddSingleton(workerStore); + }, + services => + { + services.AddExternalizedPayloadStore(opts => + { + opts.ExternalizeThresholdBytes = 1024; + opts.ContainerName = "test"; + opts.ConnectionString = "UseDevelopmentStorage=true"; + }); + }); + + string id = await server.Client.ScheduleNewOrchestrationInstanceAsync(orch, largeIn); + await server.Client.WaitForInstanceCompletionAsync(id, getInputsAndOutputs: false, this.TimeoutToken); + + var page = server.Client.GetAllInstancesAsync(new OrchestrationQuery { FetchInputsAndOutputs = true, InstanceIdPrefix = id }); + OrchestrationMetadata? found = null; + await foreach (var item in page) + { + if (item.Name == orch.Name) + { + found = item; + break; + } + } + + Assert.NotNull(found); + Assert.Equal(largeOut, found!.ReadOutputAs()); + Assert.True(workerStore.DownloadCount >= 1); + Assert.True(workerStore.UploadCount >= 1); + Assert.Contains(JsonSerializer.Serialize(largeIn), workerStore.uploadedPayloads); + Assert.Contains(JsonSerializer.Serialize(largeOut), workerStore.uploadedPayloads); + } + // Validates worker externalizes large activity input and delivers resolved payload to activity. + [Fact] + public async Task LargeActivityInputAndOutput() + { + string largeParam = new string('P', 700 * 1024); // 700KB + TaskName orchestratorName = nameof(LargeActivityInputAndOutput); + TaskName activityName = "EchoLength"; + + InMemoryPayloadStore workerStore = new InMemoryPayloadStore(); + + await using HostTestLifetime server = await this.StartWorkerAsync( + worker => + { + worker.AddTasks(tasks => tasks + .AddOrchestratorFunc( + orchestratorName, + (ctx, _) => ctx.CallActivityAsync(activityName, largeParam)) + .AddActivityFunc(activityName, (ctx, input) => input + input)); + + worker.UseExternalizedPayloads(); + worker.Services.AddSingleton(workerStore); + }, + client => { /* client not needed for externalization path here */ }, + services => + { + services.AddExternalizedPayloadStore(opts => + { + opts.ExternalizeThresholdBytes = 1024; + opts.ContainerName = "test"; + opts.ConnectionString = "UseDevelopmentStorage=true"; + }); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata completed = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, completed.RuntimeStatus); + + // validate upload and download count + Assert.True(workerStore.UploadCount >= 1); + Assert.True(workerStore.DownloadCount >= 1); + + // validate uploaded payloads include the activity input and output forms + string expectedActivityInputJson = JsonSerializer.Serialize(new[] { largeParam }); + string expectedActivityOutputJson = JsonSerializer.Serialize(largeParam + largeParam); + Assert.Contains(expectedActivityInputJson, workerStore.uploadedPayloads); + Assert.Contains(expectedActivityOutputJson, workerStore.uploadedPayloads); + } + + + // Ensures payloads below the threshold are not externalized by client or worker. + [Fact] + public async Task NoLargePayloads() + { + string smallPayload = new string('X', 64 * 1024); // 64KB + TaskName orchestratorName = nameof(NoLargePayloads); + + InMemoryPayloadStore workerStore = new InMemoryPayloadStore(); + InMemoryPayloadStore clientStore = new InMemoryPayloadStore(); + + await using HostTestLifetime server = await this.StartWorkerAsync( + worker => + { + worker.AddTasks(tasks => tasks.AddOrchestratorFunc( + orchestratorName, + (ctx, input) => Task.FromResult(input))); + + worker.UseExternalizedPayloads(); + worker.Services.AddSingleton(workerStore); + }, + client => + { + client.UseExternalizedPayloads(); + client.Services.AddSingleton(clientStore); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: smallPayload); + OrchestrationMetadata completed = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, completed.RuntimeStatus); + Assert.Equal(smallPayload, completed.ReadOutputAs()); + + Assert.Equal(0, workerStore.UploadCount); + Assert.Equal(0, workerStore.DownloadCount); + Assert.Equal(0, clientStore.UploadCount); + Assert.Equal(0, clientStore.DownloadCount); + } + + // Validates client externalizes a large external event payload and worker resolves it. + [Fact] + public async Task LargeExternalEvent() + { + string largeEvent = new string('E', 512 * 1024); // 512KB + TaskName orchestratorName = nameof(LargeExternalEvent); + const string EventName = "LargeEvent"; + + InMemoryPayloadStore fakeStore = new InMemoryPayloadStore(); + + await using HostTestLifetime server = await this.StartWorkerAsync( + worker => + { + worker.AddTasks(tasks => tasks.AddOrchestratorFunc( + orchestratorName, + async ctx => await ctx.WaitForExternalEvent(EventName))); + + worker.Services.AddSingleton(fakeStore); + worker.UseExternalizedPayloads(); + }, + client => + { + client.Services.AddSingleton(fakeStore); + client.UseExternalizedPayloads(); + }, + services => + { + services.AddExternalizedPayloadStore(opts => + { + opts.ExternalizeThresholdBytes = 1024; + opts.ContainerName = "test"; + opts.ConnectionString = "UseDevelopmentStorage=true"; + }); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + await server.Client.WaitForInstanceStartAsync(instanceId, this.TimeoutToken); + + await server.Client.RaiseEventAsync(instanceId, EventName, largeEvent, this.TimeoutToken); + + OrchestrationMetadata completed = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, completed.RuntimeStatus); + string? output = completed.ReadOutputAs(); + Assert.Equal(largeEvent, output); + Assert.True(fakeStore.UploadCount >= 1); + Assert.True(fakeStore.DownloadCount >= 1); + Assert.Contains(JsonSerializer.Serialize(largeEvent), fakeStore.uploadedPayloads); + } + + + class InMemoryPayloadStore : PayloadStore + { + const string TokenPrefix = "blob:v1:"; + readonly Dictionary tokenToPayload; + public readonly HashSet uploadedPayloads = new(); + + public InMemoryPayloadStore() + : this(new Dictionary()) + { + } + + public InMemoryPayloadStore(Dictionary shared) + { + this.tokenToPayload = shared; + } + + int uploadCount; + public int UploadCount => this.uploadCount; + int downloadCount; + public int DownloadCount => this.downloadCount; + + public override Task UploadAsync(string payLoad, CancellationToken cancellationToken) + { + Interlocked.Increment(ref this.uploadCount); + string token = $"blob:v1:test:{Guid.NewGuid():N}"; + this.tokenToPayload[token] = payLoad; + this.uploadedPayloads.Add(payLoad); + return Task.FromResult(token); + } + + public override Task DownloadAsync(string token, CancellationToken cancellationToken) + { + Interlocked.Increment(ref this.downloadCount); + return Task.FromResult(this.tokenToPayload[token]); + } + + public override bool IsKnownPayloadToken(string value) + { + return value.StartsWith(TokenPrefix, StringComparison.Ordinal); + } + + } +}