Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ff00cbc
Partial orchestration workitem completion support
YunchuWang Nov 25, 2025
7d432ca
add tests
YunchuWang Nov 29, 2025
b206ec3
Update src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
YunchuWang Dec 1, 2025
168e2f0
Update src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
YunchuWang Dec 1, 2025
95013b7
Update src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs
YunchuWang Dec 1, 2025
ffc7b7b
Merge branch 'main' into wangbill/autochunk
YunchuWang Dec 1, 2025
989f89d
feedback
YunchuWang Dec 1, 2025
87d7f95
update tests
YunchuWang Dec 1, 2025
5448636
test update
YunchuWang Dec 1, 2025
9ddc213
increase test timeout
YunchuWang Dec 1, 2025
a5b778c
save
YunchuWang Dec 1, 2025
4ffe575
make orchestration fail is single action oversized
YunchuWang Dec 3, 2025
c4b8341
advise
YunchuWang Dec 3, 2025
fa0d2c9
todo
YunchuWang Dec 3, 2025
f480c3f
Merge branch 'main' into wangbill/autochunk
YunchuWang Dec 3, 2025
6517fcd
Merge branch 'main' into wangbill/autochunk
YunchuWang Dec 3, 2025
e76982a
Update src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
YunchuWang Dec 3, 2025
53b2284
Update src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs
YunchuWang Dec 3, 2025
eb09564
feedback
YunchuWang Dec 4, 2025
d8f939a
Merge branch 'wangbill/autochunk' of https://github.com/microsoft/dur…
YunchuWang Dec 4, 2025
e3b86cc
remove still in first chunk
YunchuWang Dec 4, 2025
f73b5d4
Update src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs
YunchuWang Dec 4, 2025
27a40e1
Merge branch 'main' into wangbill/autochunk
YunchuWang Dec 4, 2025
6079088
feedback
YunchuWang Dec 5, 2025
e97b329
Merge branch 'main' into wangbill/autochunk
YunchuWang Dec 5, 2025
2ed8160
rename
YunchuWang Dec 5, 2025
b67e4a1
exclude baserequest from chunk filling
YunchuWang Dec 5, 2025
7b349df
Merge branch 'main' into wangbill/autochunk
YunchuWang Dec 5, 2025
395be15
feedback
YunchuWang Dec 5, 2025
ae33892
Merge branch 'wangbill/autochunk' of https://github.com/microsoft/dur…
YunchuWang Dec 5, 2025
fccdc13
Merge branch 'main' into wangbill/autochunk
YunchuWang Dec 5, 2025
deffc05
Update src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs
YunchuWang Dec 5, 2025
cd062a9
Merge branch 'wangbill/autochunk' of https://github.com/microsoft/dur…
YunchuWang Dec 5, 2025
3065a09
update proto
YunchuWang Dec 5, 2025
3d01238
Merge branch 'main' into wangbill/autochunk
YunchuWang Dec 5, 2025
0cc7b1d
Merge branch 'wangbill/autochunk' of https://github.com/microsoft/dur…
YunchuWang Dec 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/Grpc/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,13 @@ message OrchestratorResponse {

// Whether or not a history is required to complete the original OrchestratorRequest and none was provided.
bool requiresHistory = 7;

// True if this is a partial (chunked) completion. The backend must keep the work item open until the final chunk (isPartial=false).
bool isPartial = 8;

// Zero-based position of the current chunk within a chunked completion sequence.
// This field is omitted for non-chunked completions.
google.protobuf.Int32Value chunkIndex = 9;
}

