Skip to content

Commit 575772b

Browse files
authored
Enable suspend-resume of orchestrators. (#2227)
1 parent 50f4ba3 commit 575772b

14 files changed

+575
-11
lines changed

src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,48 @@ async Task IDurableOrchestrationClient.TerminateAsync(string instanceId, string
405405
}
406406
}
407407

408+
async Task IDurableOrchestrationClient.SuspendAsync(string instanceId, string reason)
409+
{
410+
OrchestrationState state = await this.GetOrchestrationInstanceStateAsync(instanceId);
411+
if (IsOrchestrationSuspendable(state))
412+
{
413+
this.traceHelper.SuspendingOrchestration(this.TaskHubName, state.Name, instanceId, reason);
414+
415+
var instance = new OrchestrationInstance { InstanceId = instanceId };
416+
await this.client.SuspendInstanceAsync(instance, reason);
417+
}
418+
else
419+
{
420+
this.traceHelper.ExtensionWarningEvent(
421+
hubName: this.TaskHubName,
422+
functionName: state.Name,
423+
instanceId: instanceId,
424+
message: $"Cannot suspend orchestration instance in the {state.OrchestrationStatus} state.");
425+
throw new InvalidOperationException($"Cannot suspend the orchestration instance {instanceId} because instance is in the {state.OrchestrationStatus} state.");
426+
}
427+
}
428+
429+
async Task IDurableOrchestrationClient.ResumeAsync(string instanceId, string reason)
430+
{
431+
OrchestrationState state = await this.GetOrchestrationInstanceStateAsync(instanceId);
432+
if (IsOrchestrationSuspended(state))
433+
{
434+
this.traceHelper.ResumingOrchestration(this.TaskHubName, state.Name, instanceId, reason);
435+
436+
var instance = new OrchestrationInstance { InstanceId = instanceId };
437+
await this.client.ResumeInstanceAsync(instance, reason);
438+
}
439+
else
440+
{
441+
this.traceHelper.ExtensionWarningEvent(
442+
hubName: this.TaskHubName,
443+
functionName: state.Name,
444+
instanceId: instanceId,
445+
message: $"Cannot resume orchestration instance in the {state.OrchestrationStatus} state.");
446+
throw new InvalidOperationException($"Cannot resume the orchestration instance {instanceId} because instance is in the {state.OrchestrationStatus} state.");
447+
}
448+
}
449+
408450
/// <inheritdoc />
409451
async Task IDurableOrchestrationClient.RewindAsync(string instanceId, string reason)
410452
{
@@ -743,13 +785,26 @@ internal async Task<HttpResponseMessage> WaitForCompletionOrCreateCheckStatusRes
743785
returnInternalServerErrorOnFailure);
744786
}
745787

788+
private static bool IsOrchestrationSuspendable(OrchestrationState status)
789+
{
790+
return status.OrchestrationStatus == OrchestrationStatus.Running ||
791+
status.OrchestrationStatus == OrchestrationStatus.Pending ||
792+
status.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew;
793+
}
794+
746795
private static bool IsOrchestrationRunning(OrchestrationState status)
747796
{
748797
return status.OrchestrationStatus == OrchestrationStatus.Running ||
798+
status.OrchestrationStatus == OrchestrationStatus.Suspended ||
749799
status.OrchestrationStatus == OrchestrationStatus.Pending ||
750800
status.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew;
751801
}
752802

803+
private static bool IsOrchestrationSuspended(OrchestrationState status)
804+
{
805+
return status.OrchestrationStatus == OrchestrationStatus.Suspended;
806+
}
807+
753808
private static HttpRequestMessage ConvertHttpRequestMessage(HttpRequest request)
754809
{
755810
return new HttpRequestMessageFeature(request.HttpContext).HttpRequestMessage;

src/WebJobs.Extensions.DurableTask/ContextInterfaces/IDurableOrchestrationClient.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,22 @@ Task<string> StartNewAsync<T>(
226226
/// <returns>A task that completes when the terminate message is enqueued if necessary.</returns>
227227
Task TerminateAsync(string instanceId, string reason);
228228

229+
/// <summary>
230+
/// Suspends a running orchestration instance.
231+
/// </summary>
232+
/// <param name="instanceId">The ID of the orchestration instance to suspend.</param>
233+
/// <param name="reason">The reason for suspending the orchestration instance.</param>
234+
/// <returns>A task that completes when the suspend message is enqueued if necessary.</returns>
235+
Task SuspendAsync(string instanceId, string reason);
236+
237+
/// <summary>
238+
/// Resumes a suspended orchestration instance.
239+
/// </summary>
240+
/// <param name="instanceId">The ID of the orchestration instance to resume.</param>
241+
/// <param name="reason">The reason for resuming the orchestration instance.</param>
242+
/// <returns>A task that completes when the resume message is enqueued if necessary.</returns>
243+
Task ResumeAsync(string instanceId, string reason);
244+
229245
/// <summary>
230246
/// Rewinds the specified failed orchestration instance with a reason.
231247
/// </summary>

src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Threading;
77
using System.Threading.Tasks;
88
using DurableTask.Core;
9+
using DurableTask.Core.History;
910
using DurableTask.Core.Query;
1011
using Newtonsoft.Json;
1112
using Newtonsoft.Json.Linq;
@@ -409,6 +410,40 @@ public Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reaso
409410
return this.GetOrchestrationServiceClient().ForceTerminateTaskOrchestrationAsync(instanceId, reason);
410411
}
411412

413+
/// <summary>
414+
/// Suspend the specified orchestration instance with a reason.
415+
/// </summary>
416+
/// <param name="instanceId">Instance to suspend.</param>
417+
/// <param name="reason">Reason for suspending the instance.</param>
418+
/// <returns>A task that completes when the suspend message is enqueued.</returns>
419+
public Task SuspendTaskOrchestrationAsync(string instanceId, string reason)
420+
{
421+
var taskMessage = new TaskMessage
422+
{
423+
OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
424+
Event = new ExecutionSuspendedEvent(-1, reason),
425+
};
426+
427+
return this.GetOrchestrationServiceClient().SendTaskOrchestrationMessageAsync(taskMessage);
428+
}
429+
430+
/// <summary>
431+
/// Resume the specified orchestration instance with a reason.
432+
/// </summary>
433+
/// <param name="instanceId">Instance to resume.</param>
434+
/// <param name="reason">Reason for resuming the instance.</param>
435+
/// <returns>A task that completes when the resume message is enqueued.</returns>
436+
public Task ResumeTaskOrchestrationAsync(string instanceId, string reason)
437+
{
438+
var taskMessage = new TaskMessage
439+
{
440+
OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
441+
Event = new ExecutionResumedEvent(-1, reason),
442+
};
443+
444+
return this.GetOrchestrationServiceClient().SendTaskOrchestrationMessageAsync(taskMessage);
445+
}
446+
412447
/// <inheritdoc />
413448
public Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions)
414449
{

src/WebJobs.Extensions.DurableTask/EndToEndTraceHelper.cs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,56 @@ public void FunctionTerminated(
284284
LocalSlotName, ExtensionVersion, this.sequenceNumber++);
285285
}
286286

