-
Notifications
You must be signed in to change notification settings - Fork 53
Large payload azure blob externalization support #468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
75 commits
Select commit
Hold shift + click to select a range
5c80501
save
YunchuWang 560ecab
tests
YunchuWang 0f3e624
add sample
YunchuWang d2b514f
precise to second
YunchuWang a565bb3
Merge branch 'main' into wangbill/large-payload
YunchuWang 4ec3f0c
Merge branch 'wangbill/large-payload' of https://github.com/microsoft…
YunchuWang 8575113
Merge branch 'main' into wangbill/large-payload
YunchuWang 9d2bfa3
some fb
YunchuWang ef0961a
rename dts to blob
YunchuWang 6387ad2
some fb
YunchuWang b4527cc
enabled always
YunchuWang 9168dc9
split package
YunchuWang b5bedd0
update sample
YunchuWang 6cebf0e
remove enablestorage
YunchuWang ce51c0d
comment
YunchuWang ecf89de
testname update
YunchuWang 3bd6c9a
add entity sample for largepayload
YunchuWang 20d3e8f
some fb
YunchuWang e653680
fix
YunchuWang 499c281
fb
YunchuWang e841e5e
fb
YunchuWang f182442
fb
YunchuWang e484f42
fb
YunchuWang fb6d3fb
test
YunchuWang f734b1b
enable compression
YunchuWang 50a210e
update sample
YunchuWang 833406f
add gzip encoding header to detect for decompression
YunchuWang 8b6b9f4
cleanup
YunchuWang 2d05b30
add interceptor v1
YunchuWang 05c1287
add interceptor v2
YunchuWang 23ebed1
sample good
YunchuWang 694456d
Merge branch 'main' into wangbill/largepayloadv2
YunchuWang 66e80d0
add blob retry
YunchuWang aa02fa6
save
YunchuWang a64283f
save
YunchuWang dd75b8d
save
YunchuWang b9d764f
save
YunchuWang 989c99d
test
YunchuWang 1c407dd
resume/suspend
YunchuWang c6fa543
historystateevent
YunchuWang bf04363
streaming chunk
YunchuWang e30dfda
Merge branch 'main' into wangbill/largepayloadv2
YunchuWang fbecfd2
test fix
YunchuWang 9d40f6e
Merge branch 'wangbill/largepayloadv2' of https://github.com/microsof…
YunchuWang a5b5bd1
history event streaming test added
YunchuWang a16c8ed
Merge branch 'main' into wangbill/largepayloadv2
YunchuWang 78465d5
cleanup
YunchuWang cf64aff
Interface for supporting azuremanaged
YunchuWang 089a148
one pkg experiment
YunchuWang 863fc2d
rename managedbackendinterceptor
YunchuWang f8dfbdb
one option configure
YunchuWang 3b6d174
update test
YunchuWang e7f48ec
test fix
YunchuWang 069201f
update proto
YunchuWang 8ce977f
one package finalize
YunchuWang a46b930
abstract payloadstore
YunchuWang d095443
remove retry
YunchuWang eaff753
refactor
YunchuWang 8da17bd
msi support
YunchuWang 204ff20
Merge branch 'main' into wangbill/largepayloadv2
YunchuWang ad30242
feedback
YunchuWang e2d8d42
cleanup
YunchuWang 95bca14
managed interceptor setup v1
YunchuWang 746fe71
comment out interceptor, need to be multi package
YunchuWang 3779d50
cleanup
YunchuWang c16d875
cleanup
YunchuWang 73b7635
Merge branch 'main' into wangbill/largepayloadv2
YunchuWang ae2e89c
cleanup
YunchuWang 677d6be
Merge branch 'wangbill/largepayloadv2' of https://github.com/microsof…
YunchuWang b9898ab
reset grpc changes
YunchuWang ca9240f
Merge branch 'main' into wangbill/largepayloadv2
YunchuWang 9a1f66f
feedback
YunchuWang a69743b
Merge branch 'wangbill/largepayloadv2' of https://github.com/microsof…
YunchuWang 64e0993
feedback
YunchuWang 6ecd92f
Enhance LargePayloadStorageOptions to enforce a maximum threshold of …
YunchuWang File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
24 changes: 24 additions & 0 deletions
24
samples/LargePayloadConsoleApp/LargePayloadConsoleApp.csproj
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| <Project Sdk="Microsoft.NET.Sdk"> | ||
|
|
||
| <PropertyGroup> | ||
| <OutputType>Exe</OutputType> | ||
| <TargetFramework>net8.0</TargetFramework> | ||
| <Nullable>enable</Nullable> | ||
| </PropertyGroup> | ||
|
|
||
| <ItemGroup> | ||
| <PackageReference Include="Microsoft.Extensions.Hosting" /> | ||
| <PackageReference Include="Azure.Identity" /> | ||
| <PackageReference Include="Grpc.Net.Client" /> | ||
| </ItemGroup> | ||
|
|
||
| <ItemGroup> | ||
| <!-- Using p2p references so we can show latest changes in samples. --> | ||
| <ProjectReference Include="$(SrcRoot)Client/AzureManaged/Client.AzureManaged.csproj" /> | ||
| <ProjectReference Include="$(SrcRoot)Worker/AzureManaged/Worker.AzureManaged.csproj" /> | ||
| <ProjectReference Include="$(SrcRoot)Extensions/AzureBlobPayloads/AzureBlobPayloads.csproj" /> | ||
| </ItemGroup> | ||
|
|
||
| </Project> | ||
|
|
||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,194 @@ | ||
| // Copyright (c) Microsoft Corporation. | ||
| // Licensed under the MIT License. | ||
|
|
||
| using Microsoft.DurableTask.Client; | ||
| using Microsoft.DurableTask.Client.AzureManaged; | ||
| using Microsoft.DurableTask.Client.Entities; | ||
| using Microsoft.DurableTask.Entities; | ||
| using Microsoft.DurableTask.Worker; | ||
| using Microsoft.DurableTask.Worker.AzureManaged; | ||
| using Microsoft.DurableTask; | ||
| using Microsoft.Extensions.Configuration; | ||
| using Microsoft.Extensions.DependencyInjection; | ||
| using Microsoft.Extensions.Hosting; | ||
|
|
||
| // Demonstrates Large Payload Externalization using Azure Blob Storage. | ||
| // This sample uses Azurite/emulator by default via UseDevelopmentStorage=true. | ||
|
|
||
| HostApplicationBuilder builder = Host.CreateApplicationBuilder(args); | ||
|
|
||
| // Connection string for Durable Task Scheduler | ||
| string schedulerConnectionString = builder.Configuration.GetValue<string>("DURABLE_TASK_SCHEDULER_CONNECTION_STRING") | ||
| ?? throw new InvalidOperationException("Missing required configuration 'DURABLE_TASK_SCHEDULER_CONNECTION_STRING'"); | ||
|
|
||
| // 1) Register shared payload store ONCE | ||
| builder.Services.AddExternalizedPayloadStore(opts => | ||
| { | ||
| // Keep threshold small to force externalization for demo purposes | ||
| opts.ExternalizeThresholdBytes = 1024; // 1KB | ||
| opts.ConnectionString = builder.Configuration.GetValue<string>("DURABLETASK_STORAGE") ?? "UseDevelopmentStorage=true"; | ||
| opts.ContainerName = builder.Configuration.GetValue<string>("DURABLETASK_PAYLOAD_CONTAINER"); | ||
| }); | ||
|
|
||
| // 2) Configure Durable Task client | ||
| builder.Services.AddDurableTaskClient(b => | ||
| { | ||
| b.UseDurableTaskScheduler(schedulerConnectionString); | ||
| b.Configure(o => o.EnableEntitySupport = true); | ||
|
|
||
| // Use shared store (no duplication of options) | ||
| b.UseExternalizedPayloads(); | ||
| }); | ||
|
|
||
| // 3) Configure Durable Task worker | ||
| builder.Services.AddDurableTaskWorker(b => | ||
| { | ||
| b.UseDurableTaskScheduler(schedulerConnectionString); | ||
|
|
||
| b.AddTasks(tasks => | ||
| { | ||
| // Orchestrator: call activity first, return its output (should equal original input) | ||
| tasks.AddOrchestratorFunc<string, string>("LargeInputEcho", async (ctx, input) => | ||
| { | ||
| string echoed = await ctx.CallActivityAsync<string>("Echo", input); | ||
| return echoed; | ||
| }); | ||
|
|
||
| // Activity: validate it receives raw input (not token) and return it | ||
| tasks.AddActivityFunc<string, string>("Echo", (ctx, value) => | ||
| { | ||
| if (value is null) | ||
| { | ||
| return string.Empty; | ||
| } | ||
|
|
||
| if (value.StartsWith("blob:v1:", StringComparison.Ordinal)) | ||
| { | ||
| throw new InvalidOperationException("Activity received a payload token instead of raw input."); | ||
| } | ||
|
|
||
| return value; | ||
| }); | ||
|
|
||
| // Entity samples | ||
| tasks.AddOrchestratorFunc<object?, int>( | ||
| "LargeEntityOperationInput", | ||
| (ctx, _) => ctx.Entities.CallEntityAsync<int>( | ||
| new EntityInstanceId(nameof(EchoLengthEntity), "1"), | ||
| operationName: "EchoLength", | ||
| input: new string('E', 700 * 1024))); | ||
| tasks.AddEntity<EchoLengthEntity>(nameof(EchoLengthEntity)); | ||
|
|
||
| tasks.AddOrchestratorFunc<object?, int>( | ||
| "LargeEntityOperationOutput", | ||
| async (ctx, _) => (await ctx.Entities.CallEntityAsync<string>( | ||
| new EntityInstanceId(nameof(LargeResultEntity), "1"), | ||
| operationName: "Produce", | ||
| input: 850 * 1024)).Length); | ||
| tasks.AddEntity<LargeResultEntity>(nameof(LargeResultEntity)); | ||
|
|
||
| tasks.AddOrchestratorFunc<object?, object?>( | ||
| "LargeEntityState", | ||
| async (ctx, _) => | ||
| { | ||
| await ctx.Entities.CallEntityAsync( | ||
| new EntityInstanceId(nameof(StateEntity), "1"), | ||
| operationName: "Set", | ||
| input: new string('S', 900 * 1024)); | ||
| return null; | ||
| }); | ||
| tasks.AddEntity<StateEntity>(nameof(StateEntity)); | ||
| }); | ||
|
|
||
| // Use shared store (no duplication of options) | ||
| b.UseExternalizedPayloads(); | ||
|
|
||
| b.Configure(o => o.EnableEntitySupport = true); | ||
| }); | ||
|
|
||
| IHost host = builder.Build(); | ||
| await host.StartAsync(); | ||
|
|
||
| await using DurableTaskClient client = host.Services.GetRequiredService<DurableTaskClient>(); | ||
|
|
||
| // Option A: Directly pass an oversized input to orchestration to trigger externalization | ||
| string largeInput = new string('B', 1024 * 1024); // 1MB | ||
| string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("LargeInputEcho", largeInput); | ||
| Console.WriteLine($"Started orchestration with direct large input. Instance: {instanceId}"); | ||
|
|
||
|
|
||
| using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(120)); | ||
| OrchestrationMetadata result = await client.WaitForInstanceCompletionAsync( | ||
| instanceId, | ||
| getInputsAndOutputs: true, | ||
| cts.Token); | ||
|
|
||
| Console.WriteLine($"RuntimeStatus: {result.RuntimeStatus}"); | ||
| string deserializedInput = result.ReadInputAs<string>() ?? string.Empty; | ||
| string deserializedOutput = result.ReadOutputAs<string>() ?? string.Empty; | ||
|
|
||
| Console.WriteLine($"SerializedInput: {result.SerializedInput}"); | ||
| Console.WriteLine($"SerializedOutput: {result.SerializedOutput}"); | ||
| Console.WriteLine($"Deserialized input equals original: {deserializedInput == largeInput}"); | ||
| Console.WriteLine($"Deserialized output equals original: {deserializedOutput == largeInput}"); | ||
| Console.WriteLine($"Deserialized input length: {deserializedInput.Length}"); | ||
|
|
||
| // Run entity samples | ||
| Console.WriteLine(); | ||
| Console.WriteLine("Running LargeEntityOperationInput..."); | ||
| string largeEntityInput = new string('E', 700 * 1024); // 700KB | ||
| string entityInputInstance = await client.ScheduleNewOrchestrationInstanceAsync("LargeEntityOperationInput"); | ||
| OrchestrationMetadata entityInputResult = await client.WaitForInstanceCompletionAsync(entityInputInstance, getInputsAndOutputs: true, cts.Token); | ||
| int entityInputLength = entityInputResult.ReadOutputAs<int>(); | ||
| Console.WriteLine($"Status: {entityInputResult.RuntimeStatus}, Input length: {entityInputLength}"); | ||
| Console.WriteLine($"Deserialized input length equals original: {entityInputLength == largeEntityInput.Length}"); | ||
|
|
||
| Console.WriteLine(); | ||
| Console.WriteLine("Running LargeEntityOperationOutput..."); | ||
| int largeEntityOutputLength = 850 * 1024; // 850KB | ||
| string entityOutputInstance = await client.ScheduleNewOrchestrationInstanceAsync("LargeEntityOperationOutput"); | ||
| OrchestrationMetadata entityOutputResult = await client.WaitForInstanceCompletionAsync(entityOutputInstance, getInputsAndOutputs: true, cts.Token); | ||
| int entityOutputLength = entityOutputResult.ReadOutputAs<int>(); | ||
| Console.WriteLine($"Status: {entityOutputResult.RuntimeStatus}, Output length: {entityOutputLength}"); | ||
| Console.WriteLine($"Deserialized output length equals original: {entityOutputLength == largeEntityOutputLength}"); | ||
|
|
||
| Console.WriteLine(); | ||
| Console.WriteLine("Running LargeEntityState and querying state..."); | ||
| string largeEntityState = new string('S', 900 * 1024); // 900KB | ||
| string entityStateInstance = await client.ScheduleNewOrchestrationInstanceAsync("LargeEntityState"); | ||
| OrchestrationMetadata entityStateOrch = await client.WaitForInstanceCompletionAsync(entityStateInstance, getInputsAndOutputs: true, cts.Token); | ||
| Console.WriteLine($"Status: {entityStateOrch.RuntimeStatus}"); | ||
| EntityMetadata<string>? state = await client.Entities.GetEntityAsync<string>(new EntityInstanceId(nameof(StateEntity), "1"), includeState: true); | ||
| int stateLength = state?.State?.Length ?? 0; | ||
| Console.WriteLine($"State length: {stateLength}"); | ||
| Console.WriteLine($"Deserialized state equals original: {state?.State == largeEntityState}"); | ||
|
|
||
| public class EchoLengthEntity : TaskEntity<int> | ||
| { | ||
| public int EchoLength(string input) | ||
| { | ||
| return input.Length; | ||
| } | ||
| } | ||
|
|
||
| public class LargeResultEntity : TaskEntity<object?> | ||
| { | ||
| public string Produce(int length) | ||
| { | ||
| return new string('R', length); | ||
| } | ||
| } | ||
|
|
||
| public class StateEntity : TaskEntity<string?> | ||
| { | ||
| protected override string? InitializeState(TaskEntityOperation entityOperation) | ||
| { | ||
| // Avoid Activator.CreateInstance<string>() which throws; start as null (no state) | ||
| return null; | ||
| } | ||
|
|
||
| public void Set(string value) | ||
| { | ||
| this.State = value; | ||
| } | ||
| } |
12 changes: 12 additions & 0 deletions
12
samples/LargePayloadConsoleApp/Properties/launchSettings.json
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| { | ||
| "profiles": { | ||
| "LargePayloadConsoleApp": { | ||
| "commandName": "Project", | ||
| "environmentVariables": { | ||
| "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "", | ||
| "DURABLETASK_STORAGE": "", | ||
| "DURABLETASK_PAYLOAD_CONTAINER": "" | ||
| } | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| # Large Payload Externalization Sample | ||
|
|
||
| This sample demonstrates configuring Durable Task to externalize large payloads to Azure Blob Storage using `UseExternalizedPayloads` on both client and worker, connecting via Durable Task Scheduler (no local sidecar). | ||
|
|
||
| - Defaults to Azurite/Storage Emulator via `UseDevelopmentStorage=true`. | ||
| - Threshold is set to 1KB for demo, so even modest inputs are externalized. | ||
|
|
||
| ## Prerequisites | ||
|
|
||
| - A Durable Task Scheduler connection string (e.g., from Azure portal) in `DURABLE_TASK_SCHEDULER_CONNECTION_STRING`. | ||
| - Optional: Run Azurite (if not using real Azure Storage) for payload storage tokens. | ||
|
|
||
| ## Configure | ||
|
|
||
| Environment variables (optional): | ||
|
|
||
| - `DURABLETASK_STORAGE`: Azure Storage connection string. Defaults to `UseDevelopmentStorage=true`. | ||
| - `DURABLETASK_PAYLOAD_CONTAINER`: Blob container name. Defaults to `durabletask-payloads`. | ||
|
|
||
| ## Run | ||
|
|
||
| ```bash | ||
| # from repo root | ||
| dotnet run --project samples/LargePayloadConsoleApp/LargePayloadConsoleApp.csproj | ||
| ``` | ||
|
|
||
| The app starts an orchestration with a 1MB input, which is externalized by the client and resolved by the worker. The console shows a token-like serialized input and a deserialized input length. | ||
|
|
||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| <Project Sdk="Microsoft.NET.Sdk"> | ||
|
|
||
| <PropertyGroup> | ||
| <TargetFrameworks>netstandard2.0;net6.0</TargetFrameworks> | ||
| <PackageDescription>Azure Blob Storage externalized payload support for Durable Task.</PackageDescription> | ||
| <AssemblyName>Microsoft.DurableTask.Extensions.AzureBlobPayloads</AssemblyName> | ||
| <RootNamespace>Microsoft.DurableTask</RootNamespace> | ||
| <EnableStyleCop>true</EnableStyleCop> | ||
| </PropertyGroup> | ||
|
|
||
| <ItemGroup> | ||
| <PackageReference Include="Azure.Storage.Blobs" /> | ||
| <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" /> | ||
| <PackageReference Include="Microsoft.Extensions.Options" /> | ||
| <PackageReference Include="Grpc.Net.Client" /> | ||
| </ItemGroup> | ||
|
|
||
| <ItemGroup> | ||
| <ProjectReference Include="..\..\Abstractions\Abstractions.csproj" /> | ||
| <ProjectReference Include="..\..\Client\Core\Client.csproj" /> | ||
| <ProjectReference Include="..\..\Worker\Core\Worker.csproj" /> | ||
| <ProjectReference Include="..\..\Client\Grpc\Client.Grpc.csproj" /> | ||
| <ProjectReference Include="..\..\Worker\Grpc\Worker.Grpc.csproj" /> | ||
| <ProjectReference Include="..\..\Grpc.AzureManagedBackend\Grpc.AzureManagedBackend.csproj" /> | ||
| </ItemGroup> | ||
|
|
||
| <ItemGroup> | ||
| <SharedSection Include="Core" /> | ||
| </ItemGroup> | ||
| </Project> | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is Grpc.AzureManagedBackend.csproj?