Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
5c80501
save
YunchuWang Aug 11, 2025
560ecab
tests
YunchuWang Aug 13, 2025
0f3e624
add sample
YunchuWang Aug 13, 2025
d2b514f
precise to second
YunchuWang Aug 13, 2025
a565bb3
Merge branch 'main' into wangbill/large-payload
YunchuWang Aug 13, 2025
4ec3f0c
Merge branch 'wangbill/large-payload' of https://github.com/microsoft…
YunchuWang Aug 13, 2025
8575113
Merge branch 'main' into wangbill/large-payload
YunchuWang Aug 19, 2025
9d2bfa3
some fb
YunchuWang Aug 27, 2025
ef0961a
rename dts to blob
YunchuWang Aug 28, 2025
6387ad2
some fb
YunchuWang Aug 28, 2025
b4527cc
enabled always
YunchuWang Aug 28, 2025
9168dc9
split package
YunchuWang Sep 1, 2025
b5bedd0
update sample
YunchuWang Sep 1, 2025
6cebf0e
remove enablestorage
YunchuWang Sep 1, 2025
ce51c0d
comment
YunchuWang Sep 1, 2025
ecf89de
testname update
YunchuWang Sep 1, 2025
3bd6c9a
add entity sample for largepayload
YunchuWang Sep 2, 2025
20d3e8f
some fb
YunchuWang Sep 6, 2025
e653680
fix
YunchuWang Sep 6, 2025
499c281
fb
YunchuWang Sep 6, 2025
e841e5e
fb
YunchuWang Sep 6, 2025
f182442
fb
YunchuWang Sep 6, 2025
e484f42
fb
YunchuWang Sep 6, 2025
fb6d3fb
test
YunchuWang Sep 6, 2025
f734b1b
enable compression
YunchuWang Sep 8, 2025
50a210e
update sample
YunchuWang Sep 8, 2025
833406f
add gzip encoding header to detect for decompression
YunchuWang Sep 9, 2025
5ebe0b0
initial add async versions ser/deser
YunchuWang Sep 9, 2025
db91bf6
retry on blob upload/download
YunchuWang Sep 11, 2025
6a51f62
Merge branch 'main' into wangbill/large-payload
YunchuWang Sep 11, 2025
c1a87b9
update calllers
YunchuWang Sep 10, 2025
3244c10
Merge branch 'wangbill/large-payload' of https://github.com/microsoft…
YunchuWang Sep 11, 2025
4d4aeba
continue updating async
YunchuWang Sep 11, 2025
62dda3e
more update
YunchuWang Sep 13, 2025
4f0e5e3
update all async
YunchuWang Sep 13, 2025
af7a4c8
enhance
YunchuWang Sep 14, 2025
281fed2
disallow sync calls when largepayload enabled
YunchuWang Sep 14, 2025
52ec85c
more
YunchuWang Sep 14, 2025
a5549a3
more
YunchuWang Sep 14, 2025
738892b
refactor
YunchuWang Sep 14, 2025
1064329
more
YunchuWang Sep 14, 2025
cd05d6a
more
YunchuWang Sep 14, 2025
b3cbb43
more
YunchuWang Sep 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<!-- Azure.* Packages -->
<ItemGroup>
<PackageVersion Include="Azure.Identity" Version="1.13.1" />
<PackageVersion Include="Azure.Storage.Blobs" Version="12.20.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker" Version="1.21.0" />
</ItemGroup>

Expand All @@ -33,6 +34,11 @@
</ItemGroup>

<!-- Grpc / Protobuf Packages -->

<!-- BCL Packages -->
<ItemGroup>
<PackageVersion Include="System.ComponentModel.Annotations" Version="5.0.0" />
</ItemGroup>
<ItemGroup>
<PackageVersion Include="Google.Protobuf" Version="3.21.12" />
<PackageVersion Include="Grpc.Core" Version="2.46.5" />
Expand Down
23 changes: 23 additions & 0 deletions samples/LargePayloadConsoleApp/LargePayloadConsoleApp.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" />
<PackageReference Include="Azure.Identity" />
<PackageReference Include="Grpc.Net.Client" />
</ItemGroup>

<ItemGroup>
<!-- Using p2p references so we can show latest changes in samples. -->
<ProjectReference Include="$(SrcRoot)Client/AzureManaged/Client.AzureManaged.csproj" />
<ProjectReference Include="$(SrcRoot)Worker/AzureManaged/Worker.AzureManaged.csproj" />
</ItemGroup>

</Project>


