Skip to content

Commit 5abde15

Browse files
authored
Add RestartAsync API Support at DurableTaskClient (#456)
* initial commit * udpate shimdurabletaskclient and test * update test * updatestest * update taskhubgrpcservice and test * update in memory orch service * update restart logic at inmemoryorchestrationservice * add status check if restart with same instace * update bycoments * remove tab * pull latest from protobuf
1 parent 9997696 commit 5abde15

File tree

11 files changed

+372
-3
lines changed

11 files changed

+372
-3
lines changed

src/Client/Core/DurableTaskClient.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,47 @@ public virtual Task<PurgeResult> PurgeAllInstancesAsync(
399399
throw new NotSupportedException($"{this.GetType()} does not support purging of orchestration instances.");
400400
}
401401

402+
/// <summary>
403+
/// Restarts an orchestration instance with the same or a new instance ID.
404+
/// </summary>
405+
/// <remarks>
406+
/// <para>
407+
/// This method restarts an existing orchestration instance. If <paramref name="restartWithNewInstanceId"/> is <c>true</c>,
408+
/// a new instance ID will be generated for the restarted orchestration. If <c>false</c>, the original instance ID will be reused.
409+
/// </para><para>
410+
/// The restarted orchestration will use the same input data as the original instance. If the original orchestration
411+
/// instance is not found, an <see cref="ArgumentException"/> will be thrown.
412+
/// </para><para>
413+
/// Note that this operation is backend-specific and may not be supported by all durable task backends.
414+
/// If the backend does not support restart operations, a <see cref="NotSupportedException"/> will be thrown.
415+
/// </para>
416+
/// </remarks>
417+
/// <param name="instanceId">The ID of the orchestration instance to restart.</param>
418+
/// <param name="restartWithNewInstanceId">
419+
/// If <c>true</c>, a new instance ID will be generated for the restarted orchestration.
420+
/// If <c>false</c>, the original instance ID will be reused.
421+
/// </param>
422+
/// <param name="cancellation">
423+
/// The cancellation token. This only cancels enqueueing the restart request to the backend.
424+
/// Does not abort restarting the orchestration once enqueued.
425+
/// </param>
426+
/// <returns>
427+
/// A task that completes when the orchestration instance is successfully restarted.
428+
/// The value of this task is the instance ID of the restarted orchestration instance.
429+
/// </returns>
430+
/// <exception cref="ArgumentException">
431+
/// Thrown if an orchestration with the specified <paramref name="instanceId"/> was not found. </exception>
432+
/// <exception cref="InvalidOperationException">
433+
/// Thrown when attempting to restart an instance using the same instance Id
434+
/// while the instance has not yet reached a completed or terminal state. </exception>
435+
/// <exception cref="NotSupportedException">
436+
/// Thrown if the backend does not support restart operations. </exception>
437+
public virtual Task<string> RestartAsync(
438+
string instanceId,
439+
bool restartWithNewInstanceId = false,
440+
CancellationToken cancellation = default)
441+
=> throw new NotSupportedException($"{this.GetType()} does not support orchestration restart.");
442+
402443
// TODO: Create task hub
403444

404445
// TODO: Delete task hub

src/Client/Grpc/GrpcDurableTaskClient.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,42 @@ public override Task<PurgeResult> PurgeAllInstancesAsync(
395395
return this.PurgeInstancesCoreAsync(request, cancellation);
396396
}
397397

398+
/// <inheritdoc/>
399+
public override async Task<string> RestartAsync(
400+
string instanceId,
401+
bool restartWithNewInstanceId = false,
402+
CancellationToken cancellation = default)
403+
{
404+
Check.NotNullOrEmpty(instanceId);
405+
Check.NotEntity(this.options.EnableEntitySupport, instanceId);
406+
407+
var request = new P.RestartInstanceRequest
408+
{
409+
InstanceId = instanceId,
410+
RestartWithNewInstanceId = restartWithNewInstanceId,
411+
};
412+
413+
try
414+
{
415+
P.RestartInstanceResponse result = await this.sidecarClient.RestartInstanceAsync(
416+
request, cancellationToken: cancellation);
417+
return result.InstanceId;
418+
}
419+
catch (RpcException e) when (e.StatusCode == StatusCode.NotFound)
420+
{
421+
throw new ArgumentException($"An orchestration with the instanceId {instanceId} was not found.", e);
422+
}
423+
catch (RpcException e) when (e.StatusCode == StatusCode.FailedPrecondition)
424+
{
425+
throw new InvalidOperationException($"An orchestration with the instanceId {instanceId} cannot be restarted.", e);
426+
}
427+
catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
428+
{
429+
throw new OperationCanceledException(
430+
$"The {nameof(this.RestartAsync)} operation was canceled.", e, cancellation);
431+
}
432+
}
433+
398434
static AsyncDisposable GetCallInvoker(GrpcDurableTaskClientOptions options, out CallInvoker callInvoker)
399435
{
400436
if (options.Channel is GrpcChannel c)

src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,59 @@ public override async Task<OrchestrationMetadata> WaitForInstanceStartAsync(
254254
}
255255
}
256256

257+
/// <inheritdoc/>
258+
public override async Task<string> RestartAsync(
259+
string instanceId,
260+
bool restartWithNewInstanceId = false,
261+
CancellationToken cancellation = default)
262+
{
263+
Check.NotNullOrEmpty(instanceId);
264+
cancellation.ThrowIfCancellationRequested();
265+
266+
// Get the current orchestration status to retrieve the name and input
267+
OrchestrationMetadata? status = await this.GetInstanceAsync(instanceId, getInputsAndOutputs: true, cancellation);
268+
269+
if (status == null)
270+
{
271+
throw new ArgumentException($"An orchestration with the instanceId {instanceId} was not found.");
272+
}
273+
274+
bool isInstaceNotCompleted = status.RuntimeStatus == OrchestrationRuntimeStatus.Running ||
275+
status.RuntimeStatus == OrchestrationRuntimeStatus.Pending ||
276+
status.RuntimeStatus == OrchestrationRuntimeStatus.Suspended;
277+
278+
if (isInstaceNotCompleted && !restartWithNewInstanceId)
279+
{
280+
throw new InvalidOperationException($"Instance '{instanceId}' cannot be restarted while it is in state '{status.RuntimeStatus}'. " +
281+
"Wait until it has completed, or restart with a new instance ID.");
282+
}
283+
284+
// Determine the instance ID for the restarted orchestration
285+
string newInstanceId = restartWithNewInstanceId ? Guid.NewGuid().ToString("N") : instanceId;
286+
287+
OrchestrationInstance instance = new()
288+
{
289+
InstanceId = newInstanceId,
290+
ExecutionId = Guid.NewGuid().ToString("N"),
291+
};
292+
293+
// Use the original serialized input directly to avoid double serialization
294+
// TODO: OrchestrationMetada doesn't have version property so we don't support version here.
295+
// Issue link: https://github.com/microsoft/durabletask-dotnet/issues/463
296+
TaskMessage message = new()
297+
{
298+
OrchestrationInstance = instance,
299+
Event = new ExecutionStartedEvent(-1, status.SerializedInput)
300+
{
301+
Name = status.Name,
302+
OrchestrationInstance = instance,
303+
},
304+
};
305+
306+
await this.Client.CreateTaskOrchestrationAsync(message);
307+
return newInstanceId;
308+
}
309+
257310
[return: NotNullIfNotNull("state")]
258311
OrchestrationMetadata? ToMetadata(Core.OrchestrationState? state, bool getInputsAndOutputs)
259312
{

src/Grpc/orchestrator_service.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,15 @@ message PurgeInstancesResponse {
482482
google.protobuf.BoolValue isComplete = 2;
483483
}
484484

485+
message RestartInstanceRequest {
486+
string instanceId = 1;
487+
bool restartWithNewInstanceId = 2;
488+
}
489+
490+
message RestartInstanceResponse {
491+
string instanceId = 1;
492+
}
493+
485494
message CreateTaskHubRequest {
486495
bool recreateIfExists = 1;
487496
}
@@ -682,6 +691,9 @@ service TaskHubSidecarService {
682691
// Rewinds an orchestration instance to last known good state and replays from there.
683692
rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse);
684693

694+
// Restarts an orchestration instance.
695+
rpc RestartInstance(RestartInstanceRequest) returns (RestartInstanceResponse);
696+
685697
// Waits for an orchestration instance to reach a running or completion state.
686698
rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse);
687699

src/Grpc/versions.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
# The following files were downloaded from branch main at 2025-08-08 16:46:11 UTC
2-
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/e88acbd07ae38b499dbe8c4e333e9e3feeb2a9cc/protos/orchestrator_service.proto
1+
# The following files were downloaded from branch main at 2025-09-10 22:50:45 UTC
2+
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/985035a0890575ae18be0eb2a3ac93c10824498a/protos/orchestrator_service.proto

test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,14 @@ public override Task<OrchestrationMetadata> WaitForInstanceStartAsync(
144144
{
145145
throw new NotImplementedException();
146146
}
147+
148+
public override Task<string> RestartAsync(
149+
string instanceId,
150+
bool restartWithNewInstanceId = false,
151+
CancellationToken cancellation = default)
152+
{
153+
throw new NotImplementedException();
154+
}
147155
}
148156

149157
class CustomDataConverter : DataConverter

test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,14 @@ public override Task<OrchestrationMetadata> WaitForInstanceStartAsync(
174174
{
175175
throw new NotImplementedException();
176176
}
177+
178+
public override Task<string> RestartAsync(
179+
string instanceId,
180+
bool restartWithNewInstanceId = false,
181+
CancellationToken cancellation = default)
182+
{
183+
throw new NotImplementedException();
184+
}
177185
}
178186

179187
class GoodBuildTargetOptions : DurableTaskClientOptions

test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,79 @@ public async Task ScheduleNewOrchestrationInstance_IdProvided_TagsProvided()
326326
await this.RunScheduleNewOrchestrationInstanceAsync("test", "input", options);
327327
}
328328

