diff --git a/CHANGELOG.md b/CHANGELOG.md index d803e73..3c39470 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## v1.4.1 (Unreleased) + +### Updates + +* Fix issue where scheduled orchestrations aren't immediately terminated ([#178](https://github.com/microsoft/durabletask-mssql/issues/178)) + ## v1.4.0 ### New diff --git a/src/DurableTask.SqlServer/DTUtils.cs b/src/DurableTask.SqlServer/DTUtils.cs index 3516fc4..739bf57 100644 --- a/src/DurableTask.SqlServer/DTUtils.cs +++ b/src/DurableTask.SqlServer/DTUtils.cs @@ -196,6 +196,7 @@ public static bool HasPayload(HistoryEvent e) return historyEvent.EventType switch { EventType.ExecutionStarted => ((ExecutionStartedEvent)historyEvent).Version, + EventType.SubOrchestrationInstanceCreated => ((SubOrchestrationInstanceCreatedEvent)historyEvent).Version, EventType.TaskScheduled => ((TaskScheduledEvent)historyEvent).Version, _ => null, }; diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index 51ea4e3..ccdbbe9 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -469,45 +469,70 @@ BEGIN DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() - DECLARE @existingStatus varchar(30) = ( - SELECT TOP 1 existing.[RuntimeStatus] - FROM Instances existing WITH (HOLDLOCK) - WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID - ) + DECLARE @existingStatus varchar(30) + DECLARE @existingLockExpiration datetime2(7) + + -- Get the status of an existing orchestration + SELECT TOP 1 + @existingStatus = existing.[RuntimeStatus], + @existingLockExpiration = existing.[LockExpiration] + FROM Instances existing WITH (HOLDLOCK) + WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID IF @existingStatus IS NULL BEGIN ROLLBACK TRANSACTION; THROW 50000, 'The instance does not exist.', 1; END - -- If the instance is already completed, no need to terminate it. - IF @existingStatus IN ('Pending', 'Running') + + DECLARE @now datetime2(7) = SYSUTCDATETIME() + + IF @existingStatus IN ('Running', 'Pending') BEGIN - IF NOT EXISTS ( - SELECT TOP (1) 1 FROM NewEvents - WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID AND [EventType] = 'ExecutionTerminated' - ) + -- Create a payload to store the reason, if any + DECLARE @PayloadID uniqueidentifier = NULL + IF @Reason IS NOT NULL BEGIN - -- Payloads are stored separately from the events - DECLARE @PayloadID uniqueidentifier = NULL - IF @Reason IS NOT NULL + -- Note that we don't use the Reason column for the Reason with terminate events + SET @PayloadID = NEWID() + INSERT INTO Payloads ([TaskHub], [InstanceID], [PayloadID], [Text]) + VALUES (@TaskHub, @InstanceID, @PayloadID, @Reason) + END + + -- Check the status of the orchestration to determine which termination path to take + IF @existingStatus = 'Pending' AND (@existingLockExpiration IS NULL OR @existingLockExpiration <= @now) + BEGIN + -- The orchestration hasn't started yet - transition it directly to the Terminated state and delete + -- any pending messages + UPDATE Instances SET + [RuntimeStatus] = 'Terminated', + [LastUpdatedTime] = @now, + [CompletedTime] = @now, + [OutputPayloadID] = @PayloadID, + [LockExpiration] = NULL -- release the lock, if any + WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID + + DELETE FROM NewEvents WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID + END + ELSE + BEGIN + -- The orchestration has actually started running in this case + IF NOT EXISTS ( + SELECT TOP (1) 1 FROM NewEvents + WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID AND [EventType] = 'ExecutionTerminated' + ) BEGIN - -- Note that we don't use the Reason column for the Reason with terminate events - SET @PayloadID = NEWID() - INSERT INTO Payloads ([TaskHub], [InstanceID], [PayloadID], [Text]) - VALUES (@TaskHub, @InstanceID, @PayloadID, @Reason) + INSERT INTO NewEvents ( + [TaskHub], + [InstanceID], + [EventType], + [PayloadID] + ) VALUES ( + @TaskHub, + @InstanceID, + 'ExecutionTerminated', + @PayloadID) END - - INSERT INTO NewEvents ( - [TaskHub], - [InstanceID], - [EventType], - [PayloadID] - ) VALUES ( - @TaskHub, - @InstanceID, - 'ExecutionTerminated', - @PayloadID) END END @@ -1444,7 +1469,7 @@ BEGIN -- Instance IDs can be overwritten only if the orchestration is in a terminal state IF @existingStatus NOT IN ('Failed') BEGIN - DECLARE @msg nvarchar(4000) = FORMATMESSAGE('Cannot rewing instance with ID ''%s'' because it is not in a ''Failed'' state, but in ''%s'' state.', @InstanceID, @existingStatus); + DECLARE @msg nvarchar(4000) = FORMATMESSAGE('Cannot rewind instance with ID ''%s'' because it is not in a ''Failed'' state, but in ''%s'' state.', @InstanceID, @existingStatus); THROW 50001, @msg, 1; END diff --git a/src/DurableTask.SqlServer/SqlOrchestrationService.cs b/src/DurableTask.SqlServer/SqlOrchestrationService.cs index 47f56a2..3dfe161 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationService.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationService.cs @@ -335,8 +335,14 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync( IList orchestratorMessages, IList timerMessages, TaskMessage continuedAsNewMessage, - OrchestrationState orchestrationState) + OrchestrationState? orchestrationState) { + if (orchestrationState is null || !newRuntimeState.IsValid) + { + // The work item was invalid. We can't do anything with it so we ignore it. + return; + } + ExtendedOrchestrationWorkItem currentWorkItem = (ExtendedOrchestrationWorkItem)workItem; this.traceHelper.CheckpointStarting(orchestrationState); diff --git a/src/common.props b/src/common.props index 6ee3a62..8adc7e2 100644 --- a/src/common.props +++ b/src/common.props @@ -17,7 +17,7 @@ 1 4 - 0 + 1 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).$(MinorVersion).0.0 diff --git a/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs b/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs index 40e1692..60b0f82 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs @@ -504,7 +504,7 @@ async Task ValidateDatabaseSchemaAsync(TestDatabase database, string schemaName schemaName); Assert.Equal(1, currentSchemaVersion.Major); Assert.Equal(4, currentSchemaVersion.Minor); - Assert.Equal(0, currentSchemaVersion.Patch); + Assert.Equal(1, currentSchemaVersion.Patch); } sealed class TestDatabase : IDisposable diff --git a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs index 0c9713e..3a8367b 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs @@ -862,5 +862,29 @@ await Assert.ThrowsAnyAsync( // Now the orchestration should complete immediately await instance.WaitForCompletion(timeout: TimeSpan.FromSeconds(3), expectedOutput: EventCount); } + + [Fact] + public async Task TerminateScheduledOrchestration() + { + string orchestrationName = "ScheduledOrchestration"; + + // Does nothing except return the original input + TestInstance instance = await this.testService.RunOrchestration( + input: (object)null, + orchestrationName, + version: null, + instanceId: null, + scheduledStartTime: DateTime.UtcNow.AddSeconds(30), + implementation: (ctx, input) => Task.FromResult("done")); + + // Terminate the orchestration before it starts + await instance.TerminateAsync("Bye!"); + + await instance.WaitForCompletion( + expectedStatus: OrchestrationStatus.Terminated, + expectedOutput: "Bye!"); + + LogAssert.NoWarningsOrErrors(this.testService.LogProvider); + } } } diff --git a/test/DurableTask.SqlServer.Tests/Utils/TestService.cs b/test/DurableTask.SqlServer.Tests/Utils/TestService.cs index a9a9664..6545e6a 100644 --- a/test/DurableTask.SqlServer.Tests/Utils/TestService.cs +++ b/test/DurableTask.SqlServer.Tests/Utils/TestService.cs @@ -132,11 +132,32 @@ public Task> RunOrchestration( activities); } + public Task> RunOrchestration( + TInput input, + string orchestrationName, + string version, + string instanceId, + Func> implementation, + Action onEvent = null, + params (string name, TaskActivity activity)[] activities) + { + return this.RunOrchestration( + input, + orchestrationName, + version, + instanceId, + scheduledStartTime: null, + implementation, + onEvent, + activities); + } + public async Task> RunOrchestration( TInput input, string orchestrationName, string version, string instanceId, + DateTime? scheduledStartTime, Func> implementation, Action onEvent = null, params (string name, TaskActivity activity)[] activities) @@ -147,6 +168,7 @@ public async Task> RunOrchestration( inputGenerator: i => input, orchestrationName: orchestrationName, version: version, + scheduledStartTime: scheduledStartTime, implementation, onEvent, activities); @@ -154,12 +176,35 @@ public async Task> RunOrchestration( return instances[0]; } + public Task>> RunOrchestrations( + int count, + Func instanceIdGenerator, + Func inputGenerator, + string orchestrationName, + string version, + Func> implementation, + Action onEvent = null, + params (string name, TaskActivity activity)[] activities) + { + return this.RunOrchestrations( + count, + instanceIdGenerator, + inputGenerator, + orchestrationName, + version, + scheduledStartTime: null, + implementation, + onEvent, + activities); + } + public async Task>> RunOrchestrations( int count, Func instanceIdGenerator, Func inputGenerator, string orchestrationName, string version, + DateTime? scheduledStartTime, Func> implementation, Action onEvent = null, params (string name, TaskActivity activity)[] activities) @@ -178,11 +223,25 @@ public async Task>> RunOrchestrations( this.client,