Skip to content

Commit b93b187

Browse files
sophiatevSophia Tevosyan
andauthored
Fix Terminating Pending Orchestrations (#1256)
* first commit * better implementation --------- Co-authored-by: Sophia Tevosyan <[email protected]>
1 parent 29321fb commit b93b187

File tree

6 files changed

+95
-12
lines changed

6 files changed

+95
-12
lines changed

src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -793,7 +793,8 @@ async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(boo
793793
TraceContext = currentRequestTraceContext,
794794
};
795795

796-
if (!this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances, out string warningMessage))
796+
string warningMessage = await this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances);
797+
if (!string.IsNullOrEmpty(warningMessage))
797798
{
798799
// If all messages belong to the same execution ID, then all of them need to be discarded.
799800
// However, it's also possible to have messages for *any* execution ID batched together with messages
@@ -1049,28 +1050,33 @@ internal static void TraceMessageReceived(AzureStorageOrchestrationServiceSettin
10491050
data.Episode.GetValueOrDefault(-1));
10501051
}
10511052

1052-
bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList<TaskMessage> newMessages, bool allowReplayingTerminalInstances, out string message)
1053+
async Task<string> IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList<TaskMessage> newMessages, bool allowReplayingTerminalInstances)
10531054
{
10541055
if (runtimeState.ExecutionStartedEvent == null && !newMessages.Any(msg => msg.Event is ExecutionStartedEvent))
10551056
{
10561057
var instanceId = newMessages[0].OrchestrationInstance.InstanceId;
10571058

10581059
if (DurableTask.Core.Common.Entities.AutoStart(instanceId, newMessages))
10591060
{
1060-
message = null;
1061-
return true;
1061+
return null;
10621062
}
10631063
else
10641064
{
1065+
TaskMessage executionTerminatedEventMessage = newMessages.LastOrDefault(msg => msg.Event is ExecutionTerminatedEvent);
1066+
if (executionTerminatedEventMessage is not null)
1067+
{
1068+
await this.trackingStore.UpdateStatusForTerminationAsync(instanceId, ((ExecutionTerminatedEvent)executionTerminatedEventMessage.Event).Input);
1069+
return $"Instance is {OrchestrationStatus.Terminated}";
1070+
}
1071+
10651072
// A non-zero event count usually happens when an instance's history is overwritten by a
10661073
// new instance or by a ContinueAsNew. When history is overwritten by new instances, we
10671074
// overwrite the old history with new history (with a new execution ID), but this is done
10681075
// gradually as we build up the new history over time. If we haven't yet overwritten *all*
10691076
// the old history and we receive a message from the old instance (this happens frequently
10701077
// with canceled durable timer messages) we'll end up loading just the history that hasn't
10711078
// been fully overwritten. We know it's invalid because it's missing the ExecutionStartedEvent.
1072-
message = runtimeState.Events.Count == 0 ? "No such instance" : "Invalid history (may have been overwritten by a newer instance)";
1073-
return false;
1079+
return runtimeState.Events.Count == 0 ? "No such instance" : "Invalid history (may have been overwritten by a newer instance)";
10741080
}
10751081
}
10761082

@@ -1080,12 +1086,10 @@ bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList<TaskMess
10801086
runtimeState.OrchestrationStatus != OrchestrationStatus.Pending &&
10811087
runtimeState.OrchestrationStatus != OrchestrationStatus.Suspended)
10821088
{
1083-
message = $"Instance is {runtimeState.OrchestrationStatus}";
1084-
return false;
1089+
return $"Instance is {runtimeState.OrchestrationStatus}";
10851090
}
10861091

1087-
message = null;
1088-
return true;
1092+
return null;
10891093
}
10901094

10911095
async Task AbandonAndReleaseSessionAsync(OrchestrationSession session)
@@ -1909,15 +1913,15 @@ public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string i
19091913
/// </summary>
19101914
/// <param name="instanceId">Instance ID of the orchestration to terminate.</param>
19111915
/// <param name="reason">The user-friendly reason for terminating.</param>
1912-
public Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason)
1916+
public async Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason)
19131917
{
19141918
var taskMessage = new TaskMessage
19151919
{
19161920
OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
19171921
Event = new ExecutionTerminatedEvent(-1, reason)
19181922
};
19191923

1920-
return SendTaskOrchestrationMessageAsync(taskMessage);
1924+
await SendTaskOrchestrationMessageAsync(taskMessage);
19211925
}
19221926

19231927
/// <summary>

src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,30 @@ public override async Task UpdateStatusForRewindAsync(string instanceId, Cancell
795795
stopwatch.ElapsedMilliseconds);
796796
}
797797

