Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
5c80501
save
YunchuWang Aug 11, 2025
560ecab
tests
YunchuWang Aug 13, 2025
0f3e624
add sample
YunchuWang Aug 13, 2025
d2b514f
precise to second
YunchuWang Aug 13, 2025
a565bb3
Merge branch 'main' into wangbill/large-payload
YunchuWang Aug 13, 2025
4ec3f0c
Merge branch 'wangbill/large-payload' of https://github.com/microsoft…
YunchuWang Aug 13, 2025
8575113
Merge branch 'main' into wangbill/large-payload
YunchuWang Aug 19, 2025
9d2bfa3
some fb
YunchuWang Aug 27, 2025
ef0961a
rename dts to blob
YunchuWang Aug 28, 2025
6387ad2
some fb
YunchuWang Aug 28, 2025
b4527cc
enabled always
YunchuWang Aug 28, 2025
9168dc9
split package
YunchuWang Sep 1, 2025
b5bedd0
update sample
YunchuWang Sep 1, 2025
6cebf0e
remove enablestorage
YunchuWang Sep 1, 2025
ce51c0d
comment
YunchuWang Sep 1, 2025
ecf89de
testname update
YunchuWang Sep 1, 2025
3bd6c9a
add entity sample for largepayload
YunchuWang Sep 2, 2025
20d3e8f
some fb
YunchuWang Sep 6, 2025
e653680
fix
YunchuWang Sep 6, 2025
499c281
fb
YunchuWang Sep 6, 2025
e841e5e
fb
YunchuWang Sep 6, 2025
f182442
fb
YunchuWang Sep 6, 2025
e484f42
fb
YunchuWang Sep 6, 2025
fb6d3fb
test
YunchuWang Sep 6, 2025
f734b1b
enable compression
YunchuWang Sep 8, 2025
50a210e
update sample
YunchuWang Sep 8, 2025
833406f
add gzip encoding header to detect for decompression
YunchuWang Sep 9, 2025
5ebe0b0
initial add async versions ser/deser
YunchuWang Sep 9, 2025
db91bf6
retry on blob upload/download
YunchuWang Sep 11, 2025
6a51f62
Merge branch 'main' into wangbill/large-payload
YunchuWang Sep 11, 2025
c1a87b9
update calllers
YunchuWang Sep 10, 2025
3244c10
Merge branch 'wangbill/large-payload' of https://github.com/microsoft…
YunchuWang Sep 11, 2025
4d4aeba
continue updating async
YunchuWang Sep 11, 2025
62dda3e
more update
YunchuWang Sep 13, 2025
4f0e5e3
update all async
YunchuWang Sep 13, 2025
af7a4c8
enhance
YunchuWang Sep 14, 2025
281fed2
disallow sync calls when largepayload enabled
YunchuWang Sep 14, 2025
52ec85c
more
YunchuWang Sep 14, 2025
a5549a3
more
YunchuWang Sep 14, 2025
738892b
refactor
YunchuWang Sep 14, 2025
1064329
more
YunchuWang Sep 14, 2025
cd05d6a
more
YunchuWang Sep 14, 2025
b3cbb43
more
YunchuWang Sep 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<!-- Azure.* Packages -->
<ItemGroup>
<PackageVersion Include="Azure.Identity" Version="1.13.1" />
<PackageVersion Include="Azure.Storage.Blobs" Version="12.20.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker" Version="1.21.0" />
</ItemGroup>

Expand All @@ -33,6 +34,11 @@
</ItemGroup>

<!-- Grpc / Protobuf Packages -->

