From 0a4f5ec34cd929aa9f792db74ecdb49e21d0fd8f Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 20 Oct 2025 22:37:56 -0700 Subject: [PATCH 1/7] first commit --- .../AzureStorageOrchestrationService.cs | 41 ++++-- .../Tracking/AzureTableTrackingStore.cs | 107 ++++++++++++++++ .../Tracking/ITrackingStore.cs | 13 ++ .../InstanceStoreBackedTrackingStore.cs | 18 +++ .../Tracking/TrackingStoreBase.cs | 3 + .../AzureStorageScaleTests.cs | 120 ++++++++++++++++++ 6 files changed, 291 insertions(+), 11 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 9fdee5f8f..d7c8d797e 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -793,7 +793,13 @@ async Task LockNextTaskOrchestrationWorkItemAsync(boo TraceContext = currentRequestTraceContext, }; - if (!this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances, out string warningMessage)) + string warningMessage = await this.IsExecutableInstanceAsync( + session.RuntimeState, + orchestrationWorkItem.NewMessages, + settings.AllowReplayingTerminalInstances, + session.TrackingStoreContext, + cancellationToken); + if (!string.IsNullOrEmpty(warningMessage)) { // If all messages belong to the same execution ID, then all of them need to be discarded. // However, it's also possible to have messages for *any* execution ID batched together with messages @@ -1049,7 +1055,12 @@ internal static void TraceMessageReceived(AzureStorageOrchestrationServiceSettin data.Episode.GetValueOrDefault(-1)); } - bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList newMessages, bool allowReplayingTerminalInstances, out string message) + async Task IsExecutableInstanceAsync( + OrchestrationRuntimeState runtimeState, + IList newMessages, + bool allowReplayingTerminalInstances, + object trackingStoreContext, + CancellationToken cancellationToken) { if (runtimeState.ExecutionStartedEvent == null && !newMessages.Any(msg => msg.Event is ExecutionStartedEvent)) { @@ -1057,8 +1068,7 @@ bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList 0) + { + var tasks = new List(context.Blobs.Count); + foreach (var blobName in context.Blobs) + { + tasks.Add(this.messageManager.DeleteBlobAsync(blobName)); + } + await Task.WhenAll(tasks); + } + } + + /// + /// This method is meant to be used only for testing. It updates the history state in storage based on the new events in + /// , mimicking what does in a "light-weight" fashion + /// (for example, it does not handle large message compression and uploading to blob storage, etc.) + /// + /// The instance ID of the orchestration + /// The execution ID of the orchestration + /// The new runtime state of the orchestration with the new history events. + /// + internal async Task UpdateHistoryAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState) + { + int estimatedBytes = 0; + IList newEvents = runtimeState.NewEvents; + IList allEvents = runtimeState.Events; + + int episodeNumber = Utils.GetEpisodeNumber(runtimeState); + + var newEventListBuffer = new StringBuilder(4000); + var historyEventBatch = new List(); + + string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId); + + for (int i = 0; i < newEvents.Count; i++) + { + bool isFinalEvent = i == newEvents.Count - 1; + + HistoryEvent historyEvent = newEvents[i]; + // For backwards compatibility, we convert timer timestamps to UTC prior to persisting to Azure Storage + // see: https://github.com/Azure/durabletask/pull/1138 + Utils.ConvertDateTimeInHistoryEventsToUTC(historyEvent); + var historyEntity = TableEntityConverter.Serialize(historyEvent); + historyEntity.PartitionKey = sanitizedInstanceId; + + newEventListBuffer.Append(historyEvent.EventType.ToString()).Append(','); + + // The row key is the sequence number, which represents the chronological ordinal of the event. + long sequenceNumber = i + (allEvents.Count - newEvents.Count); + historyEntity.RowKey = sequenceNumber.ToString("X16"); + historyEntity["ExecutionId"] = executionId; + + // Replacement can happen if the orchestration episode gets replayed due to a commit failure in one of the steps below. + historyEventBatch.Add(new TableTransactionAction(TableTransactionActionType.UpsertReplace, historyEntity)); + + // Keep track of the byte count to ensure we don't hit the 4 MB per-batch maximum + estimatedBytes += GetEstimatedByteCount(historyEntity); + } + + if (historyEventBatch.Count > 0) + { + await this.UploadHistoryBatch( + instanceId, + sanitizedInstanceId, + executionId, + historyEventBatch, + newEventListBuffer, + allEvents.Count, + episodeNumber, + estimatedBytes, + null, + isFinalBatch: true, + cancellationToken: CancellationToken.None); + } + } + static int GetEstimatedByteCount(TableEntity entity) { // Assume at least 1 KB of data per entity to account for static-length properties diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs index db6338bfb..188422f20 100644 --- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs @@ -103,6 +103,19 @@ interface ITrackingStore /// Returns the instance status or null if none was found. Task FetchInstanceStatusAsync(string instanceId, CancellationToken cancellationToken = default); + /// + /// Updates the instance status of the specified orchestration instance to match that of . + /// Also deletes any orphaned blobs of . + /// This method is meant to be called in the case that there is an inconsistency between the instance and history table due to a + /// failure during a call to . + /// + /// The ID of the orchestration. + /// The execution ID of the orchestration. + /// The runtime state of the orchestration. + /// Additional context for the execution that is maintained by the tracking store. + /// The token to monitor for cancellation requests. The default value is . + Task UpdateInstanceStatusAndDeleteOrphanedBlobsAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default); + /// /// Get The Orchestration State for querying all orchestration instances /// diff --git a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs index 4b816aacd..d79cf273f 100644 --- a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs @@ -176,5 +176,23 @@ await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[] return null; } + + public override Task UpdateInstanceStatusAndDeleteOrphanedBlobsAsync( + string instanceId, + string executionId, + OrchestrationRuntimeState runtimeState, + object trackingStoreContext, + CancellationToken cancellationToken = default) + { + // No blobs to delete for this tracking store implementation + await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[] +{ + new OrchestrationStateInstanceEntity() + { + State = Core.Common.Utils.BuildOrchestrationState(runtimeState), + SequenceNumber = runtimeState.Events.Count + } + }); + } } } diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs index 95465461f..b99e9a555 100644 --- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs +++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs @@ -105,5 +105,8 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId, CancellationTo /// public abstract Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, ETag? eTag, object executionData, CancellationToken cancellationToken = default); + + /// + public abstract Task UpdateInstanceStatusAndDeleteOrphanedBlobsAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default); } } diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs index 64b9eb22b..b5255a7e1 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs @@ -842,6 +842,126 @@ public async Task MonitorIncreasingControlQueueLoadDisconnected() } } + /// + /// Confirm that if a worker fails after committing the new history but before updating the instance state in a call to + /// for an orchestration that has + /// reached a terminal state, then storage is brought to consistent state by the call to + /// . + /// Since we cannot simulate a worker failure at this precise point, instead what is done by this test is that the + /// history is updated by a call to , but the instance state + /// is not. Then we confirm that after a call to lock the next task work item, the inconsistent state in storage for + /// the terminal instance is recognized, the instance state is updated, and the work item discarded. + /// The test also confirms that this is *not* done for an orchestration in a non-terminal state, since the call to + /// complete the work item after finishing it will naturally bring both the instance and history tables to a consistent state. + /// Note that this test does not confirm that orphaned blobs are deleted by the call to lock the next orchestration work item + /// in the case of a terminal orchestration with inconsistent state in storage. This is because there is no easy way to mock/inject + /// the tracking store context object that is part of the orchestration session state which keeps track of the blobs. + /// + /// + [TestMethod] + public async Task WorkerFailingDuringCompleteWorkItemCallAsync() + { + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id", + ExecutionId = "execution_id", + }; + + ExecutionStartedEvent startedEvent = new(-1, string.Empty) + { + Name = "orchestration", + Version = string.Empty, + OrchestrationInstance = orchestrationInstance, + ScheduledStartTime = DateTime.UtcNow, + }; + + var settings = new AzureStorageOrchestrationServiceSettings + { + ControlQueueBufferThreshold = 100, + LeaseRenewInterval = TimeSpan.FromMilliseconds(500), + PartitionCount = 1, + StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), + TaskHubName = TestHelpers.GetTestTaskHubName(), + }; + this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe); + + var service = new AzureStorageOrchestrationService(settings); + await service.CreateAsync(); + await service.StartAsync(); + + // Create an orchestration, confirm that its status in the instance table is "pending" + await service.CreateTaskOrchestrationAsync( + new TaskMessage() + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }); + var trackingStore = service.TrackingStore as AzureTableTrackingStore; + var instanceStatus = await trackingStore.FetchInstanceStatusAsync(orchestrationInstance.InstanceId); + Assert.AreEqual(instanceStatus.State.OrchestrationStatus, OrchestrationStatus.Pending); + + // Now create a new terminal runtime state and update just the history to simulate the worker failing after updating the + // history table but before updating the instance table upon completing a work item + var runtimeState = new OrchestrationRuntimeState(); + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(startedEvent); + runtimeState.AddEvent(new ExecutionTerminatedEvent(-1, "terminated")); + runtimeState.AddEvent(new ExecutionCompletedEvent(0, "terminated", OrchestrationStatus.Terminated)); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + await trackingStore.UpdateHistoryAsync(orchestrationInstance.InstanceId, orchestrationInstance.ExecutionId, runtimeState); + + // Confirm that the call to lock the next work item recognizes and fixes the inconsistency while discarding the work item + var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromSeconds(30), + CancellationToken.None); + Assert.IsNull(workItem); + instanceStatus = await trackingStore.FetchInstanceStatusAsync(orchestrationInstance.InstanceId); + Assert.AreEqual(instanceStatus.State.OrchestrationStatus, OrchestrationStatus.Terminated); + + // Now create a new orchestration and confirm its status is pending + orchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id2", + ExecutionId = "execution_id2", + }; + startedEvent.OrchestrationInstance = orchestrationInstance; + await service.CreateTaskOrchestrationAsync( + new TaskMessage() + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }); + instanceStatus = await trackingStore.FetchInstanceStatusAsync(orchestrationInstance.InstanceId); + Assert.AreEqual(instanceStatus.State.OrchestrationStatus, OrchestrationStatus.Pending); + + // Now create a new non-terminal runtime state and confirm and update just the history the history to simulate the worker failing after updating the + // history table but before updating the instance table upon completing a work item + runtimeState = new OrchestrationRuntimeState(); + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(startedEvent); + runtimeState.AddEvent(new TaskScheduledEvent(0, "task")); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + await trackingStore.UpdateHistoryAsync(orchestrationInstance.InstanceId, orchestrationInstance.ExecutionId, runtimeState); + + // Confirm that the call to lock the next work item does *not* update the instance table, since it will be updated by the call + // to complete the work item + workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromSeconds(30), + CancellationToken.None); + Assert.IsNotNull(workItem); + instanceStatus = await trackingStore.FetchInstanceStatusAsync(orchestrationInstance.InstanceId); + Assert.AreEqual(instanceStatus.State.OrchestrationStatus, OrchestrationStatus.Pending); + + // Confirm that the call to complete the work item does update the instance table to the correct state + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(new TaskCompletedEvent(-1, 0, string.Empty)); + runtimeState.AddEvent(new ExecutionCompletedEvent(1, string.Empty, OrchestrationStatus.Completed)); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, DurableTask.Core.Common.Utils.BuildOrchestrationState(runtimeState)); + instanceStatus = await trackingStore.FetchInstanceStatusAsync(orchestrationInstance.InstanceId); + Assert.AreEqual(instanceStatus.State.OrchestrationStatus, OrchestrationStatus.Completed); + } + #region Work Item Queue Scaling [TestMethod] public async Task ScaleDecision_WorkItemLatency_High() From 00896bc15e19b1cc1708d268d3301cfab30c4fae Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Tue, 21 Oct 2025 10:51:07 -0700 Subject: [PATCH 2/7] fixing build error --- .../Tracking/InstanceStoreBackedTrackingStore.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs index d79cf273f..4d1111a3d 100644 --- a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs @@ -177,7 +177,7 @@ await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[] return null; } - public override Task UpdateInstanceStatusAndDeleteOrphanedBlobsAsync( + public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsAsync( string instanceId, string executionId, OrchestrationRuntimeState runtimeState, From 24e22280667a944205a230708c7bef9e86487fdb Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Tue, 21 Oct 2025 16:20:57 -0700 Subject: [PATCH 3/7] updating to a better integration test --- .../Tracking/AzureTableTrackingStore.cs | 64 ---------- .../AzureStorageScaleTests.cs | 120 ------------------ .../AzureStorageScenarioTests.cs | 73 +++++++++++ 3 files changed, 73 insertions(+), 184 deletions(-) diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 5b816ccf9..56e030c4b 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -1062,70 +1062,6 @@ public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsAsync( } } - /// - /// This method is meant to be used only for testing. It updates the history state in storage based on the new events in - /// , mimicking what does in a "light-weight" fashion - /// (for example, it does not handle large message compression and uploading to blob storage, etc.) - /// - /// The instance ID of the orchestration - /// The execution ID of the orchestration - /// The new runtime state of the orchestration with the new history events. - /// - internal async Task UpdateHistoryAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState) - { - int estimatedBytes = 0; - IList newEvents = runtimeState.NewEvents; - IList allEvents = runtimeState.Events; - - int episodeNumber = Utils.GetEpisodeNumber(runtimeState); - - var newEventListBuffer = new StringBuilder(4000); - var historyEventBatch = new List(); - - string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId); - - for (int i = 0; i < newEvents.Count; i++) - { - bool isFinalEvent = i == newEvents.Count - 1; - - HistoryEvent historyEvent = newEvents[i]; - // For backwards compatibility, we convert timer timestamps to UTC prior to persisting to Azure Storage - // see: https://github.com/Azure/durabletask/pull/1138 - Utils.ConvertDateTimeInHistoryEventsToUTC(historyEvent); - var historyEntity = TableEntityConverter.Serialize(historyEvent); - historyEntity.PartitionKey = sanitizedInstanceId; - - newEventListBuffer.Append(historyEvent.EventType.ToString()).Append(','); - - // The row key is the sequence number, which represents the chronological ordinal of the event. - long sequenceNumber = i + (allEvents.Count - newEvents.Count); - historyEntity.RowKey = sequenceNumber.ToString("X16"); - historyEntity["ExecutionId"] = executionId; - - // Replacement can happen if the orchestration episode gets replayed due to a commit failure in one of the steps below. - historyEventBatch.Add(new TableTransactionAction(TableTransactionActionType.UpsertReplace, historyEntity)); - - // Keep track of the byte count to ensure we don't hit the 4 MB per-batch maximum - estimatedBytes += GetEstimatedByteCount(historyEntity); - } - - if (historyEventBatch.Count > 0) - { - await this.UploadHistoryBatch( - instanceId, - sanitizedInstanceId, - executionId, - historyEventBatch, - newEventListBuffer, - allEvents.Count, - episodeNumber, - estimatedBytes, - null, - isFinalBatch: true, - cancellationToken: CancellationToken.None); - } - } - static int GetEstimatedByteCount(TableEntity entity) { // Assume at least 1 KB of data per entity to account for static-length properties diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs index b5255a7e1..64b9eb22b 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs @@ -842,126 +842,6 @@ public async Task MonitorIncreasingControlQueueLoadDisconnected() } } - /// - /// Confirm that if a worker fails after committing the new history but before updating the instance state in a call to - /// for an orchestration that has - /// reached a terminal state, then storage is brought to consistent state by the call to - /// . - /// Since we cannot simulate a worker failure at this precise point, instead what is done by this test is that the - /// history is updated by a call to , but the instance state - /// is not. Then we confirm that after a call to lock the next task work item, the inconsistent state in storage for - /// the terminal instance is recognized, the instance state is updated, and the work item discarded. - /// The test also confirms that this is *not* done for an orchestration in a non-terminal state, since the call to - /// complete the work item after finishing it will naturally bring both the instance and history tables to a consistent state. - /// Note that this test does not confirm that orphaned blobs are deleted by the call to lock the next orchestration work item - /// in the case of a terminal orchestration with inconsistent state in storage. This is because there is no easy way to mock/inject - /// the tracking store context object that is part of the orchestration session state which keeps track of the blobs. - /// - /// - [TestMethod] - public async Task WorkerFailingDuringCompleteWorkItemCallAsync() - { - var orchestrationInstance = new OrchestrationInstance - { - InstanceId = "instance_id", - ExecutionId = "execution_id", - }; - - ExecutionStartedEvent startedEvent = new(-1, string.Empty) - { - Name = "orchestration", - Version = string.Empty, - OrchestrationInstance = orchestrationInstance, - ScheduledStartTime = DateTime.UtcNow, - }; - - var settings = new AzureStorageOrchestrationServiceSettings - { - ControlQueueBufferThreshold = 100, - LeaseRenewInterval = TimeSpan.FromMilliseconds(500), - PartitionCount = 1, - StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), - TaskHubName = TestHelpers.GetTestTaskHubName(), - }; - this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe); - - var service = new AzureStorageOrchestrationService(settings); - await service.CreateAsync(); - await service.StartAsync(); - - // Create an orchestration, confirm that its status in the instance table is "pending" - await service.CreateTaskOrchestrationAsync( - new TaskMessage() - { - OrchestrationInstance = orchestrationInstance, - Event = startedEvent - }); - var trackingStore = service.TrackingStore as AzureTableTrackingStore; - var instanceStatus = await trackingStore.FetchInstanceStatusAsync(orchestrationInstance.InstanceId); - Assert.AreEqual(instanceStatus.State.OrchestrationStatus, OrchestrationStatus.Pending); - - // Now create a new terminal runtime state and update just the history to simulate the worker failing after updating the - // history table but before updating the instance table upon completing a work item - var runtimeState = new OrchestrationRuntimeState(); - runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); - runtimeState.AddEvent(startedEvent); - runtimeState.AddEvent(new ExecutionTerminatedEvent(-1, "terminated")); - runtimeState.AddEvent(new ExecutionCompletedEvent(0, "terminated", OrchestrationStatus.Terminated)); - runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); - await trackingStore.UpdateHistoryAsync(orchestrationInstance.InstanceId, orchestrationInstance.ExecutionId, runtimeState); - - // Confirm that the call to lock the next work item recognizes and fixes the inconsistency while discarding the work item - var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan.FromSeconds(30), - CancellationToken.None); - Assert.IsNull(workItem); - instanceStatus = await trackingStore.FetchInstanceStatusAsync(orchestrationInstance.InstanceId); - Assert.AreEqual(instanceStatus.State.OrchestrationStatus, OrchestrationStatus.Terminated); - - // Now create a new orchestration and confirm its status is pending - orchestrationInstance = new OrchestrationInstance - { - InstanceId = "instance_id2", - ExecutionId = "execution_id2", - }; - startedEvent.OrchestrationInstance = orchestrationInstance; - await service.CreateTaskOrchestrationAsync( - new TaskMessage() - { - OrchestrationInstance = orchestrationInstance, - Event = startedEvent - }); - instanceStatus = await trackingStore.FetchInstanceStatusAsync(orchestrationInstance.InstanceId); - Assert.AreEqual(instanceStatus.State.OrchestrationStatus, OrchestrationStatus.Pending); - - // Now create a new non-terminal runtime state and confirm and update just the history the history to simulate the worker failing after updating the - // history table but before updating the instance table upon completing a work item - runtimeState = new OrchestrationRuntimeState(); - runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); - runtimeState.AddEvent(startedEvent); - runtimeState.AddEvent(new TaskScheduledEvent(0, "task")); - runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); - await trackingStore.UpdateHistoryAsync(orchestrationInstance.InstanceId, orchestrationInstance.ExecutionId, runtimeState); - - // Confirm that the call to lock the next work item does *not* update the instance table, since it will be updated by the call - // to complete the work item - workItem = await service.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan.FromSeconds(30), - CancellationToken.None); - Assert.IsNotNull(workItem); - instanceStatus = await trackingStore.FetchInstanceStatusAsync(orchestrationInstance.InstanceId); - Assert.AreEqual(instanceStatus.State.OrchestrationStatus, OrchestrationStatus.Pending); - - // Confirm that the call to complete the work item does update the instance table to the correct state - runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); - runtimeState.AddEvent(new TaskCompletedEvent(-1, 0, string.Empty)); - runtimeState.AddEvent(new ExecutionCompletedEvent(1, string.Empty, OrchestrationStatus.Completed)); - runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); - await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, DurableTask.Core.Common.Utils.BuildOrchestrationState(runtimeState)); - instanceStatus = await trackingStore.FetchInstanceStatusAsync(orchestrationInstance.InstanceId); - Assert.AreEqual(instanceStatus.State.OrchestrationStatus, OrchestrationStatus.Completed); - } - #region Work Item Queue Scaling [TestMethod] public async Task ScaleDecision_WorkItemLatency_High() diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index aee05e945..dcbfd9f63 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -2441,6 +2441,79 @@ public async Task TestAllowReplayingTerminalInstances(bool enableExtendedSession } } + /// + /// Confirm that if a worker fails after committing the new history but before updating the instance state in a call to + /// for an orchestration that has + /// reached a terminal state, then storage is brought to consistent state by the call to + /// . + /// Since we cannot simulate a worker failure at this precise point, instead what is done by this test is that we + /// let an orchestration run to completion, and then manually change the instance table state back to "Running". + /// We then send an event to the orchestration, which triggers a call to lock the next task work item, at which point + /// the inconsistent state in storage for the terminal instance is recognized, the instance state is updated, and the work item discarded. + /// Note that this test does not confirm that orphaned blobs are deleted by the call to lock the next orchestration work item + /// in the case of a terminal orchestration with inconsistent state in storage. This is because there is no easy way to mock/inject + /// the tracking store context object that is part of the orchestration session state which keeps track of the blobs. + /// + /// + [DataTestMethod] + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task TestWorkerFailingDuringCompleteWorkItemCall(bool enableExtendedSessions, bool terminate) + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) + { + await host.StartAsync(); + + // Run simple orchestrator to completion, this will help us obtain a valid terminal history for the orchestrator + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), "hello!"); + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + + // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". + // This simulates the scenario where the History table was updated, but not the Instance table. + var instanceId = client.InstanceId; + AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings( + enableExtendedSessions); + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new TableEntity(instanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G") + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + // Assert that the status in the Instance table reads "Running" + IList state = await client.GetStateAsync(instanceId); + OrchestrationStatus forcedStatus = state.First().OrchestrationStatus; + Assert.AreEqual(OrchestrationStatus.Running, forcedStatus); + + // The type of event sent should not matter - the event itself should be discarded, and the instance table updated + // to reflect the status in the history table. + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + OrchestrationStatus expectedStatus = OrchestrationStatus.Completed; + Assert.AreEqual(expectedStatus, status.OrchestrationStatus); + + await host.StopAsync(); + } + } + [TestMethod] [DataRow(VersioningSettings.VersionMatchStrategy.Strict)] [DataRow(VersioningSettings.VersionMatchStrategy.CurrentOrOlder)] From 69215c19a6cdcb194a2e459dcb00b1c6f2a52b2f Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Tue, 21 Oct 2025 16:29:07 -0700 Subject: [PATCH 4/7] fixed spacing --- .../Tracking/InstanceStoreBackedTrackingStore.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs index a19e15f2d..259fa558e 100644 --- a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs @@ -197,11 +197,11 @@ public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsAsync( // No blobs to delete for this tracking store implementation await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[] { - new OrchestrationStateInstanceEntity() - { - State = Core.Common.Utils.BuildOrchestrationState(runtimeState), - SequenceNumber = runtimeState.Events.Count - } + new OrchestrationStateInstanceEntity() + { + State = Core.Common.Utils.BuildOrchestrationState(runtimeState), + SequenceNumber = runtimeState.Events.Count + } }); } } From 7cd8c05440be580afb964ef2537354f1cffe2d71 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 23 Oct 2025 12:29:16 -0700 Subject: [PATCH 5/7] ignoring the old allowreplay test --- test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index f1520f0f6..aa4aa7026 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -2401,6 +2401,7 @@ await Task.WhenAll( [DataRow(false, true, false)] [DataRow(false, false, true)] [DataRow(false, false, false)] + [Ignore("Skipping since this functionality has since changed, see TestWorkerFailingDuringCompleteWorkItemCall")] public async Task TestAllowReplayingTerminalInstances(bool enableExtendedSessions, bool sendTerminateEvent, bool allowReplayingTerminalInstances) { using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost( From fb1d3e70981f6de7ab5492b027466d2a47fa2873 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 23 Oct 2025 13:24:43 -0700 Subject: [PATCH 6/7] fixing a bug where i wasnt checking that the execution id is the same --- .../AzureStorageOrchestrationService.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index e837c9ce6..c500e4cc1 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1096,7 +1096,8 @@ async Task IsExecutableInstanceAsync( runtimeState.OrchestrationStatus != OrchestrationStatus.Suspended) { InstanceStatus instanceStatus = await this.trackingStore.FetchInstanceStatusAsync(runtimeState.OrchestrationInstance.InstanceId); - if (instanceStatus.State.OrchestrationStatus != runtimeState.OrchestrationStatus) + if (instanceStatus.State.OrchestrationInstance.ExecutionId == runtimeState.OrchestrationInstance.ExecutionId + && instanceStatus.State.OrchestrationStatus != runtimeState.OrchestrationStatus) { await this.trackingStore.UpdateInstanceStatusAndDeleteOrphanedBlobsAsync( runtimeState.OrchestrationInstance.InstanceId, From 2c03f34e41e0a24ef248cf89c2818b562ec767ea Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 23 Oct 2025 15:58:27 -0700 Subject: [PATCH 7/7] addressing PR comments --- .../AzureStorageOrchestrationService.cs | 2 +- .../Tracking/AzureTableTrackingStore.cs | 34 ++++++++++++------- .../Tracking/ITrackingStore.cs | 8 ++--- .../InstanceStoreBackedTrackingStore.cs | 10 +++++- .../Tracking/TrackingStoreBase.cs | 2 +- 5 files changed, 36 insertions(+), 20 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index c500e4cc1..c8fefe04e 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1099,7 +1099,7 @@ async Task IsExecutableInstanceAsync( if (instanceStatus.State.OrchestrationInstance.ExecutionId == runtimeState.OrchestrationInstance.ExecutionId && instanceStatus.State.OrchestrationStatus != runtimeState.OrchestrationStatus) { - await this.trackingStore.UpdateInstanceStatusAndDeleteOrphanedBlobsAsync( + await this.trackingStore.UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync( runtimeState.OrchestrationInstance.InstanceId, runtimeState.OrchestrationInstance.ExecutionId, runtimeState, diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 9713fcb22..60a71de74 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -1043,13 +1043,32 @@ public override Task StartAsync(CancellationToken cancellationToken = default) return eTagValue; } - public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsAsync( + public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync( string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default) { + if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed && + runtimeState.OrchestrationStatus != OrchestrationStatus.Canceled && + runtimeState.OrchestrationStatus != OrchestrationStatus.Failed && + runtimeState.OrchestrationStatus != OrchestrationStatus.Terminated) + { + return; + } + + TrackingStoreContext context = (TrackingStoreContext)trackingStoreContext; + if (context.Blobs.Count > 0) + { + var tasks = new List(context.Blobs.Count); + foreach (string blobName in context.Blobs) + { + tasks.Add(this.messageManager.DeleteBlobAsync(blobName)); + } + await Task.WhenAll(tasks); + } + string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId); var instanceEntity = new TableEntity(sanitizedInstanceId, string.Empty) { @@ -1059,7 +1078,7 @@ public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsAsync( ["ExecutionId"] = executionId, ["LastUpdatedTime"] = runtimeState.Events.Last().Timestamp, ["RuntimeStatus"] = runtimeState.OrchestrationStatus.ToString(), - ["CompletedTime"] = runtimeState.Events.Last().Timestamp // do we want to do this as a rough proxy or DateTime.UtcNow? + ["CompletedTime"] = runtimeState.CompletedTime }; Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew(); @@ -1073,17 +1092,6 @@ public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsAsync( runtimeState.OrchestrationStatus, Utils.GetEpisodeNumber(runtimeState), orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds); - - TrackingStoreContext context = (TrackingStoreContext)trackingStoreContext; - if (context.Blobs.Count > 0) - { - var tasks = new List(context.Blobs.Count); - foreach (var blobName in context.Blobs) - { - tasks.Add(this.messageManager.DeleteBlobAsync(blobName)); - } - await Task.WhenAll(tasks); - } } static int GetEstimatedByteCount(TableEntity entity) diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs index 7099af6d8..2b1e001bf 100644 --- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs @@ -104,17 +104,17 @@ interface ITrackingStore Task FetchInstanceStatusAsync(string instanceId, CancellationToken cancellationToken = default); /// - /// Updates the instance status of the specified orchestration instance to match that of . + /// Updates the instance status of the specified orchestration instance to match that of for a completed orchestration. /// Also deletes any orphaned blobs of . - /// This method is meant to be called in the case that there is an inconsistency between the instance and history table due to a - /// failure during a call to . + /// This method is meant to be called in the case that there is an inconsistency between the instance and history table due to a failure during a call to + /// for a completing orchestration. If the orchestration is not in a terminal state, the method will immediately return and do nothing. /// /// The ID of the orchestration. /// The execution ID of the orchestration. /// The runtime state of the orchestration. /// Additional context for the execution that is maintained by the tracking store. /// The token to monitor for cancellation requests. The default value is . - Task UpdateInstanceStatusAndDeleteOrphanedBlobsAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default); + Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default); /// /// Get The Orchestration State for querying all orchestration instances diff --git a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs index 259fa558e..fdcf119db 100644 --- a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs @@ -187,13 +187,21 @@ public override async Task UpdateStatusForTerminationAsync(string instanceId, st await this.instanceStore.WriteEntitiesAsync(instanceEntity); } - public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsAsync( + public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync( string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default) { + if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed && + runtimeState.OrchestrationStatus != OrchestrationStatus.Canceled && + runtimeState.OrchestrationStatus != OrchestrationStatus.Failed && + runtimeState.OrchestrationStatus != OrchestrationStatus.Terminated) + { + return; + } + // No blobs to delete for this tracking store implementation await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[] { diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs index ae4f4cc18..d1bb19c52 100644 --- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs +++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs @@ -110,6 +110,6 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId, CancellationTo public abstract Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, ETag? eTag, object executionData, CancellationToken cancellationToken = default); /// - public abstract Task UpdateInstanceStatusAndDeleteOrphanedBlobsAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default); + public abstract Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default); } }