Skip to content
Open
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
5c80501
save
YunchuWang Aug 11, 2025
560ecab
tests
YunchuWang Aug 13, 2025
0f3e624
add sample
YunchuWang Aug 13, 2025
d2b514f
precise to second
YunchuWang Aug 13, 2025
a565bb3
Merge branch 'main' into wangbill/large-payload
YunchuWang Aug 13, 2025
4ec3f0c
Merge branch 'wangbill/large-payload' of https://github.com/microsoft…
YunchuWang Aug 13, 2025
8575113
Merge branch 'main' into wangbill/large-payload
YunchuWang Aug 19, 2025
9d2bfa3
some fb
YunchuWang Aug 27, 2025
ef0961a
rename dts to blob
YunchuWang Aug 28, 2025
6387ad2
some fb
YunchuWang Aug 28, 2025
b4527cc
enabled always
YunchuWang Aug 28, 2025
9168dc9
split package
YunchuWang Sep 1, 2025
b5bedd0
update sample
YunchuWang Sep 1, 2025
6cebf0e
remove enablestorage
YunchuWang Sep 1, 2025
ce51c0d
comment
YunchuWang Sep 1, 2025
ecf89de
testname update
YunchuWang Sep 1, 2025
3bd6c9a
add entity sample for largepayload
YunchuWang Sep 2, 2025
20d3e8f
some fb
YunchuWang Sep 6, 2025
e653680
fix
YunchuWang Sep 6, 2025
499c281
fb
YunchuWang Sep 6, 2025
e841e5e
fb
YunchuWang Sep 6, 2025
f182442
fb
YunchuWang Sep 6, 2025
e484f42
fb
YunchuWang Sep 6, 2025
fb6d3fb
test
YunchuWang Sep 6, 2025
f734b1b
enable compression
YunchuWang Sep 8, 2025
50a210e
update sample
YunchuWang Sep 8, 2025
833406f
add gzip encoding header to detect for decompression
YunchuWang Sep 9, 2025
8b6b9f4
cleanup
YunchuWang Sep 16, 2025
2d05b30
add interceptor v1
YunchuWang Sep 16, 2025
05c1287
add interceptor v2
YunchuWang Sep 16, 2025
23ebed1
sample good
YunchuWang Sep 17, 2025
694456d
Merge branch 'main' into wangbill/largepayloadv2
YunchuWang Sep 17, 2025
66e80d0
add blob retry
YunchuWang Sep 17, 2025
aa02fa6
save
YunchuWang Sep 17, 2025
a64283f
save
YunchuWang Sep 17, 2025
dd75b8d
save
YunchuWang Sep 17, 2025
b9d764f
save
YunchuWang Sep 17, 2025
989c99d
test
YunchuWang Sep 17, 2025
1c407dd
resume/suspend
YunchuWang Sep 17, 2025
c6fa543
historystateevent
YunchuWang Sep 17, 2025
bf04363
streaming chunk
YunchuWang Sep 17, 2025
e30dfda
Merge branch 'main' into wangbill/largepayloadv2
YunchuWang Sep 17, 2025
fbecfd2
test fix
YunchuWang Sep 17, 2025
9d40f6e
Merge branch 'wangbill/largepayloadv2' of https://github.com/microsof…
YunchuWang Sep 17, 2025
a5b5bd1
history event streaming test added
YunchuWang Sep 17, 2025
a16c8ed
Merge branch 'main' into wangbill/largepayloadv2
YunchuWang Sep 18, 2025
78465d5
cleanup
YunchuWang Sep 20, 2025
cf64aff
Interface for supporting azuremanaged
YunchuWang Sep 21, 2025
089a148
one pkg experiment
YunchuWang Sep 29, 2025
863fc2d
rename managedbackendinterceptor
YunchuWang Sep 30, 2025
f8dfbdb
one option configure
YunchuWang Oct 1, 2025
3b6d174
update test
YunchuWang Oct 1, 2025
e7f48ec
test fix
YunchuWang Oct 1, 2025
069201f
update proto
YunchuWang Oct 1, 2025
8ce977f
one package finalize
YunchuWang Oct 1, 2025
a46b930
abstract payloadstore
YunchuWang Oct 1, 2025
d095443
remove retry
YunchuWang Oct 2, 2025
eaff753
refactor
YunchuWang Oct 2, 2025
8da17bd
msi support
YunchuWang Oct 2, 2025
204ff20
Merge branch 'main' into wangbill/largepayloadv2
YunchuWang Oct 2, 2025
ad30242
feedback
YunchuWang Oct 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
<!-- Azure.* Packages -->
<ItemGroup>
<PackageVersion Include="Azure.Identity" Version="1.13.1" />
<PackageVersion Include="Azure.Storage.Blobs" Version="12.20.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker" Version="1.21.0" />
</ItemGroup>