106 changes: 106 additions & 0 deletions samples/LargePayloadConsoleApp/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Client.AzureManaged;
using Microsoft.DurableTask.Converters;
using Microsoft.DurableTask.Worker;
using Microsoft.DurableTask.Worker.AzureManaged;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Configuration;

// Demonstrates Large Payload Externalization using Azure Blob Storage.
// This sample uses Azurite/emulator by default via UseDevelopmentStorage=true.

HostApplicationBuilder builder = Host.CreateApplicationBuilder(args);

// Connection string for Durable Task Scheduler
string schedulerConnectionString = builder.Configuration.GetValue<string>("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? throw new InvalidOperationException("Missing required configuration 'DURABLE_TASK_SCHEDULER_CONNECTION_STRING'");

// Configure Durable Task client with Durable Task Scheduler and externalized payloads
builder.Services.AddDurableTaskClient(b =>
{
b.UseDurableTaskScheduler(schedulerConnectionString);
b.UseExternalizedPayloads(opts =>
{
opts.Enabled = true;
// Keep threshold small to force externalization for demo purposes
opts.ExternalizeThresholdBytes = 1024; // 1KB
// Default to local Azurite/emulator. Override via environment or appsettings if desired.
opts.ConnectionString = Environment.GetEnvironmentVariable("DURABLETASK_STORAGE") ?? "UseDevelopmentStorage=true";
opts.ContainerName = Environment.GetEnvironmentVariable("DURABLETASK_PAYLOAD_CONTAINER") ?? "durabletask-payloads";
});
});

// Configure Durable Task worker with tasks and externalized payloads
builder.Services.AddDurableTaskWorker(b =>
{
b.UseDurableTaskScheduler(schedulerConnectionString);
b.AddTasks(tasks =>
{
// Orchestrator: call activity first, return its output (should equal original input)
tasks.AddOrchestratorFunc<string, string>("LargeInputEcho", async (ctx, input) =>
{
string echoed = await ctx.CallActivityAsync<string>("Echo", input);
return echoed;
});

// Activity: validate it receives raw input (not token) and return it
tasks.AddActivityFunc<string, string>("Echo", (ctx, value) =>
{
if (value is null)
{
return string.Empty;
}

// If we ever see a token in the activity, externalization is not being resolved correctly.
if (value.StartsWith("dts:v1:", StringComparison.Ordinal))
{
throw new InvalidOperationException("Activity received a payload token instead of raw input.");
}

return value;
});
});
b.UseExternalizedPayloads(opts =>
{
opts.Enabled = true;
opts.ExternalizeThresholdBytes = 1024; // mirror client
opts.ConnectionString = Environment.GetEnvironmentVariable("DURABLETASK_STORAGE") ?? "UseDevelopmentStorage=true";
opts.ContainerName = Environment.GetEnvironmentVariable("DURABLETASK_PAYLOAD_CONTAINER") ?? "durabletask-payloads";
});
});

IHost host = builder.Build();
await host.StartAsync();

await using DurableTaskClient client = host.Services.GetRequiredService<DurableTaskClient>();

// Option A: Directly pass an oversized input to orchestration to trigger externalization
string largeInput = new string('B', 1024 * 1024); // 1MB
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("LargeInputEcho", largeInput);
Console.WriteLine($"Started orchestration with direct large input. Instance: {instanceId}");


using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(120));
OrchestrationMetadata result = await client.WaitForInstanceCompletionAsync(
instanceId,
getInputsAndOutputs: true,
cts.Token);

Console.WriteLine($"RuntimeStatus: {result.RuntimeStatus}");
Console.WriteLine($"UsesExternalStorage (result converter): {result.DataConverter?.UsesExternalStorage ?? false}");
string deserializedInput = result.ReadInputAs<string>() ?? string.Empty;
string deserializedOutput = result.ReadOutputAs<string>() ?? string.Empty;

Console.WriteLine($"SerializedInput: {result.SerializedInput}");
Console.WriteLine($"SerializedOutput: {result.SerializedOutput}");
Console.WriteLine($"Deserialized input equals original: {deserializedInput == largeInput}");
Console.WriteLine($"Deserialized output equals original: {deserializedOutput == largeInput}");
Console.WriteLine($"Deserialized input length: {deserializedInput.Length}");



29 changes: 29 additions & 0 deletions samples/LargePayloadConsoleApp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Large Payload Externalization Sample

This sample demonstrates configuring Durable Task to externalize large payloads to Azure Blob Storage using `UseExternalizedPayloads` on both client and worker, connecting via Durable Task Scheduler (no local sidecar).

