From 80e1322be518551bf7f6ff0e209fada007e7a4f6 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Sat, 2 Nov 2024 10:13:40 -0700 Subject: [PATCH] Initial commit: API surface area and non-passing tests --- src/Client/Core/DurableTaskClient.cs | 26 ++ src/Client/Grpc/GrpcDurableTaskClient.cs | 22 ++ .../ShimDurableTaskClient.cs | 12 +- .../OrchestrationErrorHandling.cs | 268 ++++++++++++++++++ 4 files changed, 327 insertions(+), 1 deletion(-) diff --git a/src/Client/Core/DurableTaskClient.cs b/src/Client/Core/DurableTaskClient.cs index 16d6d8426..0d2de3892 100644 --- a/src/Client/Core/DurableTaskClient.cs +++ b/src/Client/Core/DurableTaskClient.cs @@ -290,6 +290,32 @@ public virtual Task ResumeInstanceAsync(string instanceId, CancellationToken can public abstract Task ResumeInstanceAsync( string instanceId, string? reason = null, CancellationToken cancellation = default); + /// + /// Rewinds the specified orchestration instance to a previous, non-failed state. + /// + /// + /// + /// Only orchestrations in a failed state can be rewound. Attempting to rewind an orchestration in a non-failed + /// state may result in either a no-op or a failure depending on the backend implementation. + /// + /// Rewind works by rewriting an orchestration's history to remove the most recent failure records, and then + /// re-executing the orchestration with the modified history. This effectively "rewinds" the orchestration to a + /// previous "good" state, allowing it to re-execute the logic that caused the original failure. + /// + /// Rewinding an orchestration is intended to be used in cases where a failure is caused by a transient issue that + /// has since been resolved. It is not intended to be used as a general-purpose retry mechanism. + /// + /// + /// The instance ID of the orchestration to rewind. + /// The optional rewind reason, which is recorded in the orchestration history. + /// + /// A that can be used to cancel the rewind API call. Note that cancelling this + /// token does not cancel the rewind operation once it has been successfully enqueued. + /// + /// A task that completes when the rewind operation was been successfully. + public abstract Task RewindInstanceAsync( + string instanceId, string? reason = null, CancellationToken cancellation = default); + /// public virtual Task GetInstanceAsync( string instanceId, CancellationToken cancellation) diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index f0d6e6e8b..ca592dbbb 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -191,6 +191,28 @@ public override async Task ResumeInstanceAsync( } } + /// + public override async Task RewindInstanceAsync( + string instanceId, string? reason = null, CancellationToken cancellation = default) + { + if (string.IsNullOrEmpty(instanceId)) + { + throw new ArgumentNullException(nameof(instanceId)); + } + + try + { + await this.sidecarClient.RewindInstanceAsync( + new P.RewindInstanceRequest { InstanceId = instanceId, Reason = reason }, + cancellationToken: cancellation); + } + catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled) + { + throw new OperationCanceledException( + $"The {nameof(this.RewindInstanceAsync)} operation was canceled.", e, cancellation); + } + } + /// public override async Task GetInstancesAsync( string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default) diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index 3f3c52116..6ad0db528 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -184,6 +184,16 @@ public override Task TerminateInstanceAsync( return this.Client.ForceTerminateTaskOrchestrationAsync(instanceId, reason); } + /// + public override Task RewindInstanceAsync( + string instanceId, string? reason = null, CancellationToken cancellation = default) + { + // At the time of writing, there is no IOrchestrationXXXClient interface that supports rewind. + // Rather, it's only supported by specific backend implementations like Azure Storage. Once an interface + // with rewind is added to DurableTask.Core, we can add support for it here. + throw new NotSupportedException("Rewind is not supported by the current client."); + } + /// public override async Task WaitForInstanceCompletionAsync( string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default) @@ -219,7 +229,7 @@ public override async Task WaitForInstanceStartAsync( } } - [return: NotNullIfNotNull("state")] + [return: NotNullIfNotNull(nameof(state))] OrchestrationMetadata? ToMetadata(Core.OrchestrationState? state, bool getInputsAndOutputs) { if (state is null) diff --git a/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs b/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs index c8fb31181..373c0e19d 100644 --- a/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs +++ b/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using System.Runtime.Serialization; +using DurableTask.Core; using Microsoft.DurableTask.Client; using Microsoft.DurableTask.Worker; using Xunit.Abstractions; @@ -589,6 +590,273 @@ static void ValidateInnermostFailureDetailsChain(TaskFailureDetails? failureDeta ValidateInnermostFailureDetailsChain(metadata.FailureDetails.InnerFailure.InnerFailure); } + /// + /// Tests the behavior of a failed orchestration when it is rewound and re-executed. + /// + [Fact] + public async Task RewindSingleFailedActivity() + { + bool isBusted = true; + + TaskName orchestratorName = "BustedOrchestration"; + TaskName activityName = "BustedActivity"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => + tasks.AddOrchestratorFunc(orchestratorName, async ctx => + { + await ctx.CallActivityAsync(activityName); + }) + .AddActivityFunc(activityName, (TaskActivityContext context) => + { + if (isBusted) + { + throw new Exception("Kah-BOOOOOM!!!"); + } + })); + }); + + DurableTaskClient client = server.Client; + + // Start the orchestration and wait for it to fail. + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + + // Simulate "fixing" the original problem by setting the flag to false. + isBusted = false; + + // Rewind the orchestration to put it back into a running state. It should complete successfully this time. + await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration"); + metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + } + + /// + /// Tests the behavior of a failed orchestration when it is rewound and re-executed multiple times. + /// + [Fact] + public async Task RewindMultipleFailedActivities_Serial() + { + bool isBusted1 = true; + bool isBusted2 = true; + + TaskName orchestratorName = "BustedOrchestration"; + TaskName activityName1 = "BustedActivity1"; + TaskName activityName2 = "BustedActivity2"; + + int activity1CompletionCount = 0; + int activity2CompletionCount = 0; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => + tasks.AddOrchestratorFunc(orchestratorName, async ctx => + { + // Take the result of the first activity and pass it to the second activity + int result = await ctx.CallActivityAsync(activityName1); + return await ctx.CallActivityAsync(activityName2, input: result); + }) + .AddActivityFunc(activityName1, (TaskActivityContext context) => + { + if (isBusted1) + { + throw new Exception("Failure1"); + } + + activity1CompletionCount++; + return 1; + }) + .AddActivityFunc(activityName2, (TaskActivityContext context, int input) => + { + if (isBusted2) + { + throw new Exception("Failure2"); + } + + activity2CompletionCount++; + return input + 1; + })); + }); + + DurableTaskClient client = server.Client; + + // Start the orchestration and wait for it to fail with an ApplicationException. + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + Assert.Equal("Failure1", metadata.FailureDetails?.ErrorMessage); + + // Simulate "fixing" just the first problem by setting the first flag to false. + isBusted1 = false; + + // Rewind the orchestration. It should fail again, but this time with a different error message. + await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration"); + metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + Assert.Equal("Failure2", metadata.FailureDetails?.ErrorMessage); + + // Simulate "fixing" the second problem by setting the second flag to false. + isBusted2 = false; + + // Rewind the orchestration again to put it back into a running state. It should now complete successfully. + await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration"); + metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(2, metadata.ReadOutputAs()); + + // Confirm that each activity completed exactly once (i.e. successful activity calls aren't rewound). + Assert.Equal(1, activity1CompletionCount); + Assert.Equal(1, activity2CompletionCount); + } + + /// + /// Tests the behavior of a failed orchestration when multiple failures occur as part of a fan-out/fan-in, and is + /// subsequently rewound and re-executed. + /// + [Fact] + public async Task RewindMultipleFailedActivities_Parallel() + { + bool isBusted = true; + + TaskName orchestratorName = "BustedOrchestration"; + TaskName activityName = "BustedActivity"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => + tasks.AddOrchestratorFunc(orchestratorName, async ctx => + { + // Run the activity function multiple times in parallel + IList tasks = Enumerable.Range(0, 10) + .Select(i => ctx.CallActivityAsync(activityName)) + .ToList(); + + // Wait for all the activity functions to complete + await Task.WhenAll(tasks); + return "Done"; + }) + .AddActivityFunc(activityName, (TaskActivityContext context) => + { + if (isBusted) + { + throw new Exception("Kah-BOOOOOM!!!"); + } + })); + }); + + DurableTaskClient client = server.Client; + + // Start the orchestration and wait for it to fail. + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + + // Simulate "fixing" the original problem by setting the flag to false. + isBusted = false; + + // Rewind the orchestration to put it back into a running state. It should complete successfully this time. + await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration"); + metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("Done", metadata.ReadOutputAs()); + } + + /// + /// Tests rewinding an orchestration that failed due to a failed sub-orchestration. The sub-orchestration is fixed + /// and the parent orchestration is rewound to allow the entire chain to complete successfully. + /// + [Fact] + public async Task RewindFailedSubOrchestration() + { + bool isBusted = true; + + TaskName orchestratorName = "BustedOrchestrator"; + TaskName subOrchestratorName = "BustedSubOrchestrator"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + await ctx.CallSubOrchestratorAsync(subOrchestratorName); + }) + .AddOrchestratorFunc(subOrchestratorName, ctx => + { + if (isBusted) + { + throw new Exception("Kah-BOOOOOM!!!"); + } + })); + }); + + DurableTaskClient client = server.Client; + + // Start the orchestration and wait for it to fail. + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + + // Simulate "fixing" the original problem by setting the flag to false. + isBusted = false; + + // Rewind the orchestration to put it back into a running state. It should complete successfully this time. + await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration"); + metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + } + + /// + /// Tests rewinding an orchestration that failed due to a failed sub-orchestration, which itself failed due to an + /// activity. The entire orchestration chain is expected to fail, and rewinding the parent orchestration should + /// allow the entire chain to complete successfully. + /// + [Fact] + public async Task RewindFailedSubOrchestrationWithActivity() + { + bool isBusted = true; + + TaskName orchestratorName = "BustedOrchestrator"; + TaskName subOrchestratorName = "BustedSubOrchestrator"; + TaskName activityName = "BustedActivity"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + await ctx.CallSubOrchestratorAsync(subOrchestratorName); + }) + .AddOrchestratorFunc(subOrchestratorName, async ctx => + { + await ctx.CallActivityAsync(activityName); + }) + .AddActivityFunc(activityName, (TaskActivityContext _) => + { + if (isBusted) + { + throw new Exception("Kah-BOOOOOM!!!"); + } + })); + }); + + DurableTaskClient client = server.Client; + + // Start the orchestration and wait for it to fail. + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + + // Simulate "fixing" the original problem by setting the flag to false. + isBusted = false; + + // Rewind the orchestration to put it back into a running state. It should complete successfully this time. + await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration"); + metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + } + static Exception MakeException(Type exceptionType, string message) { // We assume the contructor of the exception type takes a single string argument