diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 9fdee5f8f..c30a79288 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -793,7 +793,8 @@ async Task LockNextTaskOrchestrationWorkItemAsync(boo TraceContext = currentRequestTraceContext, }; - if (!this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances, out string warningMessage)) + string warningMessage = await this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances); + if (!string.IsNullOrEmpty(warningMessage)) { // If all messages belong to the same execution ID, then all of them need to be discarded. // However, it's also possible to have messages for *any* execution ID batched together with messages @@ -1049,7 +1050,7 @@ internal static void TraceMessageReceived(AzureStorageOrchestrationServiceSettin data.Episode.GetValueOrDefault(-1)); } - bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList newMessages, bool allowReplayingTerminalInstances, out string message) + async Task IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList newMessages, bool allowReplayingTerminalInstances) { if (runtimeState.ExecutionStartedEvent == null && !newMessages.Any(msg => msg.Event is ExecutionStartedEvent)) { @@ -1057,11 +1058,17 @@ bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList msg.Event is ExecutionTerminatedEvent); + if (executionTerminatedEventMessage is not null) + { + await this.trackingStore.UpdateStatusForTerminationAsync(instanceId, ((ExecutionTerminatedEvent)executionTerminatedEventMessage.Event).Input); + return $"Instance is {OrchestrationStatus.Terminated}"; + } + // A non-zero event count usually happens when an instance's history is overwritten by a // new instance or by a ContinueAsNew. When history is overwritten by new instances, we // overwrite the old history with new history (with a new execution ID), but this is done @@ -1069,8 +1076,7 @@ bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList> GetOrchestrationStateAsync(string i /// /// Instance ID of the orchestration to terminate. /// The user-friendly reason for terminating. - public Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason) + public async Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason) { var taskMessage = new TaskMessage { @@ -1917,7 +1921,7 @@ public Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reaso Event = new ExecutionTerminatedEvent(-1, reason) }; - return SendTaskOrchestrationMessageAsync(taskMessage); + await SendTaskOrchestrationMessageAsync(taskMessage); } /// diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 17824a511..d48ef21bf 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -795,6 +795,30 @@ public override async Task UpdateStatusForRewindAsync(string instanceId, Cancell stopwatch.ElapsedMilliseconds); } + /// + public override async Task UpdateStatusForTerminationAsync(string instanceId, string output, CancellationToken cancellationToken = default) + { + string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId); + TableEntity entity = new TableEntity(sanitizedInstanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Terminated.ToString("G"), + ["LastUpdatedTime"] = DateTime.UtcNow, + [OutputProperty] = output + }; + + Stopwatch stopwatch = Stopwatch.StartNew(); + await this.InstancesTable.MergeEntityAsync(entity, ETag.All, cancellationToken); + + this.settings.Logger.InstanceStatusUpdate( + this.storageAccountName, + this.taskHubName, + instanceId, + string.Empty, + OrchestrationStatus.Terminated, + episode: 0, + stopwatch.ElapsedMilliseconds); + } + /// public override Task StartAsync(CancellationToken cancellationToken = default) diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs index db6338bfb..3ce82cf37 100644 --- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs @@ -152,6 +152,14 @@ interface ITrackingStore /// The token to monitor for cancellation requests. The default value is . Task UpdateStatusForRewindAsync(string instanceId, CancellationToken cancellationToken = default); + /// + /// Used to update the instance status to "Terminated" whend a pending orchestration is terminated. + /// + /// The instance being terminated + /// The output of the orchestration + /// The token to monitor for cancellation requests. The default value is . + Task UpdateStatusForTerminationAsync(string instanceId, string output, CancellationToken cancellationToken = default); + /// /// Purge The History and state which is older than thresholdDateTimeUtc based on the timestamp type specified by timeRangeFilterType /// diff --git a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs index 4b816aacd..53c133e7b 100644 --- a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs @@ -176,5 +176,15 @@ await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[] return null; } + + public override async Task UpdateStatusForTerminationAsync(string instanceId, string output, CancellationToken cancellationToken = default) + { + // Get the most recent execution and update its status to terminated + IEnumerable instanceEntity = await this.instanceStore.GetOrchestrationStateAsync(instanceId, allInstances: false); + instanceEntity.Single().State.OrchestrationStatus = OrchestrationStatus.Terminated; + instanceEntity.Single().State.LastUpdatedTime = DateTime.UtcNow; + instanceEntity.Single().State.Output = output; + await this.instanceStore.WriteEntitiesAsync(instanceEntity); + } } } diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs index 95465461f..4317e42f0 100644 --- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs +++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs @@ -100,6 +100,9 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId, CancellationTo throw new NotSupportedException(); } + /// + public abstract Task UpdateStatusForTerminationAsync(string instanceId, string output, CancellationToken cancellationToken = default); + /// public abstract Task StartAsync(CancellationToken cancellationToken = default); diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index aee05e945..f4bf9e096 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -965,6 +965,40 @@ public async Task TerminateSuspendedOrchestration(bool enableExtendedSessions) } } + /// + /// Test that a pending orchestration can be terminated. + /// + [DataTestMethod] + [DataRow(true)] + [DataRow(false)] + public async Task TerminatePendingOrchestration(bool enableExtendedSessions) + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) + { + await host.StartAsync(); + // Schedule a start time to ensure that the orchestration is in a Pending state when we attempt to terminate. + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Counter), 0, startAt: DateTime.UtcNow.AddMinutes(1)); + await client.WaitForStatusChange(TimeSpan.FromSeconds(5), OrchestrationStatus.Pending); + + await client.TerminateAsync("terminate"); + + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + + // Confirm the pending orchestration was terminated. + Assert.AreEqual(OrchestrationStatus.Terminated, status?.OrchestrationStatus); + Assert.AreEqual("terminate", status?.Output); + + // Now sleep for a minute and confirm that the orchestration does not start after its scheduled time. + Thread.Sleep(TimeSpan.FromMinutes(1)); + + status = await client.GetStatusAsync(); + Assert.AreEqual(OrchestrationStatus.Terminated, status?.OrchestrationStatus); + Assert.AreEqual("terminate", status?.Output); + + await host.StopAsync(); + } + } + /// /// End-to-end test which validates the Rewind functionality on more than one orchestration. ///