329+
[Theory]
330+
[InlineData(false)]
331+
[InlineData(true)]
332+
public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId)
333+
{
334+
string originalInstanceId = "test-instance-id";
335+
string orchestratorName = "TestOrchestrator";
336+
object input = "test-input";
337+
string serializedInput = "\"test-input\"";
338+
339+
// Create a completed orchestration state
340+
Core.OrchestrationState originalState = CreateState(input, "test-output");
341+
originalState.OrchestrationInstance.InstanceId = originalInstanceId;
342+
originalState.Name = orchestratorName;
343+
originalState.OrchestrationStatus = Core.OrchestrationStatus.Completed;
344+
345+
// Setup the mock to return the original orchestration state
346+
this.orchestrationClient
347+
.Setup(x => x.GetOrchestrationStateAsync(originalInstanceId, false))
348+
.ReturnsAsync(new List<Core.OrchestrationState> { originalState });
349+
350+
// Capture the TaskMessage for verification becasue we will create this message at RestartAsync.
351+
TaskMessage? capturedMessage = null;
352+
this.orchestrationClient
353+
.Setup(x => x.CreateTaskOrchestrationAsync(It.IsAny<TaskMessage>()))
354+
.Callback<TaskMessage>(msg => capturedMessage = msg)
355+
.Returns(Task.CompletedTask);
356+
357+
string restartedInstanceId = await this.client.RestartAsync(originalInstanceId, restartWithNewInstanceId);
358+
359+
if (restartWithNewInstanceId)
360+
{
361+
restartedInstanceId.Should().NotBe(originalInstanceId);
362+
}
363+
else
364+
{
365+
restartedInstanceId.Should().Be(originalInstanceId);
366+
}
367+
368+
// Verify that CreateTaskOrchestrationAsync was called
369+
this.orchestrationClient.Verify(
370+
x => x.CreateTaskOrchestrationAsync(It.IsAny<TaskMessage>()),
371+
Times.Once);
372+
373+
// Verify the captured message details
374+
capturedMessage.Should().NotBeNull();
375+
capturedMessage!.Event.Should().BeOfType<ExecutionStartedEvent>();
376+
377+
var startedEvent = (ExecutionStartedEvent)capturedMessage.Event;
378+
startedEvent.Name.Should().Be(orchestratorName);
379+
startedEvent.Input.Should().Be(serializedInput);
380+
// TODO: once we support version at ShimDurableTaskClient, we should check version here.
381+
startedEvent.OrchestrationInstance.InstanceId.Should().Be(restartedInstanceId);
382+
startedEvent.OrchestrationInstance.ExecutionId.Should().NotBeNullOrEmpty();
383+
startedEvent.OrchestrationInstance.ExecutionId.Should().NotBe(originalState.OrchestrationInstance.ExecutionId);
384+
}
385+
386+
[Fact]
387+
public async Task RestartAsync_InstanceNotFound_ThrowsArgumentException()
388+
{
389+
string nonExistentInstanceId = "non-existent-instance-id";
390+
391+
// Setup the mock to client return empty orchestration state (instance not found)
392+
this.orchestrationClient
393+
.Setup(x => x.GetOrchestrationStateAsync(nonExistentInstanceId, false))
394+
.ReturnsAsync(new List<Core.OrchestrationState>());
395+
396+
// RestartAsync should throw an ArgumentException since the instance is not found
397+
Func<Task> restartAction = () => this.client.RestartAsync(nonExistentInstanceId);
398+
399+
await restartAction.Should().ThrowAsync<ArgumentException>()
400+
.WithMessage($"*An orchestration with the instanceId {nonExistentInstanceId} was not found*");
401+
}
329402