- Defaults to Azurite/Storage Emulator via `UseDevelopmentStorage=true`.
- Threshold is set to 1KB for demo, so even modest inputs are externalized.

## Prerequisites

- A Durable Task Scheduler connection string (e.g., from Azure portal) in `DURABLE_TASK_SCHEDULER_CONNECTION_STRING`.
- Optional: Run Azurite (if not using real Azure Storage) for payload storage tokens.

## Configure

Environment variables (optional):

- `DURABLETASK_STORAGE`: Azure Storage connection string. Defaults to `UseDevelopmentStorage=true`.
- `DURABLETASK_PAYLOAD_CONTAINER`: Blob container name. Defaults to `durabletask-payloads`.

## Run

```bash
# from repo root
dotnet run --project samples/LargePayloadConsoleApp/LargePayloadConsoleApp.csproj
```

The app starts an orchestration with a 1MB input, which is externalized by the client and resolved by the worker. The console shows a token-like serialized input and a deserialized input length.


81 changes: 81 additions & 0 deletions samples/LargePayloadConsoleApp/run.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
Param(
[Parameter(Mandatory = $true)]
[string]$SchedulerConnectionString,

[string]$StorageConnectionString = "UseDevelopmentStorage=true",

[string]$PayloadContainer = "durabletask-payloads",

[switch]$StartAzurite,

[switch]$VerboseLogging
)

$ErrorActionPreference = "Stop"

function Write-Info($msg) {
Write-Host "[info] $msg"
}

function Start-AzuriteDocker {
param(
[string]$ContainerName = "durabletask-azurite"
)

if (-not (Get-Command docker -ErrorAction SilentlyContinue)) {
Write-Info "Docker not found; skipping Azurite startup."
return $false
}

try {
$existing = (docker ps -a --filter "name=$ContainerName" --format "{{.ID}}")
if ($existing) {
Write-Info "Starting existing Azurite container '$ContainerName'..."
docker start $ContainerName | Out-Null
return $true
}

Write-Info "Launching Azurite in Docker as '$ContainerName' on ports 10000-10002..."
docker run -d -p 10000:10000 -p 10001:10001 -p 10002:10002 --name $ContainerName mcr.microsoft.com/azure-storage/azurite | Out-Null
Start-Sleep -Seconds 2
return $true
}
catch {
Write-Warning "Failed to start Azurite via Docker: $_"
return $false
}
}

try {
# Set required/optional environment variables for the sample
$env:DURABLE_TASK_SCHEDULER_CONNECTION_STRING = $SchedulerConnectionString
$env:DURABLETASK_STORAGE = $StorageConnectionString
$env:DURABLETASK_PAYLOAD_CONTAINER = $PayloadContainer

Write-Info "DURABLE_TASK_SCHEDULER_CONNECTION_STRING is set."
Write-Info "DURABLETASK_STORAGE = '$($env:DURABLETASK_STORAGE)'"
Write-Info "DURABLETASK_PAYLOAD_CONTAINER = '$($env:DURABLETASK_PAYLOAD_CONTAINER)'"

if ($StartAzurite) {
$started = Start-AzuriteDocker
if ($started) {
Write-Info "Azurite is running (Docker)."
}
}

$projectPath = Join-Path $PSScriptRoot "LargePayloadConsoleApp.csproj"
if (-not (Test-Path $projectPath)) {
throw "Project file not found at $projectPath"
}

Write-Info "Running sample..."
$argsList = @("run", "--project", $projectPath)
if ($VerboseLogging) { $argsList += @("-v", "detailed") }

& dotnet @argsList
}
catch {
Write-Error $_
exit 1
}

1 change: 1 addition & 0 deletions src/Abstractions/Abstractions.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
<PackageReference Include="System.Text.Json" />
<PackageReference Include="System.Collections.Immutable"/>
<PackageReference Include="Azure.Storage.Blobs" />
</ItemGroup>

<ItemGroup>
Expand Down
98 changes: 98 additions & 0 deletions src/Abstractions/Converters/BlobPayloadStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Globalization;
using System.IO.Compression;
using System.Text;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;

namespace Microsoft.DurableTask.Converters;

