diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index 1a86c0a27..f58cf37b5 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -356,6 +356,13 @@ message OrchestratorResponse { // Whether or not a history is required to complete the original OrchestratorRequest and none was provided. bool requiresHistory = 7; + + // True if this is a partial (chunked) completion. The backend must keep the work item open until the final chunk (isPartial=false). + bool isPartial = 8; + + // Zero-based position of the current chunk within a chunked completion sequence. + // This field is omitted for non-chunked completions. + google.protobuf.Int32Value chunkIndex = 9; } message CreateInstanceRequest { diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index 69b075cd3..2876119ff 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-11-14 16:36:47 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/9f762f1301b91e3e7c736b9c5a29c2e09f2a850e/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2025-12-05 19:49:32 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/1774123378045dd87cd3307c50da4cdd4551a817/protos/orchestrator_service.proto diff --git a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs index 15f65dd84..5786d6c52 100644 --- a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs +++ b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs @@ -5,6 +5,7 @@ using System.Diagnostics; using System.Globalization; using DurableTask.Core; +using DurableTask.Core.Command; using DurableTask.Core.History; using DurableTask.Core.Query; using Google.Protobuf.WellKnownTypes; @@ -30,6 +31,29 @@ public class TaskHubGrpcServer : P.TaskHubSidecarService.TaskHubSidecarServiceBa readonly ConcurrentDictionary> pendingOrchestratorTasks = new(StringComparer.OrdinalIgnoreCase); readonly ConcurrentDictionary> pendingActivityTasks = new(StringComparer.OrdinalIgnoreCase); + readonly ConcurrentDictionary partialOrchestratorChunks = new(StringComparer.OrdinalIgnoreCase); + + /// + /// Helper class to accumulate partial orchestrator chunks. + /// + sealed class PartialOrchestratorChunk + { + readonly object lockObject = new(); + + public TaskCompletionSource TaskCompletionSource { get; set; } = null!; + public List AccumulatedActions { get; } = new(); + + /// + /// Thread-safely adds actions to the accumulated actions list. + /// + public void AddActions(IEnumerable actions) + { + lock (this.lockObject) + { + this.AccumulatedActions.AddRange(actions); + } + } + } readonly ILogger log; readonly IOrchestrationService service; @@ -191,7 +215,7 @@ async Task WaitForWorkItemClientConnection() /// A create instance response. public override async Task StartInstance(P.CreateInstanceRequest request, ServerCallContext context) { - var instance = new OrchestrationInstance + OrchestrationInstance instance = new OrchestrationInstance { InstanceId = request.InstanceId ?? Guid.NewGuid().ToString("N"), ExecutionId = Guid.NewGuid().ToString(), @@ -535,6 +559,51 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, /// Returns an empty ack back to the remote SDK that we've received the completion. public override Task CompleteOrchestratorTask(P.OrchestratorResponse request, ServerCallContext context) { + if (request.IsPartial) + { + // This is a partial chunk - accumulate actions but don't complete yet + PartialOrchestratorChunk partialChunk = this.partialOrchestratorChunks.GetOrAdd( + request.InstanceId, + _ => + { + // First chunk - get the TCS and initialize the partial chunk + if (!this.pendingOrchestratorTasks.TryGetValue(request.InstanceId, out TaskCompletionSource? tcs)) + { + throw new RpcException(new Status(StatusCode.NotFound, $"Orchestration with instance ID '{request.InstanceId}' not found")); + } + + return new PartialOrchestratorChunk + { + TaskCompletionSource = tcs, + }; + }); + + // Accumulate actions from this chunk (thread-safe) + partialChunk.AddActions(request.Actions.Select(ProtobufUtils.ToOrchestratorAction)); + + return EmptyCompleteTaskResponse; + } + + // This is the final chunk (or a single non-chunked response) + if (this.partialOrchestratorChunks.TryRemove(request.InstanceId, out PartialOrchestratorChunk? existingPartialChunk)) + { + // We've been accumulating chunks - combine with final chunk (thread-safe) + existingPartialChunk.AddActions(request.Actions.Select(ProtobufUtils.ToOrchestratorAction)); + + GrpcOrchestratorExecutionResult res = new() + { + Actions = existingPartialChunk.AccumulatedActions, + CustomStatus = request.CustomStatus, + }; + + // Remove the TCS from pending tasks and complete it + this.pendingOrchestratorTasks.TryRemove(request.InstanceId, out _); + existingPartialChunk.TaskCompletionSource.TrySetResult(res); + + return EmptyCompleteTaskResponse; + } + + // Single non-chunked response (no partial chunks) if (!this.pendingOrchestratorTasks.TryRemove( request.InstanceId, out TaskCompletionSource? tcs)) @@ -691,7 +760,7 @@ async Task ITaskExecutor.ExecuteOrchestrator( IEnumerable pastEvents, IEnumerable newEvents) { - var executionStartedEvent = pastEvents.OfType().FirstOrDefault(); + ExecutionStartedEvent? executionStartedEvent = pastEvents.OfType().FirstOrDefault(); P.OrchestrationTraceContext? orchestrationTraceContext = executionStartedEvent?.ParentTraceContext?.SpanId is not null ? new P.OrchestrationTraceContext @@ -708,7 +777,7 @@ async Task ITaskExecutor.ExecuteOrchestrator( try { - var orkRequest = new P.OrchestratorRequest + P.OrchestratorRequest orkRequest = new P.OrchestratorRequest { InstanceId = instance.InstanceId, ExecutionId = instance.ExecutionId, @@ -836,6 +905,7 @@ TaskCompletionSource CreateTaskCompletionSource void RemoveOrchestratorTaskCompletionSource(string instanceId) { this.pendingOrchestratorTasks.TryRemove(instanceId, out _); + this.partialOrchestratorChunks.TryRemove(instanceId, out _); } TaskCompletionSource CreateTaskCompletionSourceForActivity(string instanceId, int taskId) diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index ffd38cf1f..0049bb0af 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -2,11 +2,13 @@ // Licensed under the MIT License. using System.Diagnostics; +using System.Linq; using System.Text; using DurableTask.Core; using DurableTask.Core.Entities; using DurableTask.Core.Entities.OperationFormat; using DurableTask.Core.History; +using Google.Protobuf; using Microsoft.DurableTask.Abstractions; using Microsoft.DurableTask.Entities; using Microsoft.DurableTask.Tracing; @@ -755,7 +757,10 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync( response.Actions.Count, GetActionsListForLogging(response.Actions)); - await this.client.CompleteOrchestratorTaskAsync(response, cancellationToken: cancellationToken); + await this.CompleteOrchestratorTaskWithChunkingAsync( + response, + this.worker.grpcOptions.CompleteOrchestrationWorkItemChunkSizeInBytes, + cancellationToken); } async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken, CancellationToken cancellation) @@ -914,5 +919,139 @@ async Task OnRunEntityBatchAsync( await this.client.CompleteEntityTaskAsync(response, cancellationToken: cancellation); } + + /// + /// Completes an orchestration task with automatic chunking if the response exceeds the maximum size. + /// + /// The orchestrator response to send. + /// The maximum size in bytes for each chunk. + /// The cancellation token. + async Task CompleteOrchestratorTaskWithChunkingAsync( + P.OrchestratorResponse response, + int maxChunkBytes, + CancellationToken cancellationToken) + { + // Validate that no single action exceeds the maximum chunk size + static P.TaskFailureDetails? ValidateActionsSize(IEnumerable actions, int maxChunkBytes) + { + foreach (P.OrchestratorAction action in actions) + { + int actionSize = action.CalculateSize(); + if (actionSize > maxChunkBytes) + { + // TODO: large payload doc is not available yet on aka.ms, add doc link to below error message + string errorMessage = $"A single orchestrator action of type {action.OrchestratorActionTypeCase} with id {action.Id} " + + $"exceeds the {maxChunkBytes / 1024.0 / 1024.0:F2}MB limit: {actionSize / 1024.0 / 1024.0:F2}MB. " + + "Enable large-payload externalization to Azure Blob Storage to support oversized actions."; + return new P.TaskFailureDetails + { + ErrorType = typeof(InvalidOperationException).FullName, + ErrorMessage = errorMessage, + IsNonRetriable = true, + }; + } + } + + return null; + } + + P.TaskFailureDetails? validationFailure = ValidateActionsSize(response.Actions, maxChunkBytes); + if (validationFailure != null) + { + // Complete the orchestration with a failed status and failure details + P.OrchestratorResponse failureResponse = new() + { + InstanceId = response.InstanceId, + CompletionToken = response.CompletionToken, + OrchestrationTraceContext = response.OrchestrationTraceContext, + Actions = + { + new P.OrchestratorAction + { + CompleteOrchestration = new P.CompleteOrchestrationAction + { + OrchestrationStatus = P.OrchestrationStatus.Failed, + FailureDetails = validationFailure, + }, + }, + }, + }; + + await this.client.CompleteOrchestratorTaskAsync(failureResponse, cancellationToken: cancellationToken); + return; + } + + // Helper to add an action to the current chunk if it fits + static bool TryAddAction( + Google.Protobuf.Collections.RepeatedField dest, + P.OrchestratorAction action, + ref int currentSize, + int maxChunkBytes) + { + int actionSize = action.CalculateSize(); + if (currentSize + actionSize > maxChunkBytes) + { + return false; + } + + dest.Add(action); + currentSize += actionSize; + return true; + } + + // Check if the entire response fits in one chunk + int totalSize = response.CalculateSize(); + if (totalSize <= maxChunkBytes) + { + // Response fits in one chunk, send it directly (isPartial defaults to false) + await this.client.CompleteOrchestratorTaskAsync(response, cancellationToken: cancellationToken); + return; + } + + // Response is too large, split into multiple chunks + int actionsCompletedSoFar = 0, chunkIndex = 0; + List allActions = response.Actions.ToList(); + bool isPartial = true; + + while (isPartial) + { + P.OrchestratorResponse chunkedResponse = new() + { + InstanceId = response.InstanceId, + CustomStatus = response.CustomStatus, + CompletionToken = response.CompletionToken, + RequiresHistory = response.RequiresHistory, + NumEventsProcessed = 0, + ChunkIndex = chunkIndex, + }; + + int chunkPayloadSize = 0; + + // Fill the chunk with actions until we reach the size limit + while (actionsCompletedSoFar < allActions.Count && + TryAddAction(chunkedResponse.Actions, allActions[actionsCompletedSoFar], ref chunkPayloadSize, maxChunkBytes)) + { + actionsCompletedSoFar++; + } + + // Determine if this is a partial chunk (more actions remaining) + isPartial = actionsCompletedSoFar < allActions.Count; + chunkedResponse.IsPartial = isPartial; + + if (chunkIndex == 0) + { + // The first chunk preserves the original response's NumEventsProcessed value (null) + // When this is set to null, backend by default handles all the messages in the workitem. + // For subsequent chunks, we set it to 0 since all messages are already handled in first chunk. + chunkedResponse.NumEventsProcessed = null; + chunkedResponse.OrchestrationTraceContext = response.OrchestrationTraceContext; + } + + chunkIndex++; + + // Send the chunk + await this.client.CompleteOrchestratorTaskAsync(chunkedResponse, cancellationToken: cancellationToken); + } + } } } diff --git a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs index 78d520878..52372f65d 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs @@ -10,6 +10,18 @@ namespace Microsoft.DurableTask.Worker.Grpc; /// public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions { + /// + /// The minimum allowed size (in bytes) for complete orchestration work item chunks. + /// + public const int MinCompleteOrchestrationWorkItemChunkSizeInBytes = 1 * 1024 * 1024; // 1 MB + + /// + /// The maximum allowed size (in bytes) for complete orchestration work item chunks. + /// + public const int MaxCompleteOrchestrationWorkItemChunkSizeInBytes = 4_089_446; // 3.9 MB + + int completeOrchestrationWorkItemChunkSizeInBytes = MaxCompleteOrchestrationWorkItemChunkSizeInBytes; + /// /// Gets or sets the address of the gRPC endpoint to connect to. Default is localhost:4001. /// @@ -31,6 +43,38 @@ public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions /// public HashSet Capabilities { get; } = new() { P.WorkerCapability.HistoryStreaming }; + /// + /// Gets or sets the maximum size of all actions in a complete orchestration work item chunk. + /// The default value is 3.9MB. We leave some headroom to account for request size overhead. + /// + /// + /// This value is used to limit the size of the complete orchestration work item request. + /// If the response exceeds this limit, it will be automatically split into multiple chunks of maximum size CompleteOrchestrationWorkItemChunkSizeInBytes + /// + /// + /// Thrown when the value is less than 1 MB or greater than 3.9 MB. + /// + public int CompleteOrchestrationWorkItemChunkSizeInBytes + { + get => this.completeOrchestrationWorkItemChunkSizeInBytes; + set + { + if (value < MinCompleteOrchestrationWorkItemChunkSizeInBytes || + value > MaxCompleteOrchestrationWorkItemChunkSizeInBytes) + { + string message = $"{nameof(CompleteOrchestrationWorkItemChunkSizeInBytes)} must be between " + + $"{MinCompleteOrchestrationWorkItemChunkSizeInBytes} bytes (1 MB) and " + + $"{MaxCompleteOrchestrationWorkItemChunkSizeInBytes} bytes (3.9 MB), inclusive."; + throw new ArgumentOutOfRangeException( + nameof(this.CompleteOrchestrationWorkItemChunkSizeInBytes), + value, + message); + } + + this.completeOrchestrationWorkItemChunkSizeInBytes = value; + } + } + /// /// Gets the internal protocol options. These are used to control backend-dependent features. /// diff --git a/test/Grpc.IntegrationTests/AutochunkTests.cs b/test/Grpc.IntegrationTests/AutochunkTests.cs new file mode 100644 index 000000000..0bab2254f --- /dev/null +++ b/test/Grpc.IntegrationTests/AutochunkTests.cs @@ -0,0 +1,163 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Tests.Logging; +using Microsoft.DurableTask.Worker; +using Microsoft.DurableTask.Worker.Grpc; +using Xunit.Abstractions; + +namespace Microsoft.DurableTask.Grpc.Tests; + +/// +/// Integration tests for validating autochunk functionality when orchestration completion responses +/// exceed the maximum chunk size and are automatically split into multiple chunks. +/// +public class AutochunkTests(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture) : IntegrationTestBase(output, sidecarFixture) +{ + /// + /// Validates that orchestrations complete successfully when the completion response + /// exceeds the chunk size and must be split into multiple chunks. + /// + [Fact] + public async Task Autochunk_MultipleChunks_CompletesSuccessfully() + { + const int ActivityCount = 36; + const int PayloadSizePerActivity = 30 * 1024; + const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemChunkSizeInBytes; // 1 MB (minimum allowed) + TaskName orchestratorName = nameof(Autochunk_MultipleChunks_CompletesSuccessfully); + TaskName activityName = "Echo"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + // Set a small chunk size to force chunking + b.UseGrpc(opt => opt.CompleteOrchestrationWorkItemChunkSizeInBytes = ChunkSize); + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + // Start all activities in parallel so they're all in the same completion response + List> tasks = new List>(); + for (int i = 0; i < ActivityCount; i++) + { + string payload = new string((char)('A' + (i % 26)), PayloadSizePerActivity); + tasks.Add(ctx.CallActivityAsync(activityName, payload)); + } + string[] results = await Task.WhenAll(tasks); + return results.Length; + }) + .AddActivityFunc(activityName, (ctx, input) => Task.FromResult(input))); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, cts.Token); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(ActivityCount, metadata.ReadOutputAs()); + } + + /// + /// Validates autochunking with mixed action types (activities, timers, sub-orchestrations). + /// + [Fact] + public async Task Autochunk_MixedActions_CompletesSuccessfully() + { + // Use minimum allowed chunk size (1 MB) and ensure total payload exceeds it to trigger chunking + const int ActivityCount = 30; + const int TimerCount = 100; + const int SubOrchCount = 50; + const int PayloadSizePerActivity = 20 * 1024; + const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemChunkSizeInBytes; // 1 MB (minimum allowed) + TaskName orchestratorName = nameof(Autochunk_MixedActions_CompletesSuccessfully); + TaskName activityName = "Echo"; + TaskName subOrchName = "SubOrch"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + // Set a small chunk size to force chunking + b.UseGrpc(opt => opt.CompleteOrchestrationWorkItemChunkSizeInBytes = ChunkSize); + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + // Start all actions in parallel so they're all in the same completion response + List allTasks = new List(); + + // Activities + for (int i = 0; i < ActivityCount; i++) + { + string payload = new string('A', PayloadSizePerActivity); + allTasks.Add(ctx.CallActivityAsync(activityName, payload)); + } + + // Timers + for (int i = 0; i < TimerCount; i++) + { + allTasks.Add(ctx.CreateTimer(TimeSpan.FromMilliseconds(10), CancellationToken.None)); + } + + // Sub-orchestrations + for (int i = 0; i < SubOrchCount; i++) + { + allTasks.Add(ctx.CallSubOrchestratorAsync(subOrchName, i)); + } + + await Task.WhenAll(allTasks); + return ActivityCount + TimerCount + SubOrchCount; + }) + .AddOrchestratorFunc(subOrchName, (ctx, input) => Task.FromResult(input)) + .AddActivityFunc(activityName, (ctx, input) => Task.FromResult(input))); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, cts.Token); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(ActivityCount + TimerCount + SubOrchCount, metadata.ReadOutputAs()); + } + + /// + /// Validates that when a single orchestrator action exceeds the CompleteOrchestrationWorkItemChunkSizeInBytes limit, + /// the orchestration completes with a failed status and proper failure details. + /// + [Fact] + public async Task Autochunk_SingleActionExceedsChunkSize_CompletesWithFailedStatus() + { + const int ChunkSize = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemChunkSizeInBytes; // 1 MB + // Create a payload that exceeds the chunk size (1 MB + some overhead) + const int PayloadSize = ChunkSize + 100 * 1024; // 1.1 MB payload + TaskName orchestratorName = nameof(Autochunk_SingleActionExceedsChunkSize_CompletesWithFailedStatus); + TaskName activityName = "Echo"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.UseGrpc(opt => opt.CompleteOrchestrationWorkItemChunkSizeInBytes = ChunkSize); + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + // Attempt to schedule an activity with a payload that exceeds the chunk size + string largePayload = new string('A', PayloadSize); + return await ctx.CallActivityAsync(activityName, largePayload); + }) + .AddActivityFunc(activityName, (ctx, input) => Task.FromResult(input))); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, cts.Token); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + Assert.NotNull(metadata.FailureDetails); + Assert.Equal("System.InvalidOperationException: A single orchestrator action of type ScheduleTask with id 0 exceeds the 1.00MB limit: 1.10MB. Enable large-payload externalization to Azure Blob Storage to support oversized actions.", metadata.FailureDetails.ToString()); + } +} + diff --git a/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerOptionsTests.cs b/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerOptionsTests.cs new file mode 100644 index 000000000..8aaeda998 --- /dev/null +++ b/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerOptionsTests.cs @@ -0,0 +1,101 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Worker.Grpc.Tests; + +/// +/// Tests for validation. +/// +public class GrpcDurableTaskWorkerOptionsTests +{ + [Fact] + public void Default_CompleteOrchestrationWorkItemChunkSizeInBytes_IsWithinRange() + { + // Arrange + var options = new GrpcDurableTaskWorkerOptions(); + + // Act + int value = options.CompleteOrchestrationWorkItemChunkSizeInBytes; + + // Assert + value.Should().BeGreaterOrEqualTo( + GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemChunkSizeInBytes); + value.Should().BeLessOrEqualTo( + GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemChunkSizeInBytes); + } + + [Fact] + public void Setting_CompleteOrchestrationWorkItemChunkSizeInBytes_BelowMin_Throws() + { + // Arrange + var options = new GrpcDurableTaskWorkerOptions(); + int belowMin = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemChunkSizeInBytes - 1; + + // Act + Action act = () => options.CompleteOrchestrationWorkItemChunkSizeInBytes = belowMin; + + // Assert + act.Should() + .Throw() + .WithParameterName(nameof(GrpcDurableTaskWorkerOptions.CompleteOrchestrationWorkItemChunkSizeInBytes)); + } + + [Fact] + public void Setting_CompleteOrchestrationWorkItemChunkSizeInBytes_AboveMax_Throws() + { + // Arrange + var options = new GrpcDurableTaskWorkerOptions(); + int aboveMax = GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemChunkSizeInBytes + 1; + + // Act + Action act = () => options.CompleteOrchestrationWorkItemChunkSizeInBytes = aboveMax; + + // Assert + act.Should() + .Throw() + .WithParameterName(nameof(GrpcDurableTaskWorkerOptions.CompleteOrchestrationWorkItemChunkSizeInBytes)); + } + + [Fact] + public void Setting_CompleteOrchestrationWorkItemChunkSizeInBytes_AtMinBoundary_Succeeds() + { + // Arrange + var options = new GrpcDurableTaskWorkerOptions(); + int minValue = GrpcDurableTaskWorkerOptions.MinCompleteOrchestrationWorkItemChunkSizeInBytes; + + // Act + options.CompleteOrchestrationWorkItemChunkSizeInBytes = minValue; + + // Assert + options.CompleteOrchestrationWorkItemChunkSizeInBytes.Should().Be(minValue); + } + + [Fact] + public void Setting_CompleteOrchestrationWorkItemChunkSizeInBytes_AtMaxBoundary_Succeeds() + { + // Arrange + var options = new GrpcDurableTaskWorkerOptions(); + int maxValue = GrpcDurableTaskWorkerOptions.MaxCompleteOrchestrationWorkItemChunkSizeInBytes; + + // Act + options.CompleteOrchestrationWorkItemChunkSizeInBytes = maxValue; + + // Assert + options.CompleteOrchestrationWorkItemChunkSizeInBytes.Should().Be(maxValue); + } + + [Fact] + public void Setting_CompleteOrchestrationWorkItemChunkSizeInBytes_WithinRange_Succeeds() + { + // Arrange + var options = new GrpcDurableTaskWorkerOptions(); + int withinRange = 2 * 1024 * 1024; // 2 MB + + // Act + options.CompleteOrchestrationWorkItemChunkSizeInBytes = withinRange; + + // Assert + options.CompleteOrchestrationWorkItemChunkSizeInBytes.Should().Be(withinRange); + } +} +