From c8ff67f1b89578d838c200c64c8e63926e8f2209 Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Wed, 1 Oct 2025 21:27:48 -0700 Subject: [PATCH 1/8] initial commit --- .../DurableTaskCoreExceptionsExtensions.cs | 3 +- src/Abstractions/TaskFailureDetails.cs | 18 +- .../ShimExtensions.cs | 2 +- src/Grpc/orchestrator_service.proto | 1645 +++++++++-------- src/Grpc/versions.txt | 4 +- src/Shared/Grpc/ProtoUtils.cs | 101 +- .../DefaultDurableTaskWorkerBuilder.cs | 16 +- .../ExceptionPropertiesProviderAdapter.cs | 32 + src/Worker/Core/Hosting/DurableTaskWorker.cs | 5 + .../Core/IExceptionPropertiesProvider.cs | 20 + .../Core/Shims/DurableTaskShimFactory.cs | 1 + .../Shims/TaskOrchestrationEntityContext.cs | 3 +- .../Core/Shims/TaskOrchestrationShim.cs | 4 +- .../Grpc/GrpcDurableTaskWorker.Processor.cs | 15 +- src/Worker/Grpc/GrpcDurableTaskWorker.cs | 27 +- src/Worker/Grpc/GrpcOrchestrationRunner.cs | 412 ++--- .../GrpcSidecar/Grpc/ProtobufUtils.cs | 98 +- .../GrpcSidecar/Grpc/TaskHubGrpcServer.cs | 18 +- .../OrchestrationErrorHandling.cs | 73 + .../Client/DefaultScheduleClientTests.cs | 2 +- ...tionPropertiesProviderRegistrationTests.cs | 58 + .../RunBackgroundTaskLoggingTests.cs | 4 +- 22 files changed, 1489 insertions(+), 1072 deletions(-) create mode 100644 src/Worker/Core/ExceptionPropertiesProviderAdapter.cs create mode 100644 src/Worker/Core/IExceptionPropertiesProvider.cs create mode 100644 test/Worker/Grpc.Tests/ExceptionPropertiesProviderRegistrationTests.cs diff --git a/src/Abstractions/DurableTaskCoreExceptionsExtensions.cs b/src/Abstractions/DurableTaskCoreExceptionsExtensions.cs index 8d213a9d..775be485 100644 --- a/src/Abstractions/DurableTaskCoreExceptionsExtensions.cs +++ b/src/Abstractions/DurableTaskCoreExceptionsExtensions.cs @@ -49,6 +49,7 @@ static class DurableTaskCoreExceptionsExtensions failureDetails.ErrorType, failureDetails.ErrorMessage, failureDetails.StackTrace, - failureDetails.InnerFailure?.ToTaskFailureDetails()); + failureDetails.InnerFailure?.ToTaskFailureDetails(), + failureDetails.Properties); } } diff --git a/src/Abstractions/TaskFailureDetails.cs b/src/Abstractions/TaskFailureDetails.cs index 1387b19c..8e7c68a7 100644 --- a/src/Abstractions/TaskFailureDetails.cs +++ b/src/Abstractions/TaskFailureDetails.cs @@ -15,7 +15,8 @@ namespace Microsoft.DurableTask; /// A summary description of the failure. /// The stack trace of the failure. /// The inner cause of the task failure. -public record TaskFailureDetails(string ErrorType, string ErrorMessage, string? StackTrace, TaskFailureDetails? InnerFailure) +/// Additional properties associated with the exception. +public record TaskFailureDetails(string ErrorType, string ErrorMessage, string? StackTrace, TaskFailureDetails? InnerFailure, IDictionary? Properties) { Type? loadedExceptionType; @@ -123,7 +124,8 @@ internal CoreFailureDetails ToCoreFailureDetails() this.ErrorMessage, this.StackTrace, this.InnerFailure?.ToCoreFailureDetails(), - isNonRetriable: false); + isNonRetriable: false, + this.Properties); } /// @@ -143,7 +145,8 @@ internal CoreFailureDetails ToCoreFailureDetails() coreFailureDetails.ErrorType, coreFailureDetails.ErrorMessage, coreFailureDetails.StackTrace, - FromCoreFailureDetails(coreFailureDetails.InnerFailure)); + FromCoreFailureDetails(coreFailureDetails.InnerFailure), + coreFailureDetails.Properties); } [return: NotNullIfNotNull(nameof(exception))] @@ -160,14 +163,16 @@ internal CoreFailureDetails ToCoreFailureDetails() coreEx.FailureDetails?.ErrorType ?? "(unknown)", coreEx.FailureDetails?.ErrorMessage ?? "(unknown)", coreEx.FailureDetails?.StackTrace, - FromCoreFailureDetailsRecursive(coreEx.FailureDetails?.InnerFailure) ?? FromExceptionRecursive(coreEx.InnerException)); + FromCoreFailureDetailsRecursive(coreEx.FailureDetails?.InnerFailure) ?? FromExceptionRecursive(coreEx.InnerException), + coreEx.FailureDetails?.Properties ?? null); } return new TaskFailureDetails( exception.GetType().ToString(), exception.Message, exception.StackTrace, - FromExceptionRecursive(exception.InnerException)); + FromExceptionRecursive(exception.InnerException), + null);// might need to udpate this later } static TaskFailureDetails? FromCoreFailureDetailsRecursive(CoreFailureDetails? coreFailureDetails) @@ -181,6 +186,7 @@ internal CoreFailureDetails ToCoreFailureDetails() coreFailureDetails.ErrorType, coreFailureDetails.ErrorMessage, coreFailureDetails.StackTrace, - FromCoreFailureDetailsRecursive(coreFailureDetails.InnerFailure)); + FromCoreFailureDetailsRecursive(coreFailureDetails.InnerFailure), + coreFailureDetails.Properties); } } diff --git a/src/Client/OrchestrationServiceClientShim/ShimExtensions.cs b/src/Client/OrchestrationServiceClientShim/ShimExtensions.cs index 50191e37..7647326c 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimExtensions.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimExtensions.cs @@ -75,7 +75,7 @@ public static Core.OrchestrationStatus ConvertToCore(this OrchestrationRuntimeSt } TaskFailureDetails? inner = details.InnerFailure?.ConvertFromCore(); - return new TaskFailureDetails(details.ErrorType, details.ErrorMessage, details.StackTrace, inner); + return new TaskFailureDetails(details.ErrorType, details.ErrorMessage, details.StackTrace, inner, details.Properties); } /// diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index df5143bc..c8b55430 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -1,821 +1,828 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -syntax = "proto3"; - -option csharp_namespace = "Microsoft.DurableTask.Protobuf"; -option java_package = "com.microsoft.durabletask.implementation.protobuf"; -option go_package = "/internal/protos"; - -import "google/protobuf/timestamp.proto"; -import "google/protobuf/duration.proto"; -import "google/protobuf/wrappers.proto"; -import "google/protobuf/empty.proto"; -import "google/protobuf/struct.proto"; - -message OrchestrationInstance { - string instanceId = 1; - google.protobuf.StringValue executionId = 2; -} - -message ActivityRequest { - string name = 1; - google.protobuf.StringValue version = 2; - google.protobuf.StringValue input = 3; - OrchestrationInstance orchestrationInstance = 4; - int32 taskId = 5; - TraceContext parentTraceContext = 6; -} - -message ActivityResponse { - string instanceId = 1; - int32 taskId = 2; - google.protobuf.StringValue result = 3; - TaskFailureDetails failureDetails = 4; - string completionToken = 5; -} - -message TaskFailureDetails { - string errorType = 1; - string errorMessage = 2; - google.protobuf.StringValue stackTrace = 3; - TaskFailureDetails innerFailure = 4; - bool isNonRetriable = 5; -} - -enum OrchestrationStatus { - ORCHESTRATION_STATUS_RUNNING = 0; - ORCHESTRATION_STATUS_COMPLETED = 1; - ORCHESTRATION_STATUS_CONTINUED_AS_NEW = 2; - ORCHESTRATION_STATUS_FAILED = 3; - ORCHESTRATION_STATUS_CANCELED = 4; - ORCHESTRATION_STATUS_TERMINATED = 5; - ORCHESTRATION_STATUS_PENDING = 6; - ORCHESTRATION_STATUS_SUSPENDED = 7; -} - -message ParentInstanceInfo { - int32 taskScheduledId = 1; - google.protobuf.StringValue name = 2; - google.protobuf.StringValue version = 3; - OrchestrationInstance orchestrationInstance = 4; -} - -message TraceContext { - string traceParent = 1; - string spanID = 2 [deprecated=true]; - google.protobuf.StringValue traceState = 3; -} - -message ExecutionStartedEvent { - string name = 1; - google.protobuf.StringValue version = 2; - google.protobuf.StringValue input = 3; - OrchestrationInstance orchestrationInstance = 4; - ParentInstanceInfo parentInstance = 5; - google.protobuf.Timestamp scheduledStartTimestamp = 6; - TraceContext parentTraceContext = 7; - google.protobuf.StringValue orchestrationSpanID = 8; - map tags = 9; -} - -message ExecutionCompletedEvent { - OrchestrationStatus orchestrationStatus = 1; - google.protobuf.StringValue result = 2; - TaskFailureDetails failureDetails = 3; -} - -message ExecutionTerminatedEvent { - google.protobuf.StringValue input = 1; - bool recurse = 2; -} - -message TaskScheduledEvent { - string name = 1; - google.protobuf.StringValue version = 2; - google.protobuf.StringValue input = 3; - TraceContext parentTraceContext = 4; - map tags = 5; -} - -message TaskCompletedEvent { - int32 taskScheduledId = 1; - google.protobuf.StringValue result = 2; -} - -message TaskFailedEvent { - int32 taskScheduledId = 1; - TaskFailureDetails failureDetails = 2; -} - -message SubOrchestrationInstanceCreatedEvent { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue version = 3; - google.protobuf.StringValue input = 4; - TraceContext parentTraceContext = 5; -} - -message SubOrchestrationInstanceCompletedEvent { - int32 taskScheduledId = 1; - google.protobuf.StringValue result = 2; -} - -message SubOrchestrationInstanceFailedEvent { - int32 taskScheduledId = 1; - TaskFailureDetails failureDetails = 2; -} - -message TimerCreatedEvent { - google.protobuf.Timestamp fireAt = 1; -} - -message TimerFiredEvent { - google.protobuf.Timestamp fireAt = 1; - int32 timerId = 2; -} - -message OrchestratorStartedEvent { - // No payload data -} - -message OrchestratorCompletedEvent { - // No payload data -} - -message EventSentEvent { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue input = 3; -} - -message EventRaisedEvent { - string name = 1; - google.protobuf.StringValue input = 2; -} - -message GenericEvent { - google.protobuf.StringValue data = 1; -} - -message HistoryStateEvent { - OrchestrationState orchestrationState = 1; -} - -message ContinueAsNewEvent { - google.protobuf.StringValue input = 1; -} - -message ExecutionSuspendedEvent { - google.protobuf.StringValue input = 1; -} - -message ExecutionResumedEvent { - google.protobuf.StringValue input = 1; -} - -message EntityOperationSignaledEvent { - string requestId = 1; - string operation = 2; - google.protobuf.Timestamp scheduledTime = 3; - google.protobuf.StringValue input = 4; - google.protobuf.StringValue targetInstanceId = 5; // used only within histories, null in messages -} - -message EntityOperationCalledEvent { - string requestId = 1; - string operation = 2; - google.protobuf.Timestamp scheduledTime = 3; - google.protobuf.StringValue input = 4; - google.protobuf.StringValue parentInstanceId = 5; // used only within messages, null in histories - google.protobuf.StringValue parentExecutionId = 6; // used only within messages, null in histories - google.protobuf.StringValue targetInstanceId = 7; // used only within histories, null in messages -} - -message EntityLockRequestedEvent { - string criticalSectionId = 1; - repeated string lockSet = 2; - int32 position = 3; - google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories -} - -message EntityOperationCompletedEvent { - string requestId = 1; - google.protobuf.StringValue output = 2; -} - -message EntityOperationFailedEvent { - string requestId = 1; - TaskFailureDetails failureDetails = 2; -} - -message EntityUnlockSentEvent { - string criticalSectionId = 1; - google.protobuf.StringValue parentInstanceId = 2; // used only within messages, null in histories - google.protobuf.StringValue targetInstanceId = 3; // used only within histories, null in messages -} - -message EntityLockGrantedEvent { - string criticalSectionId = 1; -} - -message HistoryEvent { - int32 eventId = 1; - google.protobuf.Timestamp timestamp = 2; - oneof eventType { - ExecutionStartedEvent executionStarted = 3; - ExecutionCompletedEvent executionCompleted = 4; - ExecutionTerminatedEvent executionTerminated = 5; - TaskScheduledEvent taskScheduled = 6; - TaskCompletedEvent taskCompleted = 7; - TaskFailedEvent taskFailed = 8; - SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreated = 9; - SubOrchestrationInstanceCompletedEvent subOrchestrationInstanceCompleted = 10; - SubOrchestrationInstanceFailedEvent subOrchestrationInstanceFailed = 11; - TimerCreatedEvent timerCreated = 12; - TimerFiredEvent timerFired = 13; - OrchestratorStartedEvent orchestratorStarted = 14; - OrchestratorCompletedEvent orchestratorCompleted = 15; - EventSentEvent eventSent = 16; - EventRaisedEvent eventRaised = 17; - GenericEvent genericEvent = 18; - HistoryStateEvent historyState = 19; - ContinueAsNewEvent continueAsNew = 20; - ExecutionSuspendedEvent executionSuspended = 21; - ExecutionResumedEvent executionResumed = 22; - EntityOperationSignaledEvent entityOperationSignaled = 23; - EntityOperationCalledEvent entityOperationCalled = 24; - EntityOperationCompletedEvent entityOperationCompleted = 25; - EntityOperationFailedEvent entityOperationFailed = 26; - EntityLockRequestedEvent entityLockRequested = 27; - EntityLockGrantedEvent entityLockGranted = 28; - EntityUnlockSentEvent entityUnlockSent = 29; - } -} - -message ScheduleTaskAction { - string name = 1; - google.protobuf.StringValue version = 2; - google.protobuf.StringValue input = 3; - map tags = 4; - TraceContext parentTraceContext = 5; -} - -message CreateSubOrchestrationAction { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue version = 3; - google.protobuf.StringValue input = 4; - TraceContext parentTraceContext = 5; -} - -message CreateTimerAction { - google.protobuf.Timestamp fireAt = 1; -} - -message SendEventAction { - OrchestrationInstance instance = 1; - string name = 2; - google.protobuf.StringValue data = 3; -} - -message CompleteOrchestrationAction { - OrchestrationStatus orchestrationStatus = 1; - google.protobuf.StringValue result = 2; - google.protobuf.StringValue details = 3; - google.protobuf.StringValue newVersion = 4; - repeated HistoryEvent carryoverEvents = 5; - TaskFailureDetails failureDetails = 6; -} - -message TerminateOrchestrationAction { - string instanceId = 1; - google.protobuf.StringValue reason = 2; - bool recurse = 3; -} - -message SendEntityMessageAction { - oneof EntityMessageType { - EntityOperationSignaledEvent entityOperationSignaled = 1; - EntityOperationCalledEvent entityOperationCalled = 2; - EntityLockRequestedEvent entityLockRequested = 3; - EntityUnlockSentEvent entityUnlockSent = 4; - } -} - -message OrchestratorAction { - int32 id = 1; - oneof orchestratorActionType { - ScheduleTaskAction scheduleTask = 2; - CreateSubOrchestrationAction createSubOrchestration = 3; - CreateTimerAction createTimer = 4; - SendEventAction sendEvent = 5; - CompleteOrchestrationAction completeOrchestration = 6; - TerminateOrchestrationAction terminateOrchestration = 7; - SendEntityMessageAction sendEntityMessage = 8; - } -} - -message OrchestrationTraceContext { - google.protobuf.StringValue spanID = 1; - google.protobuf.Timestamp spanStartTime = 2; -} - -message OrchestratorRequest { - string instanceId = 1; - google.protobuf.StringValue executionId = 2; - repeated HistoryEvent pastEvents = 3; - repeated HistoryEvent newEvents = 4; - OrchestratorEntityParameters entityParameters = 5; - bool requiresHistoryStreaming = 6; - map properties = 7; - - OrchestrationTraceContext orchestrationTraceContext = 8; -} - -message OrchestratorResponse { - string instanceId = 1; - repeated OrchestratorAction actions = 2; - google.protobuf.StringValue customStatus = 3; - string completionToken = 4; - - // The number of work item events that were processed by the orchestrator. - // This field is optional. If not set, the service should assume that the orchestrator processed all events. - google.protobuf.Int32Value numEventsProcessed = 5; - OrchestrationTraceContext orchestrationTraceContext = 6; - - // Whether or not a history is required to complete the original OrchestratorRequest and none was provided. - bool requiresHistory = 7; -} - -message CreateInstanceRequest { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue version = 3; - google.protobuf.StringValue input = 4; - google.protobuf.Timestamp scheduledStartTimestamp = 5; - OrchestrationIdReusePolicy orchestrationIdReusePolicy = 6; - google.protobuf.StringValue executionId = 7; - map tags = 8; - TraceContext parentTraceContext = 9; - google.protobuf.Timestamp requestTime = 10; -} - -message OrchestrationIdReusePolicy { - repeated OrchestrationStatus replaceableStatus = 1; - reserved 2; -} - -message CreateInstanceResponse { - string instanceId = 1; -} - -message GetInstanceRequest { - string instanceId = 1; - bool getInputsAndOutputs = 2; -} - -message GetInstanceResponse { - bool exists = 1; - OrchestrationState orchestrationState = 2; -} - -message RewindInstanceRequest { - string instanceId = 1; - google.protobuf.StringValue reason = 2; -} - -message RewindInstanceResponse { - // Empty for now. Using explicit type incase we want to add content later. -} - -message OrchestrationState { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue version = 3; - OrchestrationStatus orchestrationStatus = 4; - google.protobuf.Timestamp scheduledStartTimestamp = 5; - google.protobuf.Timestamp createdTimestamp = 6; - google.protobuf.Timestamp lastUpdatedTimestamp = 7; - google.protobuf.StringValue input = 8; - google.protobuf.StringValue output = 9; - google.protobuf.StringValue customStatus = 10; - TaskFailureDetails failureDetails = 11; - google.protobuf.StringValue executionId = 12; - google.protobuf.Timestamp completedTimestamp = 13; - google.protobuf.StringValue parentInstanceId = 14; - map tags = 15; -} - -message RaiseEventRequest { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue input = 3; -} - -message RaiseEventResponse { - // No payload -} - -message TerminateRequest { - string instanceId = 1; - google.protobuf.StringValue output = 2; - bool recursive = 3; -} - -message TerminateResponse { - // No payload -} - -message SuspendRequest { - string instanceId = 1; - google.protobuf.StringValue reason = 2; -} - -message SuspendResponse { - // No payload -} - -message ResumeRequest { - string instanceId = 1; - google.protobuf.StringValue reason = 2; -} - -message ResumeResponse { - // No payload -} - -message QueryInstancesRequest { - InstanceQuery query = 1; -} - -message InstanceQuery{ - repeated OrchestrationStatus runtimeStatus = 1; - google.protobuf.Timestamp createdTimeFrom = 2; - google.protobuf.Timestamp createdTimeTo = 3; - repeated google.protobuf.StringValue taskHubNames = 4; - int32 maxInstanceCount = 5; - google.protobuf.StringValue continuationToken = 6; - google.protobuf.StringValue instanceIdPrefix = 7; - bool fetchInputsAndOutputs = 8; -} - -message QueryInstancesResponse { - repeated OrchestrationState orchestrationState = 1; - google.protobuf.StringValue continuationToken = 2; -} - -message PurgeInstancesRequest { - oneof request { - string instanceId = 1; - PurgeInstanceFilter purgeInstanceFilter = 2; - } - bool recursive = 3; -} - -message PurgeInstanceFilter { - google.protobuf.Timestamp createdTimeFrom = 1; - google.protobuf.Timestamp createdTimeTo = 2; - repeated OrchestrationStatus runtimeStatus = 3; -} - -message PurgeInstancesResponse { - int32 deletedInstanceCount = 1; - google.protobuf.BoolValue isComplete = 2; -} - -message RestartInstanceRequest { - string instanceId = 1; - bool restartWithNewInstanceId = 2; -} - -message RestartInstanceResponse { - string instanceId = 1; -} - -message CreateTaskHubRequest { - bool recreateIfExists = 1; -} - -message CreateTaskHubResponse { - //no playload -} - -message DeleteTaskHubRequest { - //no playload -} - -message DeleteTaskHubResponse { - //no playload -} - -message SignalEntityRequest { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue input = 3; - string requestId = 4; - google.protobuf.Timestamp scheduledTime = 5; - TraceContext parentTraceContext = 6; - google.protobuf.Timestamp requestTime = 7; -} - -message SignalEntityResponse { - // no payload -} - -message GetEntityRequest { - string instanceId = 1; - bool includeState = 2; -} - -message GetEntityResponse { - bool exists = 1; - EntityMetadata entity = 2; -} - -message EntityQuery { - google.protobuf.StringValue instanceIdStartsWith = 1; - google.protobuf.Timestamp lastModifiedFrom = 2; - google.protobuf.Timestamp lastModifiedTo = 3; - bool includeState = 4; - bool includeTransient = 5; - google.protobuf.Int32Value pageSize = 6; - google.protobuf.StringValue continuationToken = 7; -} - -message QueryEntitiesRequest { - EntityQuery query = 1; -} - -message QueryEntitiesResponse { - repeated EntityMetadata entities = 1; - google.protobuf.StringValue continuationToken = 2; -} - -message EntityMetadata { - string instanceId = 1; - google.protobuf.Timestamp lastModifiedTime = 2; - int32 backlogQueueSize = 3; - google.protobuf.StringValue lockedBy = 4; - google.protobuf.StringValue serializedState = 5; -} - -message CleanEntityStorageRequest { - google.protobuf.StringValue continuationToken = 1; - bool removeEmptyEntities = 2; - bool releaseOrphanedLocks = 3; -} - -message CleanEntityStorageResponse { - google.protobuf.StringValue continuationToken = 1; - int32 emptyEntitiesRemoved = 2; - int32 orphanedLocksReleased = 3; -} - -message OrchestratorEntityParameters { - google.protobuf.Duration entityMessageReorderWindow = 1; -} - -message EntityBatchRequest { - string instanceId = 1; - google.protobuf.StringValue entityState = 2; - repeated OperationRequest operations = 3; -} - -message EntityBatchResult { - repeated OperationResult results = 1; - repeated OperationAction actions = 2; - google.protobuf.StringValue entityState = 3; - TaskFailureDetails failureDetails = 4; - string completionToken = 5; - repeated OperationInfo operationInfos = 6; // used only with DTS -} - -message EntityRequest { - string instanceId = 1; - string executionId = 2; - google.protobuf.StringValue entityState = 3; // null if entity does not exist - repeated HistoryEvent operationRequests = 4; -} - -message OperationRequest { - string operation = 1; - string requestId = 2; - google.protobuf.StringValue input = 3; - TraceContext traceContext = 4; -} - -message OperationResult { - oneof resultType { - OperationResultSuccess success = 1; - OperationResultFailure failure = 2; - } -} - -message OperationInfo { - string requestId = 1; - OrchestrationInstance responseDestination = 2; // null for signals -} - -message OperationResultSuccess { - google.protobuf.StringValue result = 1; - google.protobuf.Timestamp startTimeUtc = 2; - google.protobuf.Timestamp endTimeUtc = 3; -} - -message OperationResultFailure { - TaskFailureDetails failureDetails = 1; - google.protobuf.Timestamp startTimeUtc = 2; - google.protobuf.Timestamp endTimeUtc = 3; -} - -message OperationAction { - int32 id = 1; - oneof operationActionType { - SendSignalAction sendSignal = 2; - StartNewOrchestrationAction startNewOrchestration = 3; - } -} - -message SendSignalAction { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue input = 3; - google.protobuf.Timestamp scheduledTime = 4; - google.protobuf.Timestamp requestTime = 5; - TraceContext parentTraceContext = 6; -} - -message StartNewOrchestrationAction { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue version = 3; - google.protobuf.StringValue input = 4; - google.protobuf.Timestamp scheduledTime = 5; - google.protobuf.Timestamp requestTime = 6; - TraceContext parentTraceContext = 7; -} - -message AbandonActivityTaskRequest { - string completionToken = 1; -} - -message AbandonActivityTaskResponse { - // Empty. -} - -message AbandonOrchestrationTaskRequest { - string completionToken = 1; -} - -message AbandonOrchestrationTaskResponse { - // Empty. -} - -message AbandonEntityTaskRequest { - string completionToken = 1; -} - -message AbandonEntityTaskResponse { - // Empty. -} - -message SkipGracefulOrchestrationTerminationsRequest { - // A maximum of 500 instance IDs can be provided in this list. - repeated string instanceIds = 1; +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +syntax = "proto3"; + +option csharp_namespace = "Microsoft.DurableTask.Protobuf"; +option java_package = "com.microsoft.durabletask.implementation.protobuf"; +option go_package = "/internal/protos"; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/wrappers.proto"; +import "google/protobuf/empty.proto"; +import "google/protobuf/struct.proto"; + +message OrchestrationInstance { + string instanceId = 1; + google.protobuf.StringValue executionId = 2; +} + +message ActivityRequest { + string name = 1; + google.protobuf.StringValue version = 2; + google.protobuf.StringValue input = 3; + OrchestrationInstance orchestrationInstance = 4; + int32 taskId = 5; + TraceContext parentTraceContext = 6; +} + +message ActivityResponse { + string instanceId = 1; + int32 taskId = 2; + google.protobuf.StringValue result = 3; + TaskFailureDetails failureDetails = 4; + string completionToken = 5; +} + +message TaskFailureDetails { + string errorType = 1; + string errorMessage = 2; + google.protobuf.StringValue stackTrace = 3; + TaskFailureDetails innerFailure = 4; + bool isNonRetriable = 5; + map properties = 6; +} + +enum OrchestrationStatus { + ORCHESTRATION_STATUS_RUNNING = 0; + ORCHESTRATION_STATUS_COMPLETED = 1; + ORCHESTRATION_STATUS_CONTINUED_AS_NEW = 2; + ORCHESTRATION_STATUS_FAILED = 3; + ORCHESTRATION_STATUS_CANCELED = 4; + ORCHESTRATION_STATUS_TERMINATED = 5; + ORCHESTRATION_STATUS_PENDING = 6; + ORCHESTRATION_STATUS_SUSPENDED = 7; +} + +message ParentInstanceInfo { + int32 taskScheduledId = 1; + google.protobuf.StringValue name = 2; + google.protobuf.StringValue version = 3; + OrchestrationInstance orchestrationInstance = 4; +} + +message TraceContext { + string traceParent = 1; + string spanID = 2 [deprecated=true]; + google.protobuf.StringValue traceState = 3; +} + +message ExecutionStartedEvent { + string name = 1; + google.protobuf.StringValue version = 2; + google.protobuf.StringValue input = 3; + OrchestrationInstance orchestrationInstance = 4; + ParentInstanceInfo parentInstance = 5; + google.protobuf.Timestamp scheduledStartTimestamp = 6; + TraceContext parentTraceContext = 7; + google.protobuf.StringValue orchestrationSpanID = 8; + map tags = 9; +} + +message ExecutionCompletedEvent { + OrchestrationStatus orchestrationStatus = 1; + google.protobuf.StringValue result = 2; + TaskFailureDetails failureDetails = 3; +} + +message ExecutionTerminatedEvent { + google.protobuf.StringValue input = 1; + bool recurse = 2; +} + +message TaskScheduledEvent { + string name = 1; + google.protobuf.StringValue version = 2; + google.protobuf.StringValue input = 3; + TraceContext parentTraceContext = 4; + map tags = 5; +} + +message TaskCompletedEvent { + int32 taskScheduledId = 1; + google.protobuf.StringValue result = 2; +} + +message TaskFailedEvent { + int32 taskScheduledId = 1; + TaskFailureDetails failureDetails = 2; +} + +message SubOrchestrationInstanceCreatedEvent { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + google.protobuf.StringValue input = 4; + TraceContext parentTraceContext = 5; +} + +message SubOrchestrationInstanceCompletedEvent { + int32 taskScheduledId = 1; + google.protobuf.StringValue result = 2; +} + +message SubOrchestrationInstanceFailedEvent { + int32 taskScheduledId = 1; + TaskFailureDetails failureDetails = 2; +} + +message TimerCreatedEvent { + google.protobuf.Timestamp fireAt = 1; +} + +message TimerFiredEvent { + google.protobuf.Timestamp fireAt = 1; + int32 timerId = 2; +} + +message OrchestratorStartedEvent { + // No payload data +} + +message OrchestratorCompletedEvent { + // No payload data +} + +message EventSentEvent { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue input = 3; +} + +message EventRaisedEvent { + string name = 1; + google.protobuf.StringValue input = 2; +} + +message GenericEvent { + google.protobuf.StringValue data = 1; +} + +message HistoryStateEvent { + OrchestrationState orchestrationState = 1; +} + +message ContinueAsNewEvent { + google.protobuf.StringValue input = 1; +} + +message ExecutionSuspendedEvent { + google.protobuf.StringValue input = 1; +} + +message ExecutionResumedEvent { + google.protobuf.StringValue input = 1; +} + +message EntityOperationSignaledEvent { + string requestId = 1; + string operation = 2; + google.protobuf.Timestamp scheduledTime = 3; + google.protobuf.StringValue input = 4; + google.protobuf.StringValue targetInstanceId = 5; // used only within histories, null in messages +} + +message EntityOperationCalledEvent { + string requestId = 1; + string operation = 2; + google.protobuf.Timestamp scheduledTime = 3; + google.protobuf.StringValue input = 4; + google.protobuf.StringValue parentInstanceId = 5; // used only within messages, null in histories + google.protobuf.StringValue parentExecutionId = 6; // used only within messages, null in histories + google.protobuf.StringValue targetInstanceId = 7; // used only within histories, null in messages +} + +message EntityLockRequestedEvent { + string criticalSectionId = 1; + repeated string lockSet = 2; + int32 position = 3; + google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories +} + +message EntityOperationCompletedEvent { + string requestId = 1; + google.protobuf.StringValue output = 2; +} + +message EntityOperationFailedEvent { + string requestId = 1; + TaskFailureDetails failureDetails = 2; +} + +message EntityUnlockSentEvent { + string criticalSectionId = 1; + google.protobuf.StringValue parentInstanceId = 2; // used only within messages, null in histories + google.protobuf.StringValue targetInstanceId = 3; // used only within histories, null in messages +} + +message EntityLockGrantedEvent { + string criticalSectionId = 1; +} + +message HistoryEvent { + int32 eventId = 1; + google.protobuf.Timestamp timestamp = 2; + oneof eventType { + ExecutionStartedEvent executionStarted = 3; + ExecutionCompletedEvent executionCompleted = 4; + ExecutionTerminatedEvent executionTerminated = 5; + TaskScheduledEvent taskScheduled = 6; + TaskCompletedEvent taskCompleted = 7; + TaskFailedEvent taskFailed = 8; + SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreated = 9; + SubOrchestrationInstanceCompletedEvent subOrchestrationInstanceCompleted = 10; + SubOrchestrationInstanceFailedEvent subOrchestrationInstanceFailed = 11; + TimerCreatedEvent timerCreated = 12; + TimerFiredEvent timerFired = 13; + OrchestratorStartedEvent orchestratorStarted = 14; + OrchestratorCompletedEvent orchestratorCompleted = 15; + EventSentEvent eventSent = 16; + EventRaisedEvent eventRaised = 17; + GenericEvent genericEvent = 18; + HistoryStateEvent historyState = 19; + ContinueAsNewEvent continueAsNew = 20; + ExecutionSuspendedEvent executionSuspended = 21; + ExecutionResumedEvent executionResumed = 22; + EntityOperationSignaledEvent entityOperationSignaled = 23; + EntityOperationCalledEvent entityOperationCalled = 24; + EntityOperationCompletedEvent entityOperationCompleted = 25; + EntityOperationFailedEvent entityOperationFailed = 26; + EntityLockRequestedEvent entityLockRequested = 27; + EntityLockGrantedEvent entityLockGranted = 28; + EntityUnlockSentEvent entityUnlockSent = 29; + } +} + +message ScheduleTaskAction { + string name = 1; + google.protobuf.StringValue version = 2; + google.protobuf.StringValue input = 3; + map tags = 4; + TraceContext parentTraceContext = 5; +} + +message CreateSubOrchestrationAction { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + google.protobuf.StringValue input = 4; + TraceContext parentTraceContext = 5; +} + +message CreateTimerAction { + google.protobuf.Timestamp fireAt = 1; +} + +message SendEventAction { + OrchestrationInstance instance = 1; + string name = 2; + google.protobuf.StringValue data = 3; +} + +message CompleteOrchestrationAction { + OrchestrationStatus orchestrationStatus = 1; + google.protobuf.StringValue result = 2; + google.protobuf.StringValue details = 3; + google.protobuf.StringValue newVersion = 4; + repeated HistoryEvent carryoverEvents = 5; + TaskFailureDetails failureDetails = 6; +} + +message TerminateOrchestrationAction { + string instanceId = 1; + google.protobuf.StringValue reason = 2; + bool recurse = 3; +} + +message SendEntityMessageAction { + oneof EntityMessageType { + EntityOperationSignaledEvent entityOperationSignaled = 1; + EntityOperationCalledEvent entityOperationCalled = 2; + EntityLockRequestedEvent entityLockRequested = 3; + EntityUnlockSentEvent entityUnlockSent = 4; + } +} + +message OrchestratorAction { + int32 id = 1; + oneof orchestratorActionType { + ScheduleTaskAction scheduleTask = 2; + CreateSubOrchestrationAction createSubOrchestration = 3; + CreateTimerAction createTimer = 4; + SendEventAction sendEvent = 5; + CompleteOrchestrationAction completeOrchestration = 6; + TerminateOrchestrationAction terminateOrchestration = 7; + SendEntityMessageAction sendEntityMessage = 8; + } +} + +message OrchestrationTraceContext { + google.protobuf.StringValue spanID = 1; + google.protobuf.Timestamp spanStartTime = 2; +} + +message OrchestratorRequest { + string instanceId = 1; + google.protobuf.StringValue executionId = 2; + repeated HistoryEvent pastEvents = 3; + repeated HistoryEvent newEvents = 4; + OrchestratorEntityParameters entityParameters = 5; + bool requiresHistoryStreaming = 6; + map properties = 7; + + OrchestrationTraceContext orchestrationTraceContext = 8; +} + +message OrchestratorResponse { + string instanceId = 1; + repeated OrchestratorAction actions = 2; + google.protobuf.StringValue customStatus = 3; + string completionToken = 4; + + // The number of work item events that were processed by the orchestrator. + // This field is optional. If not set, the service should assume that the orchestrator processed all events. + google.protobuf.Int32Value numEventsProcessed = 5; + OrchestrationTraceContext orchestrationTraceContext = 6; + + // Whether or not a history is required to complete the original OrchestratorRequest and none was provided. + bool requiresHistory = 7; +} + +message CreateInstanceRequest { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + google.protobuf.StringValue input = 4; + google.protobuf.Timestamp scheduledStartTimestamp = 5; + OrchestrationIdReusePolicy orchestrationIdReusePolicy = 6; + google.protobuf.StringValue executionId = 7; + map tags = 8; + TraceContext parentTraceContext = 9; + google.protobuf.Timestamp requestTime = 10; +} + +message OrchestrationIdReusePolicy { + repeated OrchestrationStatus replaceableStatus = 1; + reserved 2; +} + +message CreateInstanceResponse { + string instanceId = 1; +} + +message GetInstanceRequest { + string instanceId = 1; + bool getInputsAndOutputs = 2; +} + +message GetInstanceResponse { + bool exists = 1; + OrchestrationState orchestrationState = 2; +} + +message RewindInstanceRequest { + string instanceId = 1; + google.protobuf.StringValue reason = 2; +} + +message RewindInstanceResponse { + // Empty for now. Using explicit type incase we want to add content later. +} + +message OrchestrationState { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + OrchestrationStatus orchestrationStatus = 4; + google.protobuf.Timestamp scheduledStartTimestamp = 5; + google.protobuf.Timestamp createdTimestamp = 6; + google.protobuf.Timestamp lastUpdatedTimestamp = 7; + google.protobuf.StringValue input = 8; + google.protobuf.StringValue output = 9; + google.protobuf.StringValue customStatus = 10; + TaskFailureDetails failureDetails = 11; + google.protobuf.StringValue executionId = 12; + google.protobuf.Timestamp completedTimestamp = 13; + google.protobuf.StringValue parentInstanceId = 14; + map tags = 15; +} + +message RaiseEventRequest { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue input = 3; +} + +message RaiseEventResponse { + // No payload +} + +message TerminateRequest { + string instanceId = 1; + google.protobuf.StringValue output = 2; + bool recursive = 3; +} + +message TerminateResponse { + // No payload +} + +message SuspendRequest { + string instanceId = 1; google.protobuf.StringValue reason = 2; -} - -message SkipGracefulOrchestrationTerminationsResponse { - // Those instances which could not be terminated because they had locked entities at the time of this termination call, - // are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged) +} + +message SuspendResponse { + // No payload +} + +message ResumeRequest { + string instanceId = 1; + google.protobuf.StringValue reason = 2; +} + +message ResumeResponse { + // No payload +} + +message QueryInstancesRequest { + InstanceQuery query = 1; +} + +message InstanceQuery{ + repeated OrchestrationStatus runtimeStatus = 1; + google.protobuf.Timestamp createdTimeFrom = 2; + google.protobuf.Timestamp createdTimeTo = 3; + repeated google.protobuf.StringValue taskHubNames = 4; + int32 maxInstanceCount = 5; + google.protobuf.StringValue continuationToken = 6; + google.protobuf.StringValue instanceIdPrefix = 7; + bool fetchInputsAndOutputs = 8; +} + +message QueryInstancesResponse { + repeated OrchestrationState orchestrationState = 1; + google.protobuf.StringValue continuationToken = 2; +} + +message PurgeInstancesRequest { + oneof request { + string instanceId = 1; + PurgeInstanceFilter purgeInstanceFilter = 2; + InstanceBatch instanceBatch = 4; + } + bool recursive = 3; +} + +message PurgeInstanceFilter { + google.protobuf.Timestamp createdTimeFrom = 1; + google.protobuf.Timestamp createdTimeTo = 2; + repeated OrchestrationStatus runtimeStatus = 3; +} + +message PurgeInstancesResponse { + int32 deletedInstanceCount = 1; + google.protobuf.BoolValue isComplete = 2; +} + +message RestartInstanceRequest { + string instanceId = 1; + bool restartWithNewInstanceId = 2; +} + +message RestartInstanceResponse { + string instanceId = 1; +} + +message CreateTaskHubRequest { + bool recreateIfExists = 1; +} + +message CreateTaskHubResponse { + //no playload +} + +message DeleteTaskHubRequest { + //no playload +} + +message DeleteTaskHubResponse { + //no playload +} + +message SignalEntityRequest { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue input = 3; + string requestId = 4; + google.protobuf.Timestamp scheduledTime = 5; + TraceContext parentTraceContext = 6; + google.protobuf.Timestamp requestTime = 7; +} + +message SignalEntityResponse { + // no payload +} + +message GetEntityRequest { + string instanceId = 1; + bool includeState = 2; +} + +message GetEntityResponse { + bool exists = 1; + EntityMetadata entity = 2; +} + +message EntityQuery { + google.protobuf.StringValue instanceIdStartsWith = 1; + google.protobuf.Timestamp lastModifiedFrom = 2; + google.protobuf.Timestamp lastModifiedTo = 3; + bool includeState = 4; + bool includeTransient = 5; + google.protobuf.Int32Value pageSize = 6; + google.protobuf.StringValue continuationToken = 7; +} + +message QueryEntitiesRequest { + EntityQuery query = 1; +} + +message QueryEntitiesResponse { + repeated EntityMetadata entities = 1; + google.protobuf.StringValue continuationToken = 2; +} + +message EntityMetadata { + string instanceId = 1; + google.protobuf.Timestamp lastModifiedTime = 2; + int32 backlogQueueSize = 3; + google.protobuf.StringValue lockedBy = 4; + google.protobuf.StringValue serializedState = 5; +} + +message CleanEntityStorageRequest { + google.protobuf.StringValue continuationToken = 1; + bool removeEmptyEntities = 2; + bool releaseOrphanedLocks = 3; +} + +message CleanEntityStorageResponse { + google.protobuf.StringValue continuationToken = 1; + int32 emptyEntitiesRemoved = 2; + int32 orphanedLocksReleased = 3; +} + +message OrchestratorEntityParameters { + google.protobuf.Duration entityMessageReorderWindow = 1; +} + +message EntityBatchRequest { + string instanceId = 1; + google.protobuf.StringValue entityState = 2; + repeated OperationRequest operations = 3; +} + +message EntityBatchResult { + repeated OperationResult results = 1; + repeated OperationAction actions = 2; + google.protobuf.StringValue entityState = 3; + TaskFailureDetails failureDetails = 4; + string completionToken = 5; + repeated OperationInfo operationInfos = 6; // used only with DTS +} + +message EntityRequest { + string instanceId = 1; + string executionId = 2; + google.protobuf.StringValue entityState = 3; // null if entity does not exist + repeated HistoryEvent operationRequests = 4; +} + +message OperationRequest { + string operation = 1; + string requestId = 2; + google.protobuf.StringValue input = 3; + TraceContext traceContext = 4; +} + +message OperationResult { + oneof resultType { + OperationResultSuccess success = 1; + OperationResultFailure failure = 2; + } +} + +message OperationInfo { + string requestId = 1; + OrchestrationInstance responseDestination = 2; // null for signals +} + +message OperationResultSuccess { + google.protobuf.StringValue result = 1; + google.protobuf.Timestamp startTimeUtc = 2; + google.protobuf.Timestamp endTimeUtc = 3; +} + +message OperationResultFailure { + TaskFailureDetails failureDetails = 1; + google.protobuf.Timestamp startTimeUtc = 2; + google.protobuf.Timestamp endTimeUtc = 3; +} + +message OperationAction { + int32 id = 1; + oneof operationActionType { + SendSignalAction sendSignal = 2; + StartNewOrchestrationAction startNewOrchestration = 3; + } +} + +message SendSignalAction { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue input = 3; + google.protobuf.Timestamp scheduledTime = 4; + google.protobuf.Timestamp requestTime = 5; + TraceContext parentTraceContext = 6; +} + +message StartNewOrchestrationAction { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + google.protobuf.StringValue input = 4; + google.protobuf.Timestamp scheduledTime = 5; + google.protobuf.Timestamp requestTime = 6; + TraceContext parentTraceContext = 7; +} + +message AbandonActivityTaskRequest { + string completionToken = 1; +} + +message AbandonActivityTaskResponse { + // Empty. +} + +message AbandonOrchestrationTaskRequest { + string completionToken = 1; +} + +message AbandonOrchestrationTaskResponse { + // Empty. +} + +message AbandonEntityTaskRequest { + string completionToken = 1; +} + +message AbandonEntityTaskResponse { + // Empty. +} + +message SkipGracefulOrchestrationTerminationsRequest { + InstanceBatch instanceBatch = 1; + google.protobuf.StringValue reason = 2; +} + +message SkipGracefulOrchestrationTerminationsResponse { + // Those instances which could not be terminated because they had locked entities at the time of this termination call, + // are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged) repeated string unterminatedInstanceIds = 1; -} - -service TaskHubSidecarService { - // Sends a hello request to the sidecar service. - rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty); - - // Starts a new orchestration instance. - rpc StartInstance(CreateInstanceRequest) returns (CreateInstanceResponse); - - // Gets the status of an existing orchestration instance. - rpc GetInstance(GetInstanceRequest) returns (GetInstanceResponse); - - // Rewinds an orchestration instance to last known good state and replays from there. - rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse); - - // Restarts an orchestration instance. - rpc RestartInstance(RestartInstanceRequest) returns (RestartInstanceResponse); - - // Waits for an orchestration instance to reach a running or completion state. - rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse); - - // Waits for an orchestration instance to reach a completion state (completed, failed, terminated, etc.). - rpc WaitForInstanceCompletion(GetInstanceRequest) returns (GetInstanceResponse); - - // Raises an event to a running orchestration instance. - rpc RaiseEvent(RaiseEventRequest) returns (RaiseEventResponse); - - // Terminates a running orchestration instance. - rpc TerminateInstance(TerminateRequest) returns (TerminateResponse); - - // Suspends a running orchestration instance. - rpc SuspendInstance(SuspendRequest) returns (SuspendResponse); - - // Resumes a suspended orchestration instance. - rpc ResumeInstance(ResumeRequest) returns (ResumeResponse); - - // rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse); - - rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse); - rpc PurgeInstances(PurgeInstancesRequest) returns (PurgeInstancesResponse); - - rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem); - rpc CompleteActivityTask(ActivityResponse) returns (CompleteTaskResponse); - rpc CompleteOrchestratorTask(OrchestratorResponse) returns (CompleteTaskResponse); - rpc CompleteEntityTask(EntityBatchResult) returns (CompleteTaskResponse); - - // Gets the history of an orchestration instance as a stream of events. - rpc StreamInstanceHistory(StreamInstanceHistoryRequest) returns (stream HistoryChunk); - - // Deletes and Creates the necessary resources for the orchestration service and the instance store - rpc CreateTaskHub(CreateTaskHubRequest) returns (CreateTaskHubResponse); - - // Deletes the resources for the orchestration service and optionally the instance store - rpc DeleteTaskHub(DeleteTaskHubRequest) returns (DeleteTaskHubResponse); - - // sends a signal to an entity - rpc SignalEntity(SignalEntityRequest) returns (SignalEntityResponse); - - // get information about a specific entity - rpc GetEntity(GetEntityRequest) returns (GetEntityResponse); - - // query entities - rpc QueryEntities(QueryEntitiesRequest) returns (QueryEntitiesResponse); - - // clean entity storage - rpc CleanEntityStorage(CleanEntityStorageRequest) returns (CleanEntityStorageResponse); - - // Abandons a single work item - rpc AbandonTaskActivityWorkItem(AbandonActivityTaskRequest) returns (AbandonActivityTaskResponse); - - // Abandon an orchestration work item - rpc AbandonTaskOrchestratorWorkItem(AbandonOrchestrationTaskRequest) returns (AbandonOrchestrationTaskResponse); - - // Abandon an entity work item - rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse); - - // "Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated". - // Note that a maximum of 500 orchestrations can be terminated at a time using this method. - rpc SkipGracefulOrchestrationTerminations(SkipGracefulOrchestrationTerminationsRequest) returns (SkipGracefulOrchestrationTerminationsResponse); -} - -message GetWorkItemsRequest { - int32 maxConcurrentOrchestrationWorkItems = 1; - int32 maxConcurrentActivityWorkItems = 2; - int32 maxConcurrentEntityWorkItems = 3; - - repeated WorkerCapability capabilities = 10; -} - -enum WorkerCapability { - WORKER_CAPABILITY_UNSPECIFIED = 0; - - // Indicates that the worker is capable of streaming instance history as a more optimized - // alternative to receiving the full history embedded in the orchestrator work-item. - // When set, the service may return work items without any history events as an optimization. - // It is strongly recommended that all SDKs support this capability. - WORKER_CAPABILITY_HISTORY_STREAMING = 1; -} - -message WorkItem { - oneof request { - OrchestratorRequest orchestratorRequest = 1; - ActivityRequest activityRequest = 2; - EntityBatchRequest entityRequest = 3; // (older) used by orchestration services implementations - HealthPing healthPing = 4; - EntityRequest entityRequestV2 = 5; // (newer) used by backend service implementations - } - string completionToken = 10; -} - -message CompleteTaskResponse { - // No payload -} - -message HealthPing { - // No payload -} - -message StreamInstanceHistoryRequest { - string instanceId = 1; - google.protobuf.StringValue executionId = 2; - - // When set to true, the service may return a more optimized response suitable for workers. - bool forWorkItemProcessing = 3; -} - -message HistoryChunk { - repeated HistoryEvent events = 1; -} \ No newline at end of file + +} + +service TaskHubSidecarService { + // Sends a hello request to the sidecar service. + rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty); + + // Starts a new orchestration instance. + rpc StartInstance(CreateInstanceRequest) returns (CreateInstanceResponse); + + // Gets the status of an existing orchestration instance. + rpc GetInstance(GetInstanceRequest) returns (GetInstanceResponse); + + // Rewinds an orchestration instance to last known good state and replays from there. + rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse); + + // Restarts an orchestration instance. + rpc RestartInstance(RestartInstanceRequest) returns (RestartInstanceResponse); + + // Waits for an orchestration instance to reach a running or completion state. + rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse); + + // Waits for an orchestration instance to reach a completion state (completed, failed, terminated, etc.). + rpc WaitForInstanceCompletion(GetInstanceRequest) returns (GetInstanceResponse); + + // Raises an event to a running orchestration instance. + rpc RaiseEvent(RaiseEventRequest) returns (RaiseEventResponse); + + // Terminates a running orchestration instance. + rpc TerminateInstance(TerminateRequest) returns (TerminateResponse); + + // Suspends a running orchestration instance. + rpc SuspendInstance(SuspendRequest) returns (SuspendResponse); + + // Resumes a suspended orchestration instance. + rpc ResumeInstance(ResumeRequest) returns (ResumeResponse); + + // rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse); + + rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse); + rpc PurgeInstances(PurgeInstancesRequest) returns (PurgeInstancesResponse); + + rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem); + rpc CompleteActivityTask(ActivityResponse) returns (CompleteTaskResponse); + rpc CompleteOrchestratorTask(OrchestratorResponse) returns (CompleteTaskResponse); + rpc CompleteEntityTask(EntityBatchResult) returns (CompleteTaskResponse); + + // Gets the history of an orchestration instance as a stream of events. + rpc StreamInstanceHistory(StreamInstanceHistoryRequest) returns (stream HistoryChunk); + + // Deletes and Creates the necessary resources for the orchestration service and the instance store + rpc CreateTaskHub(CreateTaskHubRequest) returns (CreateTaskHubResponse); + + // Deletes the resources for the orchestration service and optionally the instance store + rpc DeleteTaskHub(DeleteTaskHubRequest) returns (DeleteTaskHubResponse); + + // sends a signal to an entity + rpc SignalEntity(SignalEntityRequest) returns (SignalEntityResponse); + + // get information about a specific entity + rpc GetEntity(GetEntityRequest) returns (GetEntityResponse); + + // query entities + rpc QueryEntities(QueryEntitiesRequest) returns (QueryEntitiesResponse); + + // clean entity storage + rpc CleanEntityStorage(CleanEntityStorageRequest) returns (CleanEntityStorageResponse); + + // Abandons a single work item + rpc AbandonTaskActivityWorkItem(AbandonActivityTaskRequest) returns (AbandonActivityTaskResponse); + + // Abandon an orchestration work item + rpc AbandonTaskOrchestratorWorkItem(AbandonOrchestrationTaskRequest) returns (AbandonOrchestrationTaskResponse); + + // Abandon an entity work item + rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse); + + // "Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated". + // Note that a maximum of 500 orchestrations can be terminated at a time using this method. + rpc SkipGracefulOrchestrationTerminations(SkipGracefulOrchestrationTerminationsRequest) returns (SkipGracefulOrchestrationTerminationsResponse); +} + +message GetWorkItemsRequest { + int32 maxConcurrentOrchestrationWorkItems = 1; + int32 maxConcurrentActivityWorkItems = 2; + int32 maxConcurrentEntityWorkItems = 3; + + repeated WorkerCapability capabilities = 10; +} + +enum WorkerCapability { + WORKER_CAPABILITY_UNSPECIFIED = 0; + + // Indicates that the worker is capable of streaming instance history as a more optimized + // alternative to receiving the full history embedded in the orchestrator work-item. + // When set, the service may return work items without any history events as an optimization. + // It is strongly recommended that all SDKs support this capability. + WORKER_CAPABILITY_HISTORY_STREAMING = 1; +} + +message WorkItem { + oneof request { + OrchestratorRequest orchestratorRequest = 1; + ActivityRequest activityRequest = 2; + EntityBatchRequest entityRequest = 3; // (older) used by orchestration services implementations + HealthPing healthPing = 4; + EntityRequest entityRequestV2 = 5; // (newer) used by backend service implementations + } + string completionToken = 10; +} + +message CompleteTaskResponse { + // No payload +} + +message HealthPing { + // No payload +} + +message StreamInstanceHistoryRequest { + string instanceId = 1; + google.protobuf.StringValue executionId = 2; + + // When set to true, the service may return a more optimized response suitable for workers. + bool forWorkItemProcessing = 3; +} + +message HistoryChunk { + repeated HistoryEvent events = 1; +} + +message InstanceBatch { + // A maximum of 500 instance IDs can be provided in this list. + repeated string instanceIds = 1; +} diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index 3e4d1b21..c5b4db72 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-09-17 01:45:58 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/f5745e0d83f608d77871c1894d9260ceaae08967/protos/orchestrator_service.proto +# The following files were downloaded from branch nytian/failure-details at 2025-10-01 21:51:24 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/362f886f1ef7c4dd90cbdfdb2f661f48eeeec4fa/protos/orchestrator_service.proto diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index e3e331f7..cf6d9575 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -13,6 +13,7 @@ using DurableTask.Core.History; using DurableTask.Core.Tracing; using Google.Protobuf; +using Google.Protobuf.Collections; using Google.Protobuf.WellKnownTypes; using DTCore = DurableTask.Core; using P = Microsoft.DurableTask.Protobuf; @@ -528,29 +529,43 @@ internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status) failureDetails.ErrorType, failureDetails.ErrorMessage, failureDetails.StackTrace, - failureDetails.InnerFailure.ToTaskFailureDetails()); + failureDetails.InnerFailure.ToTaskFailureDetails(), + ConvertProperties(failureDetails.Properties)); } /// /// Converts a to . /// /// The exception to convert. + /// Optional exception properties provider. /// The task failure details. [return: NotNullIfNotNull(nameof(e))] - internal static P.TaskFailureDetails? ToTaskFailureDetails(this Exception? e) + internal static P.TaskFailureDetails? ToTaskFailureDetails(this Exception? e, global::DurableTask.Core.IExceptionPropertiesProvider? exceptionPropertiesProvider = null) { if (e == null) { return null; } - return new P.TaskFailureDetails + var properties = exceptionPropertiesProvider?.GetExceptionProperties(e); + + var taskFailureDetails = new P.TaskFailureDetails { ErrorType = e.GetType().FullName, ErrorMessage = e.Message, StackTrace = e.StackTrace, - InnerFailure = e.InnerException.ToTaskFailureDetails(), + InnerFailure = e.InnerException.ToTaskFailureDetails(exceptionPropertiesProvider), }; + + if (properties != null) + { + foreach (var kvp in properties) + { + taskFailureDetails.Properties[kvp.Key] = ConvertObjectToValue(kvp.Value); + } + } + + return taskFailureDetails; } /// @@ -998,7 +1013,8 @@ internal static T Base64Decode(this MessageParser parser, string encodedMessa failureDetails.ErrorMessage, failureDetails.StackTrace, failureDetails.InnerFailure.ToCore(), - failureDetails.IsNonRetriable); + failureDetails.IsNonRetriable, + ConvertProperties(failureDetails.Properties)); } /// @@ -1044,7 +1060,7 @@ internal static T Base64Decode(this MessageParser parser, string encodedMessa return null; } - return new P.TaskFailureDetails + var taskFailureDetails = new P.TaskFailureDetails { ErrorType = failureDetails.ErrorType ?? "(unknown)", ErrorMessage = failureDetails.ErrorMessage ?? "(unknown)", @@ -1052,6 +1068,17 @@ internal static T Base64Decode(this MessageParser parser, string encodedMessa IsNonRetriable = failureDetails.IsNonRetriable, InnerFailure = failureDetails.InnerFailure.ToProtobuf(), }; + + // Properly populate the MapField + if (failureDetails.Properties != null) + { + foreach (var kvp in failureDetails.Properties) + { + taskFailureDetails.Properties[kvp.Key] = ConvertObjectToValue(kvp.Value); + } + } + + return taskFailureDetails; } static P.OrchestrationStatus ToProtobuf(this OrchestrationStatus status) @@ -1169,4 +1196,66 @@ public void RemoveUnlockObligation(string target) } } } + + /// + /// + /// + /// + /// + public static IDictionary ConvertProperties(MapField properties) + { + return properties.ToDictionary( + kvp => kvp.Key, + kvp => ConvertValue(kvp.Value) + ); + } + + /// + /// Converts a C# object to a protobuf Value. + /// + /// The object to convert. + /// The converted protobuf Value. + private static Value ConvertObjectToValue(object? obj) + { + return obj switch + { + null => Value.ForNull(), + string str => Value.ForString(str), + bool b => Value.ForBool(b), + int i => Value.ForNumber(i), + long l => Value.ForNumber(l), + float f => Value.ForNumber(f), + double d => Value.ForNumber(d), + decimal dec => Value.ForNumber((double)dec), + DateTime dt => Value.ForString(dt.ToString("O")), + DateTimeOffset dto => Value.ForString(dto.ToString("O")), + IDictionary dict => Value.ForStruct(new Struct + { + Fields = { dict.ToDictionary(kvp => kvp.Key, kvp => ConvertObjectToValue(kvp.Value)) }, + }), + IEnumerable list => Value.ForList(list.Select(ConvertObjectToValue).ToArray()), + _ => Value.ForString(obj.ToString() ?? string.Empty), + }; + } + + private static object ConvertValue(Value value) + { + switch (value.KindCase) + { + case Value.KindOneofCase.StringValue: + return value.StringValue; + case Value.KindOneofCase.NumberValue: + return value.NumberValue; + case Value.KindOneofCase.BoolValue: + return value.BoolValue; + case Value.KindOneofCase.StructValue: + return value.StructValue.Fields.ToDictionary(f => f.Key, f => ConvertValue(f.Value)); + case Value.KindOneofCase.ListValue: + return value.ListValue.Values.Select(ConvertValue).ToList(); + case Value.KindOneofCase.NullValue: + return null!; + default: + return value; // fallback + } + } } diff --git a/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs b/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs index 7e70990b..7eae49d9 100644 --- a/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs +++ b/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs @@ -54,7 +54,19 @@ public IHostedService Build(IServiceProvider serviceProvider) Verify.NotNull(this.buildTarget, error); DurableTaskRegistry registry = serviceProvider.GetOptions(this.Name); - return (IHostedService)ActivatorUtilities.CreateInstance( - serviceProvider, this.buildTarget, this.Name, registry.BuildFactory()); + + // Get the IExceptionPropertiesProvider from DI if registered + IExceptionPropertiesProvider? exceptionPropertiesProvider = serviceProvider.GetService(); + + if (exceptionPropertiesProvider != null) + { + return (IHostedService)ActivatorUtilities.CreateInstance( + serviceProvider, this.buildTarget, this.Name, registry.BuildFactory(), exceptionPropertiesProvider); + } + else + { + return (IHostedService)ActivatorUtilities.CreateInstance( + serviceProvider, this.buildTarget, this.Name, registry.BuildFactory()); + } } } diff --git a/src/Worker/Core/ExceptionPropertiesProviderAdapter.cs b/src/Worker/Core/ExceptionPropertiesProviderAdapter.cs new file mode 100644 index 00000000..758a32a6 --- /dev/null +++ b/src/Worker/Core/ExceptionPropertiesProviderAdapter.cs @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; + +namespace Microsoft.DurableTask.Worker; + +/// +/// Adapts a Microsoft.DurableTask.Worker IExceptionPropertiesProvider to DurableTask.Core IExceptionPropertiesProvider. +/// +public sealed class ExceptionPropertiesProviderAdapter : global::DurableTask.Core.IExceptionPropertiesProvider +{ + readonly IExceptionPropertiesProvider inner; + + /// + /// Initializes a new instance of the class. + /// + /// The inner provider to adapt. + public ExceptionPropertiesProviderAdapter(IExceptionPropertiesProvider inner) + { + this.inner = inner ?? throw new ArgumentNullException(nameof(inner)); + } + + /// + /// Gets exception properties from the inner provider. + /// + /// The exception to get properties for. + /// The exception properties dictionary. + public IDictionary? GetExceptionProperties(Exception exception) + => this.inner.GetExceptionProperties(exception); +} diff --git a/src/Worker/Core/Hosting/DurableTaskWorker.cs b/src/Worker/Core/Hosting/DurableTaskWorker.cs index b788a527..01f12d5c 100644 --- a/src/Worker/Core/Hosting/DurableTaskWorker.cs +++ b/src/Worker/Core/Hosting/DurableTaskWorker.cs @@ -31,4 +31,9 @@ protected DurableTaskWorker(string? name, IDurableTaskFactory factory) /// the configured tasks during host construction. /// protected virtual IDurableTaskFactory Factory { get; } + + /// + /// Gets or sets the exception properties provider used to enrich failure details with custom exception properties. + /// + protected IExceptionPropertiesProvider? ExceptionPropertiesProvider { get; set; } } diff --git a/src/Worker/Core/IExceptionPropertiesProvider.cs b/src/Worker/Core/IExceptionPropertiesProvider.cs new file mode 100644 index 00000000..ed95b2c2 --- /dev/null +++ b/src/Worker/Core/IExceptionPropertiesProvider.cs @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Worker; + +/// +/// Provides custom exception property inclusion rules for enriching FailureDetails. +/// +/// +/// Implementations should be thread-safe. The worker may call this for many exceptions concurrently. +/// +public interface IExceptionPropertiesProvider +{ + /// + /// Extracts custom properties from an exception. + /// + /// The exception to extract properties from. + /// A dictionary of custom properties to include in the FailureDetails, or null if no properties should be added. + IDictionary? GetExceptionProperties(Exception exception); +} diff --git a/src/Worker/Core/Shims/DurableTaskShimFactory.cs b/src/Worker/Core/Shims/DurableTaskShimFactory.cs index 584b7eeb..8eea7cd5 100644 --- a/src/Worker/Core/Shims/DurableTaskShimFactory.cs +++ b/src/Worker/Core/Shims/DurableTaskShimFactory.cs @@ -26,6 +26,7 @@ public class DurableTaskShimFactory /// /// The data converter. /// The logger factory. + /// Custom provider used to extract exception properties for inclusion in the failure details. public DurableTaskShimFactory( DurableTaskWorkerOptions? options = null, ILoggerFactory? loggerFactory = null) { diff --git a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs index 330fd188..a5483073 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs @@ -168,7 +168,8 @@ static TaskFailureDetails ConvertFailureDetails(FailureDetails failureDetails) failureDetails.ErrorType, failureDetails.ErrorMessage, failureDetails.StackTrace, - failureDetails.InnerFailure != null ? ConvertFailureDetails(failureDetails.InnerFailure) : null); + failureDetails.InnerFailure != null ? ConvertFailureDetails(failureDetails.InnerFailure) : null, + failureDetails.Properties); async Task CallEntityInternalAsync(EntityInstanceId id, string operationName, object? input) { diff --git a/src/Worker/Core/Shims/TaskOrchestrationShim.cs b/src/Worker/Core/Shims/TaskOrchestrationShim.cs index 127038a3..eb7a179b 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationShim.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationShim.cs @@ -95,7 +95,9 @@ public TaskOrchestrationShim( // failure details are correctly propagated. throw new CoreTaskFailedException(e.Message, e.InnerException) { - FailureDetails = new FailureDetails(e, e.FailureDetails.ToCoreFailureDetails()), + FailureDetails = new FailureDetails(e, + e.FailureDetails.ToCoreFailureDetails(), + properties: e.FailureDetails.Properties), }; } finally diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index 20cd6309..1bfe1ed2 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -33,16 +33,20 @@ class Processor readonly TaskHubSidecarServiceClient client; readonly DurableTaskShimFactory shimFactory; readonly GrpcDurableTaskWorkerOptions.InternalOptions internalOptions; + readonly DTCore.IExceptionPropertiesProvider exceptionPropertiesProvider; [Obsolete("Experimental")] readonly IOrchestrationFilter? orchestrationFilter; - public Processor(GrpcDurableTaskWorker worker, TaskHubSidecarServiceClient client, IOrchestrationFilter? orchestrationFilter = null) + public Processor(GrpcDurableTaskWorker worker, TaskHubSidecarServiceClient client, IOrchestrationFilter? orchestrationFilter = null, IExceptionPropertiesProvider? exceptionPropertiesProvider = null) { this.worker = worker; this.client = client; this.shimFactory = new DurableTaskShimFactory(this.worker.grpcOptions, this.worker.loggerFactory); this.internalOptions = this.worker.grpcOptions.Internal; this.orchestrationFilter = orchestrationFilter; + this.exceptionPropertiesProvider = exceptionPropertiesProvider != null + ? new ExceptionPropertiesProviderAdapter(exceptionPropertiesProvider) + : null; } ILogger Logger => this.worker.logger; @@ -636,7 +640,8 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync( shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), - ErrorPropagationMode.UseFailureDetails); + ErrorPropagationMode.UseFailureDetails, + this.exceptionPropertiesProvider); result = executor.Execute(); } else @@ -654,7 +659,7 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync( { // This is not expected: Normally TaskOrchestrationExecutor handles exceptions in user code. this.Logger.OrchestratorFailed(name, request.InstanceId, unexpected.ToString()); - failureDetails = unexpected.ToTaskFailureDetails(); + failureDetails = unexpected.ToTaskFailureDetails(this.exceptionPropertiesProvider); } P.OrchestratorResponse response; @@ -764,6 +769,8 @@ async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken, this.Logger.ReceivedActivityRequest(request.Name, request.TaskId, instance.InstanceId, inputSize); TaskContext innerContext = new(instance); + innerContext.ExceptionPropertiesProvider = this.exceptionPropertiesProvider; + TaskName name = new(request.Name); string? output = null; P.TaskFailureDetails? failureDetails = null; @@ -789,7 +796,7 @@ async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken, } catch (Exception applicationException) { - failureDetails = applicationException.ToTaskFailureDetails(); + failureDetails = applicationException.ToTaskFailureDetails(this.exceptionPropertiesProvider); } int outputSizeInBytes = 0; diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.cs index 463af441..e4b7352b 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.cs @@ -37,6 +37,30 @@ public GrpcDurableTaskWorker( IServiceProvider services, ILoggerFactory loggerFactory, IOrchestrationFilter? orchestrationFilter = null) + : this(name, factory, grpcOptions, workerOptions, services, loggerFactory, orchestrationFilter, null) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The name of the worker. + /// The task factory. + /// The gRPC-specific worker options. + /// The generic worker options. + /// The service provider. + /// The logger. + /// The optional used to filter orchestration execution. + /// The custom exception properties provider that help build failure details. + public GrpcDurableTaskWorker( + string name, + IDurableTaskFactory factory, + IOptionsMonitor grpcOptions, + IOptionsMonitor workerOptions, + IServiceProvider services, + ILoggerFactory loggerFactory, + IOrchestrationFilter? orchestrationFilter = null, + IExceptionPropertiesProvider? exceptionPropertiesProvider = null) : base(name, factory) { this.grpcOptions = Check.NotNull(grpcOptions).Get(name); @@ -45,6 +69,7 @@ public GrpcDurableTaskWorker( this.loggerFactory = Check.NotNull(loggerFactory); this.logger = loggerFactory.CreateLogger("Microsoft.DurableTask"); // TODO: use better category name. this.orchestrationFilter = orchestrationFilter; + this.ExceptionPropertiesProvider = exceptionPropertiesProvider; } /// @@ -52,7 +77,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await using AsyncDisposable disposable = this.GetCallInvoker(out CallInvoker callInvoker, out string address); this.logger.StartingTaskHubWorker(address); - await new Processor(this, new(callInvoker), this.orchestrationFilter).ExecuteAsync(stoppingToken); + await new Processor(this, new(callInvoker), this.orchestrationFilter, this.ExceptionPropertiesProvider).ExecuteAsync(stoppingToken); } #if NET6_0_OR_GREATER diff --git a/src/Worker/Grpc/GrpcOrchestrationRunner.cs b/src/Worker/Grpc/GrpcOrchestrationRunner.cs index 74a92006..3caf4a6c 100644 --- a/src/Worker/Grpc/GrpcOrchestrationRunner.cs +++ b/src/Worker/Grpc/GrpcOrchestrationRunner.cs @@ -1,140 +1,140 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using DurableTask.Core; -using DurableTask.Core.History; -using Google.Protobuf; -using Microsoft.DurableTask.Worker.Shims; -using Microsoft.Extensions.Caching.Memory; -using Microsoft.Extensions.DependencyInjection; -using P = Microsoft.DurableTask.Protobuf; - -namespace Microsoft.DurableTask.Worker.Grpc; - -/// -/// Helper class for invoking orchestrations directly, without building a worker instance. -/// -/// -/// -/// This static class can be used to execute orchestration logic directly. In order to use it for this purpose, the -/// caller must provide orchestration state as serialized protobuf bytes. -/// -/// The Azure Functions .NET worker extension is the primary intended user of this class, where orchestration state -/// is provided by trigger bindings. -/// -/// -public static class GrpcOrchestrationRunner -{ - /// - /// Loads orchestration history from and uses it to execute the - /// orchestrator function code pointed to by . - /// - /// - /// The type of the orchestrator function input. This type must be deserializable from JSON. - /// - /// - /// The type of the orchestrator function output. This type must be serializable to JSON. - /// - /// - /// The base64-encoded protobuf payload representing an orchestration execution request. - /// - /// A function that implements the orchestrator logic. - /// - /// Optional from which injected dependencies can be retrieved. - /// - /// - /// Returns a base64-encoded set of orchestrator actions to be interpreted by the external orchestration engine. - /// - /// - /// Thrown if or is null. - /// - public static string LoadAndRun( - string encodedOrchestratorRequest, - Func> orchestratorFunc, - IServiceProvider? services = null) - { - Check.NotNull(orchestratorFunc); - return LoadAndRun(encodedOrchestratorRequest, FuncTaskOrchestrator.Create(orchestratorFunc), services); - } - - /// - /// Deserializes orchestration history from and uses it to resume the - /// orchestrator implemented by . - /// - /// - /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string. - /// - /// - /// An implementation that defines the orchestrator logic. - /// - /// - /// Optional from which injected dependencies can be retrieved. - /// - /// - /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger. - /// - /// - /// Thrown if or is null. - /// - /// - /// Thrown if contains invalid data. - /// - public static string LoadAndRun( - string encodedOrchestratorRequest, - ITaskOrchestrator implementation, - IServiceProvider? services = null) - { - return LoadAndRun(encodedOrchestratorRequest, implementation, extendedSessionsCache: null, services: services); +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using DurableTask.Core; +using DurableTask.Core.History; +using Google.Protobuf; +using Microsoft.DurableTask.Worker.Shims; +using Microsoft.Extensions.Caching.Memory; +using Microsoft.Extensions.DependencyInjection; +using P = Microsoft.DurableTask.Protobuf; + +namespace Microsoft.DurableTask.Worker.Grpc; + +/// +/// Helper class for invoking orchestrations directly, without building a worker instance. +/// +/// +/// +/// This static class can be used to execute orchestration logic directly. In order to use it for this purpose, the +/// caller must provide orchestration state as serialized protobuf bytes. +/// +/// The Azure Functions .NET worker extension is the primary intended user of this class, where orchestration state +/// is provided by trigger bindings. +/// +/// +public static class GrpcOrchestrationRunner +{ + /// + /// Loads orchestration history from and uses it to execute the + /// orchestrator function code pointed to by . + /// + /// + /// The type of the orchestrator function input. This type must be deserializable from JSON. + /// + /// + /// The type of the orchestrator function output. This type must be serializable to JSON. + /// + /// + /// The base64-encoded protobuf payload representing an orchestration execution request. + /// + /// A function that implements the orchestrator logic. + /// + /// Optional from which injected dependencies can be retrieved. + /// + /// + /// Returns a base64-encoded set of orchestrator actions to be interpreted by the external orchestration engine. + /// + /// + /// Thrown if or is null. + /// + public static string LoadAndRun( + string encodedOrchestratorRequest, + Func> orchestratorFunc, + IServiceProvider? services = null) + { + Check.NotNull(orchestratorFunc); + return LoadAndRun(encodedOrchestratorRequest, FuncTaskOrchestrator.Create(orchestratorFunc), services); + } + + /// + /// Deserializes orchestration history from and uses it to resume the + /// orchestrator implemented by . + /// + /// + /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string. + /// + /// + /// An implementation that defines the orchestrator logic. + /// + /// + /// Optional from which injected dependencies can be retrieved. + /// + /// + /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger. + /// + /// + /// Thrown if or is null. + /// + /// + /// Thrown if contains invalid data. + /// + public static string LoadAndRun( + string encodedOrchestratorRequest, + ITaskOrchestrator implementation, + IServiceProvider? services = null) + { + return LoadAndRun(encodedOrchestratorRequest, implementation, extendedSessionsCache: null, services: services); } - /// - /// Deserializes orchestration history from and uses it to resume the - /// orchestrator implemented by . - /// - /// - /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string. - /// - /// + /// + /// Deserializes orchestration history from and uses it to resume the + /// orchestrator implemented by . + /// + /// + /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string. + /// + /// /// An implementation that defines the orchestrator logic. /// - /// - /// The cache of extended sessions which can be used to retrieve the if this orchestration request is from within an extended session. - /// - /// - /// Optional from which injected dependencies can be retrieved. - /// - /// - /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger. - /// - /// - /// Thrown if or is null. - /// - /// - /// Thrown if contains invalid data. - /// - public static string LoadAndRun( - string encodedOrchestratorRequest, + /// + /// The cache of extended sessions which can be used to retrieve the if this orchestration request is from within an extended session. + /// + /// + /// Optional from which injected dependencies can be retrieved. + /// + /// + /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger. + /// + /// + /// Thrown if or is null. + /// + /// + /// Thrown if contains invalid data. + /// + public static string LoadAndRun( + string encodedOrchestratorRequest, ITaskOrchestrator implementation, - ExtendedSessionsCache? extendedSessionsCache, - IServiceProvider? services = null) - { - Check.NotNullOrEmpty(encodedOrchestratorRequest); - Check.NotNull(implementation); - - P.OrchestratorRequest request = P.OrchestratorRequest.Parser.Base64Decode( - encodedOrchestratorRequest); - - List pastEvents = request.PastEvents.Select(ProtoUtils.ConvertHistoryEvent).ToList(); - IEnumerable newEvents = request.NewEvents.Select(ProtoUtils.ConvertHistoryEvent); - Dictionary properties = request.Properties.ToDictionary( - pair => pair.Key, - pair => ProtoUtils.ConvertValueToObject(pair.Value)); - + ExtendedSessionsCache? extendedSessionsCache, + IServiceProvider? services = null) + { + Check.NotNullOrEmpty(encodedOrchestratorRequest); + Check.NotNull(implementation); + + P.OrchestratorRequest request = P.OrchestratorRequest.Parser.Base64Decode( + encodedOrchestratorRequest); + + List pastEvents = request.PastEvents.Select(ProtoUtils.ConvertHistoryEvent).ToList(); + IEnumerable newEvents = request.NewEvents.Select(ProtoUtils.ConvertHistoryEvent); + Dictionary properties = request.Properties.ToDictionary( + pair => pair.Key, + pair => ProtoUtils.ConvertValueToObject(pair.Value)); + OrchestratorExecutionResult? result = null; MemoryCache? extendedSessions = null; - // If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the orchestration history is attached - bool addToExtendedSessions = false; + // If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the orchestration history is attached + bool addToExtendedSessions = false; bool requiresHistory = false; bool pastEventsIncluded = true; bool isExtendedSession = false; @@ -157,88 +157,88 @@ public static string LoadAndRun( { pastEventsIncluded = includePastEvents; } - + if (isExtendedSession && extendedSessions != null) { // If a history was provided, even if we already have an extended session stored, we always want to evict whatever state is in the cache and replace it with a new extended // session based on the provided history - if (!pastEventsIncluded && extendedSessions.TryGetValue(request.InstanceId, out ExtendedSessionState? extendedSessionState) && extendedSessionState is not null) - { - OrchestrationRuntimeState runtimeState = extendedSessionState!.RuntimeState; - runtimeState.NewEvents.Clear(); - foreach (HistoryEvent newEvent in newEvents) - { - runtimeState.AddEvent(newEvent); - } - - result = extendedSessionState.OrchestrationExecutor.ExecuteNewEvents(); - if (extendedSessionState.OrchestrationExecutor.IsCompleted) - { - extendedSessions.Remove(request.InstanceId); - } - } - else + if (!pastEventsIncluded && extendedSessions.TryGetValue(request.InstanceId, out ExtendedSessionState? extendedSessionState) && extendedSessionState is not null) + { + OrchestrationRuntimeState runtimeState = extendedSessionState!.RuntimeState; + runtimeState.NewEvents.Clear(); + foreach (HistoryEvent newEvent in newEvents) + { + runtimeState.AddEvent(newEvent); + } + + result = extendedSessionState.OrchestrationExecutor.ExecuteNewEvents(); + if (extendedSessionState.OrchestrationExecutor.IsCompleted) + { + extendedSessions.Remove(request.InstanceId); + } + } + else + { + extendedSessions.Remove(request.InstanceId); + addToExtendedSessions = true; + } + } + + if (result == null) + { + // DurableTask.Core did not attach the orchestration history since the extended session is still active on its end, but we have since evicted the + // session and lost the orchestration history so we cannot replay the orchestration. + if (!pastEventsIncluded) { - extendedSessions.Remove(request.InstanceId); - addToExtendedSessions = true; + requiresHistory = true; } - } - - if (result == null) - { - // DurableTask.Core did not attach the orchestration history since the extended session is still active on its end, but we have since evicted the - // session and lost the orchestration history so we cannot replay the orchestration. - if (!pastEventsIncluded) - { - requiresHistory = true; - } - else - { - // Re-construct the orchestration state from the history. - // New events must be added using the AddEvent method. - OrchestrationRuntimeState runtimeState = new(pastEvents); - - foreach (HistoryEvent newEvent in newEvents) - { - runtimeState.AddEvent(newEvent); - } - - TaskName orchestratorName = new(runtimeState.Name); - ParentOrchestrationInstance? parent = runtimeState.ParentInstance is ParentInstance p - ? new(new(p.Name), p.OrchestrationInstance.InstanceId) - : null; - - DurableTaskShimFactory factory = services is null - ? DurableTaskShimFactory.Default - : ActivatorUtilities.GetServiceOrCreateInstance(services); - TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, properties, parent); - TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails); - result = executor.Execute(); - - if (addToExtendedSessions && !executor.IsCompleted) - { - extendedSessions.Set( - request.InstanceId, - new(runtimeState, shim, executor), - new MemoryCacheEntryOptions { SlidingExpiration = TimeSpan.FromSeconds(extendedSessionIdleTimeoutInSeconds) }); - } - else - { - extendedSessions?.Remove(request.InstanceId); - } - } - } - - P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse( + else + { + // Re-construct the orchestration state from the history. + // New events must be added using the AddEvent method. + OrchestrationRuntimeState runtimeState = new(pastEvents); + + foreach (HistoryEvent newEvent in newEvents) + { + runtimeState.AddEvent(newEvent); + } + + TaskName orchestratorName = new(runtimeState.Name); + ParentOrchestrationInstance? parent = runtimeState.ParentInstance is ParentInstance p + ? new(new(p.Name), p.OrchestrationInstance.InstanceId) + : null; + + DurableTaskShimFactory factory = services is null + ? DurableTaskShimFactory.Default + : ActivatorUtilities.GetServiceOrCreateInstance(services); + TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, properties, parent); + TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails); + result = executor.Execute(); + + if (addToExtendedSessions && !executor.IsCompleted) + { + extendedSessions.Set( + request.InstanceId, + new(runtimeState, shim, executor), + new MemoryCacheEntryOptions { SlidingExpiration = TimeSpan.FromSeconds(extendedSessionIdleTimeoutInSeconds) }); + } + else + { + extendedSessions?.Remove(request.InstanceId); + } + } + } + + P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse( request.InstanceId, - request.ExecutionId, - result?.CustomStatus, - result?.Actions, - completionToken: string.Empty, /* doesn't apply */ + request.ExecutionId, + result?.CustomStatus, + result?.Actions, + completionToken: string.Empty, /* doesn't apply */ entityConversionState: null, - orchestrationActivity: null, - requiresHistory: requiresHistory); - byte[] responseBytes = response.ToByteArray(); - return Convert.ToBase64String(responseBytes); - } -} + orchestrationActivity: null, + requiresHistory: requiresHistory); + byte[] responseBytes = response.ToByteArray(); + return Convert.ToBase64String(responseBytes); + } +} diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/ProtobufUtils.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/ProtobufUtils.cs index ae8c16a1..457003d6 100644 --- a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/ProtobufUtils.cs +++ b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/ProtobufUtils.cs @@ -8,6 +8,7 @@ using DurableTask.Core.Query; using DurableTask.Core.Tracing; using Google.Protobuf; +using Google.Protobuf.Collections; using Google.Protobuf.WellKnownTypes; using Microsoft.DurableTask.Sidecar.Dispatcher; using Proto = Microsoft.DurableTask.Protobuf; @@ -357,7 +358,8 @@ public static string Base64Encode(IMessage message) failureDetails.ErrorMessage, failureDetails.StackTrace, GetFailureDetails(failureDetails.InnerFailure), - failureDetails.IsNonRetriable); + failureDetails.IsNonRetriable, + ConvertProperties(failureDetails.Properties)); } internal static Proto.TaskFailureDetails? GetFailureDetails(FailureDetails? failureDetails) @@ -366,8 +368,8 @@ public static string Base64Encode(IMessage message) { return null; } - - return new Proto.TaskFailureDetails + + var taskFailureDetails = new Proto.TaskFailureDetails { ErrorType = failureDetails.ErrorType, ErrorMessage = failureDetails.ErrorMessage, @@ -375,6 +377,18 @@ public static string Base64Encode(IMessage message) InnerFailure = GetFailureDetails(failureDetails.InnerFailure), IsNonRetriable = failureDetails.IsNonRetriable, }; + + // Add properties if they exist + if (failureDetails.Properties != null) + { + var mapField = ConvertDictionaryToMapField(failureDetails.Properties); + foreach (var kvp in mapField) + { + taskFailureDetails.Properties.Add(kvp.Key, kvp.Value); + } + } + + return taskFailureDetails; } internal static OrchestrationQuery ToOrchestrationQuery(Proto.QueryInstancesRequest request) @@ -438,4 +452,82 @@ internal static Proto.PurgeInstancesResponse CreatePurgeInstancesResponse(PurgeR }; return response; } + + /// + /// Converts a Dictionary into a MapField. + /// Supports nested dictionaries and lists. + /// + public static MapField ConvertDictionaryToMapField(IDictionary dict) + { + var map = new MapField(); + + foreach (var kvp in dict) + { + map[kvp.Key] = ConvertObjectToValue(kvp.Value); + } + + return map; + } + + /// + /// + /// + /// + /// + public static IDictionary ConvertProperties(MapField properties) + { + return properties.ToDictionary( + kvp => kvp.Key, + kvp => ConvertValue(kvp.Value) + ); + } + + /// + /// Converts a C# object to a protobuf Value. + /// + /// The object to convert. + /// The converted protobuf Value. + private static Value ConvertObjectToValue(object? obj) + { + return obj switch + { + null => Value.ForNull(), + string str => Value.ForString(str), + bool b => Value.ForBool(b), + int i => Value.ForNumber(i), + long l => Value.ForNumber(l), + float f => Value.ForNumber(f), + double d => Value.ForNumber(d), + decimal dec => Value.ForNumber((double)dec), + DateTime dt => Value.ForString(dt.ToString("O")), + DateTimeOffset dto => Value.ForString(dto.ToString("O")), + IDictionary dict => Value.ForStruct(new Struct + { + Fields = { dict.ToDictionary(kvp => kvp.Key, kvp => ConvertObjectToValue(kvp.Value)) }, + }), + IEnumerable list => Value.ForList(list.Select(ConvertObjectToValue).ToArray()), + _ => Value.ForString(obj.ToString() ?? string.Empty), + }; + } + + private static object ConvertValue(Value value) + { + switch (value.KindCase) + { + case Value.KindOneofCase.StringValue: + return value.StringValue; + case Value.KindOneofCase.NumberValue: + return value.NumberValue; + case Value.KindOneofCase.BoolValue: + return value.BoolValue; + case Value.KindOneofCase.StructValue: + return value.StructValue.Fields.ToDictionary(f => f.Key, f => ConvertValue(f.Value)); + case Value.KindOneofCase.ListValue: + return value.ListValue.Values.Select(ConvertValue).ToList(); + case Value.KindOneofCase.NullValue: + return null!; + default: + return value; // fallback + } + } } diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs index e3a320f7..ecf7975e 100644 --- a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs +++ b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs @@ -306,7 +306,7 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, Input = request.GetInputsAndOutputs ? state.Input : null, Output = request.GetInputsAndOutputs ? state.Output : null, CustomStatus = request.GetInputsAndOutputs ? state.Status : null, - FailureDetails = request.GetInputsAndOutputs ? GetFailureDetails(state.FailureDetails) : null, + FailureDetails = request.GetInputsAndOutputs ? ProtobufUtils.GetFailureDetails(state.FailureDetails) : null, Tags = { state.Tags } } }; @@ -398,22 +398,6 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, } } - static P.TaskFailureDetails? GetFailureDetails(FailureDetails? failureDetails) - { - if (failureDetails == null) - { - return null; - } - - return new P.TaskFailureDetails - { - ErrorType = failureDetails.ErrorType, - ErrorMessage = failureDetails.ErrorMessage, - StackTrace = failureDetails.StackTrace, - InnerFailure = GetFailureDetails(failureDetails.InnerFailure), - }; - } - /// /// Invoked by the remote SDK over gRPC when an orchestrator task (episode) is completed. /// diff --git a/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs b/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs index e9e3bdcd..3181bbd8 100644 --- a/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs +++ b/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using System.Runtime.Serialization; +using Microsoft.Extensions.DependencyInjection; using Microsoft.DurableTask.Client; using Microsoft.DurableTask.Tests.Logging; using Microsoft.DurableTask.Worker; @@ -631,6 +632,61 @@ static Exception MakeException(Type exceptionType, string message) return (Exception)Activator.CreateInstance(exceptionType, message)!; } + /// + /// Tests that custom exception properties are included in FailureDetails when using a custom IExceptionPropertiesProvider. + /// + [Fact] + public async Task CustomExceptionPropertiesInFailureDetails() + { + TaskName orchestratorName = "OrchestrationWithCustomException"; + TaskName activityName = "BusinessActivity"; + + // Use local function definitions + async Task MyOrchestrationImpl(TaskOrchestrationContext ctx) => + await ctx.CallActivityAsync(activityName); + + void MyActivityImpl(TaskActivityContext ctx) => + throw new ArgumentOutOfRangeException( + paramName: "age", + actualValue: 150, + message: "Age must be less than 120"); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + // Register the custom exception properties provider + b.Services.AddSingleton(); + + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, MyOrchestrationImpl) + .AddActivityFunc(activityName, MyActivityImpl)); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + + Assert.NotNull(metadata.FailureDetails); + TaskFailureDetails failureDetails = metadata.FailureDetails!; + Assert.Equal(typeof(TaskFailedException).FullName, failureDetails.ErrorType); + + // Check that the activity failure is in the inner failure + Assert.NotNull(failureDetails.InnerFailure); + TaskFailureDetails innerFailure = failureDetails.InnerFailure!; + Assert.Equal(typeof(ArgumentOutOfRangeException).FullName, innerFailure.ErrorType); + + // Check that custom properties are included + Assert.NotNull(innerFailure.Properties); + Assert.True(innerFailure.Properties.ContainsKey("Name")); + Assert.True(innerFailure.Properties.ContainsKey("Value")); + + Assert.Equal("age", innerFailure.Properties["Name"]); + Assert.Equal((double)150, innerFailure.Properties["Value"]); + } + [Serializable] class CustomException : Exception { @@ -649,4 +705,21 @@ protected CustomException(SerializationInfo info, StreamingContext context) { } } + + // Set a custom exception provider. + class TestExceptionPropertiesProvider : IExceptionPropertiesProvider + { + public IDictionary? GetExceptionProperties(Exception exception) + { + return exception switch + { + ArgumentOutOfRangeException e => new Dictionary + { + ["Name"] = e.ParamName ?? string.Empty, + ["Value"] = e.ActualValue ?? string.Empty, + }, + _ => null // No custom properties for other exceptions + }; + } + } } diff --git a/test/ScheduledTasks.Tests/Client/DefaultScheduleClientTests.cs b/test/ScheduledTasks.Tests/Client/DefaultScheduleClientTests.cs index 70cc60fc..d5c89fa0 100644 --- a/test/ScheduledTasks.Tests/Client/DefaultScheduleClientTests.cs +++ b/test/ScheduledTasks.Tests/Client/DefaultScheduleClientTests.cs @@ -166,7 +166,7 @@ public async Task DeleteAsync_WhenOrchestrationFails_ThrowsException() .ReturnsAsync(new OrchestrationMetadata(nameof(ExecuteScheduleOperationOrchestrator), instanceId) { RuntimeStatus = OrchestrationRuntimeStatus.Failed, - FailureDetails = new TaskFailureDetails("TestError", errorMessage, null, null) + FailureDetails = new TaskFailureDetails("TestError", errorMessage, null, null, null) }); // Act & Assert diff --git a/test/Worker/Grpc.Tests/ExceptionPropertiesProviderRegistrationTests.cs b/test/Worker/Grpc.Tests/ExceptionPropertiesProviderRegistrationTests.cs new file mode 100644 index 00000000..456e1a41 --- /dev/null +++ b/test/Worker/Grpc.Tests/ExceptionPropertiesProviderRegistrationTests.cs @@ -0,0 +1,58 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Reflection; +using Microsoft.DurableTask.Worker; +using Microsoft.DurableTask.Worker.Grpc; +using Microsoft.DurableTask.Worker.Hosting; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Xunit; + +namespace Worker.Grpc.Tests; + +public class ExceptionPropertiesProviderRegistrationTests +{ + sealed class TestExceptionPropertiesProvider : IExceptionPropertiesProvider + { + public IDictionary? GetExceptionProperties(Exception exception) + { + return new Dictionary { ["Foo"] = "Bar" }; + } + } + + [Fact] + public void DiRegistration_RegistersAndFlowsToWorker() + { + ServiceCollection services = new(); + services.AddSingleton(NullLoggerFactory.Instance); + + // Register via DI directly + services.AddSingleton(); + + services.AddDurableTaskWorker(builder => + { + builder.UseGrpc(); + }); + + using ServiceProvider sp = services.BuildServiceProvider(); + + IHostedService hosted = Assert.Single(sp.GetServices()); + Assert.IsType(hosted); + + object? provider = typeof(DurableTaskWorker) + .GetProperty("ExceptionPropertiesProvider", BindingFlags.Instance | BindingFlags.NonPublic)! + .GetValue(hosted); + + Assert.NotNull(provider); + Assert.IsType(provider); + + // And DI resolves the same instance + var resolved = sp.GetRequiredService(); + Assert.Same(resolved, provider); + } +} + + diff --git a/test/Worker/Grpc.Tests/RunBackgroundTaskLoggingTests.cs b/test/Worker/Grpc.Tests/RunBackgroundTaskLoggingTests.cs index 86453dde..f33321bf 100644 --- a/test/Worker/Grpc.Tests/RunBackgroundTaskLoggingTests.cs +++ b/test/Worker/Grpc.Tests/RunBackgroundTaskLoggingTests.cs @@ -388,7 +388,9 @@ public static async Task CreateAsync() grpcOptions: grpcOptions, workerOptions: workerOptions, services: services, - loggerFactory: loggerFactory); + loggerFactory: loggerFactory, + orchestrationFilter: null, + exceptionPropertiesProvider: null); // Client mock var callInvoker = Mock.Of(); From 8636a3d13c1e7662ebea8c5cd44a67bee3627e7b Mon Sep 17 00:00:00 2001 From: Naiyuan Tian <110135109+nytian@users.noreply.github.com> Date: Fri, 3 Oct 2025 10:24:53 -0700 Subject: [PATCH 2/8] Update Microsoft.Azure.DurableTask.Core to version 3.5.0 --- Directory.Packages.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index 34509fe6..a0a48e71 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -29,7 +29,7 @@ - + From c4321e41f3e569a4054497df82755b0645689399 Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Sun, 5 Oct 2025 20:56:52 -0700 Subject: [PATCH 3/8] add factory change support for df support --- src/Worker/Core/Shims/DurableTaskShimFactory.cs | 16 +++++++++++++++- src/Worker/Grpc/GrpcOrchestrationRunner.cs | 11 ++++++++++- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/Worker/Core/Shims/DurableTaskShimFactory.cs b/src/Worker/Core/Shims/DurableTaskShimFactory.cs index 584b7eeb..6c79dc65 100644 --- a/src/Worker/Core/Shims/DurableTaskShimFactory.cs +++ b/src/Worker/Core/Shims/DurableTaskShimFactory.cs @@ -18,6 +18,7 @@ namespace Microsoft.DurableTask.Worker.Shims; /// public class DurableTaskShimFactory { + public readonly IExceptionPropertiesProvider? exceptionPropertiesProvider; readonly DurableTaskWorkerOptions options; readonly ILoggerFactory loggerFactory; @@ -28,15 +29,28 @@ public class DurableTaskShimFactory /// The logger factory. public DurableTaskShimFactory( DurableTaskWorkerOptions? options = null, ILoggerFactory? loggerFactory = null) + : this(options, loggerFactory, null) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The data converter. + /// The logger factory. + /// The exception properties provider for failure details. + public DurableTaskShimFactory( + DurableTaskWorkerOptions? options = null, ILoggerFactory? loggerFactory = null, IExceptionPropertiesProvider? provider = null) { this.options = options ?? new(); this.loggerFactory = loggerFactory ?? NullLoggerFactory.Instance; + this.exceptionPropertiesProvider = provider; } /// /// Gets the default with default values. /// - public static DurableTaskShimFactory Default { get; } = new(); + public static DurableTaskShimFactory Default { get; } = new(null, null, null); /// /// Creates a from a . diff --git a/src/Worker/Grpc/GrpcOrchestrationRunner.cs b/src/Worker/Grpc/GrpcOrchestrationRunner.cs index 3caf4a6c..2b81afae 100644 --- a/src/Worker/Grpc/GrpcOrchestrationRunner.cs +++ b/src/Worker/Grpc/GrpcOrchestrationRunner.cs @@ -212,7 +212,16 @@ public static string LoadAndRun( ? DurableTaskShimFactory.Default : ActivatorUtilities.GetServiceOrCreateInstance(services); TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, properties, parent); - TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails); + var provider = factory.exceptionPropertiesProvider is not null ? + new ExceptionPropertiesProviderAdapter(factory.exceptionPropertiesProvider) : null; + + TaskOrchestrationExecutor executor = new( + runtimeState, + shim, + BehaviorOnContinueAsNew.Carryover, + request.EntityParameters.ToCore(), + ErrorPropagationMode.UseFailureDetails, + provider); result = executor.Execute(); if (addToExtendedSessions && !executor.IsCompleted) From e98024b0643191b9a08c466dffda1f0bea058a8f Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Mon, 6 Oct 2025 10:49:07 -0700 Subject: [PATCH 4/8] update test and version --- eng/targets/Release.props | 2 +- test/Worker/Grpc.Tests/RunBackgroundTaskLoggingTests.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/eng/targets/Release.props b/eng/targets/Release.props index e04c66c7..953a0eff 100644 --- a/eng/targets/Release.props +++ b/eng/targets/Release.props @@ -17,7 +17,7 @@ - 1.15.1 + 1.16.0 diff --git a/test/Worker/Grpc.Tests/RunBackgroundTaskLoggingTests.cs b/test/Worker/Grpc.Tests/RunBackgroundTaskLoggingTests.cs index f33321bf..56ace8e7 100644 --- a/test/Worker/Grpc.Tests/RunBackgroundTaskLoggingTests.cs +++ b/test/Worker/Grpc.Tests/RunBackgroundTaskLoggingTests.cs @@ -402,7 +402,7 @@ public static async Task CreateAsync() processorType, BindingFlags.Public | BindingFlags.Instance, binder: null, - args: new object?[] { worker, clientMock.Object, null }, + args: new object?[] { worker, clientMock.Object, null, null }, culture: null)!; MethodInfo runBackgroundTask = processorType.GetMethod("RunBackgroundTask", BindingFlags.Instance | BindingFlags.NonPublic)!; From 121a60521b2dc235f5d686a4144fc2c6f97440df Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Mon, 6 Oct 2025 11:51:52 -0700 Subject: [PATCH 5/8] udpate test --- test/Grpc.IntegrationTests/TracingIntegrationTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Grpc.IntegrationTests/TracingIntegrationTests.cs b/test/Grpc.IntegrationTests/TracingIntegrationTests.cs index 041c76de..c12abb26 100644 --- a/test/Grpc.IntegrationTests/TracingIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/TracingIntegrationTests.cs @@ -244,7 +244,7 @@ public async Task TaskOrchestrationWithActivityFailure() // The activity execution activities should be parented to the server activity activities. activityExecutionActivities - .Should().HaveCount(serverActivityActivities.Count) + .Should().HaveCountGreaterThan(0) .And.AllSatisfy(a => { a.ParentId.Should().BeOneOf(serverActivityActivities.Select(aa => aa.Id)); From c34de9314472b31004622477ac95b025f12943ff Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Mon, 6 Oct 2025 12:13:09 -0700 Subject: [PATCH 6/8] fix issue --- .../ExceptionPropertiesProviderAdapter.cs | 2 +- .../Grpc/GrpcDurableTaskWorker.Processor.cs | 25 ------------------- .../TracingIntegrationTests.cs | 2 +- 3 files changed, 2 insertions(+), 27 deletions(-) diff --git a/src/Worker/Core/ExceptionPropertiesProviderAdapter.cs b/src/Worker/Core/ExceptionPropertiesProviderAdapter.cs index 758a32a6..bffe8ccb 100644 --- a/src/Worker/Core/ExceptionPropertiesProviderAdapter.cs +++ b/src/Worker/Core/ExceptionPropertiesProviderAdapter.cs @@ -27,6 +27,6 @@ public ExceptionPropertiesProviderAdapter(IExceptionPropertiesProvider inner) /// /// The exception to get properties for. /// The exception properties dictionary. - public IDictionary? GetExceptionProperties(Exception exception) + public IDictionary? GetExceptionProperties(Exception exception) => this.inner.GetExceptionProperties(exception); } diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index e4f8152a..4dfa70f7 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -818,31 +818,6 @@ await this.client.AbandonTaskActivityWorkItemAsync( return; } - try - { - await using AsyncServiceScope scope = this.worker.services.CreateAsyncScope(); - if (this.worker.Factory.TryCreateActivity(name, scope.ServiceProvider, out ITaskActivity? activity)) - { - // Both the factory invocation and the RunAsync could involve user code and need to be handled as - // part of try/catch. - TaskActivity shim = this.shimFactory.CreateActivity(name, activity); - output = await shim.RunAsync(innerContext, request.Input); - } - else - { - failureDetails = new P.TaskFailureDetails - { - ErrorType = "ActivityTaskNotFound", - ErrorMessage = $"No activity task named '{name}' was found.", - IsNonRetriable = true, - }; - } - } - catch (Exception applicationException) - { - failureDetails = applicationException.ToTaskFailureDetails(this.exceptionPropertiesProvider); - } - int outputSizeInBytes = 0; if (failureDetails != null) { diff --git a/test/Grpc.IntegrationTests/TracingIntegrationTests.cs b/test/Grpc.IntegrationTests/TracingIntegrationTests.cs index c12abb26..041c76de 100644 --- a/test/Grpc.IntegrationTests/TracingIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/TracingIntegrationTests.cs @@ -244,7 +244,7 @@ public async Task TaskOrchestrationWithActivityFailure() // The activity execution activities should be parented to the server activity activities. activityExecutionActivities - .Should().HaveCountGreaterThan(0) + .Should().HaveCount(serverActivityActivities.Count) .And.AllSatisfy(a => { a.ParentId.Should().BeOneOf(serverActivityActivities.Select(aa => aa.Id)); From c08405786da2de0a828d7d809fd7232e35d93637 Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Mon, 6 Oct 2025 12:53:13 -0700 Subject: [PATCH 7/8] udpate commment --- src/Abstractions/TaskFailureDetails.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Abstractions/TaskFailureDetails.cs b/src/Abstractions/TaskFailureDetails.cs index ef981fb0..55325dbf 100644 --- a/src/Abstractions/TaskFailureDetails.cs +++ b/src/Abstractions/TaskFailureDetails.cs @@ -167,12 +167,13 @@ internal CoreFailureDetails ToCoreFailureDetails() coreEx.FailureDetails?.Properties ?? null); } + // might need to udpate this later return new TaskFailureDetails( exception.GetType().ToString(), exception.Message, exception.StackTrace, FromExceptionRecursive(exception.InnerException), - null);// might need to udpate this later + null); } static TaskFailureDetails? FromCoreFailureDetailsRecursive(CoreFailureDetails? coreFailureDetails) From d5f45fe042a71f281f4f30c5a823b4c01d452dae Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Tue, 7 Oct 2025 20:48:35 -0700 Subject: [PATCH 8/8] udpate protoutils --- src/Shared/Grpc/ProtoUtils.cs | 36 ++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index d173e314..b8d88d6e 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -3,10 +3,12 @@ using System.Buffers; using System.Buffers.Text; +using System.Collections; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Globalization; using System.Text; +using System.Text.Json; using DurableTask.Core; using DurableTask.Core.Command; using DurableTask.Core.Entities; @@ -541,14 +543,14 @@ internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status) /// Optional exception properties provider. /// The task failure details. [return: NotNullIfNotNull(nameof(e))] - internal static P.TaskFailureDetails? ToTaskFailureDetails(this Exception? e, global::DurableTask.Core.IExceptionPropertiesProvider? exceptionPropertiesProvider = null) + internal static P.TaskFailureDetails? ToTaskFailureDetails(this Exception? e, DTCore.IExceptionPropertiesProvider? exceptionPropertiesProvider = null) { if (e == null) { return null; } - var properties = exceptionPropertiesProvider?.GetExceptionProperties(e); + IDictionary? properties = exceptionPropertiesProvider?.GetExceptionProperties(e); var taskFailureDetails = new P.TaskFailureDetails { @@ -1038,15 +1040,15 @@ internal static T Base64Decode(this MessageParser parser, string encodedMessa string stringValue = value.StringValue; // Try to parse as DateTime if it's in ISO format - if (DateTime.TryParse(stringValue, null, DateTimeStyles.RoundtripKind, out DateTime dateTime)) + if (stringValue.StartsWith("dt:", StringComparison.Ordinal)) { - return dateTime; + return DateTime.Parse(stringValue[3..], CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind); } // Try to parse as DateTimeOffset if it's in ISO format - if (DateTimeOffset.TryParse(stringValue, null, DateTimeStyles.RoundtripKind, out DateTimeOffset dateTimeOffset)) + if (stringValue.StartsWith("dto:", StringComparison.Ordinal)) { - return dateTimeOffset; + return DateTimeOffset.Parse(stringValue[4..], CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind); } // Otherwise just return as string @@ -1059,8 +1061,9 @@ internal static T Base64Decode(this MessageParser parser, string encodedMessa pair => ConvertValueToObject(pair.Value)); case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.ListValue: return value.ListValue.Values.Select(ConvertValueToObject).ToList(); - default: - throw new NotSupportedException($"Unsupported Value kind: {value.KindCase}"); + default: + // Fallback: serialize the whole value to JSON string + return JsonSerializer.Serialize(value); } } @@ -1073,8 +1076,7 @@ internal static T Base64Decode(this MessageParser parser, string encodedMessa { return properties.ToDictionary( kvp => kvp.Key, - kvp => ConvertValueToObject(kvp.Value) - ); + kvp => ConvertValueToObject(kvp.Value)); } /// @@ -1093,14 +1095,18 @@ internal static Value ConvertObjectToValue(object? obj) long l => Value.ForNumber(l), float f => Value.ForNumber(f), double d => Value.ForNumber(d), - decimal dec => Value.ForNumber((double)dec), - DateTime dt => Value.ForString(dt.ToString("O")), - DateTimeOffset dto => Value.ForString(dto.ToString("O")), - IDictionary dict => Value.ForStruct(new Struct + decimal dec => Value.ForNumber((double)dec), + + // For DateTime and DateTimeOffset, add prefix to distinguish from normal string. + DateTime dt => Value.ForString($"dt:{dt.ToString("O")}"), + DateTimeOffset dto => Value.ForString($"dto:{dto.ToString("O")}"), + IDictionary dict => Value.ForStruct(new Struct { Fields = { dict.ToDictionary(kvp => kvp.Key, kvp => ConvertObjectToValue(kvp.Value)) }, }), - IEnumerable list => Value.ForList(list.Select(ConvertObjectToValue).ToArray()), + IEnumerable e => Value.ForList(e.Cast().Select(ConvertObjectToValue).ToArray()), + + // Fallback: convert unlisted type to string. _ => Value.ForString(obj.ToString() ?? string.Empty), }; }