message CreateInstanceRequest {
Expand Down
4 changes: 2 additions & 2 deletions src/Grpc/versions.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# The following files were downloaded from branch main at 2025-11-14 16:36:47 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/9f762f1301b91e3e7c736b9c5a29c2e09f2a850e/protos/orchestrator_service.proto
# The following files were downloaded from branch main at 2025-12-05 19:49:32 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/1774123378045dd87cd3307c50da4cdd4551a817/protos/orchestrator_service.proto
76 changes: 73 additions & 3 deletions src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Diagnostics;
using System.Globalization;
using DurableTask.Core;
using DurableTask.Core.Command;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Google.Protobuf.WellKnownTypes;
Expand All @@ -30,6 +31,29 @@ public class TaskHubGrpcServer : P.TaskHubSidecarService.TaskHubSidecarServiceBa

readonly ConcurrentDictionary<string, TaskCompletionSource<GrpcOrchestratorExecutionResult>> pendingOrchestratorTasks = new(StringComparer.OrdinalIgnoreCase);
readonly ConcurrentDictionary<string, TaskCompletionSource<ActivityExecutionResult>> pendingActivityTasks = new(StringComparer.OrdinalIgnoreCase);
readonly ConcurrentDictionary<string, PartialOrchestratorChunk> partialOrchestratorChunks = new(StringComparer.OrdinalIgnoreCase);

/// <summary>
/// Helper class to accumulate partial orchestrator chunks.
/// </summary>
sealed class PartialOrchestratorChunk
{
readonly object lockObject = new();

public TaskCompletionSource<GrpcOrchestratorExecutionResult> TaskCompletionSource { get; set; } = null!;
public List<OrchestratorAction> AccumulatedActions { get; } = new();

/// <summary>
/// Thread-safely adds actions to the accumulated actions list.
/// </summary>
public void AddActions(IEnumerable<OrchestratorAction> actions)
{
lock (this.lockObject)
{
this.AccumulatedActions.AddRange(actions);
}
}
}

readonly ILogger log;
readonly IOrchestrationService service;
Expand Down Expand Up @@ -191,7 +215,7 @@ async Task WaitForWorkItemClientConnection()
/// <returns>A create instance response.</returns>
public override async Task<P.CreateInstanceResponse> StartInstance(P.CreateInstanceRequest request, ServerCallContext context)
{
var instance = new OrchestrationInstance
OrchestrationInstance instance = new OrchestrationInstance
{
InstanceId = request.InstanceId ?? Guid.NewGuid().ToString("N"),
ExecutionId = Guid.NewGuid().ToString(),
Expand Down Expand Up @@ -535,6 +559,51 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state,
/// <returns>Returns an empty ack back to the remote SDK that we've received the completion.</returns>
public override Task<P.CompleteTaskResponse> CompleteOrchestratorTask(P.OrchestratorResponse request, ServerCallContext context)
{
if (request.IsPartial)
{
// This is a partial chunk - accumulate actions but don't complete yet
PartialOrchestratorChunk partialChunk = this.partialOrchestratorChunks.GetOrAdd(
request.InstanceId,
_ =>
{
// First chunk - get the TCS and initialize the partial chunk
if (!this.pendingOrchestratorTasks.TryGetValue(request.InstanceId, out TaskCompletionSource<GrpcOrchestratorExecutionResult>? tcs))
{
throw new RpcException(new Status(StatusCode.NotFound, $"Orchestration with instance ID '{request.InstanceId}' not found"));
}

return new PartialOrchestratorChunk
{
TaskCompletionSource = tcs,
};
});

// Accumulate actions from this chunk (thread-safe)
partialChunk.AddActions(request.Actions.Select(ProtobufUtils.ToOrchestratorAction));

return EmptyCompleteTaskResponse;
}

// This is the final chunk (or a single non-chunked response)
if (this.partialOrchestratorChunks.TryRemove(request.InstanceId, out PartialOrchestratorChunk? existingPartialChunk))
{
// We've been accumulating chunks - combine with final chunk (thread-safe)
existingPartialChunk.AddActions(request.Actions.Select(ProtobufUtils.ToOrchestratorAction));

GrpcOrchestratorExecutionResult res = new()
{
Actions = existingPartialChunk.AccumulatedActions,
CustomStatus = request.CustomStatus,
};

// Remove the TCS from pending tasks and complete it
this.pendingOrchestratorTasks.TryRemove(request.InstanceId, out _);
existingPartialChunk.TaskCompletionSource.TrySetResult(res);

return EmptyCompleteTaskResponse;
}

// Single non-chunked response (no partial chunks)
if (!this.pendingOrchestratorTasks.TryRemove(
request.InstanceId,
out TaskCompletionSource<GrpcOrchestratorExecutionResult>? tcs))
Expand Down Expand Up @@ -691,7 +760,7 @@ async Task<GrpcOrchestratorExecutionResult> ITaskExecutor.ExecuteOrchestrator(
IEnumerable<HistoryEvent> pastEvents,
IEnumerable<HistoryEvent> newEvents)
{
var executionStartedEvent = pastEvents.OfType<ExecutionStartedEvent>().FirstOrDefault();
ExecutionStartedEvent? executionStartedEvent = pastEvents.OfType<ExecutionStartedEvent>().FirstOrDefault();

P.OrchestrationTraceContext? orchestrationTraceContext = executionStartedEvent?.ParentTraceContext?.SpanId is not null
? new P.OrchestrationTraceContext
Expand All @@ -708,7 +777,7 @@ async Task<GrpcOrchestratorExecutionResult> ITaskExecutor.ExecuteOrchestrator(

try
{
var orkRequest = new P.OrchestratorRequest
P.OrchestratorRequest orkRequest = new P.OrchestratorRequest
{
InstanceId = instance.InstanceId,
ExecutionId = instance.ExecutionId,
Expand Down Expand Up @@ -836,6 +905,7 @@ TaskCompletionSource<GrpcOrchestratorExecutionResult> CreateTaskCompletionSource
void RemoveOrchestratorTaskCompletionSource(string instanceId)
{
this.pendingOrchestratorTasks.TryRemove(instanceId, out _);
this.partialOrchestratorChunks.TryRemove(instanceId, out _);
}

TaskCompletionSource<ActivityExecutionResult> CreateTaskCompletionSourceForActivity(string instanceId, int taskId)
Expand Down
141 changes: 140 additions & 1 deletion src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
// Licensed under the MIT License.

using System.Diagnostics;
using System.Linq;
using System.Text;
using DurableTask.Core;
using DurableTask.Core.Entities;
using DurableTask.Core.Entities.OperationFormat;
using DurableTask.Core.History;
using Google.Protobuf;
using Microsoft.DurableTask.Abstractions;
using Microsoft.DurableTask.Entities;
using Microsoft.DurableTask.Tracing;
Expand Down Expand Up @@ -755,7 +757,10 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync(
response.Actions.Count,
GetActionsListForLogging(response.Actions));

await this.client.CompleteOrchestratorTaskAsync(response, cancellationToken: cancellationToken);
await this.CompleteOrchestratorTaskWithChunkingAsync(
response,
this.worker.grpcOptions.CompleteOrchestrationWorkItemChunkSizeInBytes,
cancellationToken);
}

async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken, CancellationToken cancellation)
Expand Down Expand Up @@ -914,5 +919,139 @@ async Task OnRunEntityBatchAsync(

await this.client.CompleteEntityTaskAsync(response, cancellationToken: cancellation);
}

/// <summary>
/// Completes an orchestration task with automatic chunking if the response exceeds the maximum size.
/// </summary>
/// <param name="response">The orchestrator response to send.</param>
/// <param name="maxChunkBytes">The maximum size in bytes for each chunk.</param>
/// <param name="cancellationToken">The cancellation token.</param>
async Task CompleteOrchestratorTaskWithChunkingAsync(
P.OrchestratorResponse response,
int maxChunkBytes,
CancellationToken cancellationToken)
{
// Validate that no single action exceeds the maximum chunk size
static P.TaskFailureDetails? ValidateActionsSize(IEnumerable<P.OrchestratorAction> actions, int maxChunkBytes)
{
foreach (P.OrchestratorAction action in actions)
{
int actionSize = action.CalculateSize();
if (actionSize > maxChunkBytes)
{
// TODO: large payload doc is not available yet on aka.ms, add doc link to below error message
string errorMessage = $"A single orchestrator action of type {action.OrchestratorActionTypeCase} with id {action.Id} " +
$"exceeds the {maxChunkBytes / 1024.0 / 1024.0:F2}MB limit: {actionSize / 1024.0 / 1024.0:F2}MB. " +
"Enable large-payload externalization to Azure Blob Storage to support oversized actions.";
return new P.TaskFailureDetails
{
ErrorType = typeof(InvalidOperationException).FullName,
ErrorMessage = errorMessage,
IsNonRetriable = true,
};
}
}

return null;
}

P.TaskFailureDetails? validationFailure = ValidateActionsSize(response.Actions, maxChunkBytes);
if (validationFailure != null)
{
// Complete the orchestration with a failed status and failure details
P.OrchestratorResponse failureResponse = new()
{
InstanceId = response.InstanceId,
CompletionToken = response.CompletionToken,
OrchestrationTraceContext = response.OrchestrationTraceContext,
Actions =
{
new P.OrchestratorAction
{
CompleteOrchestration = new P.CompleteOrchestrationAction
{
OrchestrationStatus = P.OrchestrationStatus.Failed,
FailureDetails = validationFailure,
},
},
},
};

await this.client.CompleteOrchestratorTaskAsync(failureResponse, cancellationToken: cancellationToken);
return;
}

// Helper to add an action to the current chunk if it fits
static bool TryAddAction(
Google.Protobuf.Collections.RepeatedField<P.OrchestratorAction> dest,
P.OrchestratorAction action,
ref int currentSize,
int maxChunkBytes)
{
int actionSize = action.CalculateSize();
if (currentSize + actionSize > maxChunkBytes)
{
return false;
}

dest.Add(action);
currentSize += actionSize;
return true;
}

// Check if the entire response fits in one chunk
int totalSize = response.CalculateSize();
if (totalSize <= maxChunkBytes)
{
// Response fits in one chunk, send it directly (isPartial defaults to false)
await this.client.CompleteOrchestratorTaskAsync(response, cancellationToken: cancellationToken);
return;
}

// Response is too large, split into multiple chunks
int actionsCompletedSoFar = 0, chunkIndex = 0;
List<P.OrchestratorAction> allActions = response.Actions.ToList();
bool isPartial = true;

while (isPartial)
{
P.OrchestratorResponse chunkedResponse = new()
{
InstanceId = response.InstanceId,
CustomStatus = response.CustomStatus,
CompletionToken = response.CompletionToken,
RequiresHistory = response.RequiresHistory,
NumEventsProcessed = 0,
ChunkIndex = chunkIndex,
};

int chunkPayloadSize = 0;

// Fill the chunk with actions until we reach the size limit
while (actionsCompletedSoFar < allActions.Count &&
TryAddAction(chunkedResponse.Actions, allActions[actionsCompletedSoFar], ref chunkPayloadSize, maxChunkBytes))
{
actionsCompletedSoFar++;
}

// Determine if this is a partial chunk (more actions remaining)
isPartial = actionsCompletedSoFar < allActions.Count;
chunkedResponse.IsPartial = isPartial;

if (chunkIndex == 0)
{
// The first chunk preserves the original response's NumEventsProcessed value (null)
// When this is set to null, backend by default handles all the messages in the workitem.
// For subsequent chunks, we set it to 0 since all messages are already handled in first chunk.
chunkedResponse.NumEventsProcessed = null;
chunkedResponse.OrchestrationTraceContext = response.OrchestrationTraceContext;
}

chunkIndex++;

// Send the chunk
await this.client.CompleteOrchestratorTaskAsync(chunkedResponse, cancellationToken: cancellationToken);
}
}
}
}
44 changes: 44 additions & 0 deletions src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ namespace Microsoft.DurableTask.Worker.Grpc;
/// </summary>
public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions
{
/// <summary>
/// The minimum allowed size (in bytes) for complete orchestration work item chunks.
/// </summary>
public const int MinCompleteOrchestrationWorkItemChunkSizeInBytes = 1 * 1024 * 1024; // 1 MB

/// <summary>
/// The maximum allowed size (in bytes) for complete orchestration work item chunks.
/// </summary>
public const int MaxCompleteOrchestrationWorkItemChunkSizeInBytes = 4_089_446; // 3.9 MB

int completeOrchestrationWorkItemChunkSizeInBytes = MaxCompleteOrchestrationWorkItemChunkSizeInBytes;

/// <summary>
/// Gets or sets the address of the gRPC endpoint to connect to. Default is localhost:4001.
/// </summary>
Expand All @@ -31,6 +43,38 @@ public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions
/// </summary>
public HashSet<P.WorkerCapability> Capabilities { get; } = new() { P.WorkerCapability.HistoryStreaming };

/// <summary>
/// Gets or sets the maximum size of all actions in a complete orchestration work item chunk.
/// The default value is 3.9MB. We leave some headroom to account for request size overhead.
/// </summary>
/// <remarks>
/// This value is used to limit the size of the complete orchestration work item request.
/// If the response exceeds this limit, it will be automatically split into multiple chunks of maximum size CompleteOrchestrationWorkItemChunkSizeInBytes
/// </remarks>
/// <exception cref="ArgumentOutOfRangeException">
/// Thrown when the value is less than 1 MB or greater than 3.9 MB.
/// </exception>
public int CompleteOrchestrationWorkItemChunkSizeInBytes
{
get => this.completeOrchestrationWorkItemChunkSizeInBytes;
set
{
if (value < MinCompleteOrchestrationWorkItemChunkSizeInBytes ||
value > MaxCompleteOrchestrationWorkItemChunkSizeInBytes)
{
string message = $"{nameof(CompleteOrchestrationWorkItemChunkSizeInBytes)} must be between " +
$"{MinCompleteOrchestrationWorkItemChunkSizeInBytes} bytes (1 MB) and " +
$"{MaxCompleteOrchestrationWorkItemChunkSizeInBytes} bytes (3.9 MB), inclusive.";
throw new ArgumentOutOfRangeException(
nameof(this.CompleteOrchestrationWorkItemChunkSizeInBytes),
value,
message);
}

this.completeOrchestrationWorkItemChunkSizeInBytes = value;
}
}

/// <summary>
/// Gets the internal protocol options. These are used to control backend-dependent features.
/// </summary>
Expand Down
Loading
Loading