Skip to content

Commit a6834d1

Browse files
authored
Partial orchestration workitem completion support (Merge after next dts dp release) (#514)
1 parent e944e3c commit a6834d1

File tree

5 files changed

+521
-4
lines changed

5 files changed

+521
-4
lines changed

src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Globalization;
77
using System.Linq;
88
using DurableTask.Core;
9+
using DurableTask.Core.Command;
910
using DurableTask.Core.Exceptions;
1011
using DurableTask.Core.History;
1112
using DurableTask.Core.Query;
@@ -33,6 +34,29 @@ public class TaskHubGrpcServer : P.TaskHubSidecarService.TaskHubSidecarServiceBa
3334

3435
readonly ConcurrentDictionary<string, TaskCompletionSource<GrpcOrchestratorExecutionResult>> pendingOrchestratorTasks = new(StringComparer.OrdinalIgnoreCase);
3536
readonly ConcurrentDictionary<string, TaskCompletionSource<ActivityExecutionResult>> pendingActivityTasks = new(StringComparer.OrdinalIgnoreCase);
37+
readonly ConcurrentDictionary<string, PartialOrchestratorChunk> partialOrchestratorChunks = new(StringComparer.OrdinalIgnoreCase);
38+
39+
/// <summary>
40+
/// Helper class to accumulate partial orchestrator chunks.
41+
/// </summary>
42+
sealed class PartialOrchestratorChunk
43+
{
44+
readonly object lockObject = new();
45+
46+
public TaskCompletionSource<GrpcOrchestratorExecutionResult> TaskCompletionSource { get; set; } = null!;
47+
public List<OrchestratorAction> AccumulatedActions { get; } = new();
48+
49+
/// <summary>
50+
/// Thread-safely adds actions to the accumulated actions list.
51+
/// </summary>
52+
public void AddActions(IEnumerable<OrchestratorAction> actions)
53+
{
54+
lock (this.lockObject)
55+
{
56+
this.AccumulatedActions.AddRange(actions);
57+
}
58+
}
59+
}
3660

3761
readonly ILogger log;
3862
readonly IOrchestrationService service;
@@ -194,7 +218,7 @@ async Task WaitForWorkItemClientConnection()
194218
/// <returns>A create instance response.</returns>
195219
public override async Task<P.CreateInstanceResponse> StartInstance(P.CreateInstanceRequest request, ServerCallContext context)
196220
{
197-
var instance = new OrchestrationInstance
221+
OrchestrationInstance instance = new OrchestrationInstance
198222
{
199223
InstanceId = request.InstanceId ?? Guid.NewGuid().ToString("N"),
200224
ExecutionId = Guid.NewGuid().ToString(),
@@ -557,6 +581,51 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state,
557581
/// <returns>Returns an empty ack back to the remote SDK that we've received the completion.</returns>
558582
public override Task<P.CompleteTaskResponse> CompleteOrchestratorTask(P.OrchestratorResponse request, ServerCallContext context)
559583
{
584+
if (request.IsPartial)
585+
{
586+
// This is a partial chunk - accumulate actions but don't complete yet
587+
PartialOrchestratorChunk partialChunk = this.partialOrchestratorChunks.GetOrAdd(
588+
request.InstanceId,
589+
_ =>
590+
{
591+
// First chunk - get the TCS and initialize the partial chunk
592+
if (!this.pendingOrchestratorTasks.TryGetValue(request.InstanceId, out TaskCompletionSource<GrpcOrchestratorExecutionResult>? tcs))
593+
{
594+
throw new RpcException(new Status(StatusCode.NotFound, $"Orchestration with instance ID '{request.InstanceId}' not found"));
595+
}
596+
597+
return new PartialOrchestratorChunk
598+
{
599+
TaskCompletionSource = tcs,
600+
};
601+
});
602+
603+
// Accumulate actions from this chunk (thread-safe)
604+
partialChunk.AddActions(request.Actions.Select(ProtobufUtils.ToOrchestratorAction));
605+
606+
return EmptyCompleteTaskResponse;
607+
}
608+
609+
// This is the final chunk (or a single non-chunked response)
610+
if (this.partialOrchestratorChunks.TryRemove(request.InstanceId, out PartialOrchestratorChunk? existingPartialChunk))
611+
{
612+
// We've been accumulating chunks - combine with final chunk (thread-safe)
613+
existingPartialChunk.AddActions(request.Actions.Select(ProtobufUtils.ToOrchestratorAction));
614+
615+
GrpcOrchestratorExecutionResult res = new()
616+
{
617+
Actions = existingPartialChunk.AccumulatedActions,
618+
CustomStatus = request.CustomStatus,
619+
};
620+
621+
// Remove the TCS from pending tasks and complete it
622+
this.pendingOrchestratorTasks.TryRemove(request.InstanceId, out _);
623+
existingPartialChunk.TaskCompletionSource.TrySetResult(res);
624+
625+
return EmptyCompleteTaskResponse;
626+
}
627+
628+
// Single non-chunked response (no partial chunks)
560629
if (!this.pendingOrchestratorTasks.TryRemove(
561630
request.InstanceId,
562631
out TaskCompletionSource<GrpcOrchestratorExecutionResult>? tcs))
@@ -713,7 +782,7 @@ async Task<GrpcOrchestratorExecutionResult> ITaskExecutor.ExecuteOrchestrator(
713782
IEnumerable<HistoryEvent> pastEvents,
714783
IEnumerable<HistoryEvent> newEvents)
715784
{
716-
var executionStartedEvent = pastEvents.OfType<ExecutionStartedEvent>().FirstOrDefault();
785+
ExecutionStartedEvent? executionStartedEvent = pastEvents.OfType<ExecutionStartedEvent>().FirstOrDefault();
717786

718787
P.OrchestrationTraceContext? orchestrationTraceContext = executionStartedEvent?.ParentTraceContext?.SpanId is not null
719788
? new P.OrchestrationTraceContext
@@ -730,7 +799,7 @@ async Task<GrpcOrchestratorExecutionResult> ITaskExecutor.ExecuteOrchestrator(
730799

731800
try
732801
{
733-
var orkRequest = new P.OrchestratorRequest
802+
P.OrchestratorRequest orkRequest = new P.OrchestratorRequest
734803
{
735804
InstanceId = instance.InstanceId,
736805
ExecutionId = instance.ExecutionId,
@@ -858,6 +927,7 @@ TaskCompletionSource<GrpcOrchestratorExecutionResult> CreateTaskCompletionSource
858927
void RemoveOrchestratorTaskCompletionSource(string instanceId)
859928
{
860929
this.pendingOrchestratorTasks.TryRemove(instanceId, out _);
930+
this.partialOrchestratorChunks.TryRemove(instanceId, out _);
861931
}
862932

863933
TaskCompletionSource<ActivityExecutionResult> CreateTaskCompletionSourceForActivity(string instanceId, int taskId)

src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
// Licensed under the MIT License.
33

44
using System.Diagnostics;
5+
using System.Linq;
56
using System.Text;
67
using DurableTask.Core;
78
using DurableTask.Core.Entities;
89
using DurableTask.Core.Entities.OperationFormat;
910
using DurableTask.Core.History;
11+
using Google.Protobuf;
1012
using Microsoft.DurableTask.Abstractions;
1113
using Microsoft.DurableTask.Entities;
1214
using Microsoft.DurableTask.Tracing;
@@ -755,7 +757,10 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync(
755757
response.Actions.Count,
756758
GetActionsListForLogging(response.Actions));
757759

758-
await this.client.CompleteOrchestratorTaskAsync(response, cancellationToken: cancellationToken);
760+
await this.CompleteOrchestratorTaskWithChunkingAsync(
761+
response,
762+
this.worker.grpcOptions.CompleteOrchestrationWorkItemChunkSizeInBytes,
763+
cancellationToken);
759764
}
760765

761766
async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken, CancellationToken cancellation)
@@ -914,5 +919,139 @@ async Task OnRunEntityBatchAsync(
914919

915920
await this.client.CompleteEntityTaskAsync(response, cancellationToken: cancellation);
916921
}
922+
923+
/// <summary>
924+
/// Completes an orchestration task with automatic chunking if the response exceeds the maximum size.
925+
/// </summary>
926+
/// <param name="response">The orchestrator response to send.</param>
927+
/// <param name="maxChunkBytes">The maximum size in bytes for each chunk.</param>
928+
/// <param name="cancellationToken">The cancellation token.</param>
929+
async Task CompleteOrchestratorTaskWithChunkingAsync(
930+
P.OrchestratorResponse response,
931+
int maxChunkBytes,
932+
CancellationToken cancellationToken)
933+
{
934+
// Validate that no single action exceeds the maximum chunk size
935+
static P.TaskFailureDetails? ValidateActionsSize(IEnumerable<P.OrchestratorAction> actions, int maxChunkBytes)
936+
{
937+
foreach (P.OrchestratorAction action in actions)
938+
{
939+
int actionSize = action.CalculateSize();
940+
if (actionSize > maxChunkBytes)
941+
{
942+
// TODO: large payload doc is not available yet on aka.ms, add doc link to below error message
943+
string errorMessage = $"A single orchestrator action of type {action.OrchestratorActionTypeCase} with id {action.Id} " +
944+
$"exceeds the {maxChunkBytes / 1024.0 / 1024.0:F2}MB limit: {actionSize / 1024.0 / 1024.0:F2}MB. " +
945+
"Enable large-payload externalization to Azure Blob Storage to support oversized actions.";
946+
return new P.TaskFailureDetails
947+
{
948+
ErrorType = typeof(InvalidOperationException).FullName,
949+
ErrorMessage = errorMessage,
950+
IsNonRetriable = true,
951+
};
952+
}
953+
}
954+
955+
return null;
956+
}
957+
958+
P.TaskFailureDetails? validationFailure = ValidateActionsSize(response.Actions, maxChunkBytes);
959+
if (validationFailure != null)
960+
{
961+
// Complete the orchestration with a failed status and failure details
962+
P.OrchestratorResponse failureResponse = new()
963+
{
964+
InstanceId = response.InstanceId,
965+
CompletionToken = response.CompletionToken,
966+
OrchestrationTraceContext = response.OrchestrationTraceContext,
967+
Actions =
968+
{
969+
new P.OrchestratorAction
970+
{
971+
CompleteOrchestration = new P.CompleteOrchestrationAction
972+
{
973+
OrchestrationStatus = P.OrchestrationStatus.Failed,
974+
FailureDetails = validationFailure,
975+
},
976+
},
977+
},
978+
};
979+
980+
await this.client.CompleteOrchestratorTaskAsync(failureResponse, cancellationToken: cancellationToken);
981+
return;
982+
}
983+
984+
// Helper to add an action to the current chunk if it fits
985+
static bool TryAddAction(
986+
Google.Protobuf.Collections.RepeatedField<P.OrchestratorAction> dest,
987+
P.OrchestratorAction action,
988+
ref int currentSize,
989+
int maxChunkBytes)
990+
{
991+
int actionSize = action.CalculateSize();
992+
if (currentSize + actionSize > maxChunkBytes)
993+
{
994+
return false;
995+
}
996+
997+
dest.Add(action);
998+
currentSize += actionSize;
999+
return true;
1000+
}
1001+
1002+
// Check if the entire response fits in one chunk
1003+
int totalSize = response.CalculateSize();
1004+
if (totalSize <= maxChunkBytes)
1005+
{
1006+
// Response fits in one chunk, send it directly (isPartial defaults to false)
1007+
await this.client.CompleteOrchestratorTaskAsync(response, cancellationToken: cancellationToken);
1008+
return;
1009+
}
1010+
1011+
// Response is too large, split into multiple chunks
1012+
int actionsCompletedSoFar = 0, chunkIndex = 0;
1013+
List<P.OrchestratorAction> allActions = response.Actions.ToList();
1014+
bool isPartial = true;
1015+
1016+
while (isPartial)
1017+
{
1018+
P.OrchestratorResponse chunkedResponse = new()
1019+
{
1020+
InstanceId = response.InstanceId,
1021+
CustomStatus = response.CustomStatus,
1022+
CompletionToken = response.CompletionToken,
1023+
RequiresHistory = response.RequiresHistory,
1024+
NumEventsProcessed = 0,
1025+
ChunkIndex = chunkIndex,
1026+
};
1027+
1028+
int chunkPayloadSize = 0;
1029+
1030+
// Fill the chunk with actions until we reach the size limit
1031+
while (actionsCompletedSoFar < allActions.Count &&
1032+
TryAddAction(chunkedResponse.Actions, allActions[actionsCompletedSoFar], ref chunkPayloadSize, maxChunkBytes))
1033+
{
1034+
actionsCompletedSoFar++;
1035+
}
1036+
1037+
// Determine if this is a partial chunk (more actions remaining)
1038+
isPartial = actionsCompletedSoFar < allActions.Count;
1039+
chunkedResponse.IsPartial = isPartial;
1040+
1041+
if (chunkIndex == 0)
1042+
{
1043+
// The first chunk preserves the original response's NumEventsProcessed value (null)
1044+
// When this is set to null, backend by default handles all the messages in the workitem.
1045+
// For subsequent chunks, we set it to 0 since all messages are already handled in first chunk.
1046+
chunkedResponse.NumEventsProcessed = null;
1047+
chunkedResponse.OrchestrationTraceContext = response.OrchestrationTraceContext;
1048+
}
1049+
1050+
chunkIndex++;
1051+
1052+
// Send the chunk
1053+
await this.client.CompleteOrchestratorTaskAsync(chunkedResponse, cancellationToken: cancellationToken);
1054+
}
1055+
}
9171056
}
9181057
}

src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,18 @@ namespace Microsoft.DurableTask.Worker.Grpc;
1010
/// </summary>
1111
public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions
1212
{
13+
/// <summary>
14+
/// The minimum allowed size (in bytes) for complete orchestration work item chunks.
15+
/// </summary>
16+
public const int MinCompleteOrchestrationWorkItemChunkSizeInBytes = 1 * 1024 * 1024; // 1 MB
17+
18+
/// <summary>
19+
/// The maximum allowed size (in bytes) for complete orchestration work item chunks.
20+
/// </summary>
21+
public const int MaxCompleteOrchestrationWorkItemChunkSizeInBytes = 4_089_446; // 3.9 MB
22+
23+
int completeOrchestrationWorkItemChunkSizeInBytes = MaxCompleteOrchestrationWorkItemChunkSizeInBytes;
24+
1325
/// <summary>
1426
/// Gets or sets the address of the gRPC endpoint to connect to. Default is localhost:4001.
1527
/// </summary>
@@ -31,6 +43,38 @@ public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions
3143
/// </summary>
3244
public HashSet<P.WorkerCapability> Capabilities { get; } = new() { P.WorkerCapability.HistoryStreaming };
3345

46+
/// <summary>
47+
/// Gets or sets the maximum size of all actions in a complete orchestration work item chunk.
48+
/// The default value is 3.9MB. We leave some headroom to account for request size overhead.
49+
/// </summary>
50+
/// <remarks>
51+
/// This value is used to limit the size of the complete orchestration work item request.
52+
/// If the response exceeds this limit, it will be automatically split into multiple chunks of maximum size CompleteOrchestrationWorkItemChunkSizeInBytes
53+
/// </remarks>
54+
/// <exception cref="ArgumentOutOfRangeException">
55+
/// Thrown when the value is less than 1 MB or greater than 3.9 MB.
56+
/// </exception>
57+
public int CompleteOrchestrationWorkItemChunkSizeInBytes
58+
{
59+
get => this.completeOrchestrationWorkItemChunkSizeInBytes;
60+
set
61+
{
62+
if (value < MinCompleteOrchestrationWorkItemChunkSizeInBytes ||
63+
value > MaxCompleteOrchestrationWorkItemChunkSizeInBytes)
64+
{
65+
string message = $"{nameof(CompleteOrchestrationWorkItemChunkSizeInBytes)} must be between " +
66+
$"{MinCompleteOrchestrationWorkItemChunkSizeInBytes} bytes (1 MB) and " +
67+
$"{MaxCompleteOrchestrationWorkItemChunkSizeInBytes} bytes (3.9 MB), inclusive.";
68+
throw new ArgumentOutOfRangeException(
69+
nameof(this.CompleteOrchestrationWorkItemChunkSizeInBytes),
70+
value,
71+
message);
72+
}
73+
74+
this.completeOrchestrationWorkItemChunkSizeInBytes = value;
75+
}
76+
}
77+
3478
/// <summary>
3579
/// Gets the internal protocol options. These are used to control backend-dependent features.
3680
/// </summary>

0 commit comments

Comments
 (0)