From 7e8221663627a1d35c71755b074b3e16f8bc5d45 Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Mon, 4 Aug 2025 21:01:48 -0700 Subject: [PATCH 01/11] initial commit --- src/Client/Core/DurableTaskClient.cs | 40 +++++++++++ src/Client/Grpc/GrpcDurableTaskClient.cs | 32 +++++++++ .../ShimDurableTaskClient.cs | 45 +++++++++++++ src/Grpc/orchestrator_service.proto | 12 ++++ .../DefaultDurableTaskClientBuilderTests.cs | 8 +++ ...DurableTaskClientBuilderExtensionsTests.cs | 8 +++ .../ShimDurableTaskClientTests.cs | 65 ++++++++++++++++++ .../GrpcDurableTaskClientIntegrationTests.cs | 66 +++++++++++++++++++ 8 files changed, 276 insertions(+) diff --git a/src/Client/Core/DurableTaskClient.cs b/src/Client/Core/DurableTaskClient.cs index 16d6d8426..bf1394ae9 100644 --- a/src/Client/Core/DurableTaskClient.cs +++ b/src/Client/Core/DurableTaskClient.cs @@ -399,6 +399,46 @@ public virtual Task PurgeAllInstancesAsync( throw new NotSupportedException($"{this.GetType()} does not support purging of orchestration instances."); } + /// + /// Restarts an orchestration instance with the same or a new instance ID. + /// + /// + /// + /// This method restarts an existing orchestration instance. If is true, + /// a new instance ID will be generated for the restarted orchestration. If false, the original instance ID will be reused. + /// + /// The restarted orchestration will use the same input data as the original instance. If the original orchestration + /// instance is not found, an will be thrown. + /// + /// Note that this operation is backend-specific and may not be supported by all durable task backends. + /// If the backend does not support restart operations, a will be thrown. + /// + /// + /// The ID of the orchestration instance to restart. + /// + /// If true, a new instance ID will be generated for the restarted orchestration. + /// If false, the original instance ID will be reused. + /// + /// + /// The cancellation token. This only cancels enqueueing the restart request to the backend. + /// Does not abort restarting the orchestration once enqueued. + /// + /// + /// A task that completes when the orchestration instance is successfully restarted. + /// The value of this task is the instance ID of the restarted orchestration instance. + /// + /// + /// Thrown if an orchestration with the specified was not found. + /// + /// + /// Thrown if the backend does not support restart operations. + /// + public virtual Task RestartAsync( + string instanceId, + bool restartWithNewInstanceId = false, + CancellationToken cancellation = default) + => throw new NotSupportedException($"{this.GetType()} does not support orchestration restart."); + // TODO: Create task hub // TODO: Delete task hub diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index e1caea2f5..591f9199d 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -408,6 +408,38 @@ public override Task PurgeAllInstancesAsync( return this.PurgeInstancesCoreAsync(request, cancellation); } + /// + public override async Task RestartAsync( + string instanceId, + bool restartWithNewInstanceId = false, + CancellationToken cancellation = default) + { + Check.NotNullOrEmpty(instanceId); + Check.NotEntity(this.options.EnableEntitySupport, instanceId); + + var request = new P.RestartInstanceRequest + { + InstanceId = instanceId, + RestartWithNewInstanceId = restartWithNewInstanceId, + }; + + try + { + P.RestartInstanceResponse result = await this.sidecarClient.RestartInstanceAsync( + request, cancellationToken: cancellation); + return result.InstanceId; + } + catch (RpcException e) when (e.StatusCode == StatusCode.NotFound) + { + throw new ArgumentException($"An orchestration with the instanceId {instanceId} was not found.", e); + } + catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled) + { + throw new OperationCanceledException( + $"The {nameof(this.RestartAsync)} operation was canceled.", e, cancellation); + } + } + static AsyncDisposable GetCallInvoker(GrpcDurableTaskClientOptions options, out CallInvoker callInvoker) { if (options.Channel is GrpcChannel c) diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index bb77aab21..c72cfc2f6 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -254,6 +254,51 @@ public override async Task WaitForInstanceStartAsync( } } + /// + public override async Task RestartAsync( + string instanceId, + bool restartWithNewInstanceId = false, + CancellationToken cancellation = default) + { + Check.NotNullOrEmpty(instanceId); + cancellation.ThrowIfCancellationRequested(); + + // Get the current orchestration status to retrieve the name and input + OrchestrationMetadata? status = await this.GetInstanceAsync(instanceId, getInputsAndOutputs: true, cancellation); + + if (status == null) + { + throw new ArgumentException($"An orchestration with the instanceId {instanceId} was not found."); + } + + // Create options for the new orchestration + StartOrchestrationOptions options; + if (!restartWithNewInstanceId) + { + options = new StartOrchestrationOptions() + { + InstanceId = instanceId, + }; + } + else + { + options = new StartOrchestrationOptions(); + } + + object? input = null; + if (!string.IsNullOrEmpty(status.SerializedInput)) + { + input = this.DataConverter.Deserialize(status.SerializedInput, typeof(object)); + } + + // Start a new orchestration with the same name and input + return await this.ScheduleNewOrchestrationInstanceAsync( + new TaskName(status.Name), + input, + options, + cancellation); + } + [return: NotNullIfNotNull("state")] OrchestrationMetadata? ToMetadata(Core.OrchestrationState? state, bool getInputsAndOutputs) { diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index b2ca147b5..e1bf3ec09 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -471,6 +471,15 @@ message PurgeInstancesResponse { google.protobuf.BoolValue isComplete = 2; } +message RestartInstanceRequest { + string instanceId = 1; + bool restartWithNewInstanceId = 2; +} + +message RestartInstanceResponse { + string instanceId = 1; +} + message CreateTaskHubRequest { bool recreateIfExists = 1; } @@ -689,6 +698,9 @@ service TaskHubSidecarService { // Resumes a suspended orchestration instance. rpc ResumeInstance(ResumeRequest) returns (ResumeResponse); + // Restarts an orchestration instance with the same or a new instance ID. + rpc RestartInstance(RestartInstanceRequest) returns (RestartInstanceResponse); + // rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse); rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse); diff --git a/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs b/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs index b2b184cad..d5b20a120 100644 --- a/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs +++ b/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs @@ -144,6 +144,14 @@ public override Task WaitForInstanceStartAsync( { throw new NotImplementedException(); } + + public override Task RestartAsync( + string instanceId, + bool restartWithNewInstanceId = false, + CancellationToken cancellation = default) + { + throw new NotImplementedException(); + } } class CustomDataConverter : DataConverter diff --git a/test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs b/test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs index 9af6469ec..f028fa13f 100644 --- a/test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs +++ b/test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs @@ -174,6 +174,14 @@ public override Task WaitForInstanceStartAsync( { throw new NotImplementedException(); } + + public override Task RestartAsync( + string instanceId, + bool restartWithNewInstanceId = false, + CancellationToken cancellation = default) + { + throw new NotImplementedException(); + } } class GoodBuildTargetOptions : DurableTaskClientOptions diff --git a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs index 5eba17ede..978cbf97e 100644 --- a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs +++ b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs @@ -326,6 +326,71 @@ public async Task ScheduleNewOrchestrationInstance_IdProvided_TagsProvided() await this.RunScheduleNewOrchestrationInstanceAsync("test", "input", options); } + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) + { + string originalInstanceId = "test-instance-id"; + string orchestratorName = "TestOrchestrator"; + object input = "test-input"; + string serializedInput = "\"test-input\""; + + // Create a completed orchestration state + Core.OrchestrationState originalState = CreateState(input, "test-output"); + originalState.OrchestrationInstance.InstanceId = originalInstanceId; + originalState.Name = orchestratorName; + originalState.OrchestrationStatus = Core.OrchestrationStatus.Completed; + + // Setup the mock to return the original orchestration state + this.orchestrationClient + .Setup(x => x.GetOrchestrationStateAsync(originalInstanceId, false)) + .ReturnsAsync(new List { originalState }); + + // Setup the mock for the new orchestration creation + this.orchestrationClient + .Setup(x => x.CreateTaskOrchestrationAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + string restartedInstanceId = await this.client.RestartAsync(originalInstanceId, restartWithNewInstanceId); + + if (restartWithNewInstanceId) + { + restartedInstanceId.Should().NotBe(originalInstanceId); + } + else + { + restartedInstanceId.Should().Be(originalInstanceId); + } + + // Verify that CreateTaskOrchestrationAsync was called with the correct parameters + this.orchestrationClient.Verify( + x => x.CreateTaskOrchestrationAsync(It.Is(msg => + msg.Event is ExecutionStartedEvent startedEvent && + startedEvent.Name == orchestratorName && + startedEvent.Input == serializedInput && + (restartWithNewInstanceId ? + msg.OrchestrationInstance.InstanceId != originalInstanceId : + msg.OrchestrationInstance.InstanceId == originalInstanceId))), + Times.Once); + } + + [Fact] + public async Task RestartAsync_InstanceNotFound_ThrowsArgumentException() + { + string nonExistentInstanceId = "non-existent-instance-id"; + + // Setup the mock to client return empty orchestration state (instance not found) + this.orchestrationClient + .Setup(x => x.GetOrchestrationStateAsync(nonExistentInstanceId, false)) + .ReturnsAsync(new List()); + + // RestartAsync should throw an ArgumentException since the instance is not found + Func restartAction = () => this.client.RestartAsync(nonExistentInstanceId); + + await restartAction.Should().ThrowAsync() + .WithMessage($"*An orchestration with the instanceId {nonExistentInstanceId} was not found*"); + } static Core.OrchestrationState CreateState( object input, object? output = null, DateTimeOffset start = default) diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index 50fe6e070..6590a3854 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -224,6 +224,72 @@ public async Task PurgeInstances_WithFilter_EndToEnd() } } + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) + { + await using HostTestLifetime server = await this.StartAsync(); + + // Start an initial orchestration + string originalInstanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, input: "test-input"); + + // Wait for it to start and then complete + await server.Client.WaitForInstanceStartAsync(originalInstanceId, default); + await server.Client.RaiseEventAsync(originalInstanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(originalInstanceId, default); + + // Verify the original orchestration completed + OrchestrationMetadata? originalMetadata = await server.Client.GetInstanceAsync(originalInstanceId, true); + originalMetadata.Should().NotBeNull(); + originalMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + + // Restart the orchestration + string restartedInstanceId = await server.Client.RestartAsync(originalInstanceId, restartWithNewInstanceId); + + // Verify the restart behavior + if (restartWithNewInstanceId) + { + restartedInstanceId.Should().NotBe(originalInstanceId); + } + else + { + restartedInstanceId.Should().Be(originalInstanceId); + } + + // Wait for the restarted orchestration to start + await server.Client.WaitForInstanceStartAsync(restartedInstanceId, default); + + // Verify the restarted orchestration has the same input + OrchestrationMetadata? restartedMetadata = await server.Client.GetInstanceAsync(restartedInstanceId, true); + restartedMetadata.Should().NotBeNull(); + restartedMetadata!.Name.Should().Be(OrchestrationName); + restartedMetadata.SerializedInput.Should().Be("\"test-input\""); + + // Complete the restarted orchestration + await server.Client.RaiseEventAsync(restartedInstanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(restartedInstanceId, default); + + // Verify the restarted orchestration completed + restartedMetadata = await server.Client.GetInstanceAsync(restartedInstanceId, true); + restartedMetadata.Should().NotBeNull(); + restartedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + } + + [Fact] + public async Task RestartAsync_InstanceNotFound_ThrowsArgumentException() + { + await using HostTestLifetime server = await this.StartAsync(); + + // Try to restart a non-existent orchestration + Func restartAction = () => server.Client.RestartAsync("non-existent-instance-id"); + + // Should throw ArgumentException + await restartAction.Should().ThrowAsync() + .WithMessage("*An orchestration with the instanceId non-existent-instance-id was not found*"); + } + Task StartAsync() { static async Task Orchestration(TaskOrchestrationContext context, bool shouldThrow) From f9fa4b654a7d130dd6c13b4e972f1b72683d4d21 Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Mon, 4 Aug 2025 21:17:50 -0700 Subject: [PATCH 02/11] udpate shimdurabletaskclient and test --- .../ShimDurableTaskClient.cs | 40 +++++++++---------- .../ShimDurableTaskClientTests.cs | 25 +++++++----- 2 files changed, 34 insertions(+), 31 deletions(-) diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index c72cfc2f6..5062066dc 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -271,32 +271,28 @@ public override async Task RestartAsync( throw new ArgumentException($"An orchestration with the instanceId {instanceId} was not found."); } - // Create options for the new orchestration - StartOrchestrationOptions options; - if (!restartWithNewInstanceId) - { - options = new StartOrchestrationOptions() - { - InstanceId = instanceId, - }; - } - else + // Determine the instance ID for the restarted orchestration + string newInstanceId = restartWithNewInstanceId ? Guid.NewGuid().ToString("N") : instanceId; + + OrchestrationInstance instance = new() { - options = new StartOrchestrationOptions(); - } + InstanceId = newInstanceId, + ExecutionId = Guid.NewGuid().ToString("N"), + }; - object? input = null; - if (!string.IsNullOrEmpty(status.SerializedInput)) + // Use the original serialized input directly to avoid double serialization + TaskMessage message = new() { - input = this.DataConverter.Deserialize(status.SerializedInput, typeof(object)); - } + OrchestrationInstance = instance, + Event = new ExecutionStartedEvent(-1, status.SerializedInput) + { + Name = status.Name, + OrchestrationInstance = instance, + }, + }; - // Start a new orchestration with the same name and input - return await this.ScheduleNewOrchestrationInstanceAsync( - new TaskName(status.Name), - input, - options, - cancellation); + await this.Client.CreateTaskOrchestrationAsync(message); + return newInstanceId; } [return: NotNullIfNotNull("state")] diff --git a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs index 978cbf97e..85930f068 100644 --- a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs +++ b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs @@ -347,9 +347,11 @@ public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) .Setup(x => x.GetOrchestrationStateAsync(originalInstanceId, false)) .ReturnsAsync(new List { originalState }); - // Setup the mock for the new orchestration creation + // Capture the TaskMessage for verification becasue we will create this message at RestartAsync. + TaskMessage? capturedMessage = null; this.orchestrationClient .Setup(x => x.CreateTaskOrchestrationAsync(It.IsAny())) + .Callback(msg => capturedMessage = msg) .Returns(Task.CompletedTask); string restartedInstanceId = await this.client.RestartAsync(originalInstanceId, restartWithNewInstanceId); @@ -363,16 +365,21 @@ public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) restartedInstanceId.Should().Be(originalInstanceId); } - // Verify that CreateTaskOrchestrationAsync was called with the correct parameters + // Verify that CreateTaskOrchestrationAsync was called this.orchestrationClient.Verify( - x => x.CreateTaskOrchestrationAsync(It.Is(msg => - msg.Event is ExecutionStartedEvent startedEvent && - startedEvent.Name == orchestratorName && - startedEvent.Input == serializedInput && - (restartWithNewInstanceId ? - msg.OrchestrationInstance.InstanceId != originalInstanceId : - msg.OrchestrationInstance.InstanceId == originalInstanceId))), + x => x.CreateTaskOrchestrationAsync(It.IsAny()), Times.Once); + + // Verify the captured message details + capturedMessage.Should().NotBeNull(); + capturedMessage!.Event.Should().BeOfType(); + + var startedEvent = (ExecutionStartedEvent)capturedMessage.Event; + startedEvent.Name.Should().Be(orchestratorName); + startedEvent.Input.Should().Be(serializedInput); + startedEvent.OrchestrationInstance.InstanceId.Should().Be(restartedInstanceId); + startedEvent.OrchestrationInstance.ExecutionId.Should().NotBeNullOrEmpty(); + startedEvent.OrchestrationInstance.ExecutionId.Should().NotBe(originalState.OrchestrationInstance.ExecutionId); } [Fact] From 315dc3de588307edd860d9b20794f6a11658362a Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Wed, 6 Aug 2025 12:02:40 -0700 Subject: [PATCH 03/11] update test --- .../GrpcDurableTaskClientIntegrationTests.cs | 6 +-- .../GrpcSidecar/Grpc/TaskHubGrpcServer.cs | 54 +++++++++++++++++++ 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index 6590a3854..291696099 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -231,9 +231,9 @@ public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) { await using HostTestLifetime server = await this.StartAsync(); - // Start an initial orchestration + // Start an initial orchestration with shouldThrow = false to ensure it completes successfully string originalInstanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( - OrchestrationName, input: "test-input"); + OrchestrationName, input: false); // Wait for it to start and then complete await server.Client.WaitForInstanceStartAsync(originalInstanceId, default); @@ -265,7 +265,7 @@ public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) OrchestrationMetadata? restartedMetadata = await server.Client.GetInstanceAsync(restartedInstanceId, true); restartedMetadata.Should().NotBeNull(); restartedMetadata!.Name.Should().Be(OrchestrationName); - restartedMetadata.SerializedInput.Should().Be("\"test-input\""); + restartedMetadata.SerializedInput.Should().Be("false"); // Complete the restarted orchestration await server.Client.RaiseEventAsync(restartedInstanceId, "event", default); diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs index 192a5a77f..b5c5a5dd2 100644 --- a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs +++ b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs @@ -333,6 +333,60 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, return new P.ResumeResponse(); } + public override async Task RestartInstance(P.RestartInstanceRequest request, ServerCallContext context) + { + try + { + // Get the original orchestration state + IList states = await this.client.GetOrchestrationStateAsync(request.InstanceId, false); + if (states == null || states.Count == 0) + { + throw new RpcException(new Status(StatusCode.NotFound, $"An orchestration with the instanceId {request.InstanceId} was not found.")); + } + + OrchestrationState state = states[0]; + string newInstanceId = request.RestartWithNewInstanceId ? Guid.NewGuid().ToString("N") : request.InstanceId; + + // Create a new orchestration instance + OrchestrationInstance newInstance = new() + { + InstanceId = newInstanceId, + ExecutionId = Guid.NewGuid().ToString("N"), + }; + + // Create an ExecutionStartedEvent with the original input + ExecutionStartedEvent startedEvent = new(-1, state.Name) + { + Input = state.Input, // Use the original serialized input + Version = state.Version ?? string.Empty, + OrchestrationInstance = newInstance, + }; + + TaskMessage taskMessage = new() + { + OrchestrationInstance = newInstance, + Event = startedEvent, + }; + + await this.client.CreateTaskOrchestrationAsync(taskMessage); + + return new P.RestartInstanceResponse + { + InstanceId = newInstanceId, + }; + } + catch (RpcException) + { + // Re-throw RpcException as-is + throw; + } + catch (Exception ex) + { + this.log.LogError(ex, "Error restarting orchestration instance {InstanceId}", request.InstanceId); + throw new RpcException(new Status(StatusCode.Internal, ex.Message)); + } + } + static P.TaskFailureDetails? GetFailureDetails(FailureDetails? failureDetails) { if (failureDetails == null) From 6bc59c8324d315734b29b80f48f0387c4d4d41ae Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Wed, 6 Aug 2025 16:08:00 -0700 Subject: [PATCH 04/11] updatestest --- eng/targets/Release.props | 2 +- .../GrpcDurableTaskClientIntegrationTests.cs | 28 +++++++++---------- .../GrpcSidecar/Grpc/TaskHubGrpcServer.cs | 8 ++++++ .../InMemoryOrchestrationService.cs | 26 +++++++++++------ 4 files changed, 40 insertions(+), 24 deletions(-) diff --git a/eng/targets/Release.props b/eng/targets/Release.props index bf40e2a27..3a9b61267 100644 --- a/eng/targets/Release.props +++ b/eng/targets/Release.props @@ -17,7 +17,7 @@ - 1.12.0 + 1.13.0 diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index 291696099..19dc63652 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -229,6 +229,7 @@ public async Task PurgeInstances_WithFilter_EndToEnd() [InlineData(true)] public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); // Reduced timeout to 1 minute await using HostTestLifetime server = await this.StartAsync(); // Start an initial orchestration with shouldThrow = false to ensure it completes successfully @@ -238,8 +239,8 @@ public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) // Wait for it to start and then complete await server.Client.WaitForInstanceStartAsync(originalInstanceId, default); await server.Client.RaiseEventAsync(originalInstanceId, "event", default); - await server.Client.WaitForInstanceCompletionAsync(originalInstanceId, default); - + await server.Client.WaitForInstanceCompletionAsync(originalInstanceId, cts.Token); + // Verify the original orchestration completed OrchestrationMetadata? originalMetadata = await server.Client.GetInstanceAsync(originalInstanceId, true); originalMetadata.Should().NotBeNull(); @@ -258,32 +259,29 @@ public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) restartedInstanceId.Should().Be(originalInstanceId); } - // Wait for the restarted orchestration to start - await server.Client.WaitForInstanceStartAsync(restartedInstanceId, default); - - // Verify the restarted orchestration has the same input - OrchestrationMetadata? restartedMetadata = await server.Client.GetInstanceAsync(restartedInstanceId, true); - restartedMetadata.Should().NotBeNull(); - restartedMetadata!.Name.Should().Be(OrchestrationName); - restartedMetadata.SerializedInput.Should().Be("false"); - // Complete the restarted orchestration - await server.Client.RaiseEventAsync(restartedInstanceId, "event", default); - await server.Client.WaitForInstanceCompletionAsync(restartedInstanceId, default); + await server.Client.RaiseEventAsync(restartedInstanceId, "event"); + + // Wait for completion (with shorter timeout) + using var completionCts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + await server.Client.WaitForInstanceCompletionAsync(restartedInstanceId, completionCts.Token); // Verify the restarted orchestration completed - restartedMetadata = await server.Client.GetInstanceAsync(restartedInstanceId, true); + var restartedMetadata = await server.Client.GetInstanceAsync(restartedInstanceId, true); restartedMetadata.Should().NotBeNull(); + restartedMetadata!.Name.Should().Be(OrchestrationName); + restartedMetadata.SerializedInput.Should().Be("false"); restartedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); } [Fact] public async Task RestartAsync_InstanceNotFound_ThrowsArgumentException() { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); // 1-minute timeout await using HostTestLifetime server = await this.StartAsync(); // Try to restart a non-existent orchestration - Func restartAction = () => server.Client.RestartAsync("non-existent-instance-id"); + Func restartAction = () => server.Client.RestartAsync("non-existent-instance-id", cancellation: cts.Token); // Should throw ArgumentException await restartAction.Should().ThrowAsync() diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs index b5c5a5dd2..f1624f8b4 100644 --- a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs +++ b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs @@ -339,12 +339,20 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, { // Get the original orchestration state IList states = await this.client.GetOrchestrationStateAsync(request.InstanceId, false); + if (states == null || states.Count == 0) { throw new RpcException(new Status(StatusCode.NotFound, $"An orchestration with the instanceId {request.InstanceId} was not found.")); } OrchestrationState state = states[0]; + + // Check if the state is null + if (state == null) + { + throw new RpcException(new Status(StatusCode.NotFound, $"An orchestration with the instanceId {request.InstanceId} was not found.")); + } + string newInstanceId = request.RestartWithNewInstanceId ? Guid.NewGuid().ToString("N") : request.InstanceId; // Create a new orchestration instance diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs b/test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs index 481fdc3ff..9c6e989a3 100644 --- a/test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs +++ b/test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs @@ -444,8 +444,21 @@ public void AddMessage(TaskMessage message) SerializedInstanceState state = this.store.GetOrAdd(instanceId, id => new SerializedInstanceState(id, executionId)); lock (state) { + bool isRestart = state.ExecutionId != null && state.ExecutionId != executionId; + if (message.Event is ExecutionStartedEvent startEvent) { + // For restart scenarios, clear the history and reset the state + if (isRestart && state.IsCompleted) + { + state.HistoryEventsJson.Clear(); + state.ExecutionId = executionId; // Always update the execution ID + state.IsCompleted = false; + } + + // Add the ExecutionStartedEvent to history so the orchestration worker has proper context + state.HistoryEventsJson.Add(startEvent); + OrchestrationState newStatusRecord = new() { OrchestrationInstance = startEvent.OrchestrationInstance, @@ -460,7 +473,6 @@ public void AddMessage(TaskMessage message) }; state.StatusRecordJson = JsonValue.Create(newStatusRecord); - state.HistoryEventsJson.Clear(); state.IsCompleted = false; } else if (state.IsCompleted) @@ -661,13 +673,11 @@ public async ValueTask TakeNextAsync(CancellationToken public void Schedule(SerializedInstanceState state) { - // TODO: There is a race condition here. If another thread is calling TakeNextAsync - // and removed the queue item before updating the dictionary, then we'll fail - // to update the readyToRunQueue and the orchestration will get stuck. - if (this.readyInstances.TryAdd(state.InstanceId, state)) - { - this.readyToRunQueue.Writer.TryWrite(state); - } + // Always update the dictionary entry (for restart scenarios) + this.readyInstances[state.InstanceId] = state; + + // Always try to write to the queue + this.readyToRunQueue.Writer.TryWrite(state); } } From 9dce30fbe551a5e3e1a9bd9be7f106db10471d41 Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Thu, 7 Aug 2025 12:08:39 -0700 Subject: [PATCH 05/11] update taskhubgrpcservice and test --- .../GrpcDurableTaskClientIntegrationTests.cs | 3 +-- .../GrpcSidecar/Grpc/TaskHubGrpcServer.cs | 4 ++-- .../InMemoryOrchestrationService.cs | 21 +++++++++++-------- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index 19dc63652..a32235b7f 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -229,7 +229,7 @@ public async Task PurgeInstances_WithFilter_EndToEnd() [InlineData(true)] public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) { - using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); // Reduced timeout to 1 minute + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); await using HostTestLifetime server = await this.StartAsync(); // Start an initial orchestration with shouldThrow = false to ensure it completes successfully @@ -262,7 +262,6 @@ public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) // Complete the restarted orchestration await server.Client.RaiseEventAsync(restartedInstanceId, "event"); - // Wait for completion (with shorter timeout) using var completionCts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); await server.Client.WaitForInstanceCompletionAsync(restartedInstanceId, completionCts.Token); diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs index f1624f8b4..d24fc41a4 100644 --- a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs +++ b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs @@ -363,9 +363,9 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, }; // Create an ExecutionStartedEvent with the original input - ExecutionStartedEvent startedEvent = new(-1, state.Name) + ExecutionStartedEvent startedEvent = new(-1, state.Input) { - Input = state.Input, // Use the original serialized input + Name = state.Name, Version = state.Version ?? string.Empty, OrchestrationInstance = newInstance, }; diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs b/test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs index 9c6e989a3..9c0ce304e 100644 --- a/test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs +++ b/test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs @@ -449,16 +449,14 @@ public void AddMessage(TaskMessage message) if (message.Event is ExecutionStartedEvent startEvent) { // For restart scenarios, clear the history and reset the state - if (isRestart && state.IsCompleted) + if (isRestart) { state.HistoryEventsJson.Clear(); state.ExecutionId = executionId; // Always update the execution ID state.IsCompleted = false; + state.IsLoaded = false; // Reset the loaded state for restart scenarios } - // Add the ExecutionStartedEvent to history so the orchestration worker has proper context - state.HistoryEventsJson.Add(startEvent); - OrchestrationState newStatusRecord = new() { OrchestrationInstance = startEvent.OrchestrationInstance, @@ -673,11 +671,16 @@ public async ValueTask TakeNextAsync(CancellationToken public void Schedule(SerializedInstanceState state) { - // Always update the dictionary entry (for restart scenarios) - this.readyInstances[state.InstanceId] = state; - - // Always try to write to the queue - this.readyToRunQueue.Writer.TryWrite(state); + // TODO: There is a race condition here. If another thread is calling TakeNextAsync + // and removed the queue item before updating the dictionary, then we'll fail + // to update the readyToRunQueue and the orchestration will get stuck. + if (this.readyInstances.TryAdd(state.InstanceId, state)) + { + if (!this.readyToRunQueue.Writer.TryWrite(state)) + { + throw new InvalidOperationException($"unable to write to queue for {state.InstanceId}"); + } + } } } From dd6f8f4b2ce4587d2a7544156f2e76713a932d35 Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Thu, 7 Aug 2025 12:10:32 -0700 Subject: [PATCH 06/11] update in memory orch service --- .../GrpcSidecar/InMemoryOrchestrationService.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs b/test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs index 9c0ce304e..41fe3523a 100644 --- a/test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs +++ b/test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs @@ -449,7 +449,7 @@ public void AddMessage(TaskMessage message) if (message.Event is ExecutionStartedEvent startEvent) { // For restart scenarios, clear the history and reset the state - if (isRestart) + if (isRestart && state.IsCompleted) { state.HistoryEventsJson.Clear(); state.ExecutionId = executionId; // Always update the execution ID From 977d8cd9d11f59d361bfb4a8974f36223ab6c21f Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Thu, 7 Aug 2025 12:19:50 -0700 Subject: [PATCH 07/11] update restart logic at inmemoryorchestrationservice --- .../GrpcDurableTaskClientIntegrationTests.cs | 3 ++- .../GrpcSidecar/InMemoryOrchestrationService.cs | 7 +++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index a32235b7f..63f92ef31 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -265,7 +265,8 @@ public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) using var completionCts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); await server.Client.WaitForInstanceCompletionAsync(restartedInstanceId, completionCts.Token); - // Verify the restarted orchestration completed + // Verify the restarted orchestration completed. + // Also verify input and orchestrator name are matched. var restartedMetadata = await server.Client.GetInstanceAsync(restartedInstanceId, true); restartedMetadata.Should().NotBeNull(); restartedMetadata!.Name.Should().Be(OrchestrationName); diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs b/test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs index 41fe3523a..3494c64b2 100644 --- a/test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs +++ b/test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs @@ -451,10 +451,8 @@ public void AddMessage(TaskMessage message) // For restart scenarios, clear the history and reset the state if (isRestart && state.IsCompleted) { - state.HistoryEventsJson.Clear(); - state.ExecutionId = executionId; // Always update the execution ID - state.IsCompleted = false; - state.IsLoaded = false; // Reset the loaded state for restart scenarios + state.ExecutionId = executionId; + state.IsLoaded = false; } OrchestrationState newStatusRecord = new() @@ -471,6 +469,7 @@ public void AddMessage(TaskMessage message) }; state.StatusRecordJson = JsonValue.Create(newStatusRecord); + state.HistoryEventsJson.Clear(); state.IsCompleted = false; } else if (state.IsCompleted) From 6ee7fe59723e0cb2b2e085f6218abc5a5c76ff41 Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Mon, 18 Aug 2025 15:38:07 -0700 Subject: [PATCH 08/11] add status check if restart with same instace --- src/Client/Core/DurableTaskClient.cs | 3 +++ src/Client/Grpc/GrpcDurableTaskClient.cs | 4 ++++ .../ShimDurableTaskClient.cs | 11 +++++++++++ 3 files changed, 18 insertions(+) diff --git a/src/Client/Core/DurableTaskClient.cs b/src/Client/Core/DurableTaskClient.cs index bf1394ae9..61f4663e5 100644 --- a/src/Client/Core/DurableTaskClient.cs +++ b/src/Client/Core/DurableTaskClient.cs @@ -430,6 +430,9 @@ public virtual Task PurgeAllInstancesAsync( /// /// Thrown if an orchestration with the specified was not found. /// + /// + /// Thrown when attempting to restart an instance using the same instance Id + /// while the instance has not yet reached a completed or terminal state. /// /// Thrown if the backend does not support restart operations. /// diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index 591f9199d..763c7182f 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -433,6 +433,10 @@ public override async Task RestartAsync( { throw new ArgumentException($"An orchestration with the instanceId {instanceId} was not found.", e); } + catch (RpcException e) when (e.StatusCode == StatusCode.FailedPrecondition) + { + throw new InvalidOperationException($"An orchestration with the instanceId {instanceId} cannot be restarted.", e); + } catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled) { throw new OperationCanceledException( diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index 5062066dc..9249437fa 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -271,6 +271,16 @@ public override async Task RestartAsync( throw new ArgumentException($"An orchestration with the instanceId {instanceId} was not found."); } + bool isInstaceNotCompleted = status.RuntimeStatus == OrchestrationRuntimeStatus.Running || + status.RuntimeStatus == OrchestrationRuntimeStatus.Pending || + status.RuntimeStatus == OrchestrationRuntimeStatus.Suspended; + + if (isInstaceNotCompleted && !restartWithNewInstanceId) + { + throw new InvalidOperationException($"Instance '{instanceId}' cannot be restarted while it is in state '{status.RuntimeStatus}'. " + + "Wait until it has completed, or restart with a new instance ID."); + } + // Determine the instance ID for the restarted orchestration string newInstanceId = restartWithNewInstanceId ? Guid.NewGuid().ToString("N") : instanceId; @@ -281,6 +291,7 @@ public override async Task RestartAsync( }; // Use the original serialized input directly to avoid double serialization + // Note: OrchestrationMetada doesn't have version property so we don't support version here. TaskMessage message = new() { OrchestrationInstance = instance, From 73cfab79b1ed03d6b843bc4c5b01c36ed7d9b45b Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Thu, 28 Aug 2025 09:34:48 -0700 Subject: [PATCH 09/11] update bycoments --- src/Client/Core/DurableTaskClient.cs | 9 ++++----- src/Client/Grpc/GrpcDurableTaskClient.cs | 1 + .../ShimDurableTaskClient.cs | 4 +++- .../ShimDurableTaskClientTests.cs | 1 + 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/Client/Core/DurableTaskClient.cs b/src/Client/Core/DurableTaskClient.cs index 61f4663e5..05cf60317 100644 --- a/src/Client/Core/DurableTaskClient.cs +++ b/src/Client/Core/DurableTaskClient.cs @@ -428,14 +428,13 @@ public virtual Task PurgeAllInstancesAsync( /// The value of this task is the instance ID of the restarted orchestration instance. /// /// - /// Thrown if an orchestration with the specified was not found. - /// + /// Thrown if an orchestration with the specified was not found. /// /// Thrown when attempting to restart an instance using the same instance Id - /// while the instance has not yet reached a completed or terminal state. + /// while the instance has not yet reached a completed or terminal state. /// - /// Thrown if the backend does not support restart operations. - /// + /// Thrown if the backend does not support restart operations. + [Obsolete("Experimental")] public virtual Task RestartAsync( string instanceId, bool restartWithNewInstanceId = false, diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index c57b22f96..77b7afb70 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -396,6 +396,7 @@ public override Task PurgeAllInstancesAsync( } /// + [Obsolete("Experimental")] public override async Task RestartAsync( string instanceId, bool restartWithNewInstanceId = false, diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index 9249437fa..267926a66 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -255,6 +255,7 @@ public override async Task WaitForInstanceStartAsync( } /// + [Obsolete("Experimental")] public override async Task RestartAsync( string instanceId, bool restartWithNewInstanceId = false, @@ -291,7 +292,8 @@ public override async Task RestartAsync( }; // Use the original serialized input directly to avoid double serialization - // Note: OrchestrationMetada doesn't have version property so we don't support version here. + // TODO: OrchestrationMetada doesn't have version property so we don't support version here. + // Issue link: https://github.com/microsoft/durabletask-dotnet/issues/463 TaskMessage message = new() { OrchestrationInstance = instance, diff --git a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs index 85930f068..588f59ac3 100644 --- a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs +++ b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs @@ -377,6 +377,7 @@ public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) var startedEvent = (ExecutionStartedEvent)capturedMessage.Event; startedEvent.Name.Should().Be(orchestratorName); startedEvent.Input.Should().Be(serializedInput); + // TODO: once we support version at ShimDurableTaskClient, we should check version here. startedEvent.OrchestrationInstance.InstanceId.Should().Be(restartedInstanceId); startedEvent.OrchestrationInstance.ExecutionId.Should().NotBeNullOrEmpty(); startedEvent.OrchestrationInstance.ExecutionId.Should().NotBe(originalState.OrchestrationInstance.ExecutionId); From d9a00ed975a49921c70794950abc4b152f39873f Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Thu, 28 Aug 2025 11:06:14 -0700 Subject: [PATCH 10/11] remove tab --- src/Client/Core/DurableTaskClient.cs | 1 - src/Client/Grpc/GrpcDurableTaskClient.cs | 1 - .../OrchestrationServiceClientShim/ShimDurableTaskClient.cs | 1 - 3 files changed, 3 deletions(-) diff --git a/src/Client/Core/DurableTaskClient.cs b/src/Client/Core/DurableTaskClient.cs index 05cf60317..439670cfc 100644 --- a/src/Client/Core/DurableTaskClient.cs +++ b/src/Client/Core/DurableTaskClient.cs @@ -434,7 +434,6 @@ public virtual Task PurgeAllInstancesAsync( /// while the instance has not yet reached a completed or terminal state. /// /// Thrown if the backend does not support restart operations. - [Obsolete("Experimental")] public virtual Task RestartAsync( string instanceId, bool restartWithNewInstanceId = false, diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index 77b7afb70..c57b22f96 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -396,7 +396,6 @@ public override Task PurgeAllInstancesAsync( } /// - [Obsolete("Experimental")] public override async Task RestartAsync( string instanceId, bool restartWithNewInstanceId = false, diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index 267926a66..4fbf828d0 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -255,7 +255,6 @@ public override async Task WaitForInstanceStartAsync( } /// - [Obsolete("Experimental")] public override async Task RestartAsync( string instanceId, bool restartWithNewInstanceId = false, From 138eb958f07bb1517506526c9509da5a0b955ba1 Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Wed, 10 Sep 2025 15:52:43 -0700 Subject: [PATCH 11/11] pull latest from protobuf --- src/Grpc/orchestrator_service.proto | 6 +++--- src/Grpc/versions.txt | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index cb3e6856c..3b9c4f408 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -691,6 +691,9 @@ service TaskHubSidecarService { // Rewinds an orchestration instance to last known good state and replays from there. rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse); + // Restarts an orchestration instance. + rpc RestartInstance(RestartInstanceRequest) returns (RestartInstanceResponse); + // Waits for an orchestration instance to reach a running or completion state. rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse); @@ -709,9 +712,6 @@ service TaskHubSidecarService { // Resumes a suspended orchestration instance. rpc ResumeInstance(ResumeRequest) returns (ResumeResponse); - // Restarts an orchestration instance with the same or a new instance ID. - rpc RestartInstance(RestartInstanceRequest) returns (RestartInstanceResponse); - // rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse); rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse); diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index 5c0de3577..e9f651378 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-08-08 16:46:11 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/e88acbd07ae38b499dbe8c4e333e9e3feeb2a9cc/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2025-09-10 22:50:45 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/985035a0890575ae18be0eb2a3ac93c10824498a/protos/orchestrator_service.proto