798+
/// <inheritdoc />
799+
public override async Task UpdateStatusForTerminationAsync(string instanceId, string output, CancellationToken cancellationToken = default)
800+
{
801+
string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId);
802+
TableEntity entity = new TableEntity(sanitizedInstanceId, "")
803+
{
804+
["RuntimeStatus"] = OrchestrationStatus.Terminated.ToString("G"),
805+
["LastUpdatedTime"] = DateTime.UtcNow,
806+
[OutputProperty] = output
807+
};
808+
809+
Stopwatch stopwatch = Stopwatch.StartNew();
810+
await this.InstancesTable.MergeEntityAsync(entity, ETag.All, cancellationToken);
811+
812+
this.settings.Logger.InstanceStatusUpdate(
813+
this.storageAccountName,
814+
this.taskHubName,
815+
instanceId,
816+
string.Empty,
817+
OrchestrationStatus.Terminated,
818+
episode: 0,
819+
stopwatch.ElapsedMilliseconds);
820+
}
821+
798822

799823
/// <inheritdoc />
800824
public override Task StartAsync(CancellationToken cancellationToken = default)

src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,14 @@ interface ITrackingStore
152152
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
153153
Task UpdateStatusForRewindAsync(string instanceId, CancellationToken cancellationToken = default);
154154

155+
/// <summary>
156+
/// Used to update the instance status to "Terminated" whend a pending orchestration is terminated.
157+
/// </summary>
158+
/// <param name="instanceId">The instance being terminated</param>
159+
/// <param name="output">The output of the orchestration</param>
160+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
161+
Task UpdateStatusForTerminationAsync(string instanceId, string output, CancellationToken cancellationToken = default);
162+
155163
/// <summary>
156164
/// Purge The History and state which is older than thresholdDateTimeUtc based on the timestamp type specified by timeRangeFilterType
157165
/// </summary>

src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,5 +176,15 @@ await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[]
176176

177177
return null;
178178
}
179+
180+
public override async Task UpdateStatusForTerminationAsync(string instanceId, string output, CancellationToken cancellationToken = default)
181+
{
182+
// Get the most recent execution and update its status to terminated
183+
IEnumerable<OrchestrationStateInstanceEntity> instanceEntity = await this.instanceStore.GetOrchestrationStateAsync(instanceId, allInstances: false);
184+
instanceEntity.Single().State.OrchestrationStatus = OrchestrationStatus.Terminated;
185+
instanceEntity.Single().State.LastUpdatedTime = DateTime.UtcNow;
186+
instanceEntity.Single().State.Output = output;
187+
await this.instanceStore.WriteEntitiesAsync(instanceEntity);
188+
}
179189
}
180190
}

src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId, CancellationTo
100100
throw new NotSupportedException();
101101
}
102102

103+
/// <inheritdoc />
104+
public abstract Task UpdateStatusForTerminationAsync(string instanceId, string output, CancellationToken cancellationToken = default);
105+
103106
/// <inheritdoc />
104107
public abstract Task StartAsync(CancellationToken cancellationToken = default);
105108

test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -965,6 +965,40 @@ public async Task TerminateSuspendedOrchestration(bool enableExtendedSessions)
965965
}
966966
}
967967

968+
/// <summary>
969+
/// Test that a pending orchestration can be terminated.
970+
/// </summary>
971+
[DataTestMethod]
972+
[DataRow(true)]
973+
[DataRow(false)]
974+
public async Task TerminatePendingOrchestration(bool enableExtendedSessions)
975+
{
976+
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions))
977+
{
978+
await host.StartAsync();
979+
// Schedule a start time to ensure that the orchestration is in a Pending state when we attempt to terminate.
980+
var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Counter), 0, startAt: DateTime.UtcNow.AddMinutes(1));
981+
await client.WaitForStatusChange(TimeSpan.FromSeconds(5), OrchestrationStatus.Pending);
982+
983+
await client.TerminateAsync("terminate");
984+
985+
var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10));
986+
987+
// Confirm the pending orchestration was terminated.
988+
Assert.AreEqual(OrchestrationStatus.Terminated, status?.OrchestrationStatus);
989+
Assert.AreEqual("terminate", status?.Output);
990+
991+
// Now sleep for a minute and confirm that the orchestration does not start after its scheduled time.
992+
Thread.Sleep(TimeSpan.FromMinutes(1));
993+
994+
status = await client.GetStatusAsync();
995+
Assert.AreEqual(OrchestrationStatus.Terminated, status?.OrchestrationStatus);
996+
Assert.AreEqual("terminate", status?.Output);
997+
998+
await host.StopAsync();
999+
}
1000+
}
1001+
9681002
/// <summary>
9691003
/// End-to-end test which validates the Rewind functionality on more than one orchestration.
9701004
/// </summary>

0 commit comments

Comments
 (0)