From ff00cbc25d543dc91fc98875c718bdba7f2fa683 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Tue, 25 Nov 2025 12:55:59 -0800 Subject: [PATCH 01/24] Partial orchestration workitem completion support --- src/Grpc/orchestrator_service.proto | 3 + src/Grpc/versions.txt | 2 +- .../Grpc/GrpcDurableTaskWorker.Processor.cs | 105 +++++++++++++++++- .../Grpc/GrpcDurableTaskWorkerOptions.cs | 10 ++ 4 files changed, 118 insertions(+), 2 deletions(-) diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index 1a86c0a27..fb3c86820 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -356,6 +356,9 @@ message OrchestratorResponse { // Whether or not a history is required to complete the original OrchestratorRequest and none was provided. bool requiresHistory = 7; + + // True if this is a partial (chunked) completion. The backend must keep the work item open until the final chunk (isPartial=false). + bool isPartial = 8; } message CreateInstanceRequest { diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index 69b075cd3..52f3e8688 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-11-14 16:36:47 UTC +# The following files were downloaded from branch main at 2025-11-25 20:53:40 UTC https://raw.githubusercontent.com/microsoft/durabletask-protobuf/9f762f1301b91e3e7c736b9c5a29c2e09f2a850e/protos/orchestrator_service.proto diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index ffd38cf1f..f04cb5232 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -2,11 +2,13 @@ // Licensed under the MIT License. using System.Diagnostics; +using System.Linq; using System.Text; using DurableTask.Core; using DurableTask.Core.Entities; using DurableTask.Core.Entities.OperationFormat; using DurableTask.Core.History; +using Google.Protobuf; using Microsoft.DurableTask.Abstractions; using Microsoft.DurableTask.Entities; using Microsoft.DurableTask.Tracing; @@ -755,7 +757,12 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync( response.Actions.Count, GetActionsListForLogging(response.Actions)); - await this.client.CompleteOrchestratorTaskAsync(response, cancellationToken: cancellationToken); + // Auto-chunk the response if it exceeds the maximum size + int maxChunkBytes = this.worker.grpcOptions.MaxCompleteOrchestrationWorkItemSizePerChunk; + await this.CompleteOrchestratorTaskWithChunkingAsync( + response, + maxChunkBytes, + cancellationToken); } async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken, CancellationToken cancellation) @@ -914,5 +921,101 @@ async Task OnRunEntityBatchAsync( await this.client.CompleteEntityTaskAsync(response, cancellationToken: cancellation); } + + /// + /// Completes an orchestration task with automatic chunking if the response exceeds the maximum size. + /// + /// The orchestrator response to send. + /// The maximum size in bytes for each chunk. + /// The cancellation token. + async Task CompleteOrchestratorTaskWithChunkingAsync( + P.OrchestratorResponse response, + int maxChunkBytes, + CancellationToken cancellationToken) + { + // Validate that no single action exceeds the maximum chunk size + static void ValidateActionsSize(IEnumerable actions, int maxChunkBytes) + { + foreach (P.OrchestratorAction action in actions) + { + int actionSize = action.CalculateSize(); + if (actionSize > maxChunkBytes) + { + throw new InvalidOperationException( + $"A single orchestrator action of type {action.OrchestratorActionTypeCase} with id {action.Id} " + + $"exceeds the {maxChunkBytes / 1024 / 1024}MB limit: {actionSize / 1024 / 1024}MB"); + } + } + } + + ValidateActionsSize(response.Actions, maxChunkBytes); + + // Helper to add an action to the current chunk if it fits + static bool TryAddAction( + Google.Protobuf.Collections.RepeatedField dest, + P.OrchestratorAction action, + ref int currentSize, + int maxChunkBytes) + { + int actionSize = action.CalculateSize(); + if (currentSize + actionSize > maxChunkBytes) + { + return false; + } + + dest.Add(action); + currentSize += actionSize; + return true; + } + + // Check if the entire response fits in one chunk + int totalSize = response.CalculateSize(); + if (totalSize <= maxChunkBytes) + { + // Response fits in one chunk, send it directly (isPartial defaults to false) + await this.client.CompleteOrchestratorTaskAsync(response, cancellationToken: cancellationToken); + return; + } + + // Response is too large, split into multiple chunks + int actionsCompletedSoFar = 0; + bool isFirstChunk = true; + List allActions = response.Actions.ToList(); + + while (actionsCompletedSoFar < allActions.Count) + { + P.OrchestratorResponse chunkedResponse = new() + { + InstanceId = response.InstanceId, + CustomStatus = response.CustomStatus, + CompletionToken = response.CompletionToken, + OrchestrationTraceContext = actionsCompletedSoFar == 0 ? response.OrchestrationTraceContext : null, // Only include trace context in first chunk + RequiresHistory = response.RequiresHistory, + }; + + int chunkSize = chunkedResponse.CalculateSize(); + + // Fill the chunk with actions until we reach the size limit + while (actionsCompletedSoFar < allActions.Count && + TryAddAction(chunkedResponse.Actions, allActions[actionsCompletedSoFar], ref chunkSize, maxChunkBytes)) + { + actionsCompletedSoFar++; + } + + // Determine if this is a partial chunk (more actions remaining) + bool isPartial = actionsCompletedSoFar < allActions.Count; + chunkedResponse.IsPartial = isPartial; + + if (isFirstChunk) + { + isFirstChunk = false; + } else { + chunkedResponse.NumEventsProcessed = -1; + } + + // Send the chunk + await this.client.CompleteOrchestratorTaskAsync(chunkedResponse, cancellationToken: cancellationToken); + } + } } } diff --git a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs index 78d520878..0f8b85886 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs @@ -31,6 +31,16 @@ public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions /// public HashSet Capabilities { get; } = new() { P.WorkerCapability.HistoryStreaming }; + /// + /// Gets or sets the maximum size of all actions in a complete orchestration work item chunk. + /// The default value is 3.9MB. We leave some headroom to account for request size overhead. + /// + /// + /// This value is used to limit the size of the complete orchestration work item chunk request. + /// If the response exceeds this limit, it will be automatically split into multiple chunks. + /// + public int MaxCompleteOrchestrationWorkItemSizePerChunk { get; set; } = 4089446; // 3.9MB + /// /// Gets the internal protocol options. These are used to control backend-dependent features. /// From 7d432cae9e0aa452064b12702f87b5be7a30de40 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Fri, 28 Nov 2025 23:09:45 -0800 Subject: [PATCH 02/24] add tests --- .../CacheClearingOrchestratorV2.cs | 40 +++++ .../Sidecar/Grpc/TaskHubGrpcServer.cs | 70 +++++++- .../Grpc/GrpcDurableTaskWorker.Processor.cs | 3 +- test/Grpc.IntegrationTests/AutochunkTests.cs | 157 ++++++++++++++++++ 4 files changed, 262 insertions(+), 8 deletions(-) create mode 100644 samples/ScheduleWebApp/Orchestrations/CacheClearingOrchestratorV2.cs create mode 100644 test/Grpc.IntegrationTests/AutochunkTests.cs diff --git a/samples/ScheduleWebApp/Orchestrations/CacheClearingOrchestratorV2.cs b/samples/ScheduleWebApp/Orchestrations/CacheClearingOrchestratorV2.cs new file mode 100644 index 000000000..c10687816 --- /dev/null +++ b/samples/ScheduleWebApp/Orchestrations/CacheClearingOrchestratorV2.cs @@ -0,0 +1,40 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask; +using ScheduleWebApp.Activities; + +namespace ScheduleWebApp.Orchestrations; + +public class CacheClearingOrchestratorV2 : TaskOrchestrator +{ + public override async Task RunAsync(TaskOrchestrationContext context, string scheduleId) + { + ILogger logger = context.CreateReplaySafeLogger(nameof(CacheClearingOrchestratorV2)); + try + { + logger.LogInformation("Starting CacheClearingOrchestration for schedule ID: {ScheduleId}", scheduleId); + + TaskOptions options = new TaskOptions(tags: new Dictionary + { + { "scheduleId", scheduleId } + }); + + // Schedule all activities first to ensure deterministic ordering + Task[] tasks = Enumerable.Range(0, 100) + .Select(i => context.CallActivityAsync(nameof(CacheClearingActivity), new string('A', 4 * 1024), options)) + .ToArray(); + + await Task.WhenAll(tasks); + + logger.LogInformation("CacheClearingOrchestration completed for schedule ID: {ScheduleId}", scheduleId); + + return "ok"; + } + catch (Exception ex) + { + logger.LogError(ex, "Error in CacheClearingOrchestration for schedule ID: {ScheduleId}", scheduleId); + throw; + } + } +} \ No newline at end of file diff --git a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs index b25177ae9..e0bdfe542 100644 --- a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs +++ b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs @@ -5,6 +5,7 @@ using System.Diagnostics; using System.Globalization; using DurableTask.Core; +using DurableTask.Core.Command; using DurableTask.Core.History; using DurableTask.Core.Query; using Google.Protobuf.WellKnownTypes; @@ -30,6 +31,16 @@ public class TaskHubGrpcServer : P.TaskHubSidecarService.TaskHubSidecarServiceBa readonly ConcurrentDictionary> pendingOrchestratorTasks = new(StringComparer.OrdinalIgnoreCase); readonly ConcurrentDictionary> pendingActivityTasks = new(StringComparer.OrdinalIgnoreCase); + readonly ConcurrentDictionary partialOrchestratorChunks = new(StringComparer.OrdinalIgnoreCase); + + /// + /// Helper class to accumulate partial orchestrator chunks. + /// + sealed class PartialOrchestratorChunk + { + public TaskCompletionSource TaskCompletionSource { get; set; } = null!; + public List AccumulatedActions { get; } = new(); + } readonly ILogger log; readonly IOrchestrationService service; @@ -191,7 +202,7 @@ async Task WaitForWorkItemClientConnection() /// A create instance response. public override async Task StartInstance(P.CreateInstanceRequest request, ServerCallContext context) { - var instance = new OrchestrationInstance + OrchestrationInstance instance = new OrchestrationInstance { InstanceId = request.InstanceId ?? Guid.NewGuid().ToString("N"), ExecutionId = Guid.NewGuid().ToString(), @@ -471,20 +482,20 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, { // Get the original orchestration state IList states = await this.client.GetOrchestrationStateAsync(request.InstanceId, false); - + if (states == null || states.Count == 0) { throw new RpcException(new Status(StatusCode.NotFound, $"An orchestration with the instanceId {request.InstanceId} was not found.")); } OrchestrationState state = states[0]; - + // Check if the state is null if (state == null) { throw new RpcException(new Status(StatusCode.NotFound, $"An orchestration with the instanceId {request.InstanceId} was not found.")); } - + string newInstanceId = request.RestartWithNewInstanceId ? Guid.NewGuid().ToString("N") : request.InstanceId; // Create a new orchestration instance @@ -535,6 +546,51 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, /// Returns an empty ack back to the remote SDK that we've received the completion. public override Task CompleteOrchestratorTask(P.OrchestratorResponse request, ServerCallContext context) { + if (request.IsPartial) + { + // This is a partial chunk - accumulate actions but don't complete yet + PartialOrchestratorChunk partialChunk = this.partialOrchestratorChunks.GetOrAdd( + request.InstanceId, + _ => + { + // First chunk - get the TCS and initialize the partial chunk + if (!this.pendingOrchestratorTasks.TryGetValue(request.InstanceId, out TaskCompletionSource? tcs)) + { + throw new RpcException(new Status(StatusCode.NotFound, $"Orchestration not found")); + } + + return new PartialOrchestratorChunk + { + TaskCompletionSource = tcs, + }; + }); + + // Accumulate actions from this chunk + partialChunk.AccumulatedActions.AddRange(request.Actions.Select(ProtobufUtils.ToOrchestratorAction)); + + return EmptyCompleteTaskResponse; + } + + // This is the final chunk (or a single non-chunked response) + if (this.partialOrchestratorChunks.TryRemove(request.InstanceId, out PartialOrchestratorChunk? existingPartialChunk)) + { + // We've been accumulating chunks - combine with final chunk + existingPartialChunk.AccumulatedActions.AddRange(request.Actions.Select(ProtobufUtils.ToOrchestratorAction)); + + GrpcOrchestratorExecutionResult res = new() + { + Actions = existingPartialChunk.AccumulatedActions, + CustomStatus = request.CustomStatus, // Use custom status from final chunk + }; + + // Remove the TCS from pending tasks and complete it + this.pendingOrchestratorTasks.TryRemove(request.InstanceId, out _); + existingPartialChunk.TaskCompletionSource.TrySetResult(res); + + return EmptyCompleteTaskResponse; + } + + // Single non-chunked response (no partial chunks) if (!this.pendingOrchestratorTasks.TryRemove( request.InstanceId, out TaskCompletionSource? tcs)) @@ -646,6 +702,7 @@ public override async Task GetWorkItems(P.GetWorkItemsRequest request, IServerSt } } + /// public override async Task StreamInstanceHistory(P.StreamInstanceHistoryRequest request, IServerStreamWriter responseStream, ServerCallContext context) { if (this.streamingPastEvents.TryGetValue(request.InstanceId, out List? pastEvents)) @@ -684,7 +741,7 @@ async Task ITaskExecutor.ExecuteOrchestrator( IEnumerable pastEvents, IEnumerable newEvents) { - var executionStartedEvent = pastEvents.OfType().FirstOrDefault(); + ExecutionStartedEvent? executionStartedEvent = pastEvents.OfType().FirstOrDefault(); P.OrchestrationTraceContext? orchestrationTraceContext = executionStartedEvent?.ParentTraceContext?.SpanId is not null ? new P.OrchestrationTraceContext @@ -701,7 +758,7 @@ async Task ITaskExecutor.ExecuteOrchestrator( try { - var orkRequest = new P.OrchestratorRequest + P.OrchestratorRequest orkRequest = new P.OrchestratorRequest { InstanceId = instance.InstanceId, ExecutionId = instance.ExecutionId, @@ -829,6 +886,7 @@ TaskCompletionSource CreateTaskCompletionSource void RemoveOrchestratorTaskCompletionSource(string instanceId) { this.pendingOrchestratorTasks.TryRemove(instanceId, out _); + this.partialOrchestratorChunks.TryRemove(instanceId, out _); } TaskCompletionSource CreateTaskCompletionSourceForActivity(string instanceId, int taskId) diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index f04cb5232..e3a6320b5 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -758,10 +758,9 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync( GetActionsListForLogging(response.Actions)); // Auto-chunk the response if it exceeds the maximum size - int maxChunkBytes = this.worker.grpcOptions.MaxCompleteOrchestrationWorkItemSizePerChunk; await this.CompleteOrchestratorTaskWithChunkingAsync( response, - maxChunkBytes, + this.worker.grpcOptions.MaxCompleteOrchestrationWorkItemSizePerChunk, cancellationToken); } diff --git a/test/Grpc.IntegrationTests/AutochunkTests.cs b/test/Grpc.IntegrationTests/AutochunkTests.cs new file mode 100644 index 000000000..43dc5b706 --- /dev/null +++ b/test/Grpc.IntegrationTests/AutochunkTests.cs @@ -0,0 +1,157 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Worker; +using Xunit.Abstractions; + +namespace Microsoft.DurableTask.Grpc.Tests; + +/// +/// Integration tests for validating autochunk functionality when orchestration completion responses +/// exceed the maximum chunk size and are automatically split into multiple chunks. +/// +public class AutochunkTests(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture) : IntegrationTestBase(output, sidecarFixture) +{ + /// + /// Validates that orchestrations complete successfully when the completion response + /// exceeds the chunk size and must be split into multiple chunks. + /// + [Fact] + public async Task Autochunk_MultipleChunks_CompletesSuccessfully() + { + const int ActivityCount = 15; + const int PayloadSizePerActivity = 3 * 1024; // 3KB per activity + const int ChunkSize = 10 * 1024; // 10KB chunks (small to force chunking) + TaskName orchestratorName = nameof(Autochunk_MultipleChunks_CompletesSuccessfully); + TaskName activityName = "Echo"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + // Set a small chunk size to force chunking + b.UseGrpc(opt => opt.MaxCompleteOrchestrationWorkItemSizePerChunk = ChunkSize); + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + // Start all activities in parallel so they're all in the same completion response + List> tasks = new List>(); + for (int i = 0; i < ActivityCount; i++) + { + string payload = new string((char)('A' + (i % 26)), PayloadSizePerActivity); + tasks.Add(ctx.CallActivityAsync(activityName, payload)); + } + string[] results = await Task.WhenAll(tasks); + return results.Length; + }) + .AddActivityFunc(activityName, (ctx, input) => Task.FromResult(input))); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(ActivityCount, metadata.ReadOutputAs()); + } + + /// + /// Validates autochunking with many timers that together exceed the chunk size. + /// + [Fact] + public async Task Autochunk_ManyTimers_CompletesSuccessfully() + { + const int TimerCount = 100; + const int ChunkSize = 100; // 100B chunks + TaskName orchestratorName = nameof(Autochunk_ManyTimers_CompletesSuccessfully); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + // Set a small chunk size to force chunking + b.UseGrpc(opt => opt.MaxCompleteOrchestrationWorkItemSizePerChunk = ChunkSize); + b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx => + { + // Start all timers in parallel so they're all in the same completion response + List timerTasks = new List(); + for (int i = 0; i < TimerCount; i++) + { + timerTasks.Add(ctx.CreateTimer(TimeSpan.FromMilliseconds(10), CancellationToken.None)); + } + await Task.WhenAll(timerTasks); + return TimerCount; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(TimerCount, metadata.ReadOutputAs()); + } + + /// + /// Validates autochunking with mixed action types (activities, timers, sub-orchestrations). + /// + [Fact] + public async Task Autochunk_MixedActions_CompletesSuccessfully() + { + const int ActivityCount = 8; + const int TimerCount = 5; + const int SubOrchCount = 3; + const int PayloadSizePerActivity = 2 * 1024; // 2KB per activity + const int ChunkSize = 8 * 1024; // 8KB chunks + TaskName orchestratorName = nameof(Autochunk_MixedActions_CompletesSuccessfully); + TaskName activityName = "Echo"; + TaskName subOrchName = "SubOrch"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + // Set a small chunk size to force chunking + b.UseGrpc(opt => opt.MaxCompleteOrchestrationWorkItemSizePerChunk = ChunkSize); + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + // Start all actions in parallel so they're all in the same completion response + List allTasks = new List(); + + // Activities + for (int i = 0; i < ActivityCount; i++) + { + string payload = new string('A', PayloadSizePerActivity); + allTasks.Add(ctx.CallActivityAsync(activityName, payload)); + } + + // Timers + for (int i = 0; i < TimerCount; i++) + { + allTasks.Add(ctx.CreateTimer(TimeSpan.FromMilliseconds(10), CancellationToken.None)); + } + + // Sub-orchestrations + for (int i = 0; i < SubOrchCount; i++) + { + allTasks.Add(ctx.CallSubOrchestratorAsync(subOrchName, i)); + } + + await Task.WhenAll(allTasks); + return ActivityCount + TimerCount + SubOrchCount; + }) + .AddOrchestratorFunc(subOrchName, (ctx, input) => Task.FromResult(input)) + .AddActivityFunc(activityName, (ctx, input) => Task.FromResult(input))); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(ActivityCount + TimerCount + SubOrchCount, metadata.ReadOutputAs()); + } +} + From b206ec3796c93e20fa8740da51cb3e15f8f51e3f Mon Sep 17 00:00:00 2001 From: wangbill Date: Mon, 1 Dec 2025 08:28:48 -0800 Subject: [PATCH 03/24] Update src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index e3a6320b5..c9a8c1f0e 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -942,7 +942,7 @@ static void ValidateActionsSize(IEnumerable actions, int m { throw new InvalidOperationException( $"A single orchestrator action of type {action.OrchestratorActionTypeCase} with id {action.Id} " + - $"exceeds the {maxChunkBytes / 1024 / 1024}MB limit: {actionSize / 1024 / 1024}MB"); + $"exceeds the {maxChunkBytes / 1024.0 / 1024.0:F2}MB limit: {actionSize / 1024.0 / 1024.0:F2}MB"); } } } From 168e2f0bc29e06af3c178bdea1a4488f84cf8e86 Mon Sep 17 00:00:00 2001 From: wangbill Date: Mon, 1 Dec 2025 08:30:11 -0800 Subject: [PATCH 04/24] Update src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index c9a8c1f0e..6031a68f9 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -988,7 +988,7 @@ static bool TryAddAction( InstanceId = response.InstanceId, CustomStatus = response.CustomStatus, CompletionToken = response.CompletionToken, - OrchestrationTraceContext = actionsCompletedSoFar == 0 ? response.OrchestrationTraceContext : null, // Only include trace context in first chunk + OrchestrationTraceContext = isFirstChunk ? response.OrchestrationTraceContext : null, // Only include trace context in first chunk RequiresHistory = response.RequiresHistory, }; From 95013b740080392431ec9d985b39fe9ed1bb629c Mon Sep 17 00:00:00 2001 From: wangbill Date: Mon, 1 Dec 2025 08:36:27 -0800 Subject: [PATCH 05/24] Update src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs index e0bdfe542..4f3497d7c 100644 --- a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs +++ b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs @@ -556,7 +556,7 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, // First chunk - get the TCS and initialize the partial chunk if (!this.pendingOrchestratorTasks.TryGetValue(request.InstanceId, out TaskCompletionSource? tcs)) { - throw new RpcException(new Status(StatusCode.NotFound, $"Orchestration not found")); + throw new RpcException(new Status(StatusCode.NotFound, $"Orchestration with instance ID '{request.InstanceId}' not found")); } return new PartialOrchestratorChunk From 989f89d9d60ce7aa8cd98ad3a45d280bb32aab2e Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Mon, 1 Dec 2025 08:46:08 -0800 Subject: [PATCH 06/24] feedback --- .../Grpc/GrpcDurableTaskWorker.Processor.cs | 7 +- .../Grpc/GrpcDurableTaskWorkerOptions.cs | 36 ++++++- .../GrpcDurableTaskWorkerOptionsTests.cs | 101 ++++++++++++++++++ 3 files changed, 140 insertions(+), 4 deletions(-) create mode 100644 test/Worker/Grpc.Tests/GrpcDurableTaskWorkerOptionsTests.cs diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index 6031a68f9..2943393e9 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -757,7 +757,6 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync( response.Actions.Count, GetActionsListForLogging(response.Actions)); - // Auto-chunk the response if it exceeds the maximum size await this.CompleteOrchestratorTaskWithChunkingAsync( response, this.worker.grpcOptions.MaxCompleteOrchestrationWorkItemSizePerChunk, @@ -988,7 +987,6 @@ static bool TryAddAction( InstanceId = response.InstanceId, CustomStatus = response.CustomStatus, CompletionToken = response.CompletionToken, - OrchestrationTraceContext = isFirstChunk ? response.OrchestrationTraceContext : null, // Only include trace context in first chunk RequiresHistory = response.RequiresHistory, }; @@ -1007,8 +1005,11 @@ static bool TryAddAction( if (isFirstChunk) { + chunkedResponse.OrchestrationTraceContext = response.OrchestrationTraceContext; isFirstChunk = false; - } else { + } + else + { chunkedResponse.NumEventsProcessed = -1; } diff --git a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs index 0f8b85886..e0ded1654 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs @@ -10,6 +10,18 @@ namespace Microsoft.DurableTask.Worker.Grpc; /// public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions { + /// + /// The minimum allowed size (in bytes) for complete orchestration work item chunks. + /// + public const int MinCompleteOrchestrationWorkItemSizePerChunkBytes = 1 * 1024 * 1024; // 1 MB + + /// + /// The maximum allowed size (in bytes) for complete orchestration work item chunks. + /// + public const int MaxCompleteOrchestrationWorkItemSizePerChunkBytes = 4_089_446; // 3.9 MB + + int maxCompleteOrchestrationWorkItemSizePerChunk = MaxCompleteOrchestrationWorkItemSizePerChunkBytes; + /// /// Gets or sets the address of the gRPC endpoint to connect to. Default is localhost:4001. /// @@ -39,7 +51,29 @@ public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions /// This value is used to limit the size of the complete orchestration work item chunk request. /// If the response exceeds this limit, it will be automatically split into multiple chunks. /// - public int MaxCompleteOrchestrationWorkItemSizePerChunk { get; set; } = 4089446; // 3.9MB + /// + /// Thrown when the value is less than 1 MB or greater than 3.9 MB. + /// + public int MaxCompleteOrchestrationWorkItemSizePerChunk + { + get => this.maxCompleteOrchestrationWorkItemSizePerChunk; + set + { + if (value < MinCompleteOrchestrationWorkItemSizePerChunkBytes || + value > MaxCompleteOrchestrationWorkItemSizePerChunkBytes) + { + string message = $"MaxCompleteOrchestrationWorkItemSizePerChunk must be between " + + $"{MinCompleteOrchestrationWorkItemSizePerChunkBytes} bytes (1 MB) and " + + $"{MaxCompleteOrchestrationWorkItemSizePerChunkBytes} bytes (3.9 MB), inclusive."; + throw new ArgumentOutOfRangeException( + nameof(this.MaxCompleteOrchestrationWorkItemSizePerChunk), + value, + message); + } + + this.maxCompleteOrchestrationWorkItemSizePerChunk = value; + } + } /// /// Gets the internal protocol options. These are used to control backend-dependent features. diff --git a/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerOptionsTests.cs b/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerOptionsTests.cs new file mode 100644 index 000000000..1750d09e8 --- /dev/null +++ b/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerOptionsTests.cs @@ -0,0 +1,101 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Worker.Grpc.Tests; + +/// +/// Tests for validation. +/// +public class GrpcDurableTaskWorkerOptionsTests +{ + [Fact] + public void Default_MaxCompleteOrchestrationWorkItemSizePerChunk_IsWithinRange() + { + // Arrange + var options = new GrpcDurableTaskWorkerOptions(); + + // Act + int value = options.MaxCompleteOrchestrationWorkItemSizePerChunk; + + // Assert + value.Should().BeGreaterOrEqualTo( + GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes); + value.Should().BeLessOrEqualTo( + GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemSizePerChunkBytes); + } + + [Fact] + public void Setting_MaxCompleteOrchestrationWorkItemSizePerChunk_BelowMin_Throws() + { + // Arrange + var options = new GrpcDurableTaskWorkerOptions(); + int belowMin = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes - 1; + + // Act + Action act = () => options.MaxCompleteOrchestrationWorkItemSizePerChunk = belowMin; + + // Assert + act.Should() + .Throw() + .WithParameterName(nameof(GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemSizePerChunk)); + } + + [Fact] + public void Setting_MaxCompleteOrchestrationWorkItemSizePerChunk_AboveMax_Throws() + { + // Arrange + var options = new GrpcDurableTaskWorkerOptions(); + int aboveMax = GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemSizePerChunkBytes + 1; + + // Act + Action act = () => options.MaxCompleteOrchestrationWorkItemSizePerChunk = aboveMax; + + // Assert + act.Should() + .Throw() + .WithParameterName(nameof(GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemSizePerChunk)); + } + + [Fact] + public void Setting_MaxCompleteOrchestrationWorkItemSizePerChunk_AtMinBoundary_Succeeds() + { + // Arrange + var options = new GrpcDurableTaskWorkerOptions(); + int minValue = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes; + + // Act + options.MaxCompleteOrchestrationWorkItemSizePerChunk = minValue; + + // Assert + options.MaxCompleteOrchestrationWorkItemSizePerChunk.Should().Be(minValue); + } + + [Fact] + public void Setting_MaxCompleteOrchestrationWorkItemSizePerChunk_AtMaxBoundary_Succeeds() + { + // Arrange + var options = new GrpcDurableTaskWorkerOptions(); + int maxValue = GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemSizePerChunkBytes; + + // Act + options.MaxCompleteOrchestrationWorkItemSizePerChunk = maxValue; + + // Assert + options.MaxCompleteOrchestrationWorkItemSizePerChunk.Should().Be(maxValue); + } + + [Fact] + public void Setting_MaxCompleteOrchestrationWorkItemSizePerChunk_WithinRange_Succeeds() + { + // Arrange + var options = new GrpcDurableTaskWorkerOptions(); + int withinRange = 2 * 1024 * 1024; // 2 MB + + // Act + options.MaxCompleteOrchestrationWorkItemSizePerChunk = withinRange; + + // Assert + options.MaxCompleteOrchestrationWorkItemSizePerChunk.Should().Be(withinRange); + } +} + From 87d7f95d1e57e668fecd7036281522bd3c0fd92a Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Mon, 1 Dec 2025 09:31:46 -0800 Subject: [PATCH 07/24] update tests --- test/Grpc.IntegrationTests/AutochunkTests.cs | 23 +++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/test/Grpc.IntegrationTests/AutochunkTests.cs b/test/Grpc.IntegrationTests/AutochunkTests.cs index 43dc5b706..84269c20b 100644 --- a/test/Grpc.IntegrationTests/AutochunkTests.cs +++ b/test/Grpc.IntegrationTests/AutochunkTests.cs @@ -3,6 +3,7 @@ using Microsoft.DurableTask.Client; using Microsoft.DurableTask.Worker; +using Microsoft.DurableTask.Worker.Grpc; using Xunit.Abstractions; namespace Microsoft.DurableTask.Grpc.Tests; @@ -20,9 +21,11 @@ public class AutochunkTests(ITestOutputHelper output, GrpcSidecarFixture sidecar [Fact] public async Task Autochunk_MultipleChunks_CompletesSuccessfully() { - const int ActivityCount = 15; + // Use minimum allowed chunk size (1 MB) and ensure total payload exceeds it to trigger chunking + // 360 activities × 3KB = ~1.05 MB, exceeding 1 MB chunk size while completing within timeout + const int ActivityCount = 360; const int PayloadSizePerActivity = 3 * 1024; // 3KB per activity - const int ChunkSize = 10 * 1024; // 10KB chunks (small to force chunking) + const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes; // 1 MB (minimum allowed) TaskName orchestratorName = nameof(Autochunk_MultipleChunks_CompletesSuccessfully); TaskName activityName = "Echo"; @@ -62,8 +65,11 @@ public async Task Autochunk_MultipleChunks_CompletesSuccessfully() [Fact] public async Task Autochunk_ManyTimers_CompletesSuccessfully() { - const int TimerCount = 100; - const int ChunkSize = 100; // 100B chunks + // Use minimum allowed chunk size (1 MB) and use many timers to exceed it + // Timers are small, so we need a large number to exceed 1 MB chunk size + // Using 10000 timers which should be sufficient to test chunking without being too slow + const int TimerCount = 10000; + const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes; // 1 MB (minimum allowed) TaskName orchestratorName = nameof(Autochunk_ManyTimers_CompletesSuccessfully); await using HostTestLifetime server = await this.StartWorkerAsync(b => @@ -99,11 +105,12 @@ public async Task Autochunk_ManyTimers_CompletesSuccessfully() [Fact] public async Task Autochunk_MixedActions_CompletesSuccessfully() { - const int ActivityCount = 8; - const int TimerCount = 5; - const int SubOrchCount = 3; + // Use minimum allowed chunk size (1 MB) and ensure total payload exceeds it to trigger chunking + const int ActivityCount = 300; // 300 activities × 2KB = 600KB, plus timers and sub-orchestrations to exceed 1 MB + const int TimerCount = 1000; // Additional timers to help exceed chunk size + const int SubOrchCount = 50; // Additional sub-orchestrations const int PayloadSizePerActivity = 2 * 1024; // 2KB per activity - const int ChunkSize = 8 * 1024; // 8KB chunks + const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes; // 1 MB (minimum allowed) TaskName orchestratorName = nameof(Autochunk_MixedActions_CompletesSuccessfully); TaskName activityName = "Echo"; TaskName subOrchName = "SubOrch"; From 5448636faa9fa1f7484857b8d23d964961e28868 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Mon, 1 Dec 2025 10:05:19 -0800 Subject: [PATCH 08/24] test update --- .../CacheClearingOrchestratorV2.cs | 40 -------------- test/Grpc.IntegrationTests/AutochunkTests.cs | 52 +++++++++++++++++++ 2 files changed, 52 insertions(+), 40 deletions(-) delete mode 100644 samples/ScheduleWebApp/Orchestrations/CacheClearingOrchestratorV2.cs diff --git a/samples/ScheduleWebApp/Orchestrations/CacheClearingOrchestratorV2.cs b/samples/ScheduleWebApp/Orchestrations/CacheClearingOrchestratorV2.cs deleted file mode 100644 index c10687816..000000000 --- a/samples/ScheduleWebApp/Orchestrations/CacheClearingOrchestratorV2.cs +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using Microsoft.DurableTask; -using ScheduleWebApp.Activities; - -namespace ScheduleWebApp.Orchestrations; - -public class CacheClearingOrchestratorV2 : TaskOrchestrator -{ - public override async Task RunAsync(TaskOrchestrationContext context, string scheduleId) - { - ILogger logger = context.CreateReplaySafeLogger(nameof(CacheClearingOrchestratorV2)); - try - { - logger.LogInformation("Starting CacheClearingOrchestration for schedule ID: {ScheduleId}", scheduleId); - - TaskOptions options = new TaskOptions(tags: new Dictionary - { - { "scheduleId", scheduleId } - }); - - // Schedule all activities first to ensure deterministic ordering - Task[] tasks = Enumerable.Range(0, 100) - .Select(i => context.CallActivityAsync(nameof(CacheClearingActivity), new string('A', 4 * 1024), options)) - .ToArray(); - - await Task.WhenAll(tasks); - - logger.LogInformation("CacheClearingOrchestration completed for schedule ID: {ScheduleId}", scheduleId); - - return "ok"; - } - catch (Exception ex) - { - logger.LogError(ex, "Error in CacheClearingOrchestration for schedule ID: {ScheduleId}", scheduleId); - throw; - } - } -} \ No newline at end of file diff --git a/test/Grpc.IntegrationTests/AutochunkTests.cs b/test/Grpc.IntegrationTests/AutochunkTests.cs index 84269c20b..0691a6903 100644 --- a/test/Grpc.IntegrationTests/AutochunkTests.cs +++ b/test/Grpc.IntegrationTests/AutochunkTests.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Tests.Logging; using Microsoft.DurableTask.Worker; using Microsoft.DurableTask.Worker.Grpc; using Xunit.Abstractions; @@ -160,5 +161,56 @@ public async Task Autochunk_MixedActions_CompletesSuccessfully() Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); Assert.Equal(ActivityCount + TimerCount + SubOrchCount, metadata.ReadOutputAs()); } + + /// + /// Validates that an InvalidOperationException is thrown when a single orchestrator action + /// exceeds the MaxCompleteOrchestrationWorkItemSizePerChunk limit. + /// + [Fact] + public async Task Autochunk_SingleActionExceedsChunkSize_ThrowsInvalidOperationException() + { + const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes; // 1 MB + // Create a payload that exceeds the chunk size (1 MB + some overhead) + const int PayloadSize = ChunkSize + 100 * 1024; // 1.1 MB payload + TaskName orchestratorName = nameof(Autochunk_SingleActionExceedsChunkSize_ThrowsInvalidOperationException); + TaskName activityName = "Echo"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.UseGrpc(opt => opt.MaxCompleteOrchestrationWorkItemSizePerChunk = ChunkSize); + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + // Attempt to schedule an activity with a payload that exceeds the chunk size + string largePayload = new string('A', PayloadSize); + return await ctx.CallActivityAsync(activityName, largePayload); + }) + .AddActivityFunc(activityName, (ctx, input) => Task.FromResult(input))); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + + // Wait a bit for the orchestration to process and the exception to be thrown + await Task.Delay(TimeSpan.FromSeconds(2), this.TimeoutToken); + + // The exception is caught and the work item is abandoned, so the orchestration won't complete. + // Instead, verify the exception was thrown by checking the logs. + IReadOnlyCollection logs = this.GetLogs(); + + // Find the log entry with the InvalidOperationException + LogEntry? errorLog = logs.FirstOrDefault(log => + log.Exception is InvalidOperationException && + log.Exception.Message.Contains("exceeds the", StringComparison.OrdinalIgnoreCase) && + log.Exception.Message.Contains("MB limit", StringComparison.OrdinalIgnoreCase)); + + Assert.NotNull(errorLog); + Assert.NotNull(errorLog.Exception); + Assert.IsType(errorLog.Exception); + + // Verify the error message contains the expected information + Assert.Contains("exceeds the", errorLog.Exception.Message, StringComparison.OrdinalIgnoreCase); + Assert.Contains("MB limit", errorLog.Exception.Message, StringComparison.OrdinalIgnoreCase); + Assert.Contains("ScheduleTask", errorLog.Exception.Message, StringComparison.OrdinalIgnoreCase); + } } From 9ddc2138d42480cd93d9b961f6589b8285f81ead Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Mon, 1 Dec 2025 10:50:29 -0800 Subject: [PATCH 09/24] increase test timeout --- test/Grpc.IntegrationTests/AutochunkTests.cs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/test/Grpc.IntegrationTests/AutochunkTests.cs b/test/Grpc.IntegrationTests/AutochunkTests.cs index 0691a6903..976a1c348 100644 --- a/test/Grpc.IntegrationTests/AutochunkTests.cs +++ b/test/Grpc.IntegrationTests/AutochunkTests.cs @@ -51,8 +51,9 @@ public async Task Autochunk_MultipleChunks_CompletesSuccessfully() }); string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); + instanceId, getInputsAndOutputs: true, cts.Token); Assert.NotNull(metadata); Assert.Equal(instanceId, metadata.InstanceId); @@ -91,8 +92,9 @@ public async Task Autochunk_ManyTimers_CompletesSuccessfully() }); string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); + instanceId, getInputsAndOutputs: true, cts.Token); Assert.NotNull(metadata); Assert.Equal(instanceId, metadata.InstanceId); @@ -153,8 +155,9 @@ public async Task Autochunk_MixedActions_CompletesSuccessfully() }); string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); + instanceId, getInputsAndOutputs: true, cts.Token); Assert.NotNull(metadata); Assert.Equal(instanceId, metadata.InstanceId); From a5b778c4ad01d25a542f5b6f73d5366360f02cd8 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Mon, 1 Dec 2025 11:08:54 -0800 Subject: [PATCH 10/24] save --- .../Sidecar/Grpc/TaskHubGrpcServer.cs | 21 ++++-- test/Grpc.IntegrationTests/AutochunkTests.cs | 69 ++++--------------- 2 files changed, 30 insertions(+), 60 deletions(-) diff --git a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs index 4f3497d7c..70edcf557 100644 --- a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs +++ b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs @@ -38,8 +38,21 @@ public class TaskHubGrpcServer : P.TaskHubSidecarService.TaskHubSidecarServiceBa /// sealed class PartialOrchestratorChunk { + readonly object lockObject = new(); + public TaskCompletionSource TaskCompletionSource { get; set; } = null!; public List AccumulatedActions { get; } = new(); + + /// + /// Thread-safely adds actions to the accumulated actions list. + /// + public void AddActions(IEnumerable actions) + { + lock (this.lockObject) + { + this.AccumulatedActions.AddRange(actions); + } + } } readonly ILogger log; @@ -565,8 +578,8 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, }; }); - // Accumulate actions from this chunk - partialChunk.AccumulatedActions.AddRange(request.Actions.Select(ProtobufUtils.ToOrchestratorAction)); + // Accumulate actions from this chunk (thread-safe) + partialChunk.AddActions(request.Actions.Select(ProtobufUtils.ToOrchestratorAction)); return EmptyCompleteTaskResponse; } @@ -574,8 +587,8 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, // This is the final chunk (or a single non-chunked response) if (this.partialOrchestratorChunks.TryRemove(request.InstanceId, out PartialOrchestratorChunk? existingPartialChunk)) { - // We've been accumulating chunks - combine with final chunk - existingPartialChunk.AccumulatedActions.AddRange(request.Actions.Select(ProtobufUtils.ToOrchestratorAction)); + // We've been accumulating chunks - combine with final chunk (thread-safe) + existingPartialChunk.AddActions(request.Actions.Select(ProtobufUtils.ToOrchestratorAction)); GrpcOrchestratorExecutionResult res = new() { diff --git a/test/Grpc.IntegrationTests/AutochunkTests.cs b/test/Grpc.IntegrationTests/AutochunkTests.cs index 976a1c348..636b73f31 100644 --- a/test/Grpc.IntegrationTests/AutochunkTests.cs +++ b/test/Grpc.IntegrationTests/AutochunkTests.cs @@ -22,10 +22,8 @@ public class AutochunkTests(ITestOutputHelper output, GrpcSidecarFixture sidecar [Fact] public async Task Autochunk_MultipleChunks_CompletesSuccessfully() { - // Use minimum allowed chunk size (1 MB) and ensure total payload exceeds it to trigger chunking - // 360 activities × 3KB = ~1.05 MB, exceeding 1 MB chunk size while completing within timeout - const int ActivityCount = 360; - const int PayloadSizePerActivity = 3 * 1024; // 3KB per activity + const int ActivityCount = 36; + const int PayloadSizePerActivity = 30 * 1024; const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes; // 1 MB (minimum allowed) TaskName orchestratorName = nameof(Autochunk_MultipleChunks_CompletesSuccessfully); TaskName activityName = "Echo"; @@ -51,7 +49,7 @@ public async Task Autochunk_MultipleChunks_CompletesSuccessfully() }); string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( instanceId, getInputsAndOutputs: true, cts.Token); @@ -61,47 +59,6 @@ public async Task Autochunk_MultipleChunks_CompletesSuccessfully() Assert.Equal(ActivityCount, metadata.ReadOutputAs()); } - /// - /// Validates autochunking with many timers that together exceed the chunk size. - /// - [Fact] - public async Task Autochunk_ManyTimers_CompletesSuccessfully() - { - // Use minimum allowed chunk size (1 MB) and use many timers to exceed it - // Timers are small, so we need a large number to exceed 1 MB chunk size - // Using 10000 timers which should be sufficient to test chunking without being too slow - const int TimerCount = 10000; - const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes; // 1 MB (minimum allowed) - TaskName orchestratorName = nameof(Autochunk_ManyTimers_CompletesSuccessfully); - - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - // Set a small chunk size to force chunking - b.UseGrpc(opt => opt.MaxCompleteOrchestrationWorkItemSizePerChunk = ChunkSize); - b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx => - { - // Start all timers in parallel so they're all in the same completion response - List timerTasks = new List(); - for (int i = 0; i < TimerCount; i++) - { - timerTasks.Add(ctx.CreateTimer(TimeSpan.FromMilliseconds(10), CancellationToken.None)); - } - await Task.WhenAll(timerTasks); - return TimerCount; - })); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, cts.Token); - - Assert.NotNull(metadata); - Assert.Equal(instanceId, metadata.InstanceId); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - Assert.Equal(TimerCount, metadata.ReadOutputAs()); - } - /// /// Validates autochunking with mixed action types (activities, timers, sub-orchestrations). /// @@ -109,10 +66,10 @@ public async Task Autochunk_ManyTimers_CompletesSuccessfully() public async Task Autochunk_MixedActions_CompletesSuccessfully() { // Use minimum allowed chunk size (1 MB) and ensure total payload exceeds it to trigger chunking - const int ActivityCount = 300; // 300 activities × 2KB = 600KB, plus timers and sub-orchestrations to exceed 1 MB - const int TimerCount = 1000; // Additional timers to help exceed chunk size - const int SubOrchCount = 50; // Additional sub-orchestrations - const int PayloadSizePerActivity = 2 * 1024; // 2KB per activity + const int ActivityCount = 30; + const int TimerCount = 100; + const int SubOrchCount = 50; + const int PayloadSizePerActivity = 20 * 1024; const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes; // 1 MB (minimum allowed) TaskName orchestratorName = nameof(Autochunk_MixedActions_CompletesSuccessfully); TaskName activityName = "Echo"; @@ -155,7 +112,7 @@ public async Task Autochunk_MixedActions_CompletesSuccessfully() }); string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( instanceId, getInputsAndOutputs: true, cts.Token); @@ -192,24 +149,24 @@ public async Task Autochunk_SingleActionExceedsChunkSize_ThrowsInvalidOperationE }); string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - + // Wait a bit for the orchestration to process and the exception to be thrown await Task.Delay(TimeSpan.FromSeconds(2), this.TimeoutToken); - + // The exception is caught and the work item is abandoned, so the orchestration won't complete. // Instead, verify the exception was thrown by checking the logs. IReadOnlyCollection logs = this.GetLogs(); - + // Find the log entry with the InvalidOperationException LogEntry? errorLog = logs.FirstOrDefault(log => log.Exception is InvalidOperationException && log.Exception.Message.Contains("exceeds the", StringComparison.OrdinalIgnoreCase) && log.Exception.Message.Contains("MB limit", StringComparison.OrdinalIgnoreCase)); - + Assert.NotNull(errorLog); Assert.NotNull(errorLog.Exception); Assert.IsType(errorLog.Exception); - + // Verify the error message contains the expected information Assert.Contains("exceeds the", errorLog.Exception.Message, StringComparison.OrdinalIgnoreCase); Assert.Contains("MB limit", errorLog.Exception.Message, StringComparison.OrdinalIgnoreCase); From 4ffe57594a42ef3bc7d4e8262e1c256f70055219 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Wed, 3 Dec 2025 11:34:36 -0800 Subject: [PATCH 11/24] make orchestration fail is single action oversized --- .../Grpc/GrpcDurableTaskWorker.Processor.cs | 42 ++++++++++++++++--- test/Grpc.IntegrationTests/AutochunkTests.cs | 39 +++++++---------- 2 files changed, 51 insertions(+), 30 deletions(-) diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index 2943393e9..0d3b1e802 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -932,21 +932,53 @@ async Task CompleteOrchestratorTaskWithChunkingAsync( CancellationToken cancellationToken) { // Validate that no single action exceeds the maximum chunk size - static void ValidateActionsSize(IEnumerable actions, int maxChunkBytes) + static P.TaskFailureDetails? ValidateActionsSize(IEnumerable actions, int maxChunkBytes) { foreach (P.OrchestratorAction action in actions) { int actionSize = action.CalculateSize(); if (actionSize > maxChunkBytes) { - throw new InvalidOperationException( - $"A single orchestrator action of type {action.OrchestratorActionTypeCase} with id {action.Id} " + - $"exceeds the {maxChunkBytes / 1024.0 / 1024.0:F2}MB limit: {actionSize / 1024.0 / 1024.0:F2}MB"); + string errorMessage = $"A single orchestrator action of type {action.OrchestratorActionTypeCase} with id {action.Id} " + + $"exceeds the {maxChunkBytes / 1024.0 / 1024.0:F2}MB limit: {actionSize / 1024.0 / 1024.0:F2}MB"; + + return new P.TaskFailureDetails + { + ErrorType = typeof(InvalidOperationException).FullName, + ErrorMessage = errorMessage, + IsNonRetriable = true, + }; } } + + return null; } - ValidateActionsSize(response.Actions, maxChunkBytes); + P.TaskFailureDetails? validationFailure = ValidateActionsSize(response.Actions, maxChunkBytes); + if (validationFailure != null) + { + // Complete the orchestration with a failed status and failure details + P.OrchestratorResponse failureResponse = new() + { + InstanceId = response.InstanceId, + CompletionToken = response.CompletionToken, + OrchestrationTraceContext = response.OrchestrationTraceContext, + Actions = + { + new P.OrchestratorAction + { + CompleteOrchestration = new P.CompleteOrchestrationAction + { + OrchestrationStatus = P.OrchestrationStatus.Failed, + FailureDetails = validationFailure, + }, + }, + }, + }; + + await this.client.CompleteOrchestratorTaskAsync(failureResponse, cancellationToken: cancellationToken); + return; + } // Helper to add an action to the current chunk if it fits static bool TryAddAction( diff --git a/test/Grpc.IntegrationTests/AutochunkTests.cs b/test/Grpc.IntegrationTests/AutochunkTests.cs index 636b73f31..63b51b695 100644 --- a/test/Grpc.IntegrationTests/AutochunkTests.cs +++ b/test/Grpc.IntegrationTests/AutochunkTests.cs @@ -123,16 +123,16 @@ public async Task Autochunk_MixedActions_CompletesSuccessfully() } /// - /// Validates that an InvalidOperationException is thrown when a single orchestrator action - /// exceeds the MaxCompleteOrchestrationWorkItemSizePerChunk limit. + /// Validates that when a single orchestrator action exceeds the MaxCompleteOrchestrationWorkItemSizePerChunk limit, + /// the orchestration completes with a failed status and proper failure details. /// [Fact] - public async Task Autochunk_SingleActionExceedsChunkSize_ThrowsInvalidOperationException() + public async Task Autochunk_SingleActionExceedsChunkSize_CompletesWithFailedStatus() { const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes; // 1 MB // Create a payload that exceeds the chunk size (1 MB + some overhead) const int PayloadSize = ChunkSize + 100 * 1024; // 1.1 MB payload - TaskName orchestratorName = nameof(Autochunk_SingleActionExceedsChunkSize_ThrowsInvalidOperationException); + TaskName orchestratorName = nameof(Autochunk_SingleActionExceedsChunkSize_CompletesWithFailedStatus); TaskName activityName = "Echo"; await using HostTestLifetime server = await this.StartWorkerAsync(b => @@ -149,28 +149,17 @@ public async Task Autochunk_SingleActionExceedsChunkSize_ThrowsInvalidOperationE }); string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, cts.Token); - // Wait a bit for the orchestration to process and the exception to be thrown - await Task.Delay(TimeSpan.FromSeconds(2), this.TimeoutToken); - - // The exception is caught and the work item is abandoned, so the orchestration won't complete. - // Instead, verify the exception was thrown by checking the logs. - IReadOnlyCollection logs = this.GetLogs(); - - // Find the log entry with the InvalidOperationException - LogEntry? errorLog = logs.FirstOrDefault(log => - log.Exception is InvalidOperationException && - log.Exception.Message.Contains("exceeds the", StringComparison.OrdinalIgnoreCase) && - log.Exception.Message.Contains("MB limit", StringComparison.OrdinalIgnoreCase)); - - Assert.NotNull(errorLog); - Assert.NotNull(errorLog.Exception); - Assert.IsType(errorLog.Exception); - - // Verify the error message contains the expected information - Assert.Contains("exceeds the", errorLog.Exception.Message, StringComparison.OrdinalIgnoreCase); - Assert.Contains("MB limit", errorLog.Exception.Message, StringComparison.OrdinalIgnoreCase); - Assert.Contains("ScheduleTask", errorLog.Exception.Message, StringComparison.OrdinalIgnoreCase); + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + Assert.NotNull(metadata.FailureDetails); + Assert.Contains("exceeds the", metadata.FailureDetails.ErrorMessage, StringComparison.OrdinalIgnoreCase); + Assert.Contains("MB limit", metadata.FailureDetails.ErrorMessage, StringComparison.OrdinalIgnoreCase); + Assert.Equal(typeof(InvalidOperationException).FullName, metadata.FailureDetails.ErrorType); } } From c4b83411bc4eb0b41a7cc95805b20fc7559cbc1a Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Wed, 3 Dec 2025 12:03:16 -0800 Subject: [PATCH 12/24] advise --- src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index 0d3b1e802..18a99b169 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -940,7 +940,8 @@ async Task CompleteOrchestratorTaskWithChunkingAsync( if (actionSize > maxChunkBytes) { string errorMessage = $"A single orchestrator action of type {action.OrchestratorActionTypeCase} with id {action.Id} " + - $"exceeds the {maxChunkBytes / 1024.0 / 1024.0:F2}MB limit: {actionSize / 1024.0 / 1024.0:F2}MB"; + $"exceeds the {maxChunkBytes / 1024.0 / 1024.0:F2}MB limit: {actionSize / 1024.0 / 1024.0:F2}MB. " + + "Enable large-payload externalization to Azure Blob Storage to support oversized actions."; return new P.TaskFailureDetails { From fa0d2c9f3cc239046838d975795aaaefc6d5fa7e Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Wed, 3 Dec 2025 12:04:58 -0800 Subject: [PATCH 13/24] todo --- src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index 18a99b169..ad874ae72 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -939,10 +939,10 @@ async Task CompleteOrchestratorTaskWithChunkingAsync( int actionSize = action.CalculateSize(); if (actionSize > maxChunkBytes) { + // TODO: large payload doc is not available yet on aka.ms,add doc link to below error message string errorMessage = $"A single orchestrator action of type {action.OrchestratorActionTypeCase} with id {action.Id} " + $"exceeds the {maxChunkBytes / 1024.0 / 1024.0:F2}MB limit: {actionSize / 1024.0 / 1024.0:F2}MB. " + "Enable large-payload externalization to Azure Blob Storage to support oversized actions."; - return new P.TaskFailureDetails { ErrorType = typeof(InvalidOperationException).FullName, From e76982ab77f933f38cc834f79e3d2e626ffa5b97 Mon Sep 17 00:00:00 2001 From: wangbill Date: Wed, 3 Dec 2025 14:47:41 -0800 Subject: [PATCH 14/24] Update src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index ad874ae72..a65ffc048 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -939,7 +939,7 @@ async Task CompleteOrchestratorTaskWithChunkingAsync( int actionSize = action.CalculateSize(); if (actionSize > maxChunkBytes) { - // TODO: large payload doc is not available yet on aka.ms,add doc link to below error message + // TODO: large payload doc is not available yet on aka.ms, add doc link to below error message string errorMessage = $"A single orchestrator action of type {action.OrchestratorActionTypeCase} with id {action.Id} " + $"exceeds the {maxChunkBytes / 1024.0 / 1024.0:F2}MB limit: {actionSize / 1024.0 / 1024.0:F2}MB. " + "Enable large-payload externalization to Azure Blob Storage to support oversized actions."; From 53b2284aa6760808157df452e315bd753eff391b Mon Sep 17 00:00:00 2001 From: wangbill Date: Wed, 3 Dec 2025 14:49:09 -0800 Subject: [PATCH 15/24] Update src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs Co-authored-by: sophiatev <38052607+sophiatev@users.noreply.github.com> --- src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs index e0ded1654..026f3833e 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs @@ -48,7 +48,8 @@ public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions /// The default value is 3.9MB. We leave some headroom to account for request size overhead. /// /// - /// This value is used to limit the size of the complete orchestration work item chunk request. + /// This value is used to limit the size of the complete orchestration work item request. + /// If the response exceeds this limit, it will be automatically split into multiple chunks of maximum size OrchestrationWorkItemChunkSizeInBytes /// If the response exceeds this limit, it will be automatically split into multiple chunks. /// /// From eb09564c3d568170a775077a1ec61a8bd98d382f Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Wed, 3 Dec 2025 17:56:54 -0800 Subject: [PATCH 16/24] feedback --- src/Grpc/orchestrator_service.proto | 2 + .../Grpc/GrpcDurableTaskWorker.Processor.cs | 24 ++++++---- .../Grpc/GrpcDurableTaskWorkerOptions.cs | 26 +++++------ test/Grpc.IntegrationTests/AutochunkTests.cs | 18 ++++---- .../GrpcDurableTaskWorkerOptionsTests.cs | 46 +++++++++---------- 5 files changed, 60 insertions(+), 56 deletions(-) diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index fb3c86820..44f5e33de 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -359,6 +359,8 @@ message OrchestratorResponse { // True if this is a partial (chunked) completion. The backend must keep the work item open until the final chunk (isPartial=false). bool isPartial = 8; + + google.protobuf.Int32Value chunkIndex = 9; } message CreateInstanceRequest { diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index ad874ae72..928ab6d95 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -759,7 +759,7 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync( await this.CompleteOrchestratorTaskWithChunkingAsync( response, - this.worker.grpcOptions.MaxCompleteOrchestrationWorkItemSizePerChunk, + this.worker.grpcOptions.CompleteOrchestrationWorkItemChunkSizeInBytes, cancellationToken); } @@ -1009,11 +1009,11 @@ static bool TryAddAction( } // Response is too large, split into multiple chunks - int actionsCompletedSoFar = 0; - bool isFirstChunk = true; + int actionsCompletedSoFar = 0, chunkIndex = 0; List allActions = response.Actions.ToList(); + bool allChunksCompleted = false; - while (actionsCompletedSoFar < allActions.Count) + while (!allChunksCompleted) { P.OrchestratorResponse chunkedResponse = new() { @@ -1021,6 +1021,8 @@ static bool TryAddAction( CustomStatus = response.CustomStatus, CompletionToken = response.CompletionToken, RequiresHistory = response.RequiresHistory, + NumEventsProcessed = 0, + ChunkIndex = chunkIndex, }; int chunkSize = chunkedResponse.CalculateSize(); @@ -1035,17 +1037,19 @@ static bool TryAddAction( // Determine if this is a partial chunk (more actions remaining) bool isPartial = actionsCompletedSoFar < allActions.Count; chunkedResponse.IsPartial = isPartial; - - if (isFirstChunk) + if (!isPartial) { - chunkedResponse.OrchestrationTraceContext = response.OrchestrationTraceContext; - isFirstChunk = false; + allChunksCompleted = true; + chunkedResponse.NumEventsProcessed = null; } - else + + if (chunkIndex == 0) { - chunkedResponse.NumEventsProcessed = -1; + chunkedResponse.OrchestrationTraceContext = response.OrchestrationTraceContext; } + chunkIndex++; + // Send the chunk await this.client.CompleteOrchestratorTaskAsync(chunkedResponse, cancellationToken: cancellationToken); } diff --git a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs index e0ded1654..f8720a4b9 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs @@ -13,14 +13,14 @@ public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions /// /// The minimum allowed size (in bytes) for complete orchestration work item chunks. /// - public const int MinCompleteOrchestrationWorkItemSizePerChunkBytes = 1 * 1024 * 1024; // 1 MB + public const int MinCompleteOrchestrationWorkItemChunkSizeInBytes = 1 * 1024 * 1024; // 1 MB /// /// The maximum allowed size (in bytes) for complete orchestration work item chunks. /// - public const int MaxCompleteOrchestrationWorkItemSizePerChunkBytes = 4_089_446; // 3.9 MB + public const int MaxCompleteOrchestrationWorkItemChunkSizeBytes = 4_089_446; // 3.9 MB - int maxCompleteOrchestrationWorkItemSizePerChunk = MaxCompleteOrchestrationWorkItemSizePerChunkBytes; + int completeOrchestrationWorkItemChunkSizeInBytes = MaxCompleteOrchestrationWorkItemChunkSizeBytes; /// /// Gets or sets the address of the gRPC endpoint to connect to. Default is localhost:4001. @@ -48,30 +48,30 @@ public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions /// The default value is 3.9MB. We leave some headroom to account for request size overhead. /// /// - /// This value is used to limit the size of the complete orchestration work item chunk request. + /// This value is used to limit the size of the complete orchestration work item request. /// If the response exceeds this limit, it will be automatically split into multiple chunks. /// /// /// Thrown when the value is less than 1 MB or greater than 3.9 MB. /// - public int MaxCompleteOrchestrationWorkItemSizePerChunk + public int CompleteOrchestrationWorkItemChunkSizeInBytes { - get => this.maxCompleteOrchestrationWorkItemSizePerChunk; + get => this.completeOrchestrationWorkItemChunkSizeInBytes; set { - if (value < MinCompleteOrchestrationWorkItemSizePerChunkBytes || - value > MaxCompleteOrchestrationWorkItemSizePerChunkBytes) + if (value < MinCompleteOrchestrationWorkItemChunkSizeInBytes || + value > MaxCompleteOrchestrationWorkItemChunkSizeBytes) { - string message = $"MaxCompleteOrchestrationWorkItemSizePerChunk must be between " + - $"{MinCompleteOrchestrationWorkItemSizePerChunkBytes} bytes (1 MB) and " + - $"{MaxCompleteOrchestrationWorkItemSizePerChunkBytes} bytes (3.9 MB), inclusive."; + string message = $"CompleteOrchestrationWorkItemChunkSizeInBytes must be between " + + $"{MinCompleteOrchestrationWorkItemChunkSizeInBytes} bytes (1 MB) and " + + $"{MaxCompleteOrchestrationWorkItemChunkSizeBytes} bytes (3.9 MB), inclusive."; throw new ArgumentOutOfRangeException( - nameof(this.MaxCompleteOrchestrationWorkItemSizePerChunk), + nameof(this.CompleteOrchestrationWorkItemChunkSizeInBytes), value, message); } - this.maxCompleteOrchestrationWorkItemSizePerChunk = value; + this.completeOrchestrationWorkItemChunkSizeInBytes = value; } } diff --git a/test/Grpc.IntegrationTests/AutochunkTests.cs b/test/Grpc.IntegrationTests/AutochunkTests.cs index 63b51b695..0bab2254f 100644 --- a/test/Grpc.IntegrationTests/AutochunkTests.cs +++ b/test/Grpc.IntegrationTests/AutochunkTests.cs @@ -24,14 +24,14 @@ public async Task Autochunk_MultipleChunks_CompletesSuccessfully() { const int ActivityCount = 36; const int PayloadSizePerActivity = 30 * 1024; - const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes; // 1 MB (minimum allowed) + const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemChunkSizeInBytes; // 1 MB (minimum allowed) TaskName orchestratorName = nameof(Autochunk_MultipleChunks_CompletesSuccessfully); TaskName activityName = "Echo"; await using HostTestLifetime server = await this.StartWorkerAsync(b => { // Set a small chunk size to force chunking - b.UseGrpc(opt => opt.MaxCompleteOrchestrationWorkItemSizePerChunk = ChunkSize); + b.UseGrpc(opt => opt.CompleteOrchestrationWorkItemChunkSizeInBytes = ChunkSize); b.AddTasks(tasks => tasks .AddOrchestratorFunc(orchestratorName, async ctx => { @@ -70,7 +70,7 @@ public async Task Autochunk_MixedActions_CompletesSuccessfully() const int TimerCount = 100; const int SubOrchCount = 50; const int PayloadSizePerActivity = 20 * 1024; - const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes; // 1 MB (minimum allowed) + const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemChunkSizeInBytes; // 1 MB (minimum allowed) TaskName orchestratorName = nameof(Autochunk_MixedActions_CompletesSuccessfully); TaskName activityName = "Echo"; TaskName subOrchName = "SubOrch"; @@ -78,7 +78,7 @@ public async Task Autochunk_MixedActions_CompletesSuccessfully() await using HostTestLifetime server = await this.StartWorkerAsync(b => { // Set a small chunk size to force chunking - b.UseGrpc(opt => opt.MaxCompleteOrchestrationWorkItemSizePerChunk = ChunkSize); + b.UseGrpc(opt => opt.CompleteOrchestrationWorkItemChunkSizeInBytes = ChunkSize); b.AddTasks(tasks => tasks .AddOrchestratorFunc(orchestratorName, async ctx => { @@ -123,13 +123,13 @@ public async Task Autochunk_MixedActions_CompletesSuccessfully() } /// - /// Validates that when a single orchestrator action exceeds the MaxCompleteOrchestrationWorkItemSizePerChunk limit, + /// Validates that when a single orchestrator action exceeds the CompleteOrchestrationWorkItemChunkSizeInBytes limit, /// the orchestration completes with a failed status and proper failure details. /// [Fact] public async Task Autochunk_SingleActionExceedsChunkSize_CompletesWithFailedStatus() { - const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes; // 1 MB + const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemChunkSizeInBytes; // 1 MB // Create a payload that exceeds the chunk size (1 MB + some overhead) const int PayloadSize = ChunkSize + 100 * 1024; // 1.1 MB payload TaskName orchestratorName = nameof(Autochunk_SingleActionExceedsChunkSize_CompletesWithFailedStatus); @@ -137,7 +137,7 @@ public async Task Autochunk_SingleActionExceedsChunkSize_CompletesWithFailedStat await using HostTestLifetime server = await this.StartWorkerAsync(b => { - b.UseGrpc(opt => opt.MaxCompleteOrchestrationWorkItemSizePerChunk = ChunkSize); + b.UseGrpc(opt => opt.CompleteOrchestrationWorkItemChunkSizeInBytes = ChunkSize); b.AddTasks(tasks => tasks .AddOrchestratorFunc(orchestratorName, async ctx => { @@ -157,9 +157,7 @@ public async Task Autochunk_SingleActionExceedsChunkSize_CompletesWithFailedStat Assert.Equal(instanceId, metadata.InstanceId); Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); Assert.NotNull(metadata.FailureDetails); - Assert.Contains("exceeds the", metadata.FailureDetails.ErrorMessage, StringComparison.OrdinalIgnoreCase); - Assert.Contains("MB limit", metadata.FailureDetails.ErrorMessage, StringComparison.OrdinalIgnoreCase); - Assert.Equal(typeof(InvalidOperationException).FullName, metadata.FailureDetails.ErrorType); + Assert.Equal("System.InvalidOperationException: A single orchestrator action of type ScheduleTask with id 0 exceeds the 1.00MB limit: 1.10MB. Enable large-payload externalization to Azure Blob Storage to support oversized actions.", metadata.FailureDetails.ToString()); } } diff --git a/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerOptionsTests.cs b/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerOptionsTests.cs index 1750d09e8..3c2ba74d5 100644 --- a/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerOptionsTests.cs +++ b/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerOptionsTests.cs @@ -9,93 +9,93 @@ namespace Microsoft.DurableTask.Worker.Grpc.Tests; public class GrpcDurableTaskWorkerOptionsTests { [Fact] - public void Default_MaxCompleteOrchestrationWorkItemSizePerChunk_IsWithinRange() + public void Default_CompleteOrchestrationWorkItemChunkSizeInBytes_IsWithinRange() { // Arrange var options = new GrpcDurableTaskWorkerOptions(); // Act - int value = options.MaxCompleteOrchestrationWorkItemSizePerChunk; + int value = options.CompleteOrchestrationWorkItemChunkSizeInBytes; // Assert value.Should().BeGreaterOrEqualTo( - GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes); + GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemChunkSizeInBytes); value.Should().BeLessOrEqualTo( - GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemSizePerChunkBytes); + GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemChunkSizeBytes); } [Fact] - public void Setting_MaxCompleteOrchestrationWorkItemSizePerChunk_BelowMin_Throws() + public void Setting_CompleteOrchestrationWorkItemChunkSizeInBytes_BelowMin_Throws() { // Arrange var options = new GrpcDurableTaskWorkerOptions(); - int belowMin = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes - 1; + int belowMin = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemChunkSizeInBytes - 1; // Act - Action act = () => options.MaxCompleteOrchestrationWorkItemSizePerChunk = belowMin; + Action act = () => options.CompleteOrchestrationWorkItemChunkSizeInBytes = belowMin; // Assert act.Should() .Throw() - .WithParameterName(nameof(GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemSizePerChunk)); + .WithParameterName(nameof(GrpcDurableTaskWorkerOptions.CompleteOrchestrationWorkItemChunkSizeInBytes)); } [Fact] - public void Setting_MaxCompleteOrchestrationWorkItemSizePerChunk_AboveMax_Throws() + public void Setting_CompleteOrchestrationWorkItemChunkSizeInBytes_AboveMax_Throws() { // Arrange var options = new GrpcDurableTaskWorkerOptions(); - int aboveMax = GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemSizePerChunkBytes + 1; + int aboveMax = GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemChunkSizeBytes + 1; // Act - Action act = () => options.MaxCompleteOrchestrationWorkItemSizePerChunk = aboveMax; + Action act = () => options.CompleteOrchestrationWorkItemChunkSizeInBytes = aboveMax; // Assert act.Should() .Throw() - .WithParameterName(nameof(GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemSizePerChunk)); + .WithParameterName(nameof(GrpcDurableTaskWorkerOptions.CompleteOrchestrationWorkItemChunkSizeInBytes)); } [Fact] - public void Setting_MaxCompleteOrchestrationWorkItemSizePerChunk_AtMinBoundary_Succeeds() + public void Setting_CompleteOrchestrationWorkItemChunkSizeInBytes_AtMinBoundary_Succeeds() { // Arrange var options = new GrpcDurableTaskWorkerOptions(); - int minValue = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemSizePerChunkBytes; + int minValue = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemChunkSizeInBytes; // Act - options.MaxCompleteOrchestrationWorkItemSizePerChunk = minValue; + options.CompleteOrchestrationWorkItemChunkSizeInBytes = minValue; // Assert - options.MaxCompleteOrchestrationWorkItemSizePerChunk.Should().Be(minValue); + options.CompleteOrchestrationWorkItemChunkSizeInBytes.Should().Be(minValue); } [Fact] - public void Setting_MaxCompleteOrchestrationWorkItemSizePerChunk_AtMaxBoundary_Succeeds() + public void Setting_CompleteOrchestrationWorkItemChunkSizeInBytes_AtMaxBoundary_Succeeds() { // Arrange var options = new GrpcDurableTaskWorkerOptions(); - int maxValue = GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemSizePerChunkBytes; + int maxValue = GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemChunkSizeBytes; // Act - options.MaxCompleteOrchestrationWorkItemSizePerChunk = maxValue; + options.CompleteOrchestrationWorkItemChunkSizeInBytes = maxValue; // Assert - options.MaxCompleteOrchestrationWorkItemSizePerChunk.Should().Be(maxValue); + options.CompleteOrchestrationWorkItemChunkSizeInBytes.Should().Be(maxValue); } [Fact] - public void Setting_MaxCompleteOrchestrationWorkItemSizePerChunk_WithinRange_Succeeds() + public void Setting_CompleteOrchestrationWorkItemChunkSizeInBytes_WithinRange_Succeeds() { // Arrange var options = new GrpcDurableTaskWorkerOptions(); int withinRange = 2 * 1024 * 1024; // 2 MB // Act - options.MaxCompleteOrchestrationWorkItemSizePerChunk = withinRange; + options.CompleteOrchestrationWorkItemChunkSizeInBytes = withinRange; // Assert - options.MaxCompleteOrchestrationWorkItemSizePerChunk.Should().Be(withinRange); + options.CompleteOrchestrationWorkItemChunkSizeInBytes.Should().Be(withinRange); } } From e3b86cc99824885b6cdb1dd31d3da23782ea5eb0 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Wed, 3 Dec 2025 19:02:05 -0800 Subject: [PATCH 17/24] remove still in first chunk --- src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index 27f96b28c..c28c363cc 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -1040,11 +1040,11 @@ static bool TryAddAction( if (!isPartial) { allChunksCompleted = true; - chunkedResponse.NumEventsProcessed = null; } if (chunkIndex == 0) { + chunkedResponse.NumEventsProcessed = null; chunkedResponse.OrchestrationTraceContext = response.OrchestrationTraceContext; } From f73b5d4e4771481fc0821381c42170476e60626f Mon Sep 17 00:00:00 2001 From: wangbill Date: Thu, 4 Dec 2025 13:02:02 -0800 Subject: [PATCH 18/24] Update src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs index 0c9cc0b3d..57c2f13bd 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs @@ -49,7 +49,7 @@ public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions /// /// /// This value is used to limit the size of the complete orchestration work item request. - /// If the response exceeds this limit, it will be automatically split into multiple chunks of maximum size OrchestrationWorkItemChunkSizeInBytes + /// If the response exceeds this limit, it will be automatically split into multiple chunks of maximum size CompleteOrchestrationWorkItemChunkSizeInBytes /// /// /// Thrown when the value is less than 1 MB or greater than 3.9 MB. From 6079088a3b38ee58e086e5c21b8d1f11de810d7d Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 4 Dec 2025 16:00:18 -0800 Subject: [PATCH 19/24] feedback --- src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs | 2 +- src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs index 70edcf557..dafcc01b2 100644 --- a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs +++ b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs @@ -593,7 +593,7 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, GrpcOrchestratorExecutionResult res = new() { Actions = existingPartialChunk.AccumulatedActions, - CustomStatus = request.CustomStatus, // Use custom status from final chunk + CustomStatus = request.CustomStatus, }; // Remove the TCS from pending tasks and complete it diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index c28c363cc..fe7ada4e2 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -1044,6 +1044,9 @@ static bool TryAddAction( if (chunkIndex == 0) { + // The first chunk preserves the original response's NumEventsProcessed value (null) + // When this is set to null, backend by default handles all the messages in the workitem. + // For subsequent chunks, we set it to 0 since all messages are already handled in first chunk. chunkedResponse.NumEventsProcessed = null; chunkedResponse.OrchestrationTraceContext = response.OrchestrationTraceContext; } From 2ed816058560df345e735730cc02129e8e31d403 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 4 Dec 2025 16:47:13 -0800 Subject: [PATCH 20/24] rename --- src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs | 8 ++++---- .../Grpc.Tests/GrpcDurableTaskWorkerOptionsTests.cs | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs index 57c2f13bd..406439d77 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs @@ -18,9 +18,9 @@ public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions /// /// The maximum allowed size (in bytes) for complete orchestration work item chunks. /// - public const int MaxCompleteOrchestrationWorkItemChunkSizeBytes = 4_089_446; // 3.9 MB + public const int MaxCompleteOrchestrationWorkItemChunkSizeInBytes = 4_089_446; // 3.9 MB - int completeOrchestrationWorkItemChunkSizeInBytes = MaxCompleteOrchestrationWorkItemChunkSizeBytes; + int completeOrchestrationWorkItemChunkSizeInBytes = MaxCompleteOrchestrationWorkItemChunkSizeInBytes; /// /// Gets or sets the address of the gRPC endpoint to connect to. Default is localhost:4001. @@ -60,11 +60,11 @@ public int CompleteOrchestrationWorkItemChunkSizeInBytes set { if (value < MinCompleteOrchestrationWorkItemChunkSizeInBytes || - value > MaxCompleteOrchestrationWorkItemChunkSizeBytes) + value > MaxCompleteOrchestrationWorkItemChunkSizeInBytes) { string message = $"CompleteOrchestrationWorkItemChunkSizeInBytes must be between " + $"{MinCompleteOrchestrationWorkItemChunkSizeInBytes} bytes (1 MB) and " + - $"{MaxCompleteOrchestrationWorkItemChunkSizeBytes} bytes (3.9 MB), inclusive."; + $"{MaxCompleteOrchestrationWorkItemChunkSizeInBytes} bytes (3.9 MB), inclusive."; throw new ArgumentOutOfRangeException( nameof(this.CompleteOrchestrationWorkItemChunkSizeInBytes), value, diff --git a/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerOptionsTests.cs b/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerOptionsTests.cs index 3c2ba74d5..8aaeda998 100644 --- a/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerOptionsTests.cs +++ b/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerOptionsTests.cs @@ -21,7 +21,7 @@ public void Default_CompleteOrchestrationWorkItemChunkSizeInBytes_IsWithinRange( value.Should().BeGreaterOrEqualTo( GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemChunkSizeInBytes); value.Should().BeLessOrEqualTo( - GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemChunkSizeBytes); + GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemChunkSizeInBytes); } [Fact] @@ -45,7 +45,7 @@ public void Setting_CompleteOrchestrationWorkItemChunkSizeInBytes_AboveMax_Throw { // Arrange var options = new GrpcDurableTaskWorkerOptions(); - int aboveMax = GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemChunkSizeBytes + 1; + int aboveMax = GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemChunkSizeInBytes + 1; // Act Action act = () => options.CompleteOrchestrationWorkItemChunkSizeInBytes = aboveMax; @@ -75,7 +75,7 @@ public void Setting_CompleteOrchestrationWorkItemChunkSizeInBytes_AtMaxBoundary_ { // Arrange var options = new GrpcDurableTaskWorkerOptions(); - int maxValue = GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemChunkSizeBytes; + int maxValue = GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemChunkSizeInBytes; // Act options.CompleteOrchestrationWorkItemChunkSizeInBytes = maxValue; From b67e4a1d8f257a6686afa447a2297f857fa25371 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Fri, 5 Dec 2025 01:03:44 -0800 Subject: [PATCH 21/24] exclude baserequest from chunk filling --- src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index fe7ada4e2..effedd217 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -1025,11 +1025,11 @@ static bool TryAddAction( ChunkIndex = chunkIndex, }; - int chunkSize = chunkedResponse.CalculateSize(); + int chunkPayloadSize = 0; // Fill the chunk with actions until we reach the size limit while (actionsCompletedSoFar < allActions.Count && - TryAddAction(chunkedResponse.Actions, allActions[actionsCompletedSoFar], ref chunkSize, maxChunkBytes)) + TryAddAction(chunkedResponse.Actions, allActions[actionsCompletedSoFar], ref chunkPayloadSize, maxChunkBytes)) { actionsCompletedSoFar++; } From 395be1531b06df761b491052158196aec8d1764b Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Fri, 5 Dec 2025 02:05:33 -0800 Subject: [PATCH 22/24] feedback --- src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index effedd217..0049bb0af 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -1011,9 +1011,9 @@ static bool TryAddAction( // Response is too large, split into multiple chunks int actionsCompletedSoFar = 0, chunkIndex = 0; List allActions = response.Actions.ToList(); - bool allChunksCompleted = false; + bool isPartial = true; - while (!allChunksCompleted) + while (isPartial) { P.OrchestratorResponse chunkedResponse = new() { @@ -1035,12 +1035,8 @@ static bool TryAddAction( } // Determine if this is a partial chunk (more actions remaining) - bool isPartial = actionsCompletedSoFar < allActions.Count; + isPartial = actionsCompletedSoFar < allActions.Count; chunkedResponse.IsPartial = isPartial; - if (!isPartial) - { - allChunksCompleted = true; - } if (chunkIndex == 0) { From deffc056081a12eeeba5f673cb084d58d4117f6a Mon Sep 17 00:00:00 2001 From: wangbill Date: Fri, 5 Dec 2025 09:18:49 -0800 Subject: [PATCH 23/24] Update src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs index 406439d77..52372f65d 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs @@ -62,7 +62,7 @@ public int CompleteOrchestrationWorkItemChunkSizeInBytes if (value < MinCompleteOrchestrationWorkItemChunkSizeInBytes || value > MaxCompleteOrchestrationWorkItemChunkSizeInBytes) { - string message = $"CompleteOrchestrationWorkItemChunkSizeInBytes must be between " + + string message = $"{nameof(CompleteOrchestrationWorkItemChunkSizeInBytes)} must be between " + $"{MinCompleteOrchestrationWorkItemChunkSizeInBytes} bytes (1 MB) and " + $"{MaxCompleteOrchestrationWorkItemChunkSizeInBytes} bytes (3.9 MB), inclusive."; throw new ArgumentOutOfRangeException( From 3065a09336a80d71b74d2da5a0fdfde6b3b1860b Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Fri, 5 Dec 2025 11:49:55 -0800 Subject: [PATCH 24/24] update proto --- src/Grpc/orchestrator_service.proto | 2 ++ src/Grpc/versions.txt | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index 44f5e33de..f58cf37b5 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -360,6 +360,8 @@ message OrchestratorResponse { // True if this is a partial (chunked) completion. The backend must keep the work item open until the final chunk (isPartial=false). bool isPartial = 8; + // Zero-based position of the current chunk within a chunked completion sequence. + // This field is omitted for non-chunked completions. google.protobuf.Int32Value chunkIndex = 9; } diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index 52f3e8688..2876119ff 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-11-25 20:53:40 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/9f762f1301b91e3e7c736b9c5a29c2e09f2a850e/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2025-12-05 19:49:32 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/1774123378045dd87cd3307c50da4cdd4551a817/protos/orchestrator_service.proto