diff --git a/src/Client/Core/DurableTaskClient.cs b/src/Client/Core/DurableTaskClient.cs
index 16d6d8426..0d2de3892 100644
--- a/src/Client/Core/DurableTaskClient.cs
+++ b/src/Client/Core/DurableTaskClient.cs
@@ -290,6 +290,32 @@ public virtual Task ResumeInstanceAsync(string instanceId, CancellationToken can
public abstract Task ResumeInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default);
+ ///
+ /// Rewinds the specified orchestration instance to a previous, non-failed state.
+ ///
+ ///
+ ///
+ /// Only orchestrations in a failed state can be rewound. Attempting to rewind an orchestration in a non-failed
+ /// state may result in either a no-op or a failure depending on the backend implementation.
+ ///
+ /// Rewind works by rewriting an orchestration's history to remove the most recent failure records, and then
+ /// re-executing the orchestration with the modified history. This effectively "rewinds" the orchestration to a
+ /// previous "good" state, allowing it to re-execute the logic that caused the original failure.
+ ///
+ /// Rewinding an orchestration is intended to be used in cases where a failure is caused by a transient issue that
+ /// has since been resolved. It is not intended to be used as a general-purpose retry mechanism.
+ ///
+ ///
+ /// The instance ID of the orchestration to rewind.
+ /// The optional rewind reason, which is recorded in the orchestration history.
+ ///
+ /// A that can be used to cancel the rewind API call. Note that cancelling this
+ /// token does not cancel the rewind operation once it has been successfully enqueued.
+ ///
+ /// A task that completes when the rewind operation was been successfully.
+ public abstract Task RewindInstanceAsync(
+ string instanceId, string? reason = null, CancellationToken cancellation = default);
+
///
public virtual Task GetInstanceAsync(
string instanceId, CancellationToken cancellation)
diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs
index f0d6e6e8b..ca592dbbb 100644
--- a/src/Client/Grpc/GrpcDurableTaskClient.cs
+++ b/src/Client/Grpc/GrpcDurableTaskClient.cs
@@ -191,6 +191,28 @@ public override async Task ResumeInstanceAsync(
}
}
+ ///
+ public override async Task RewindInstanceAsync(
+ string instanceId, string? reason = null, CancellationToken cancellation = default)
+ {
+ if (string.IsNullOrEmpty(instanceId))
+ {
+ throw new ArgumentNullException(nameof(instanceId));
+ }
+
+ try
+ {
+ await this.sidecarClient.RewindInstanceAsync(
+ new P.RewindInstanceRequest { InstanceId = instanceId, Reason = reason },
+ cancellationToken: cancellation);
+ }
+ catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
+ {
+ throw new OperationCanceledException(
+ $"The {nameof(this.RewindInstanceAsync)} operation was canceled.", e, cancellation);
+ }
+ }
+
///
public override async Task GetInstancesAsync(
string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs
index 3f3c52116..6ad0db528 100644
--- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs
+++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs
@@ -184,6 +184,16 @@ public override Task TerminateInstanceAsync(
return this.Client.ForceTerminateTaskOrchestrationAsync(instanceId, reason);
}
+ ///
+ public override Task RewindInstanceAsync(
+ string instanceId, string? reason = null, CancellationToken cancellation = default)
+ {
+ // At the time of writing, there is no IOrchestrationXXXClient interface that supports rewind.
+ // Rather, it's only supported by specific backend implementations like Azure Storage. Once an interface
+ // with rewind is added to DurableTask.Core, we can add support for it here.
+ throw new NotSupportedException("Rewind is not supported by the current client.");
+ }
+
///
public override async Task WaitForInstanceCompletionAsync(
string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
@@ -219,7 +229,7 @@ public override async Task WaitForInstanceStartAsync(
}
}
- [return: NotNullIfNotNull("state")]
+ [return: NotNullIfNotNull(nameof(state))]
OrchestrationMetadata? ToMetadata(Core.OrchestrationState? state, bool getInputsAndOutputs)
{
if (state is null)
diff --git a/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs b/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs
index c8fb31181..373c0e19d 100644
--- a/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs
+++ b/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs
@@ -2,6 +2,7 @@
// Licensed under the MIT License.
using System.Runtime.Serialization;
+using DurableTask.Core;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Worker;
using Xunit.Abstractions;
@@ -589,6 +590,273 @@ static void ValidateInnermostFailureDetailsChain(TaskFailureDetails? failureDeta
ValidateInnermostFailureDetailsChain(metadata.FailureDetails.InnerFailure.InnerFailure);
}
+ ///
+ /// Tests the behavior of a failed orchestration when it is rewound and re-executed.
+ ///
+ [Fact]
+ public async Task RewindSingleFailedActivity()
+ {
+ bool isBusted = true;
+
+ TaskName orchestratorName = "BustedOrchestration";
+ TaskName activityName = "BustedActivity";
+
+ await using HostTestLifetime server = await this.StartWorkerAsync(b =>
+ {
+ b.AddTasks(tasks =>
+ tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
+ {
+ await ctx.CallActivityAsync(activityName);
+ })
+ .AddActivityFunc(activityName, (TaskActivityContext context) =>
+ {
+ if (isBusted)
+ {
+ throw new Exception("Kah-BOOOOOM!!!");
+ }
+ }));
+ });
+
+ DurableTaskClient client = server.Client;
+
+ // Start the orchestration and wait for it to fail.
+ string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
+ OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
+ Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);
+
+ // Simulate "fixing" the original problem by setting the flag to false.
+ isBusted = false;
+
+ // Rewind the orchestration to put it back into a running state. It should complete successfully this time.
+ await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
+ metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
+ Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
+ }
+
+ ///
+ /// Tests the behavior of a failed orchestration when it is rewound and re-executed multiple times.
+ ///
+ [Fact]
+ public async Task RewindMultipleFailedActivities_Serial()
+ {
+ bool isBusted1 = true;
+ bool isBusted2 = true;
+
+ TaskName orchestratorName = "BustedOrchestration";
+ TaskName activityName1 = "BustedActivity1";
+ TaskName activityName2 = "BustedActivity2";
+
+ int activity1CompletionCount = 0;
+ int activity2CompletionCount = 0;
+
+ await using HostTestLifetime server = await this.StartWorkerAsync(b =>
+ {
+ b.AddTasks(tasks =>
+ tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
+ {
+ // Take the result of the first activity and pass it to the second activity
+ int result = await ctx.CallActivityAsync(activityName1);
+ return await ctx.CallActivityAsync(activityName2, input: result);
+ })
+ .AddActivityFunc(activityName1, (TaskActivityContext context) =>
+ {
+ if (isBusted1)
+ {
+ throw new Exception("Failure1");
+ }
+
+ activity1CompletionCount++;
+ return 1;
+ })
+ .AddActivityFunc(activityName2, (TaskActivityContext context, int input) =>
+ {
+ if (isBusted2)
+ {
+ throw new Exception("Failure2");
+ }
+
+ activity2CompletionCount++;
+ return input + 1;
+ }));
+ });
+
+ DurableTaskClient client = server.Client;
+
+ // Start the orchestration and wait for it to fail with an ApplicationException.
+ string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
+ OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
+ Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);
+ Assert.Equal("Failure1", metadata.FailureDetails?.ErrorMessage);
+
+ // Simulate "fixing" just the first problem by setting the first flag to false.
+ isBusted1 = false;
+
+ // Rewind the orchestration. It should fail again, but this time with a different error message.
+ await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
+ metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
+ Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);
+ Assert.Equal("Failure2", metadata.FailureDetails?.ErrorMessage);
+
+ // Simulate "fixing" the second problem by setting the second flag to false.
+ isBusted2 = false;
+
+ // Rewind the orchestration again to put it back into a running state. It should now complete successfully.
+ await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
+ metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
+ Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
+ Assert.Equal(2, metadata.ReadOutputAs());
+
+ // Confirm that each activity completed exactly once (i.e. successful activity calls aren't rewound).
+ Assert.Equal(1, activity1CompletionCount);
+ Assert.Equal(1, activity2CompletionCount);
+ }
+
+ ///
+ /// Tests the behavior of a failed orchestration when multiple failures occur as part of a fan-out/fan-in, and is
+ /// subsequently rewound and re-executed.
+ ///
+ [Fact]
+ public async Task RewindMultipleFailedActivities_Parallel()
+ {
+ bool isBusted = true;
+
+ TaskName orchestratorName = "BustedOrchestration";
+ TaskName activityName = "BustedActivity";
+
+ await using HostTestLifetime server = await this.StartWorkerAsync(b =>
+ {
+ b.AddTasks(tasks =>
+ tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
+ {
+ // Run the activity function multiple times in parallel
+ IList tasks = Enumerable.Range(0, 10)
+ .Select(i => ctx.CallActivityAsync(activityName))
+ .ToList();
+
+ // Wait for all the activity functions to complete
+ await Task.WhenAll(tasks);
+ return "Done";
+ })
+ .AddActivityFunc(activityName, (TaskActivityContext context) =>
+ {
+ if (isBusted)
+ {
+ throw new Exception("Kah-BOOOOOM!!!");
+ }
+ }));
+ });
+
+ DurableTaskClient client = server.Client;
+
+ // Start the orchestration and wait for it to fail.
+ string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
+ OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
+ Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);
+
+ // Simulate "fixing" the original problem by setting the flag to false.
+ isBusted = false;
+
+ // Rewind the orchestration to put it back into a running state. It should complete successfully this time.
+ await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
+ metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
+ Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
+ Assert.Equal("Done", metadata.ReadOutputAs());
+ }
+
+ ///
+ /// Tests rewinding an orchestration that failed due to a failed sub-orchestration. The sub-orchestration is fixed
+ /// and the parent orchestration is rewound to allow the entire chain to complete successfully.
+ ///
+ [Fact]
+ public async Task RewindFailedSubOrchestration()
+ {
+ bool isBusted = true;
+
+ TaskName orchestratorName = "BustedOrchestrator";
+ TaskName subOrchestratorName = "BustedSubOrchestrator";
+
+ await using HostTestLifetime server = await this.StartWorkerAsync(b =>
+ {
+ b.AddTasks(tasks => tasks
+ .AddOrchestratorFunc(orchestratorName, async ctx =>
+ {
+ await ctx.CallSubOrchestratorAsync(subOrchestratorName);
+ })
+ .AddOrchestratorFunc(subOrchestratorName, ctx =>
+ {
+ if (isBusted)
+ {
+ throw new Exception("Kah-BOOOOOM!!!");
+ }
+ }));
+ });
+
+ DurableTaskClient client = server.Client;
+
+ // Start the orchestration and wait for it to fail.
+ string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
+ OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
+ Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);
+
+ // Simulate "fixing" the original problem by setting the flag to false.
+ isBusted = false;
+
+ // Rewind the orchestration to put it back into a running state. It should complete successfully this time.
+ await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
+ metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
+ Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
+ }
+
+ ///
+ /// Tests rewinding an orchestration that failed due to a failed sub-orchestration, which itself failed due to an
+ /// activity. The entire orchestration chain is expected to fail, and rewinding the parent orchestration should
+ /// allow the entire chain to complete successfully.
+ ///
+ [Fact]
+ public async Task RewindFailedSubOrchestrationWithActivity()
+ {
+ bool isBusted = true;
+
+ TaskName orchestratorName = "BustedOrchestrator";
+ TaskName subOrchestratorName = "BustedSubOrchestrator";
+ TaskName activityName = "BustedActivity";
+
+ await using HostTestLifetime server = await this.StartWorkerAsync(b =>
+ {
+ b.AddTasks(tasks => tasks
+ .AddOrchestratorFunc(orchestratorName, async ctx =>
+ {
+ await ctx.CallSubOrchestratorAsync(subOrchestratorName);
+ })
+ .AddOrchestratorFunc(subOrchestratorName, async ctx =>
+ {
+ await ctx.CallActivityAsync(activityName);
+ })
+ .AddActivityFunc(activityName, (TaskActivityContext _) =>
+ {
+ if (isBusted)
+ {
+ throw new Exception("Kah-BOOOOOM!!!");
+ }
+ }));
+ });
+
+ DurableTaskClient client = server.Client;
+
+ // Start the orchestration and wait for it to fail.
+ string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
+ OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
+ Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);
+
+ // Simulate "fixing" the original problem by setting the flag to false.
+ isBusted = false;
+
+ // Rewind the orchestration to put it back into a running state. It should complete successfully this time.
+ await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
+ metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
+ Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
+ }
+
static Exception MakeException(Type exceptionType, string message)
{
// We assume the contructor of the exception type takes a single string argument