<!-- BCL Packages -->
<ItemGroup>
<PackageVersion Include="System.ComponentModel.Annotations" Version="5.0.0" />
</ItemGroup>
<ItemGroup>
<PackageVersion Include="Google.Protobuf" Version="3.21.12" />
<PackageVersion Include="Grpc.Core" Version="2.46.5" />
Expand Down
14 changes: 14 additions & 0 deletions Microsoft.DurableTask.sln
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ScheduleWebApp", "samples\S
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ScheduledTasks.Tests", "test\ScheduledTasks.Tests\ScheduledTasks.Tests.csproj", "{D2779F32-A548-44F8-B60A-6AC018966C79}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LargePayloadConsoleApp", "samples\LargePayloadConsoleApp\LargePayloadConsoleApp.csproj", "{6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AzureBlobPayloads", "src\Extensions\AzureBlobPayloads\AzureBlobPayloads.csproj", "{FE1DA748-D6DB-E168-BC42-6DBBCEAF229C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -247,6 +251,14 @@ Global
{D2779F32-A548-44F8-B60A-6AC018966C79}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D2779F32-A548-44F8-B60A-6AC018966C79}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D2779F32-A548-44F8-B60A-6AC018966C79}.Release|Any CPU.Build.0 = Release|Any CPU
{6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC}.Release|Any CPU.Build.0 = Release|Any CPU
{FE1DA748-D6DB-E168-BC42-6DBBCEAF229C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FE1DA748-D6DB-E168-BC42-6DBBCEAF229C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FE1DA748-D6DB-E168-BC42-6DBBCEAF229C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FE1DA748-D6DB-E168-BC42-6DBBCEAF229C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -293,6 +305,8 @@ Global
{A89B766C-987F-4C9F-8937-D0AB9FE640C8} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17}
{100348B5-4D97-4A3F-B777-AB14F276F8FE} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17}
{D2779F32-A548-44F8-B60A-6AC018966C79} = {E5637F81-2FB9-4CD7-900D-455363B142A7}
{6EB9D002-62C8-D6C1-62A8-14C54CA6DBBC} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17}
{FE1DA748-D6DB-E168-BC42-6DBBCEAF229C} = {8AFC9781-F6F1-4696-BB4A-9ED7CA9D612B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {AB41CB55-35EA-4986-A522-387AB3402E71}
Expand Down
24 changes: 24 additions & 0 deletions samples/LargePayloadConsoleApp/LargePayloadConsoleApp.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" />
<PackageReference Include="Azure.Identity" />
<PackageReference Include="Grpc.Net.Client" />
</ItemGroup>

<ItemGroup>
<!-- Using p2p references so we can show latest changes in samples. -->
<ProjectReference Include="$(SrcRoot)Client/AzureManaged/Client.AzureManaged.csproj" />
<ProjectReference Include="$(SrcRoot)Worker/AzureManaged/Worker.AzureManaged.csproj" />
<ProjectReference Include="$(SrcRoot)Extensions/AzureBlobPayloads/AzureBlobPayloads.csproj" />
</ItemGroup>

</Project>


204 changes: 204 additions & 0 deletions samples/LargePayloadConsoleApp/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Client.AzureManaged;
using Microsoft.DurableTask.Client.Entities;
using Microsoft.DurableTask.Entities;
using Microsoft.DurableTask.Worker;
using Microsoft.DurableTask.Worker.AzureManaged;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

// Demonstrates Large Payload Externalization using Azure Blob Storage.
// This sample uses Azurite/emulator by default via UseDevelopmentStorage=true.

HostApplicationBuilder builder = Host.CreateApplicationBuilder(args);

// Connection string for Durable Task Scheduler
string schedulerConnectionString = builder.Configuration.GetValue<string>("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? throw new InvalidOperationException("Missing required configuration 'DURABLE_TASK_SCHEDULER_CONNECTION_STRING'");

// Configure Durable Task client with Durable Task Scheduler and externalized payloads
builder.Services.AddDurableTaskClient(b =>
{
b.UseDurableTaskScheduler(schedulerConnectionString);
// Ensure entity APIs are enabled for the client
b.Configure(o => { o.EnableEntitySupport = true; o.EnableLargePayloadSupport = true; });
b.UseExternalizedPayloads(opts =>
{
// Keep threshold small to force externalization for demo purposes
opts.ExternalizeThresholdBytes = 1024; // 1KB
opts.ConnectionString = builder.Configuration.GetValue<string>("DURABLETASK_STORAGE") ?? "UseDevelopmentStorage=true";
opts.ContainerName = builder.Configuration.GetValue<string>("DURABLETASK_PAYLOAD_CONTAINER");
});
});

// Configure Durable Task worker with tasks and externalized payloads
builder.Services.AddDurableTaskWorker(b =>
{
b.UseDurableTaskScheduler(schedulerConnectionString);
b.AddTasks(tasks =>
{
// Orchestrator: call activity first, return its output (should equal original input)
tasks.AddOrchestratorFunc<string, string>("LargeInputEcho", async (ctx, input) =>
{
string echoed = await ctx.CallActivityAsync<string>("Echo", input);
return echoed;
});

// Activity: validate it receives raw input (not token) and return it
tasks.AddActivityFunc<string, string>("Echo", (ctx, value) =>
{
if (value is null)
{
return string.Empty;
}

// If we ever see a token in the activity, externalization is not being resolved correctly.
if (value.StartsWith("blob:v1:", StringComparison.Ordinal))
{
throw new InvalidOperationException("Activity received a payload token instead of raw input.");
}

return value;
});

// Entity samples
// 1) Large entity operation input (worker externalizes input; entity receives resolved payload)
tasks.AddOrchestratorFunc<object?, int>(
"LargeEntityOperationInput",
(ctx, _) => ctx.Entities.CallEntityAsync<int>(
new EntityInstanceId(nameof(EchoLengthEntity), "1"),
operationName: "EchoLength",
input: new string('E', 700 * 1024)));
tasks.AddEntity<EchoLengthEntity>(nameof(EchoLengthEntity));

// 2) Large entity operation output (worker externalizes output; orchestrator reads resolved payload)
tasks.AddOrchestratorFunc<object?, int>(
"LargeEntityOperationOutput",
async (ctx, _) => (await ctx.Entities.CallEntityAsync<string>(
new EntityInstanceId(nameof(LargeResultEntity), "1"),
operationName: "Produce",
input: 850 * 1024)).Length);
tasks.AddEntity<LargeResultEntity>(nameof(LargeResultEntity));

// 3) Large entity state (worker externalizes state; client resolves on query)
tasks.AddOrchestratorFunc<object?, object?>(
"LargeEntityState",
async (ctx, _) =>
{
await ctx.Entities.CallEntityAsync(
new EntityInstanceId(nameof(StateEntity), "1"),
operationName: "Set",
input: new string('S', 900 * 1024));
return null;
});
tasks.AddEntity<StateEntity>(nameof(StateEntity));
});
b.UseExternalizedPayloads(opts =>
{
opts.ExternalizeThresholdBytes = 1024; // mirror client
opts.ConnectionString = builder.Configuration.GetValue<string>("DURABLETASK_STORAGE") ?? "UseDevelopmentStorage=true";
opts.ContainerName = builder.Configuration.GetValue<string>("DURABLETASK_PAYLOAD_CONTAINER");
});
// Ensure entity APIs are enabled for the worker
b.Configure(o => { o.EnableEntitySupport = true; o.EnableLargePayloadSupport = true; });
});

IHost host = builder.Build();
await host.StartAsync();

await using DurableTaskClient client = host.Services.GetRequiredService<DurableTaskClient>();

// Option A: Directly pass an oversized input to orchestration to trigger externalization
string largeInput = new string('B', 1024 * 1024); // 1MB
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("LargeInputEcho", largeInput);
Console.WriteLine($"Started orchestration with direct large input. Instance: {instanceId}");


using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(120));
OrchestrationMetadata result = await client.WaitForInstanceCompletionAsync(
instanceId,
getInputsAndOutputs: true,
cts.Token);

Console.WriteLine($"RuntimeStatus: {result.RuntimeStatus}");
string deserializedInput = result.ReadInputAs<string>() ?? string.Empty;
string deserializedOutput = result.ReadOutputAs<string>() ?? string.Empty;

Console.WriteLine($"SerializedInput: {result.SerializedInput}");
Console.WriteLine($"SerializedOutput: {result.SerializedOutput}");
Console.WriteLine($"Deserialized input equals original: {deserializedInput == largeInput}");
Console.WriteLine($"Deserialized output equals original: {deserializedOutput == largeInput}");
Console.WriteLine($"Deserialized input length: {deserializedInput.Length}");



// Run entity samples
Console.WriteLine();
Console.WriteLine("Running LargeEntityOperationInput...");
string largeEntityInput = new string('E', 700 * 1024); // 700KB
string entityInputInstance = await client.ScheduleNewOrchestrationInstanceAsync("LargeEntityOperationInput");
OrchestrationMetadata entityInputResult = await client.WaitForInstanceCompletionAsync(entityInputInstance, getInputsAndOutputs: true, cts.Token);
int entityInputLength = entityInputResult.ReadOutputAs<int>();
Console.WriteLine($"Status: {entityInputResult.RuntimeStatus}, Output length: {entityInputLength}");
Console.WriteLine($"Deserialized input length equals original: {entityInputLength == largeEntityInput.Length}");

Console.WriteLine();
Console.WriteLine("Running LargeEntityOperationOutput...");
int largeEntityOutputLength = 850 * 1024; // 850KB
string entityOutputInstance = await client.ScheduleNewOrchestrationInstanceAsync("LargeEntityOperationOutput");
OrchestrationMetadata entityOutputResult = await client.WaitForInstanceCompletionAsync(entityOutputInstance, getInputsAndOutputs: true, cts.Token);
int entityOutputLength = entityOutputResult.ReadOutputAs<int>();
Console.WriteLine($"Status: {entityOutputResult.RuntimeStatus}, Output length: {entityOutputLength}");
Console.WriteLine($"Deserialized output length equals original: {entityOutputLength == largeEntityOutputLength}");

Console.WriteLine();
Console.WriteLine("Running LargeEntityState and querying state...");
string largeEntityState = new string('S', 900 * 1024); // 900KB
string entityStateInstance = await client.ScheduleNewOrchestrationInstanceAsync("LargeEntityState");
OrchestrationMetadata entityStateOrch = await client.WaitForInstanceCompletionAsync(entityStateInstance, getInputsAndOutputs: true, cts.Token);
Console.WriteLine($"Status: {entityStateOrch.RuntimeStatus}");
EntityMetadata<string>? state = await client.Entities.GetEntityAsync<string>(new EntityInstanceId(nameof(StateEntity), "1"), includeState: true);
int stateLength = state?.State?.Length ?? 0;
Console.WriteLine($"State length: {stateLength}");
Console.WriteLine($"Deserialized state equals original: {state?.State == largeEntityState}");





public class EchoLengthEntity : TaskEntity<int>
{
public int EchoLength(string input)
{
return input.Length;
}
}

public class LargeResultEntity : TaskEntity<object?>
{
public string Produce(int length)
{
return new string('R', length);
}
}

public class StateEntity : TaskEntity<string?>
{
protected override string? InitializeState(TaskEntityOperation entityOperation)
{
// Avoid Activator.CreateInstance<string>() which throws; start as null (no state)
return null;
}

public void Set(string value)
{
this.State = value;
}
}



29 changes: 29 additions & 0 deletions samples/LargePayloadConsoleApp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Large Payload Externalization Sample

This sample demonstrates configuring Durable Task to externalize large payloads to Azure Blob Storage using `UseExternalizedPayloads` on both client and worker, connecting via Durable Task Scheduler (no local sidecar).

- Defaults to Azurite/Storage Emulator via `UseDevelopmentStorage=true`.
- Threshold is set to 1KB for demo, so even modest inputs are externalized.

## Prerequisites

- A Durable Task Scheduler connection string (e.g., from Azure portal) in `DURABLE_TASK_SCHEDULER_CONNECTION_STRING`.
- Optional: Run Azurite (if not using real Azure Storage) for payload storage tokens.

## Configure

Environment variables (optional):

- `DURABLETASK_STORAGE`: Azure Storage connection string. Defaults to `UseDevelopmentStorage=true`.
- `DURABLETASK_PAYLOAD_CONTAINER`: Blob container name. Defaults to `durabletask-payloads`.

## Run

```bash
# from repo root
dotnet run --project samples/LargePayloadConsoleApp/LargePayloadConsoleApp.csproj
```

The app starts an orchestration with a 1MB input, which is externalized by the client and resolved by the worker. The console shows a token-like serialized input and a deserialized input length.


81 changes: 81 additions & 0 deletions samples/LargePayloadConsoleApp/run.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
Param(
[Parameter(Mandatory = $true)]
[string]$SchedulerConnectionString,

[string]$StorageConnectionString = "UseDevelopmentStorage=true",

[string]$PayloadContainer = "durabletask-payloads",

[switch]$StartAzurite,

[switch]$VerboseLogging
)

$ErrorActionPreference = "Stop"

function Write-Info($msg) {
Write-Host "[info] $msg"
}

function Start-AzuriteDocker {
param(
[string]$ContainerName = "durabletask-azurite"
)

if (-not (Get-Command docker -ErrorAction SilentlyContinue)) {
Write-Info "Docker not found; skipping Azurite startup."
return $false
}

try {
$existing = (docker ps -a --filter "name=$ContainerName" --format "{{.ID}}")
if ($existing) {
Write-Info "Starting existing Azurite container '$ContainerName'..."
docker start $ContainerName | Out-Null
return $true
}

Write-Info "Launching Azurite in Docker as '$ContainerName' on ports 10000-10002..."
docker run -d -p 10000:10000 -p 10001:10001 -p 10002:10002 --name $ContainerName mcr.microsoft.com/azure-storage/azurite | Out-Null
Start-Sleep -Seconds 2
return $true
}
catch {
Write-Warning "Failed to start Azurite via Docker: $_"
return $false
}
}

try {
# Set required/optional environment variables for the sample
$env:DURABLE_TASK_SCHEDULER_CONNECTION_STRING = $SchedulerConnectionString
$env:DURABLETASK_STORAGE = $StorageConnectionString
$env:DURABLETASK_PAYLOAD_CONTAINER = $PayloadContainer

Write-Info "DURABLE_TASK_SCHEDULER_CONNECTION_STRING is set."
Write-Info "DURABLETASK_STORAGE = '$($env:DURABLETASK_STORAGE)'"
Write-Info "DURABLETASK_PAYLOAD_CONTAINER = '$($env:DURABLETASK_PAYLOAD_CONTAINER)'"

if ($StartAzurite) {
$started = Start-AzuriteDocker
if ($started) {
Write-Info "Azurite is running (Docker)."
}
}

$projectPath = Join-Path $PSScriptRoot "LargePayloadConsoleApp.csproj"
if (-not (Test-Path $projectPath)) {
throw "Project file not found at $projectPath"
}

Write-Info "Running sample..."
$argsList = @("run", "--project", $projectPath)
if ($VerboseLogging) { $argsList += @("-v", "detailed") }

& dotnet @argsList
}
catch {
Write-Error $_
exit 1
}

Loading
Loading