Expand All @@ -34,6 +35,11 @@
</ItemGroup>

<!-- Grpc / Protobuf Packages -->

<!-- BCL Packages -->
<ItemGroup>
<PackageVersion Include="System.ComponentModel.Annotations" Version="5.0.0" />
</ItemGroup>
<ItemGroup>
<PackageVersion Include="Google.Protobuf" Version="3.21.12" />
<PackageVersion Include="Grpc.Core" Version="2.46.5" />
Expand Down
14 changes: 14 additions & 0 deletions Microsoft.DurableTask.sln
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ScheduleWebApp", "samples\S
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ScheduledTasks.Tests", "test\ScheduledTasks.Tests\ScheduledTasks.Tests.csproj", "{D2779F32-A548-44F8-B60A-6AC018966C79}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LargePayloadConsoleApp", "samples\LargePayloadConsoleApp\LargePayloadConsoleApp.csproj", "{6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AzureBlobPayloads", "src\Extensions\AzureBlobPayloads\AzureBlobPayloads.csproj", "{FE1DA748-D6DB-E168-BC42-6DBBCEAF229C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -247,6 +251,14 @@ Global
{D2779F32-A548-44F8-B60A-6AC018966C79}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D2779F32-A548-44F8-B60A-6AC018966C79}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D2779F32-A548-44F8-B60A-6AC018966C79}.Release|Any CPU.Build.0 = Release|Any CPU
{6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC}.Release|Any CPU.Build.0 = Release|Any CPU
{FE1DA748-D6DB-E168-BC42-6DBBCEAF229C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FE1DA748-D6DB-E168-BC42-6DBBCEAF229C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FE1DA748-D6DB-E168-BC42-6DBBCEAF229C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FE1DA748-D6DB-E168-BC42-6DBBCEAF229C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -293,6 +305,8 @@ Global
{A89B766C-987F-4C9F-8937-D0AB9FE640C8} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17}
{100348B5-4D97-4A3F-B777-AB14F276F8FE} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17}
{D2779F32-A548-44F8-B60A-6AC018966C79} = {E5637F81-2FB9-4CD7-900D-455363B142A7}
{6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17}
{FE1DA748-D6DB-E168-BC42-6DBBCEAF229C} = {8AFC9781-F6F1-4696-BB4A-9ED7CA9D612B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {AB41CB55-35EA-4986-A522-387AB3402E71}
Expand Down
24 changes: 24 additions & 0 deletions samples/LargePayloadConsoleApp/LargePayloadConsoleApp.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" />
<PackageReference Include="Azure.Identity" />
<PackageReference Include="Grpc.Net.Client" />
</ItemGroup>

<ItemGroup>
<!-- Using p2p references so we can show latest changes in samples. -->
<ProjectReference Include="$(SrcRoot)Client/AzureManaged/Client.AzureManaged.csproj" />
<ProjectReference Include="$(SrcRoot)Worker/AzureManaged/Worker.AzureManaged.csproj" />
<ProjectReference Include="$(SrcRoot)Extensions/AzureBlobPayloads/AzureBlobPayloads.csproj" />
</ItemGroup>

</Project>


198 changes: 198 additions & 0 deletions samples/LargePayloadConsoleApp/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Client.AzureManaged;
using Microsoft.DurableTask.Client.Entities;
using Microsoft.DurableTask.Entities;
using Microsoft.DurableTask.Worker;
using Microsoft.DurableTask.Worker.AzureManaged;
using Microsoft.DurableTask;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

// Demonstrates Large Payload Externalization using Azure Blob Storage.
// This sample uses Azurite/emulator by default via UseDevelopmentStorage=true.

HostApplicationBuilder builder = Host.CreateApplicationBuilder(args);

// Connection string for Durable Task Scheduler
string schedulerConnectionString = builder.Configuration.GetValue<string>("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? throw new InvalidOperationException("Missing required configuration 'DURABLE_TASK_SCHEDULER_CONNECTION_STRING'");

// 1) Register shared payload store ONCE
builder.Services.AddExternalizedPayloadStore(opts =>
{
// Keep threshold small to force externalization for demo purposes
opts.ExternalizeThresholdBytes = 1024; // 1KB
opts.ConnectionString = builder.Configuration.GetValue<string>("DURABLETASK_STORAGE") ?? "UseDevelopmentStorage=true";
opts.ContainerName = builder.Configuration.GetValue<string>("DURABLETASK_PAYLOAD_CONTAINER");
});

// 2) Configure Durable Task client
builder.Services.AddDurableTaskClient(b =>
{
b.UseDurableTaskScheduler(schedulerConnectionString);
b.Configure(o => o.EnableEntitySupport = true);

// Use shared store (no duplication of options)
b.UseExternalizedPayloads();
});

// 3) Configure Durable Task worker
builder.Services.AddDurableTaskWorker(b =>
{
b.UseDurableTaskScheduler(schedulerConnectionString);

b.AddTasks(tasks =>
{
// Orchestrator: call activity first, return its output (should equal original input)
tasks.AddOrchestratorFunc<string, string>("LargeInputEcho", async (ctx, input) =>
{
string echoed = await ctx.CallActivityAsync<string>("Echo", input);
return echoed;
});

// Activity: validate it receives raw input (not token) and return it
tasks.AddActivityFunc<string, string>("Echo", (ctx, value) =>
{
if (value is null)
{
return string.Empty;
}

if (value.StartsWith("blob:v1:", StringComparison.Ordinal))
{
throw new InvalidOperationException("Activity received a payload token instead of raw input.");
}

return value;
});

// Entity samples
tasks.AddOrchestratorFunc<object?, int>(
"LargeEntityOperationInput",
(ctx, _) => ctx.Entities.CallEntityAsync<int>(
new EntityInstanceId(nameof(EchoLengthEntity), "1"),
operationName: "EchoLength",
input: new string('E', 700 * 1024)));
tasks.AddEntity<EchoLengthEntity>(nameof(EchoLengthEntity));

tasks.AddOrchestratorFunc<object?, int>(
"LargeEntityOperationOutput",
async (ctx, _) => (await ctx.Entities.CallEntityAsync<string>(
new EntityInstanceId(nameof(LargeResultEntity), "1"),
operationName: "Produce",
input: 850 * 1024)).Length);
tasks.AddEntity<LargeResultEntity>(nameof(LargeResultEntity));

tasks.AddOrchestratorFunc<object?, object?>(
"LargeEntityState",
async (ctx, _) =>
{
await ctx.Entities.CallEntityAsync(
new EntityInstanceId(nameof(StateEntity), "1"),
operationName: "Set",
input: new string('S', 900 * 1024));
return null;
});
tasks.AddEntity<StateEntity>(nameof(StateEntity));
});

// Use shared store (no duplication of options)
b.UseExternalizedPayloads();

b.Configure(o => o.EnableEntitySupport = true);
});

IHost host = builder.Build();
await host.StartAsync();

await using DurableTaskClient client = host.Services.GetRequiredService<DurableTaskClient>();

// Option A: Directly pass an oversized input to orchestration to trigger externalization
string largeInput = new string('B', 1024 * 1024); // 1MB
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("LargeInputEcho", largeInput);
Console.WriteLine($"Started orchestration with direct large input. Instance: {instanceId}");


using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(120));
OrchestrationMetadata result = await client.WaitForInstanceCompletionAsync(
instanceId,
getInputsAndOutputs: true,
cts.Token);

Console.WriteLine($"RuntimeStatus: {result.RuntimeStatus}");
string deserializedInput = result.ReadInputAs<string>() ?? string.Empty;
string deserializedOutput = result.ReadOutputAs<string>() ?? string.Empty;

Console.WriteLine($"SerializedInput: {result.SerializedInput}");
Console.WriteLine($"SerializedOutput: {result.SerializedOutput}");
Console.WriteLine($"Deserialized input equals original: {deserializedInput == largeInput}");
Console.WriteLine($"Deserialized output equals original: {deserializedOutput == largeInput}");
Console.WriteLine($"Deserialized input length: {deserializedInput.Length}");

// Run entity samples
Console.WriteLine();
Console.WriteLine("Running LargeEntityOperationInput...");
string largeEntityInput = new string('E', 700 * 1024); // 700KB
string entityInputInstance = await client.ScheduleNewOrchestrationInstanceAsync("LargeEntityOperationInput");
OrchestrationMetadata entityInputResult = await client.WaitForInstanceCompletionAsync(entityInputInstance, getInputsAndOutputs: true, cts.Token);
int entityInputLength = entityInputResult.ReadOutputAs<int>();
Console.WriteLine($"Status: {entityInputResult.RuntimeStatus}, Output length: {entityInputLength}");
Console.WriteLine($"Deserialized input length equals original: {entityInputLength == largeEntityInput.Length}");

Console.WriteLine();
Console.WriteLine("Running LargeEntityOperationOutput...");
int largeEntityOutputLength = 850 * 1024; // 850KB
string entityOutputInstance = await client.ScheduleNewOrchestrationInstanceAsync("LargeEntityOperationOutput");
OrchestrationMetadata entityOutputResult = await client.WaitForInstanceCompletionAsync(entityOutputInstance, getInputsAndOutputs: true, cts.Token);
int entityOutputLength = entityOutputResult.ReadOutputAs<int>();
Console.WriteLine($"Status: {entityOutputResult.RuntimeStatus}, Output length: {entityOutputLength}");
Console.WriteLine($"Deserialized output length equals original: {entityOutputLength == largeEntityOutputLength}");

Console.WriteLine();
Console.WriteLine("Running LargeEntityState and querying state...");
string largeEntityState = new string('S', 900 * 1024); // 900KB
string entityStateInstance = await client.ScheduleNewOrchestrationInstanceAsync("LargeEntityState");
OrchestrationMetadata entityStateOrch = await client.WaitForInstanceCompletionAsync(entityStateInstance, getInputsAndOutputs: true, cts.Token);
Console.WriteLine($"Status: {entityStateOrch.RuntimeStatus}");
EntityMetadata<string>? state = await client.Entities.GetEntityAsync<string>(new EntityInstanceId(nameof(StateEntity), "1"), includeState: true);
int stateLength = state?.State?.Length ?? 0;
Console.WriteLine($"State length: {stateLength}");
Console.WriteLine($"Deserialized state equals original: {state?.State == largeEntityState}");





public class EchoLengthEntity : TaskEntity<int>
{
public int EchoLength(string input)
{
return input.Length;
}
}

public class LargeResultEntity : TaskEntity<object?>
{
public string Produce(int length)
{
return new string('R', length);
}
}

public class StateEntity : TaskEntity<string?>
{
protected override string? InitializeState(TaskEntityOperation entityOperation)
{
// Avoid Activator.CreateInstance<string>() which throws; start as null (no state)
return null;
}

public void Set(string value)
{
this.State = value;
}
}
12 changes: 12 additions & 0 deletions samples/LargePayloadConsoleApp/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"profiles": {
"LargePayloadConsoleApp": {
"commandName": "Project",
"environmentVariables": {
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "",
"DURABLETASK_STORAGE": "",
"DURABLETASK_PAYLOAD_CONTAINER": ""
}
}
}
}
29 changes: 29 additions & 0 deletions samples/LargePayloadConsoleApp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Large Payload Externalization Sample