287+
public void SuspendingOrchestration(
288+
string hubName,
289+
string functionName,
290+
string instanceId,
291+
string reason)
292+
{
293+
FunctionType functionType = FunctionType.Orchestrator;
294+
295+
EtwEventSource.Instance.SuspendingOrchestration(
296+
hubName,
297+
LocalAppName,
298+
LocalSlotName,
299+
functionName,
300+
instanceId,
301+
reason,
302+
functionType.ToString(),
303+
ExtensionVersion,
304+
IsReplay: false);
305+
306+
this.logger.LogInformation(
307+
"{instanceId}: Suspending function '{functionName} ({functionType})'. Reason: {reason}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.",
308+
instanceId, functionName, functionType, reason, FunctionState.Suspended, hubName, LocalAppName,
309+
LocalSlotName, ExtensionVersion, this.sequenceNumber++);
310+
}
311+
312+
public void ResumingOrchestration(
313+
string hubName,
314+
string functionName,
315+
string instanceId,
316+
string reason)
317+
{
318+
FunctionType functionType = FunctionType.Orchestrator;
319+
320+
EtwEventSource.Instance.ResumingOrchestration(
321+
hubName,
322+
LocalAppName,
323+
LocalSlotName,
324+
functionName,
325+
instanceId,
326+
reason,
327+
functionType.ToString(),
328+
ExtensionVersion,
329+
IsReplay: false);
330+
331+
this.logger.LogInformation(
332+
"{instanceId}: Resuming function '{functionName} ({functionType})'. Reason: {reason}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.",
333+
instanceId, functionName, functionType, reason, FunctionState.Scheduled, hubName, LocalAppName,
334+
LocalSlotName, ExtensionVersion, this.sequenceNumber++);
335+
}
336+
287337
public void FunctionRewound(
288338
string hubName,
289339
string functionName,

src/WebJobs.Extensions.DurableTask/EtwEventSource.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,36 @@ public void TokenRenewalFailed(
484484
this.WriteEvent(230, TaskHub, AppName, SlotName, Resource, Attempt, DelayMs, Details, ExtensionVersion);
485485
}
486486

487+
[Event(231, Level = EventLevel.Informational, Version = 1)]
488+
public void SuspendingOrchestration(
489+
string TaskHub,
490+
string AppName,
491+
string SlotName,
492+
string FunctionName,
493+
string InstanceId,
494+
string Reason,
495+
string FunctionType,
496+
string ExtensionVersion,
497+
bool IsReplay)
498+
{
499+
this.WriteEvent(231, TaskHub, AppName, SlotName, FunctionName, InstanceId, Reason, FunctionType, ExtensionVersion, IsReplay);
500+
}
501+
502+
[Event(232, Level = EventLevel.Informational, Version = 1)]
503+
public void ResumingOrchestration(
504+
string TaskHub,
505+
string AppName,
506+
string SlotName,
507+
string FunctionName,
508+
string InstanceId,
509+
string Reason,
510+
string FunctionType,
511+
string ExtensionVersion,
512+
bool IsReplay)
513+
{
514+
this.WriteEvent(232, TaskHub, AppName, SlotName, FunctionName, InstanceId, Reason, FunctionType, ExtensionVersion, IsReplay);
515+
}
516+
487517
#pragma warning restore SA1313 // Parameter names should begin with lower-case letter
488518
}
489519
}

src/WebJobs.Extensions.DurableTask/FunctionState.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@ internal enum FunctionState
2020
Rewound,
2121
EntityStateCreated,
2222
EntityStateDeleted,
23+
Suspended,
2324
}
2425
}

0 commit comments

Comments
 (0)