diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index c21d6d1b7..482cb7683 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -793,7 +793,12 @@ async Task LockNextTaskOrchestrationWorkItemAsync(boo TraceContext = currentRequestTraceContext, }; - string warningMessage = await this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances); + string warningMessage = await this.IsExecutableInstanceAsync( + session.RuntimeState, + orchestrationWorkItem.NewMessages, + settings.AllowReplayingTerminalInstances, + session.TrackingStoreContext, + cancellationToken); if (!string.IsNullOrEmpty(warningMessage)) { // If all messages belong to the same execution ID, then all of them need to be discarded. @@ -1050,7 +1055,12 @@ internal static void TraceMessageReceived(AzureStorageOrchestrationServiceSettin data.Episode.GetValueOrDefault(-1)); } - async Task IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList newMessages, bool allowReplayingTerminalInstances) + async Task IsExecutableInstanceAsync( + OrchestrationRuntimeState runtimeState, + IList newMessages, + bool allowReplayingTerminalInstances, + object trackingStoreContext, + CancellationToken cancellationToken) { if (runtimeState.ExecutionStartedEvent == null && !newMessages.Any(msg => msg.Event is ExecutionStartedEvent)) { @@ -1085,12 +1095,25 @@ await this.trackingStore.UpdateStatusForTerminationAsync( } if (runtimeState.ExecutionStartedEvent != null && - !allowReplayingTerminalInstances && runtimeState.OrchestrationStatus != OrchestrationStatus.Running && runtimeState.OrchestrationStatus != OrchestrationStatus.Pending && runtimeState.OrchestrationStatus != OrchestrationStatus.Suspended) { - return $"Instance is {runtimeState.OrchestrationStatus}"; + InstanceStatus instanceStatus = await this.trackingStore.FetchInstanceStatusAsync(runtimeState.OrchestrationInstance.InstanceId); + if (instanceStatus.State.OrchestrationInstance.ExecutionId == runtimeState.OrchestrationInstance.ExecutionId + && instanceStatus.State.OrchestrationStatus != runtimeState.OrchestrationStatus) + { + await this.trackingStore.UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync( + runtimeState.OrchestrationInstance.InstanceId, + runtimeState.OrchestrationInstance.ExecutionId, + runtimeState, + trackingStoreContext, + cancellationToken); + } + if (!allowReplayingTerminalInstances) + { + return $"Instance is {runtimeState.OrchestrationStatus}"; + } } return null; diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index cb5c2bfb5..2f2dca83b 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -1048,6 +1048,57 @@ public override Task StartAsync(CancellationToken cancellationToken = default) return eTagValue; } + public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync( + string instanceId, + string executionId, + OrchestrationRuntimeState runtimeState, + object trackingStoreContext, + CancellationToken cancellationToken = default) + { + if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed && + runtimeState.OrchestrationStatus != OrchestrationStatus.Canceled && + runtimeState.OrchestrationStatus != OrchestrationStatus.Failed && + runtimeState.OrchestrationStatus != OrchestrationStatus.Terminated) + { + return; + } + + TrackingStoreContext context = (TrackingStoreContext)trackingStoreContext; + if (context.Blobs.Count > 0) + { + var tasks = new List(context.Blobs.Count); + foreach (string blobName in context.Blobs) + { + tasks.Add(this.messageManager.DeleteBlobAsync(blobName)); + } + await Task.WhenAll(tasks); + } + + string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId); + var instanceEntity = new TableEntity(sanitizedInstanceId, string.Empty) + { + // TODO: Translating null to "null" is a temporary workaround. We should prioritize + // https://github.com/Azure/durabletask/issues/477 so that this is no longer necessary. + ["CustomStatus"] = runtimeState.Status ?? "null", + ["ExecutionId"] = executionId, + ["LastUpdatedTime"] = runtimeState.Events.Last().Timestamp, + ["RuntimeStatus"] = runtimeState.OrchestrationStatus.ToString(), + ["CompletedTime"] = runtimeState.CompletedTime + }; + + Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew(); + await this.InstancesTable.InsertOrMergeEntityAsync(instanceEntity); + + this.settings.Logger.InstanceStatusUpdate( + this.storageAccountName, + this.taskHubName, + instanceId, + executionId, + runtimeState.OrchestrationStatus, + Utils.GetEpisodeNumber(runtimeState), + orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds); + } + static int GetEstimatedByteCount(TableEntity entity) { // Assume at least 1 KB of data per entity to account for static-length properties diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs index 19dccd35c..c0ce27831 100644 --- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs @@ -103,6 +103,19 @@ interface ITrackingStore /// Returns the instance status or null if none was found. Task FetchInstanceStatusAsync(string instanceId, CancellationToken cancellationToken = default); + /// + /// Updates the instance status of the specified orchestration instance to match that of for a completed orchestration. + /// Also deletes any orphaned blobs of . + /// This method is meant to be called in the case that there is an inconsistency between the instance and history table due to a failure during a call to + /// for a completing orchestration. If the orchestration is not in a terminal state, the method will immediately return and do nothing. + /// + /// The ID of the orchestration. + /// The execution ID of the orchestration. + /// The runtime state of the orchestration. + /// Additional context for the execution that is maintained by the tracking store. + /// The token to monitor for cancellation requests. The default value is . + Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default); + /// /// Get The Orchestration State for querying all orchestration instances /// diff --git a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs index 0548bfeeb..1e70a98ee 100644 --- a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs @@ -191,5 +191,31 @@ public override async Task UpdateStatusForTerminationAsync( instanceEntity.Single().State.Output = output; await this.instanceStore.WriteEntitiesAsync(instanceEntity); } + + public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync( + string instanceId, + string executionId, + OrchestrationRuntimeState runtimeState, + object trackingStoreContext, + CancellationToken cancellationToken = default) + { + if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed && + runtimeState.OrchestrationStatus != OrchestrationStatus.Canceled && + runtimeState.OrchestrationStatus != OrchestrationStatus.Failed && + runtimeState.OrchestrationStatus != OrchestrationStatus.Terminated) + { + return; + } + + // No blobs to delete for this tracking store implementation + await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[] + { + new OrchestrationStateInstanceEntity() + { + State = Core.Common.Utils.BuildOrchestrationState(runtimeState), + SequenceNumber = runtimeState.Events.Count + } + }); + } } } diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs index ee8e2012c..e6f55fc59 100644 --- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs +++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs @@ -108,5 +108,8 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId, CancellationTo /// public abstract Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, ETag? eTag, object executionData, CancellationToken cancellationToken = default); + + /// + public abstract Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default); } } diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index f4bf9e096..aa4aa7026 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -2401,6 +2401,7 @@ await Task.WhenAll( [DataRow(false, true, false)] [DataRow(false, false, true)] [DataRow(false, false, false)] + [Ignore("Skipping since this functionality has since changed, see TestWorkerFailingDuringCompleteWorkItemCall")] public async Task TestAllowReplayingTerminalInstances(bool enableExtendedSessions, bool sendTerminateEvent, bool allowReplayingTerminalInstances) { using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost( @@ -2475,6 +2476,79 @@ public async Task TestAllowReplayingTerminalInstances(bool enableExtendedSession } } + /// + /// Confirm that if a worker fails after committing the new history but before updating the instance state in a call to + /// for an orchestration that has + /// reached a terminal state, then storage is brought to consistent state by the call to + /// . + /// Since we cannot simulate a worker failure at this precise point, instead what is done by this test is that we + /// let an orchestration run to completion, and then manually change the instance table state back to "Running". + /// We then send an event to the orchestration, which triggers a call to lock the next task work item, at which point + /// the inconsistent state in storage for the terminal instance is recognized, the instance state is updated, and the work item discarded. + /// Note that this test does not confirm that orphaned blobs are deleted by the call to lock the next orchestration work item + /// in the case of a terminal orchestration with inconsistent state in storage. This is because there is no easy way to mock/inject + /// the tracking store context object that is part of the orchestration session state which keeps track of the blobs. + /// + /// + [DataTestMethod] + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task TestWorkerFailingDuringCompleteWorkItemCall(bool enableExtendedSessions, bool terminate) + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) + { + await host.StartAsync(); + + // Run simple orchestrator to completion, this will help us obtain a valid terminal history for the orchestrator + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), "hello!"); + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + + // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". + // This simulates the scenario where the History table was updated, but not the Instance table. + var instanceId = client.InstanceId; + AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings( + enableExtendedSessions); + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new TableEntity(instanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G") + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + // Assert that the status in the Instance table reads "Running" + IList state = await client.GetStateAsync(instanceId); + OrchestrationStatus forcedStatus = state.First().OrchestrationStatus; + Assert.AreEqual(OrchestrationStatus.Running, forcedStatus); + + // The type of event sent should not matter - the event itself should be discarded, and the instance table updated + // to reflect the status in the history table. + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + OrchestrationStatus expectedStatus = OrchestrationStatus.Completed; + Assert.AreEqual(expectedStatus, status.OrchestrationStatus); + + await host.StopAsync(); + } + } + [TestMethod] [DataRow(VersioningSettings.VersionMatchStrategy.Strict)] [DataRow(VersioningSettings.VersionMatchStrategy.CurrentOrOlder)]