This sample demonstrates configuring Durable Task to externalize large payloads to Azure Blob Storage using `UseExternalizedPayloads` on both client and worker, connecting via Durable Task Scheduler (no local sidecar).

- Defaults to Azurite/Storage Emulator via `UseDevelopmentStorage=true`.
- Threshold is set to 1KB for demo, so even modest inputs are externalized.

## Prerequisites

- A Durable Task Scheduler connection string (e.g., from Azure portal) in `DURABLE_TASK_SCHEDULER_CONNECTION_STRING`.
- Optional: Run Azurite (if not using real Azure Storage) for payload storage tokens.

## Configure

Environment variables (optional):

- `DURABLETASK_STORAGE`: Azure Storage connection string. Defaults to `UseDevelopmentStorage=true`.
- `DURABLETASK_PAYLOAD_CONTAINER`: Blob container name. Defaults to `durabletask-payloads`.

## Run

```bash
# from repo root
dotnet run --project samples/LargePayloadConsoleApp/LargePayloadConsoleApp.csproj
```

The app starts an orchestration with a 1MB input, which is externalized by the client and resolved by the worker. The console shows a token-like serialized input and a deserialized input length.


30 changes: 30 additions & 0 deletions src/Extensions/AzureBlobPayloads/AzureBlobPayloads.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0;net6.0</TargetFrameworks>
<PackageDescription>Azure Blob Storage externalized payload support for Durable Task.</PackageDescription>
<AssemblyName>Microsoft.DurableTask.Extensions.AzureBlobPayloads</AssemblyName>
<RootNamespace>Microsoft.DurableTask</RootNamespace>
<EnableStyleCop>true</EnableStyleCop>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Storage.Blobs" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Options" />
<PackageReference Include="Grpc.Net.Client" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Abstractions\Abstractions.csproj" />
<ProjectReference Include="..\..\Client\Core\Client.csproj" />
<ProjectReference Include="..\..\Worker\Core\Worker.csproj" />
<ProjectReference Include="..\..\Client\Grpc\Client.Grpc.csproj" />
<ProjectReference Include="..\..\Worker\Grpc\Worker.Grpc.csproj" />
<ProjectReference Include="..\..\Grpc.AzureManagedBackend\Grpc.AzureManagedBackend.csproj" />
</ItemGroup>

<ItemGroup>
<SharedSection Include="Core" />
</ItemGroup>
</Project>
Loading
Loading