/// <summary>
/// Azure Blob Storage implementation of <see cref="IPayloadStore"/>.
/// Stores payloads as blobs and returns opaque tokens in the form "dts:v1:&lt;container&gt;:&lt;blobName&gt;".
/// </summary>
public sealed class BlobPayloadStore : IPayloadStore
{
readonly BlobContainerClient containerClient;
readonly LargePayloadStorageOptions options;

/// <summary>
/// Initializes a new instance of the <see cref="BlobPayloadStore"/> class.
/// </summary>
/// <param name="options">The options for the blob payload store.</param>
/// <exception cref="ArgumentNullException">Thrown when <paramref name="options"/> is null.</exception>
/// <exception cref="ArgumentException">Thrown when <paramref name="options.ConnectionString"/> is null or empty.</exception>
public BlobPayloadStore(LargePayloadStorageOptions options)
{
this.options = options ?? throw new ArgumentNullException(nameof(options));

Check.NotNullOrEmpty(options.ConnectionString, nameof(options.ConnectionString));
Check.NotNullOrEmpty(options.ContainerName, nameof(options.ContainerName));

BlobServiceClient serviceClient = new(options.ConnectionString);
this.containerClient = serviceClient.GetBlobContainerClient(options.ContainerName);
}

/// <inheritdoc/>
public async Task<string> UploadAsync(string contentType, ReadOnlyMemory<byte> payloadBytes, CancellationToken cancellationToken)
{
// Ensure container exists
await this.containerClient.CreateIfNotExistsAsync(PublicAccessType.None, cancellationToken: cancellationToken).ConfigureAwait(false);

// One blob per payload using GUID-based name for uniqueness
string timestamp = DateTimeOffset.UtcNow.ToString("yyyy/MM/dd/HH/mm/ss", CultureInfo.InvariantCulture);
string blobName = $"{timestamp}/{Guid.NewGuid():N}";
BlobClient blob = this.containerClient.GetBlobClient(blobName);

byte[] payloadBuffer = payloadBytes.ToArray();

// Compress and upload streaming
using Stream blobStream = await blob.OpenWriteAsync(overwrite: true, cancellationToken: cancellationToken).ConfigureAwait(false);
using GZipStream compressedBlobStream = new(blobStream, CompressionLevel.Optimal, leaveOpen: true);
using MemoryStream payloadStream = new(payloadBuffer, writable: false);

await payloadStream.CopyToAsync(compressedBlobStream, bufferSize: 81920, cancellationToken).ConfigureAwait(false);
await compressedBlobStream.FlushAsync(cancellationToken).ConfigureAwait(false);
await blobStream.FlushAsync(cancellationToken).ConfigureAwait(false);

return EncodeToken(this.containerClient.Name, blobName);
}

/// <inheritdoc/>
public async Task<string> DownloadAsync(string token, CancellationToken cancellationToken)
{
(string container, string name) = DecodeToken(token);
if (!string.Equals(container, this.containerClient.Name, StringComparison.Ordinal))
{
throw new ArgumentException("Token container does not match configured container.", nameof(token));
}

BlobClient blob = this.containerClient.GetBlobClient(name);
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
using GZipStream decompressedBlobStream = new GZipStream(result.Content, CompressionMode.Decompress);
using StreamReader reader = new(decompressedBlobStream, Encoding.UTF8);
return await reader.ReadToEndAsync();
}

static string EncodeToken(string container, string name) => $"dts:v1:{container}:{name}";

static (string Container, string Name) DecodeToken(string token)
{
if (!token.StartsWith("dts:v1:", StringComparison.Ordinal))
{
throw new ArgumentException("Invalid external payload token.", nameof(token));
}

string rest = token.Substring("dts:v1:".Length);
int sep = rest.IndexOf(':');
if (sep <= 0 || sep >= rest.Length - 1)
{
throw new ArgumentException("Invalid external payload token format.", nameof(token));
}

return (rest.Substring(0, sep), rest.Substring(sep + 1));
}
}
27 changes: 27 additions & 0 deletions src/Abstractions/Converters/IPayloadStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Microsoft.DurableTask.Converters;

/// <summary>
/// Abstraction for storing and retrieving large payloads out-of-band.
/// </summary>
public interface IPayloadStore
{
/// <summary>
/// Uploads a payload and returns an opaque reference token that can be embedded in orchestration messages.
/// </summary>
/// <param name="contentType">The content type of the payload (e.g., application/json).</param>
/// <param name="payloadBytes">The payload bytes.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Opaque reference token.</returns>
Task<string> UploadAsync(string contentType, ReadOnlyMemory<byte> payloadBytes, CancellationToken cancellationToken);

/// <summary>
/// Downloads the payload referenced by the token.
/// </summary>
/// <param name="token">The opaque reference token.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Payload string.</returns>
Task<string> DownloadAsync(string token, CancellationToken cancellationToken);
}
Loading
Loading