diff --git a/src/DurableTask.ApplicationInsights/DurableTask.ApplicationInsights.csproj b/src/DurableTask.ApplicationInsights/DurableTask.ApplicationInsights.csproj index 1c168a38c..02f79492c 100644 --- a/src/DurableTask.ApplicationInsights/DurableTask.ApplicationInsights.csproj +++ b/src/DurableTask.ApplicationInsights/DurableTask.ApplicationInsights.csproj @@ -11,8 +11,8 @@ 0 - 7 - 1 + 8 + 0 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj index 09d6cc84f..8ff135fbd 100644 --- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj +++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj @@ -21,8 +21,8 @@ 2 - 6 - 1 + 7 + 0 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 diff --git a/src/DurableTask.Core/Command/OrchestratorActionType.cs b/src/DurableTask.Core/Command/OrchestratorActionType.cs index 7e9cbdc69..34d256aaa 100644 --- a/src/DurableTask.Core/Command/OrchestratorActionType.cs +++ b/src/DurableTask.Core/Command/OrchestratorActionType.cs @@ -42,5 +42,10 @@ public enum OrchestratorActionType /// The orchestrator completed. /// OrchestrationComplete, + + /// + /// The orchestration was rewound. + /// + RewindOrchestration, } } \ No newline at end of file diff --git a/src/DurableTask.Core/Command/RewindOrchestrationAction.cs b/src/DurableTask.Core/Command/RewindOrchestrationAction.cs new file mode 100644 index 000000000..6913726c7 --- /dev/null +++ b/src/DurableTask.Core/Command/RewindOrchestrationAction.cs @@ -0,0 +1,25 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- +#nullable enable +namespace DurableTask.Core.Command +{ + + /// + /// Orchestrator action for rewinding orchestrations. + /// + public class RewindOrchestrationAction : OrchestratorAction + { + /// + public override OrchestratorActionType OrchestratorActionType => OrchestratorActionType.RewindOrchestration; + } +} \ No newline at end of file diff --git a/src/DurableTask.Core/DurableTask.Core.csproj b/src/DurableTask.Core/DurableTask.Core.csproj index b7acf7eec..1327f7492 100644 --- a/src/DurableTask.Core/DurableTask.Core.csproj +++ b/src/DurableTask.Core/DurableTask.Core.csproj @@ -17,8 +17,8 @@ 3 - 5 - 1 + 6 + 0 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 diff --git a/src/DurableTask.Core/History/EventType.cs b/src/DurableTask.Core/History/EventType.cs index f9412d0f2..52c44ad11 100644 --- a/src/DurableTask.Core/History/EventType.cs +++ b/src/DurableTask.Core/History/EventType.cs @@ -125,5 +125,10 @@ public enum EventType /// Orchestration was resumed event /// ExecutionResumed, + + /// + /// Orchestration was rewound event. + /// + ExecutionRewound, } } \ No newline at end of file diff --git a/src/DurableTask.Core/History/ExecutionRewoundEvent.cs b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs new file mode 100644 index 000000000..c838e9eb2 --- /dev/null +++ b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs @@ -0,0 +1,71 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- +#nullable enable +namespace DurableTask.Core.History +{ + using DurableTask.Core.Tracing; + using System.Runtime.Serialization; + + /// + /// Generic History event + /// + [DataContract] + public class ExecutionRewoundEvent : HistoryEvent, ISupportsDurableTraceContext + { + /// + /// Creates a new ExecutionRewoundEvent with the supplied event id and empty reason. + /// + /// The integer event id + public ExecutionRewoundEvent(int eventId) : base(eventId) { } + + /// + /// Creates a new ExecutionRewoundEvent with the supplied event id and reason. + /// + /// The integer event id + /// The reason for the rewind event + public ExecutionRewoundEvent(int eventId, string? reason) + : base(eventId) + { + this.Reason = reason; + } + + /// + /// Gets the event type + /// + public override EventType EventType => EventType.ExecutionRewound; + + /// + /// Gets or sets the reason for the rewind event. + /// + [DataMember] + public string? Reason { get; set; } + + /// + /// Gets or sets the parent execution id of the rewound suborchestration. + /// + [DataMember] + public string? ParentExecutionId { get; set; } + + /// + /// Gets or sets the instance ID of the rewound orchestration. + /// + [DataMember] + public string? InstanceId { get; set; } + + /// + /// Gets or sets the parent trace context of the rewound suborchestration. + /// + [DataMember] + public DistributedTraceContext? ParentTraceContext { get; set; } + } +} \ No newline at end of file diff --git a/src/DurableTask.Core/History/ExecutionStartedEvent.cs b/src/DurableTask.Core/History/ExecutionStartedEvent.cs index 6ae593359..59c6b8202 100644 --- a/src/DurableTask.Core/History/ExecutionStartedEvent.cs +++ b/src/DurableTask.Core/History/ExecutionStartedEvent.cs @@ -48,6 +48,31 @@ internal ExecutionStartedEvent() { } + /// + /// Creates a new ExecutionStartedEvent with the same fields as . + /// A deep copy is performed on all non-base class fields. + /// + internal ExecutionStartedEvent(ExecutionStartedEvent other) + { + // Copy base class fields + EventId = other.EventId; + Timestamp = other.Timestamp; + ExtensionData = other.ExtensionData; + IsPlayed = other.IsPlayed; + + // Deep copy all other fields + OrchestrationInstance = other.OrchestrationInstance?.Clone(); + ParentInstance = other.ParentInstance?.Clone(); + ParentTraceContext = other.ParentTraceContext?.Clone(); + Input = other.Input; + Name = other.Name; + Version = other.Version; + Tags = other.Tags != null ? new Dictionary(other.Tags) : null; + Correlation = other.Correlation; + ScheduledStartTime = other.ScheduledStartTime; + Generation = other.Generation; + } + /// /// Gets the event type /// diff --git a/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs b/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs index e611eb99e..646070933 100644 --- a/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs +++ b/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs @@ -30,6 +30,25 @@ public SubOrchestrationInstanceCreatedEvent(int eventId) { } + /// + /// Creates a new ExecutionStartedEvent with the same fields as . + /// + internal SubOrchestrationInstanceCreatedEvent(SubOrchestrationInstanceCreatedEvent other) + { + // Copy base class fields + EventId = other.EventId; + Timestamp = other.Timestamp; + ExtensionData = other.ExtensionData; + IsPlayed = other.IsPlayed; + + // Copy all other fields + Name = other.Name; + Version = other.Version; + InstanceId = other.InstanceId; + Input = other.Input; + ClientSpanId = other.ClientSpanId; + } + /// /// Gets the event type /// diff --git a/src/DurableTask.Core/OrchestrationStatus.cs b/src/DurableTask.Core/OrchestrationStatus.cs index 098aecec1..1e26747d8 100644 --- a/src/DurableTask.Core/OrchestrationStatus.cs +++ b/src/DurableTask.Core/OrchestrationStatus.cs @@ -56,6 +56,6 @@ public enum OrchestrationStatus /// /// Orchestration state of suspended /// - Suspended, + Suspended } } \ No newline at end of file diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index e11330367..b81cbae50 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -320,6 +320,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work var isCompleted = false; var continuedAsNew = false; var isInterrupted = false; + var isRewinding = false; // correlation CorrelationTraceClient.Propagate(() => CorrelationTraceContext.Current = workItem.TraceContext); @@ -342,6 +343,21 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work ExecutionStartedEvent startEvent = runtimeState.ExecutionStartedEvent ?? workItem.NewMessages.Select(msg => msg.Event).OfType().FirstOrDefault(); + ExecutionRewoundEvent rewindEvent = + workItem.NewMessages.Select(msg => msg.Event).OfType().LastOrDefault(); + + if (rewindEvent is not null && runtimeState.OrchestrationStatus != OrchestrationStatus.Running) + { + isRewinding = true; + if (rewindEvent.ParentTraceContext != null) + { + startEvent.ParentTraceContext = rewindEvent.ParentTraceContext; + } + // We set these to null here so that a new Activity is created to represent the execution of the rewound orchestration. + startEvent.ParentTraceContext.SpanId = null; + startEvent.ParentTraceContext.Id = null; + startEvent.ParentTraceContext.ActivityStartTime = null; + } Activity? traceActivity = TraceHelper.StartTraceActivityForOrchestrationExecution(startEvent); OrchestrationState? instanceState = null; @@ -427,15 +443,25 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work if (!versioningFailed) { - if (workItem.Cursor == null) + // In this case we skip the orchestration's execution since all tasks have been completed and it is in a terminal state. + // Instead we "rewind" its execution by removing all failed tasks (see ProcessRewindOrchestrationDecision). + // Upon receiving the next work item for the rewound orchestration, the failed tasks will be re-executed. + if (isRewinding) { - workItem.Cursor = await this.ExecuteOrchestrationAsync(runtimeState, workItem); + decisions = new List { new RewindOrchestrationAction() }; } else { - await this.ResumeOrchestrationAsync(workItem); + if (workItem.Cursor == null) + { + workItem.Cursor = await this.ExecuteOrchestrationAsync(runtimeState, workItem); + } + else + { + await this.ResumeOrchestrationAsync(workItem); + } + decisions = workItem.Cursor.LatestDecisions.ToList(); } - decisions = workItem.Cursor.LatestDecisions.ToList(); } this.logHelper.OrchestrationExecuted( @@ -519,6 +545,19 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work isCompleted = !continuedAsNew; break; + case OrchestratorActionType.RewindOrchestration: + this.ProcessRewindOrchestrationDecision( + runtimeState, + out List subOrchestrationRewindMessages, + out OrchestrationRuntimeState newRuntimeState); + orchestratorMessages.AddRange(subOrchestrationRewindMessages); + workItem.OrchestrationRuntimeState = newRuntimeState; + runtimeState = newRuntimeState; + // Setting this to true here will end an extended session if it is in progress. + // We don't want to save the state across executions, since we essentially manually modify + // the orchestration history here and so that stored by the extended session is incorrect. + isRewinding = true; + break; default: throw TraceHelper.TraceExceptionInstance( TraceEventType.Error, @@ -679,7 +718,7 @@ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync( workItem.OrchestrationRuntimeState = runtimeState; } - return isCompleted || continuedAsNew || isInterrupted; + return isCompleted || continuedAsNew || isInterrupted || isRewinding; } static OrchestrationExecutionContext GetOrchestrationExecutionContext(OrchestrationRuntimeState runtimeState) @@ -855,6 +894,22 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt return false; } + if (message.Event.EventType == EventType.ExecutionRewound + && workItem.OrchestrationRuntimeState.OrchestrationStatus != OrchestrationStatus.Running + && workItem.NewMessages.Count > 1) + { + foreach (TaskMessage droppedMessage in workItem.NewMessages) + { + if (droppedMessage.Event.EventType != EventType.ExecutionRewound) + { + logHelper.DroppingOrchestrationMessage(workItem, droppedMessage, "Multiple messages sent to an instance " + + "that is attempting to rewind from a terminal state. The only message that can be sent in " + + "this case is the rewind request."); + } + } + return false; + } + logHelper.ProcessingOrchestrationMessage(workItem, message); TraceHelper.TraceInstance( TraceEventType.Information, @@ -935,7 +990,12 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt TraceHelper.EmitTraceActivityForTaskFailed(workItem.OrchestrationRuntimeState.OrchestrationInstance, taskScheduledEvent, taskFailedEvent, errorPropagationMode); } - workItem.OrchestrationRuntimeState.AddEvent(message.Event); + // In this case, the ExecutionRewoundEvent has already been added to the history and is just sent as a way to trigger the failed deepest suborchestrations to rerun. + // We do not redundantly add it to the history in this situation. + if (!(message.Event is ExecutionRewoundEvent executionRewoundEvent && workItem.OrchestrationRuntimeState.OrchestrationStatus == OrchestrationStatus.Running)) + { + workItem.OrchestrationRuntimeState.AddEvent(message.Event); + } } return true; @@ -1165,7 +1225,7 @@ TaskMessage ProcessCreateSubOrchestrationInstanceDecision( { Name = createSubOrchestrationAction.Name, Version = createSubOrchestrationAction.Version, - InstanceId = createSubOrchestrationAction.InstanceId + InstanceId = createSubOrchestrationAction.InstanceId, }; if (includeParameters) { @@ -1261,6 +1321,124 @@ TaskMessage ProcessSendEventDecision( }; } + void ProcessRewindOrchestrationDecision( + OrchestrationRuntimeState runtimeState, + out List subOrchestrationRewindMessages, + out OrchestrationRuntimeState newRuntimeState) + { + HashSet failedTaskIds = new(); + subOrchestrationRewindMessages = new(); + + newRuntimeState = new() + { + Status = runtimeState.Status + }; + + // Determine the task IDs of the failed tasks and suborchestrations + foreach (var evt in runtimeState.Events) + { + if (evt is TaskFailedEvent taskFailedEvent) + { + failedTaskIds.Add(taskFailedEvent.TaskScheduledId); + } + else if (evt is SubOrchestrationInstanceFailedEvent subOrchestrationInstanceFailedEvent) + { + failedTaskIds.Add(subOrchestrationInstanceFailedEvent.TaskScheduledId); + } + } + + ExecutionRewoundEvent executionRewoundEvent = (runtimeState.NewEvents.Last(e => e is ExecutionRewoundEvent) as ExecutionRewoundEvent)!; + string newExecutionId = Guid.NewGuid().ToString("N"); + + // Copy the existing history, removing the failed task/suborchestration events and generating rewind events for each of the failed suborchestrations. + foreach (var evt in runtimeState.Events) + { + // Do not add the TaskScheduledEvents for the failed tasks so that they get rescheduled, and do not add any of + // the failed task/suborchestration/execution events to the new history. + if (!(evt is TaskScheduledEvent taskScheduledEvent && failedTaskIds.Contains(taskScheduledEvent.EventId)) + && evt is not TaskFailedEvent + && evt is not SubOrchestrationInstanceFailedEvent + && evt is not ExecutionCompletedEvent) + { + HistoryEvent eventToAdd = evt; + + if (evt is ExecutionStartedEvent executionStartedEvent) + { + // Copy all information from the old ExecutionStartedEvent except for the ExecutionId, since we create a new one + var newExecutionStartedEvent = new ExecutionStartedEvent(executionStartedEvent); + newExecutionStartedEvent.OrchestrationInstance.ExecutionId = newExecutionId; + + // If this is a suborchestration, we also need to update the ParentInstance's ExecutionId to match the new ExecutionId of the rewinding parent orchestration + if (!string.IsNullOrEmpty(executionRewoundEvent.ParentExecutionId)) + { + newExecutionStartedEvent.ParentInstance.OrchestrationInstance.ExecutionId = executionRewoundEvent.ParentExecutionId; + } + eventToAdd = newExecutionStartedEvent; + } + + // For each of the failed suborchestrations, generate a rewind event + else if (evt is SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreatedEvent + && failedTaskIds.Contains(subOrchestrationInstanceCreatedEvent.EventId)) + { + var childExecutionRewoundEvent = new ExecutionRewoundEvent(-1, executionRewoundEvent!.Reason) + { + ParentExecutionId = newExecutionId, + InstanceId = subOrchestrationInstanceCreatedEvent.InstanceId + }; + + if (runtimeState.ExecutionStartedEvent.TryGetParentTraceContext(out ActivityContext parentTraceContext)) + { + // We set a new client span ID here so that the execution of the rewound suborchestration is not tied to the + // old parent. + var newClientSpanId = ActivitySpanId.CreateRandom(); + var newSubOrchestrationInstanceCreatedEvent = new SubOrchestrationInstanceCreatedEvent(subOrchestrationInstanceCreatedEvent) + { + ClientSpanId = newClientSpanId.ToString() + }; + eventToAdd = newSubOrchestrationInstanceCreatedEvent; + + ActivityContext childActivityContext = new( + parentTraceContext.TraceId, + newClientSpanId, + parentTraceContext.TraceFlags, + parentTraceContext.TraceState); + childExecutionRewoundEvent.SetParentTraceContext(childActivityContext); + } + + subOrchestrationRewindMessages.Add + ( + new TaskMessage + { + Event = childExecutionRewoundEvent, + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = subOrchestrationInstanceCreatedEvent.InstanceId + }, + } + ); + } + + // Finally, add the event to the new history + newRuntimeState.AddEvent(eventToAdd); + } + } + + // If this is a "terminal leaf" with no suborchestrations, we need to add an outbound message to it to force it to rerun. + // This will trigger the orchestration to rerun with the altered history, so it will only rerun the failed tasks. + // Once it finishes, it will send a completion message to its parent orchestration, which will trigger the parents to rerun as well. + if (subOrchestrationRewindMessages.Count == 0) + { + subOrchestrationRewindMessages.Add( + new TaskMessage + { + // This is a "dummy event" that will not be added to the history and is used just to trigger the rerun. + Event = new ExecutionRewoundEvent(-1, string.Empty), + OrchestrationInstance = newRuntimeState.OrchestrationInstance, + } + ); + } + } + internal class NonBlockingCountdownLock { int available; diff --git a/src/DurableTask.Core/Tracing/DistributedTraceContext.cs b/src/DurableTask.Core/Tracing/DistributedTraceContext.cs index b69b1f5f6..612caa386 100644 --- a/src/DurableTask.Core/Tracing/DistributedTraceContext.cs +++ b/src/DurableTask.Core/Tracing/DistributedTraceContext.cs @@ -80,5 +80,15 @@ public string? TraceState /// [DataMember] public DateTimeOffset? ActivityStartTime { get; set; } + + internal DistributedTraceContext Clone() + { + return new DistributedTraceContext(this.TraceParent, this.TraceState) + { + Id = Id, + SpanId = SpanId, + ActivityStartTime = ActivityStartTime + }; + } } }