330403
static Core.OrchestrationState CreateState(
331404
object input, object? output = null, DateTimeOffset start = default)

test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,70 @@ public async Task PurgeInstances_WithFilter_EndToEnd()
224224
}
225225
}
226226

227+
[Theory]
228+
[InlineData(false)]
229+
[InlineData(true)]
230+
public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId)
231+
{
232+
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
233+
await using HostTestLifetime server = await this.StartAsync();
234+
235+
// Start an initial orchestration with shouldThrow = false to ensure it completes successfully
236+
string originalInstanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(
237+
OrchestrationName, input: false);
238+
239+
// Wait for it to start and then complete
240+
await server.Client.WaitForInstanceStartAsync(originalInstanceId, default);
241+
await server.Client.RaiseEventAsync(originalInstanceId, "event", default);
242+
await server.Client.WaitForInstanceCompletionAsync(originalInstanceId, cts.Token);
243+
244+
// Verify the original orchestration completed
245+
OrchestrationMetadata? originalMetadata = await server.Client.GetInstanceAsync(originalInstanceId, true);
246+
originalMetadata.Should().NotBeNull();
247+
originalMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed);
248+
249+
// Restart the orchestration
250+
string restartedInstanceId = await server.Client.RestartAsync(originalInstanceId, restartWithNewInstanceId);
251+
252+
// Verify the restart behavior
253+
if (restartWithNewInstanceId)
254+
{
255+
restartedInstanceId.Should().NotBe(originalInstanceId);
256+
}
257+
else
258+
{
259+
restartedInstanceId.Should().Be(originalInstanceId);
260+
}
261+
262+
// Complete the restarted orchestration
263+
await server.Client.RaiseEventAsync(restartedInstanceId, "event");
264+
265+
using var completionCts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
266+
await server.Client.WaitForInstanceCompletionAsync(restartedInstanceId, completionCts.Token);
267+
268+
// Verify the restarted orchestration completed.
269+
// Also verify input and orchestrator name are matched.
270+
var restartedMetadata = await server.Client.GetInstanceAsync(restartedInstanceId, true);
271+
restartedMetadata.Should().NotBeNull();
272+
restartedMetadata!.Name.Should().Be(OrchestrationName);
273+
restartedMetadata.SerializedInput.Should().Be("false");
274+
restartedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed);
275+
}
276+
277+
[Fact]
278+
public async Task RestartAsync_InstanceNotFound_ThrowsArgumentException()
279+
{
280+
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); // 1-minute timeout
281+
await using HostTestLifetime server = await this.StartAsync();
282+
283+
// Try to restart a non-existent orchestration
284+
Func<Task> restartAction = () => server.Client.RestartAsync("non-existent-instance-id", cancellation: cts.Token);
285+
286+
// Should throw ArgumentException
287+
await restartAction.Should().ThrowAsync<ArgumentException>()
288+
.WithMessage("*An orchestration with the instanceId non-existent-instance-id was not found*");
289+
}
290+
227291
Task<HostTestLifetime> StartAsync()
228292
{
229293
static async Task<string> Orchestration(TaskOrchestrationContext context, bool shouldThrow)

0 commit comments

Comments
 (0)