From 0df9ae60769ebecec45ee7e49afb522e7a354f3b Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 8 Dec 2025 23:06:24 -0800 Subject: [PATCH 1/8] first commit --- .../AzureStorageOrchestrationService.cs | 2 +- src/DurableTask.AzureStorage/MessageData.cs | 2 + .../Messaging/OrchestrationSession.cs | 6 +- .../OrchestrationETags.cs | 12 + .../OrchestrationSessionManager.cs | 25 ++- .../Tracking/AzureTableTrackingStore.cs | 86 +++++-- .../Tracking/ITrackingStore.cs | 6 +- .../InstanceStoreBackedTrackingStore.cs | 10 +- .../Tracking/TrackingStoreBase.cs | 4 +- src/DurableTask.Core/OrchestrationState.cs | 2 + .../AzureStorageScaleTests.cs | 212 +++++++++++++++++- 11 files changed, 312 insertions(+), 55 deletions(-) create mode 100644 src/DurableTask.AzureStorage/OrchestrationETags.cs diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 22e0e80da..1c87113b7 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1230,7 +1230,7 @@ await this.CommitOutboundQueueMessages( // will result in a duplicate replay of the orchestration with no side-effects. try { - session.ETag = await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETag, session.TrackingStoreContext); + await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETags, session.TrackingStoreContext); // update the runtime state and execution id stored in the session session.UpdateRuntimeState(runtimeState); diff --git a/src/DurableTask.AzureStorage/MessageData.cs b/src/DurableTask.AzureStorage/MessageData.cs index d89e7c686..4715043c7 100644 --- a/src/DurableTask.AzureStorage/MessageData.cs +++ b/src/DurableTask.AzureStorage/MessageData.cs @@ -111,6 +111,8 @@ internal void Update(UpdateReceipt receipt) { this.OriginalQueueMessage = this.OriginalQueueMessage.Update(receipt); } + + internal object MessageMetadata { get; set; } } /// diff --git a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs index 719694e97..1b2e4a20e 100644 --- a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs +++ b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs @@ -37,7 +37,7 @@ public OrchestrationSession( ControlQueue controlQueue, List initialMessageBatch, OrchestrationRuntimeState runtimeState, - ETag? eTag, + OrchestrationETags eTags, DateTime lastCheckpointTime, object trackingStoreContext, TimeSpan idleTimeout, @@ -48,7 +48,7 @@ public OrchestrationSession( this.ControlQueue = controlQueue ?? throw new ArgumentNullException(nameof(controlQueue)); this.CurrentMessageBatch = initialMessageBatch ?? throw new ArgumentNullException(nameof(initialMessageBatch)); this.RuntimeState = runtimeState ?? throw new ArgumentNullException(nameof(runtimeState)); - this.ETag = eTag; + this.ETags = eTags; this.LastCheckpointTime = lastCheckpointTime; this.TrackingStoreContext = trackingStoreContext; @@ -66,7 +66,7 @@ public OrchestrationSession( public OrchestrationRuntimeState RuntimeState { get; private set; } - public ETag? ETag { get; set; } + public OrchestrationETags ETags { get; set; } public DateTime LastCheckpointTime { get; } diff --git a/src/DurableTask.AzureStorage/OrchestrationETags.cs b/src/DurableTask.AzureStorage/OrchestrationETags.cs new file mode 100644 index 000000000..9e7f67531 --- /dev/null +++ b/src/DurableTask.AzureStorage/OrchestrationETags.cs @@ -0,0 +1,12 @@ +using Azure; + +#nullable enable +namespace DurableTask.AzureStorage +{ + class OrchestrationETags + { + internal ETag? InstanceETag { get; set; } + + internal ETag? HistoryETag { get; set; } + } +} diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs index 51a6e07ba..c583f39e0 100644 --- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs +++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs @@ -271,21 +271,22 @@ async Task> DedupeExecutionStartedMessagesAsync( // Terminology: // "Local" -> the instance ID info comes from the local copy of the message we're examining // "Remote" -> the instance ID info comes from the Instances table that we're querying - IAsyncEnumerable instances = this.trackingStore.GetStateAsync(instanceIds, cancellationToken); - IDictionary remoteOrchestrationsById = - await instances.ToDictionaryAsync(o => o.OrchestrationInstance.InstanceId, cancellationToken); + IAsyncEnumerable instances = this.trackingStore.FetchInstanceStatusAsync(instanceIds, cancellationToken); + IDictionary remoteOrchestrationsById = + await instances.ToDictionaryAsync(o => o.State.OrchestrationInstance.InstanceId, cancellationToken); foreach (MessageData message in executionStartedMessages) { OrchestrationInstance localInstance = message.TaskMessage.OrchestrationInstance; var expectedGeneration = ((ExecutionStartedEvent)message.TaskMessage.Event).Generation; - if (remoteOrchestrationsById.TryGetValue(localInstance.InstanceId, out OrchestrationState remoteInstance) && - (remoteInstance.OrchestrationInstance.ExecutionId == null || string.Equals(localInstance.ExecutionId, remoteInstance.OrchestrationInstance.ExecutionId, StringComparison.OrdinalIgnoreCase))) + if (remoteOrchestrationsById.TryGetValue(localInstance.InstanceId, out InstanceStatus remoteInstance) && + (remoteInstance.State.OrchestrationInstance.ExecutionId == null || string.Equals(localInstance.ExecutionId, remoteInstance.State.OrchestrationInstance.ExecutionId, StringComparison.OrdinalIgnoreCase))) { // Happy path: The message matches the table status. Alternatively, if the table doesn't have an ExecutionId field (older clients, pre-v1.8.5), // then we have no way of knowing if it's a duplicate. Either way, allow it to run. + message.MessageMetadata = remoteInstance.ETag; } - else if (expectedGeneration == remoteInstance?.Generation && this.IsScheduledAfterInstanceUpdate(message, remoteInstance)) + else if (expectedGeneration == remoteInstance?.State.Generation && this.IsScheduledAfterInstanceUpdate(message, remoteInstance?.State)) { // The message was scheduled after the Instances table was updated with the orchestration info. // We know almost certainly that this is a redundant message and can be safely discarded because @@ -476,6 +477,10 @@ internal void AddMessageToPendingOrchestration( if (targetBatch == null) { targetBatch = new PendingMessageBatch(controlQueue, instanceId, executionId); + if (data.MessageMetadata is ETag instanceEtag) + { + targetBatch.ETags.InstanceETag = instanceEtag; + } node = this.pendingOrchestrationMessageBatches.AddLast(targetBatch); // Before the batch of messages can be processed, we need to download the latest execution state. @@ -519,7 +524,7 @@ async Task ScheduleOrchestrationStatePrefetch( cancellationToken); batch.OrchestrationState = new OrchestrationRuntimeState(history.Events); - batch.ETag = history.ETag; + batch.ETags.HistoryETag = history.ETag; batch.LastCheckpointTime = history.LastCheckpointTime; batch.TrackingStoreContext = history.TrackingStoreContext; } @@ -590,7 +595,7 @@ async Task ScheduleOrchestrationStatePrefetch( nextBatch.ControlQueue, nextBatch.Messages, nextBatch.OrchestrationState, - nextBatch.ETag, + nextBatch.ETags, nextBatch.LastCheckpointTime, nextBatch.TrackingStoreContext, this.settings.ExtendedSessionIdleTimeout, @@ -737,8 +742,10 @@ public OrchestrationRuntimeState? OrchestrationState } } - public ETag? ETag { get; set; } + public OrchestrationETags ETags { get; } = new OrchestrationETags(); + public DateTime LastCheckpointTime { get; set; } + public object? TrackingStoreContext { get; set; } } } diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index c6f63838a..6f04da182 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -510,19 +510,19 @@ async Task ConvertFromAsync(OrchestrationInstanceStatus orch } /// - public override async IAsyncEnumerable GetStateAsync(IEnumerable instanceIds, [EnumeratorCancellation] CancellationToken cancellationToken = default) + public override async IAsyncEnumerable FetchInstanceStatusAsync(IEnumerable instanceIds, [EnumeratorCancellation] CancellationToken cancellationToken = default) { if (instanceIds == null) { yield break; } - IEnumerable> instanceQueries = instanceIds.Select(instance => this.GetStateAsync(instance, allExecutions: true, fetchInput: false, cancellationToken).SingleOrDefaultAsync().AsTask()); - foreach (OrchestrationState state in await Task.WhenAll(instanceQueries)) + IEnumerable> instanceQueries = instanceIds.Select(instance => this.FetchInstanceStatusAsync(instance, cancellationToken)); + foreach (InstanceStatus status in await Task.WhenAll(instanceQueries)) { - if (state != null) + if (status != null) { - yield return state; + yield return status; } } } @@ -839,12 +839,12 @@ public override Task StartAsync(CancellationToken cancellationToken = default) } /// - public override async Task UpdateStateAsync( + public override async Task UpdateStateAsync( OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, - ETag? eTagValue, + OrchestrationETags eTags, object trackingStoreContext, CancellationToken cancellationToken = default) { @@ -991,7 +991,7 @@ public override Task StartAsync(CancellationToken cancellationToken = default) // Table storage only supports inserts of up to 100 entities at a time or 4 MB at a time. if (historyEventBatch.Count == 99 || estimatedBytes > 3 * 1024 * 1024 /* 3 MB */) { - eTagValue = await this.UploadHistoryBatch( + eTags.HistoryETag = await this.UploadHistoryBatch( instanceId, sanitizedInstanceId, executionId, @@ -1000,7 +1000,7 @@ public override Task StartAsync(CancellationToken cancellationToken = default) allEvents.Count, episodeNumber, estimatedBytes, - eTagValue, + eTags.HistoryETag, isFinalBatch: isFinalEvent, cancellationToken: cancellationToken); @@ -1014,7 +1014,7 @@ public override Task StartAsync(CancellationToken cancellationToken = default) // First persistence step is to commit history to the history table. Messages must come after. if (historyEventBatch.Count > 0) { - eTagValue = await this.UploadHistoryBatch( + eTags.HistoryETag = await this.UploadHistoryBatch( instanceId, sanitizedInstanceId, executionId, @@ -1023,22 +1023,12 @@ public override Task StartAsync(CancellationToken cancellationToken = default) allEvents.Count, episodeNumber, estimatedBytes, - eTagValue, + eTags.HistoryETag, isFinalBatch: true, cancellationToken: cancellationToken); } - Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew(); - await this.InstancesTable.InsertOrMergeEntityAsync(instanceEntity); - - this.settings.Logger.InstanceStatusUpdate( - this.storageAccountName, - this.taskHubName, - instanceId, - executionId, - runtimeStatus, - episodeNumber, - orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds); + await this.TryUpdateInstanceTableAsync(instanceEntity, eTags.InstanceETag, instanceId, executionId, runtimeStatus, episodeNumber); // finally, delete orphaned blobs from the previous execution history. // We had to wait until the new history has committed to make sure the blobs are no longer necessary. @@ -1051,8 +1041,6 @@ public override Task StartAsync(CancellationToken cancellationToken = default) } await Task.WhenAll(tasks); } - - return eTagValue; } public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync( @@ -1424,6 +1412,56 @@ bool ExceedsMaxTablePropertySize(string data) return false; } + async Task TryUpdateInstanceTableAsync(TableEntity instanceEntity, ETag? eTag, string instanceId, string executionId, OrchestrationStatus runtimeStatus, int episodeNumber) + { + Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew(); + + try + { + if (eTag == null) + { + await this.InstancesTable.InsertEntityAsync(instanceEntity); + } + else + { + await this.InstancesTable.MergeEntityAsync(instanceEntity, eTag.Value); + } + + this.settings.Logger.InstanceStatusUpdate( + this.storageAccountName, + this.taskHubName, + instanceId, + executionId, + runtimeStatus, + episodeNumber, + orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds); + } + catch (DurableTaskStorageException ex) + { + // Handle the case where the the history has already been updated by another caller. + // Common case: the resulting code is 'PreconditionFailed', which means we are trying to update an existing instance entity and "eTag" no longer matches the one stored. + // Edge case: the resulting code is 'Conflict'. This is the case when eTag is null, and we are trying to insert a new instance entity, in which case the exception + // indicates that the table entity we are trying to "add" already exists. + if (ex.HttpStatusCode == (int)HttpStatusCode.Conflict || ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed) + { + this.settings.Logger.SplitBrainDetected( + this.storageAccountName, + this.taskHubName, + instanceId, + executionId, + newEventCount: 0, + totalEventCount: 1, // these fields don't really make sense for the instance table case. do we want to introduce a new log? or are we okay with this since "InstanceEntity" + // in the new events field will allow this to be detectable? + "InstanceEntity", + orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds, + eTag is null ? string.Empty : eTag.ToString()); + } + + throw; + } + + } + class TrackingStoreContext { public List Blobs { get; set; } = new List(); diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs index 4a1b91fae..a1fc52e9f 100644 --- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs @@ -72,10 +72,10 @@ interface ITrackingStore /// The RuntimeState for an olderExecution /// InstanceId for the Orchestration Update /// ExecutionId for the Orchestration Update - /// The ETag value to use for safe updates + /// The ETag value for the instance and history tables to use for safe updates /// Additional context for the execution that is maintained by the tracking store. /// The token to monitor for cancellation requests. The default value is . - Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, ETag? eTag, object trackingStoreContext, CancellationToken cancellationToken = default); + Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, OrchestrationETags eTags, object trackingStoreContext, CancellationToken cancellationToken = default); /// /// Get The Orchestration State for the Latest or All Executions @@ -127,7 +127,7 @@ interface ITrackingStore /// /// The list of instances to query for. /// The token to monitor for cancellation requests. The default value is . - IAsyncEnumerable GetStateAsync(IEnumerable instanceIds, CancellationToken cancellationToken = default); + IAsyncEnumerable FetchInstanceStatusAsync(IEnumerable instanceIds, CancellationToken cancellationToken = default); /// /// Get The Orchestration State for querying orchestration instances by the condition diff --git a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs index ca3da07bd..f719c92dd 100644 --- a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs @@ -136,20 +136,20 @@ public override Task StartAsync(CancellationToken cancellationToken = default) } /// - public override async Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, ETag? eTag, object executionData, CancellationToken cancellationToken = default) + public override async Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, OrchestrationETags eTags, object executionData, CancellationToken cancellationToken = default) { //In case there is a runtime state for an older execution/iteration as well that needs to be committed, commit it. //This may be the case if a ContinueAsNew was executed on the orchestration if (newRuntimeState != oldRuntimeState) { - eTag = await UpdateStateAsync(oldRuntimeState, instanceId, oldRuntimeState.OrchestrationInstance.ExecutionId, eTag, cancellationToken); + await UpdateStateAsync(oldRuntimeState, instanceId, oldRuntimeState.OrchestrationInstance.ExecutionId, eTags, cancellationToken); } - return await UpdateStateAsync(newRuntimeState, instanceId, executionId, eTag, cancellationToken); + await UpdateStateAsync(newRuntimeState, instanceId, executionId, eTags, cancellationToken); } /// - private async Task UpdateStateAsync(OrchestrationRuntimeState runtimeState, string instanceId, string executionId, ETag? eTag, CancellationToken cancellationToken = default) + private async Task UpdateStateAsync(OrchestrationRuntimeState runtimeState, string instanceId, string executionId, OrchestrationETags eTags, CancellationToken cancellationToken = default) { int oldEventsCount = (runtimeState.Events.Count - runtimeState.NewEvents.Count); await instanceStore.WriteEntitiesAsync(runtimeState.NewEvents.Select((x, i) => @@ -173,8 +173,6 @@ await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[] SequenceNumber = runtimeState.Events.Count } }); - - return null; } public override async Task UpdateStatusForTerminationAsync( diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs index 7879537e4..d02a729c0 100644 --- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs +++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs @@ -60,7 +60,7 @@ public virtual IAsyncEnumerable GetStateAsync(CancellationTo } /// - public virtual IAsyncEnumerable GetStateAsync(IEnumerable instanceIds, CancellationToken cancellationToken = default) + public virtual IAsyncEnumerable FetchInstanceStatusAsync(IEnumerable instanceIds, CancellationToken cancellationToken = default) { throw new NotSupportedException(); } @@ -107,7 +107,7 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId, CancellationTo public abstract Task StartAsync(CancellationToken cancellationToken = default); /// - public abstract Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, ETag? eTag, object executionData, CancellationToken cancellationToken = default); + public abstract Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, OrchestrationETags eTags, object executionData, CancellationToken cancellationToken = default); /// public abstract Task UpdateInstanceStatusForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, bool instanceEntityExists, CancellationToken cancellationToken = default); diff --git a/src/DurableTask.Core/OrchestrationState.cs b/src/DurableTask.Core/OrchestrationState.cs index 359b9dc61..c65dc78e3 100644 --- a/src/DurableTask.Core/OrchestrationState.cs +++ b/src/DurableTask.Core/OrchestrationState.cs @@ -161,5 +161,7 @@ public OrchestrationState ClearFieldsImmutably(bool clearInput, bool clearOutput /// Implementation for . /// public ExtensionDataObject ExtensionData { get; set; } + + internal string Etag { get; set; } } } \ No newline at end of file diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs index 64b9eb22b..f80f6f95d 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs @@ -13,13 +13,6 @@ namespace DurableTask.AzureStorage.Tests { - using System; - using System.Collections.Generic; - using System.Diagnostics; - using System.Linq; - using System.Net; - using System.Threading; - using System.Threading.Tasks; using Azure.Data.Tables; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; @@ -33,6 +26,13 @@ namespace DurableTask.AzureStorage.Tests using DurableTask.Core.Exceptions; using DurableTask.Core.History; using Microsoft.VisualStudio.TestTools.UnitTesting; + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Net; + using System.Threading; + using System.Threading.Tasks; /// /// Validates the following requirements: @@ -634,6 +634,204 @@ await TestHelpers.WaitFor( Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode); } + /// + /// Confirm that if a worker tries to complete a work item after stalling and another worker has since completed the work item, + /// a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has the correct status code (precondition failed). + /// The specific scenario tested is if the the worker stalled after updating the history table but before updating the instance table. + /// When it attempts to update the instance table with a stale etag, it will fail. + /// + /// + /// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker + /// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker + /// has since updated" the instance table. + /// + /// + [TestMethod] + public async Task WorkerAttemptingToUpdateInstanceTableAfterStalling() + { + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id", + ExecutionId = "execution_id", + }; + + ExecutionStartedEvent startedEvent = new(-1, string.Empty) + { + Name = "orchestration", + Version = string.Empty, + OrchestrationInstance = orchestrationInstance, + ScheduledStartTime = DateTime.UtcNow, + }; + + var settings = new AzureStorageOrchestrationServiceSettings + { + PartitionCount = 1, + StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), + TaskHubName = TestHelpers.GetTestTaskHubName(), + ExtendedSessionsEnabled = false + }; + this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe); + + var service = new AzureStorageOrchestrationService(settings); + await service.CreateAsync(); + await service.StartAsync(); + + // Create the orchestration and get the first work item and start "working" on it + await service.CreateTaskOrchestrationAsync( + new TaskMessage() + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }); + var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + var runtimeState = workItem.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(startedEvent); + runtimeState.AddEvent(new ExecutionCompletedEvent(0, string.Empty, OrchestrationStatus.Completed)); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + // Now manually update the instance to have status "Completed" + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new (orchestrationInstance.InstanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"), + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + // Confirm an exception is thrown due to the etag mismatch for the instance table when the worker attempts to complete the work item + SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) + ); + Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); + DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; + Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode); + } + + /// + /// Confirm that if a worker tries to complete a work item for a suborchestration after stalling and another worker has since completed the work item, + /// a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has the correct status code (conflict). + /// The specific scenario tested is if the the worker stalled after updating the history table but before updating the instance table for the first work item + /// for a suborchestration. When it attempts to insert a new entity into the instance table for the suborchestration (since for a suborchestration, + /// the instance entity is only created upon completion of the first work item), it will fail. + /// + /// + /// Since it is impossible to force stalling, we simulate the above scenario by manually inserting an entry into the instance table before the worker + /// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker + /// has since updated" the instance table. + /// + /// + [TestMethod] + public async Task WorkerAttemptingToUpdateInstanceTableAfterStallingForSubOrchestration() + { + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id", + ExecutionId = "execution_id", + }; + + ExecutionStartedEvent startedEvent = new(-1, string.Empty) + { + Name = "orchestration", + Version = string.Empty, + OrchestrationInstance = orchestrationInstance, + ScheduledStartTime = DateTime.UtcNow, + }; + + var settings = new AzureStorageOrchestrationServiceSettings + { + PartitionCount = 1, + StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), + TaskHubName = TestHelpers.GetTestTaskHubName(), + ExtendedSessionsEnabled = false + }; + this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe); + + var service = new AzureStorageOrchestrationService(settings); + await service.CreateAsync(); + await service.StartAsync(); + + // Create the orchestration and get the first work item and start "working" on it + await service.CreateTaskOrchestrationAsync( + new TaskMessage() + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }); + var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + var runtimeState = workItem.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(startedEvent); + runtimeState.AddEvent(new SubOrchestrationInstanceCreatedEvent(0) + { + Name = "suborchestration", + InstanceId = "sub_instance_id" + }); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + // Create the task message to start the suborchestration + var subOrchestrationExecutionStartedEvent = new ExecutionStartedEvent(-1, string.Empty) + { + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = "sub_instance_id", + ExecutionId = Guid.NewGuid().ToString("N") + }, + ParentInstance = new ParentInstance + { + OrchestrationInstance = runtimeState.OrchestrationInstance, + Name = runtimeState.Name, + Version = runtimeState.Version, + TaskScheduleId = 0, + }, + Name = "suborchestration" + }; + List orchestratorMessages = + new() { + new TaskMessage() + { + OrchestrationInstance = subOrchestrationExecutionStartedEvent.OrchestrationInstance, + Event = subOrchestrationExecutionStartedEvent, + } + }; + + // Complete the first work item, which will send the execution started message for the suborchestration + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), orchestratorMessages, new List(), null, null); + + // Now get the work item for the suborchestration and "work" on it + workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + runtimeState = workItem.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(subOrchestrationExecutionStartedEvent); + runtimeState.AddEvent(new ExecutionCompletedEvent(0, string.Empty, OrchestrationStatus.Completed)); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + AzureStorageClient azureStorageClient = new (settings); + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + // Now manually update the suborchestration to have status "Completed" + TableEntity entity = new ("sub_instance_id", "") + { + ["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"), + }; + await instanceTable.InsertEntityAsync(entity); + + // Confirm an exception is thrown because the worker attempts to insert a new entity for the suborchestration into the instance table + // when one already exists + SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) + ); + Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); + DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; + Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode); + } + [TestMethod] public async Task MonitorIdleTaskHubDisconnected() { From 2fd508295f00b591a45787378b55429223cc8634 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 8 Dec 2025 23:16:56 -0800 Subject: [PATCH 2/8] removing unnecessary update to OrchestrationState --- src/DurableTask.Core/OrchestrationState.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/DurableTask.Core/OrchestrationState.cs b/src/DurableTask.Core/OrchestrationState.cs index c65dc78e3..359b9dc61 100644 --- a/src/DurableTask.Core/OrchestrationState.cs +++ b/src/DurableTask.Core/OrchestrationState.cs @@ -161,7 +161,5 @@ public OrchestrationState ClearFieldsImmutably(bool clearInput, bool clearOutput /// Implementation for . /// public ExtensionDataObject ExtensionData { get; set; } - - internal string Etag { get; set; } } } \ No newline at end of file From 27551064084e8ada0758eb34f6a9ab08a690c3e6 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Tue, 9 Dec 2025 10:30:05 -0800 Subject: [PATCH 3/8] forgot to update the instance etag --- src/DurableTask.AzureStorage/Storage/Table.cs | 10 ++++++---- .../Tracking/AzureTableTrackingStore.cs | 13 ++++++++----- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/DurableTask.AzureStorage/Storage/Table.cs b/src/DurableTask.AzureStorage/Storage/Table.cs index e47b49529..7d726e1d7 100644 --- a/src/DurableTask.AzureStorage/Storage/Table.cs +++ b/src/DurableTask.AzureStorage/Storage/Table.cs @@ -86,16 +86,18 @@ public async Task DeleteEntityAsync(T tableEntity, ETag ifMatch = default, Ca this.stats.TableEntitiesWritten.Increment(); } - public async Task InsertEntityAsync(T tableEntity, CancellationToken cancellationToken = default) where T : ITableEntity + public async Task InsertEntityAsync(T tableEntity, CancellationToken cancellationToken = default) where T : ITableEntity { - await this.tableClient.AddEntityAsync(tableEntity, cancellationToken).DecorateFailure(); + Response result = await this.tableClient.AddEntityAsync(tableEntity, cancellationToken).DecorateFailure(); this.stats.TableEntitiesWritten.Increment(); + return result; } - public async Task MergeEntityAsync(T tableEntity, ETag ifMatch, CancellationToken cancellationToken = default) where T : ITableEntity + public async Task MergeEntityAsync(T tableEntity, ETag ifMatch, CancellationToken cancellationToken = default) where T : ITableEntity { - await this.tableClient.UpdateEntityAsync(tableEntity, ifMatch, TableUpdateMode.Merge, cancellationToken).DecorateFailure(); + Response result = await this.tableClient.UpdateEntityAsync(tableEntity, ifMatch, TableUpdateMode.Merge, cancellationToken).DecorateFailure(); this.stats.TableEntitiesWritten.Increment(); + return result; } public async Task InsertOrMergeEntityAsync(T tableEntity, CancellationToken cancellationToken = default) where T : ITableEntity diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 6f04da182..1f92a7946 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -1028,7 +1028,7 @@ public override async Task UpdateStateAsync( cancellationToken: cancellationToken); } - await this.TryUpdateInstanceTableAsync(instanceEntity, eTags.InstanceETag, instanceId, executionId, runtimeStatus, episodeNumber); + eTags.InstanceETag = await this.TryUpdateInstanceTableAsync(instanceEntity, eTags.InstanceETag, instanceId, executionId, runtimeStatus, episodeNumber); // finally, delete orphaned blobs from the previous execution history. // We had to wait until the new history has committed to make sure the blobs are no longer necessary. @@ -1412,19 +1412,20 @@ bool ExceedsMaxTablePropertySize(string data) return false; } - async Task TryUpdateInstanceTableAsync(TableEntity instanceEntity, ETag? eTag, string instanceId, string executionId, OrchestrationStatus runtimeStatus, int episodeNumber) + async Task TryUpdateInstanceTableAsync(TableEntity instanceEntity, ETag? eTag, string instanceId, string executionId, OrchestrationStatus runtimeStatus, int episodeNumber) { Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew(); - try { + Response result; + if (eTag == null) { - await this.InstancesTable.InsertEntityAsync(instanceEntity); + result = await this.InstancesTable.InsertEntityAsync(instanceEntity); } else { - await this.InstancesTable.MergeEntityAsync(instanceEntity, eTag.Value); + result = await this.InstancesTable.MergeEntityAsync(instanceEntity, eTag.Value); } this.settings.Logger.InstanceStatusUpdate( @@ -1435,6 +1436,8 @@ async Task TryUpdateInstanceTableAsync(TableEntity instanceEntity, ETag? eTag, s runtimeStatus, episodeNumber, orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds); + + return result.Headers.ETag; } catch (DurableTaskStorageException ex) { From 0b1912bf7700849ab8f2c2e91188194b2ceebd0a Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Tue, 9 Dec 2025 10:39:04 -0800 Subject: [PATCH 4/8] added test cleanup --- .../Tracking/AzureTableTrackingStore.cs | 20 +- .../AzureStorageScaleTests.cs | 562 +++++++++--------- 2 files changed, 301 insertions(+), 281 deletions(-) diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 1f92a7946..86616dc97 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -1353,7 +1353,7 @@ static string GetBlobName(TableEntity entity, string property) } catch (DurableTaskStorageException ex) { - // Handle the case where the the history has already been updated by another caller. + // Handle the case where the history has already been updated by another caller. // Common case: the resulting code is 'PreconditionFailed', which means "eTagValue" no longer matches the one stored, and TableTransactionActionType is "Update". // Edge case: the resulting code is 'Conflict'. This is the case when eTagValue is null, and the TableTransactionActionType is "Add", // in which case the exception indicates that the table entity we are trying to "add" already exists. @@ -1414,19 +1414,13 @@ bool ExceedsMaxTablePropertySize(string data) async Task TryUpdateInstanceTableAsync(TableEntity instanceEntity, ETag? eTag, string instanceId, string executionId, OrchestrationStatus runtimeStatus, int episodeNumber) { - Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew(); + var orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew(); + try { - Response result; - - if (eTag == null) - { - result = await this.InstancesTable.InsertEntityAsync(instanceEntity); - } - else - { - result = await this.InstancesTable.MergeEntityAsync(instanceEntity, eTag.Value); - } + Response result = await (eTag == null + ? this.InstancesTable.InsertEntityAsync(instanceEntity) + : this.InstancesTable.MergeEntityAsync(instanceEntity, eTag.Value)); this.settings.Logger.InstanceStatusUpdate( this.storageAccountName, @@ -1441,7 +1435,7 @@ bool ExceedsMaxTablePropertySize(string data) } catch (DurableTaskStorageException ex) { - // Handle the case where the the history has already been updated by another caller. + // Handle the case where the instance table has already been updated by another caller. // Common case: the resulting code is 'PreconditionFailed', which means we are trying to update an existing instance entity and "eTag" no longer matches the one stored. // Edge case: the resulting code is 'Conflict'. This is the case when eTag is null, and we are trying to insert a new instance entity, in which case the exception // indicates that the table entity we are trying to "add" already exists. diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs index f80f6f95d..03960d426 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs @@ -505,139 +505,149 @@ await TestHelpers.WaitFor( [TestMethod] public async Task MultipleWorkersAttemptingToCompleteSameWorkItem() { - var orchestrationInstance = new OrchestrationInstance - { - InstanceId = "instance_id", - ExecutionId = "execution_id", - }; - - ExecutionStartedEvent startedEvent = new(-1, string.Empty) + AzureStorageOrchestrationService service1 = null; + AzureStorageOrchestrationService service2 = null; + try { - Name = "orchestration", - Version = string.Empty, - OrchestrationInstance = orchestrationInstance, - ScheduledStartTime = DateTime.UtcNow, - }; - - // Create worker 1, wait for it to acquire the lease. - // Make sure to set a small control queue visibility timeout so that worker 2 can reacquire the work item quickly once worker 1 loses the lease. - var service1 = await this.EnsureTaskHubAsync( - nameof(MultipleWorkersAttemptingToCompleteSameWorkItem), - testDeletion: false, - deleteBeforeCreate: true, - partitionCount: 1, - workerId: "1", - controlQueueVisibilityTimeout: TimeSpan.FromSeconds(1) - ); - await service1.StartAsync(); - await TestHelpers.WaitFor( - condition: () => service1.OwnedControlQueues.Any(), - timeout: TimeSpan.FromSeconds(30)); - ControlQueue controlQueue = service1.OwnedControlQueues.Single(); + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id", + ExecutionId = "execution_id", + }; - // Create the orchestration and get the first work item and start "working" on it - await service1.CreateTaskOrchestrationAsync( - new TaskMessage() + ExecutionStartedEvent startedEvent = new(-1, string.Empty) { + Name = "orchestration", + Version = string.Empty, OrchestrationInstance = orchestrationInstance, - Event = startedEvent - }); - var workItem1 = await service1.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan.FromMinutes(5), - CancellationToken.None); - var runtimeState = workItem1.OrchestrationRuntimeState; - runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); - runtimeState.AddEvent(startedEvent); - runtimeState.AddEvent(new TaskScheduledEvent(0, "task")); - runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); - - // Now lose the lease - BlobPartitionLease lease = await service1.ListBlobLeasesAsync().SingleAsync(); - await service1.OnOwnershipLeaseReleasedAsync(lease, CloseReason.LeaseLost); - await TestHelpers.WaitFor( - condition: () => !service1.OwnedControlQueues.Any(), - timeout: TimeSpan.FromSeconds(30)); - - // Create worker 2, wait for it to now acquire the lease - var service2 = await this.EnsureTaskHubAsync( - nameof(MultipleWorkersAttemptingToCompleteSameWorkItem), - testDeletion: false, - deleteBeforeCreate: false, - workerId: "2", - partitionCount: 1, - controlQueueVisibilityTimeout: TimeSpan.FromSeconds(1) - ); - await service2.StartAsync(); - await service2.OnOwnershipLeaseAquiredAsync(lease); - await TestHelpers.WaitFor( - condition: () => service2.OwnedControlQueues.Any(), - timeout: TimeSpan.FromSeconds(60)); - - // Have worker 2 dequeue the same work item and start "working" on it - var workItem2 = await service2.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan.FromMinutes(5), - CancellationToken.None); - workItem2.OrchestrationRuntimeState = runtimeState; - - // Worker 2 completes the work item - await service2.CompleteTaskOrchestrationWorkItemAsync(workItem2, runtimeState, new List(), new List(), new List(), null, null); - // Now worker 1 will attempt to complete the same work item. Since this is the first attempt to complete a work item and add a history for the orchestration (by worker 1), - // there is no etag stored for the OrchestrationSession, and so the a "conflict" exception will be thrown as worker 2 already created a history for the orchestration. - SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => - await service1.CompleteTaskOrchestrationWorkItemAsync(workItem1, runtimeState, new List(), new List(), new List(), null, null) - ); - Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); - DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; - Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode); - await service1.ReleaseTaskOrchestrationWorkItemAsync(workItem1); - await service2.ReleaseTaskOrchestrationWorkItemAsync(workItem2); - - // Now simulate a task completing for the orchestration - var taskCompletedEvent = new TaskCompletedEvent(-1, 0, string.Empty); - await service2.SendTaskOrchestrationMessageAsync(new TaskMessage { Event = taskCompletedEvent, OrchestrationInstance = orchestrationInstance }); - // Worker 2 gets the next work item related to this task completion and starts "working" on it - workItem2 = await service2.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan.FromMinutes(5), - CancellationToken.None); - runtimeState = workItem2.OrchestrationRuntimeState; - runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); - runtimeState.AddEvent(taskCompletedEvent); - runtimeState.AddEvent(new ExecutionCompletedEvent(1, string.Empty, OrchestrationStatus.Completed)); - runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); - - // Now force worker 2 to lose the lease and have worker 1 acquire it - lease = await service2.ListBlobLeasesAsync().SingleAsync(); - await service2.OnOwnershipLeaseReleasedAsync(lease, CloseReason.LeaseLost); - await TestHelpers.WaitFor( - condition: () => !service2.OwnedControlQueues.Any(), - timeout: TimeSpan.FromSeconds(30)); - await service1.OnOwnershipLeaseAquiredAsync(lease); - await TestHelpers.WaitFor( - condition: () => service1.OwnedControlQueues.Any(), - timeout: TimeSpan.FromSeconds(60)); + ScheduledStartTime = DateTime.UtcNow, + }; - // Worker 1 also acquires the work item and starts "working" on it - workItem1 = await service1.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan.FromMinutes(5), - CancellationToken.None); - workItem1.OrchestrationRuntimeState = runtimeState; - - // Worker 1 completes the work item - await service1.CompleteTaskOrchestrationWorkItemAsync(workItem1, runtimeState, new List(), new List(), new List(), null, null); - // Now worker 2 attempts to complete the same work item. Since this is not the first work item for the orchestration, now an etag exists for the OrchestrationSession, and the exception - // that is thrown will be "precondition failed" as the Etag is stale after worker 1 completed the work item. - exception = await Assert.ThrowsExceptionAsync(async () => - await service2.CompleteTaskOrchestrationWorkItemAsync(workItem2, runtimeState, new List(), new List(), new List(), null, null) - ); - Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); - dtse = (DurableTaskStorageException)exception.InnerException; - Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode); + // Create worker 1, wait for it to acquire the lease. + // Make sure to set a small control queue visibility timeout so that worker 2 can reacquire the work item quickly once worker 1 loses the lease. + service1 = await this.EnsureTaskHubAsync( + nameof(MultipleWorkersAttemptingToCompleteSameWorkItem), + testDeletion: false, + deleteBeforeCreate: true, + partitionCount: 1, + workerId: "1", + controlQueueVisibilityTimeout: TimeSpan.FromSeconds(1) + ); + await service1.StartAsync(); + await TestHelpers.WaitFor( + condition: () => service1.OwnedControlQueues.Any(), + timeout: TimeSpan.FromSeconds(30)); + ControlQueue controlQueue = service1.OwnedControlQueues.Single(); + + // Create the orchestration and get the first work item and start "working" on it + await service1.CreateTaskOrchestrationAsync( + new TaskMessage() + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }); + var workItem1 = await service1.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + var runtimeState = workItem1.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(startedEvent); + runtimeState.AddEvent(new TaskScheduledEvent(0, "task")); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + // Now lose the lease + BlobPartitionLease lease = await service1.ListBlobLeasesAsync().SingleAsync(); + await service1.OnOwnershipLeaseReleasedAsync(lease, CloseReason.LeaseLost); + await TestHelpers.WaitFor( + condition: () => !service1.OwnedControlQueues.Any(), + timeout: TimeSpan.FromSeconds(30)); + + // Create worker 2, wait for it to now acquire the lease + service2 = await this.EnsureTaskHubAsync( + nameof(MultipleWorkersAttemptingToCompleteSameWorkItem), + testDeletion: false, + deleteBeforeCreate: false, + workerId: "2", + partitionCount: 1, + controlQueueVisibilityTimeout: TimeSpan.FromSeconds(1) + ); + await service2.StartAsync(); + await service2.OnOwnershipLeaseAquiredAsync(lease); + await TestHelpers.WaitFor( + condition: () => service2.OwnedControlQueues.Any(), + timeout: TimeSpan.FromSeconds(60)); + + // Have worker 2 dequeue the same work item and start "working" on it + var workItem2 = await service2.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + workItem2.OrchestrationRuntimeState = runtimeState; + + // Worker 2 completes the work item + await service2.CompleteTaskOrchestrationWorkItemAsync(workItem2, runtimeState, new List(), new List(), new List(), null, null); + // Now worker 1 will attempt to complete the same work item. Since this is the first attempt to complete a work item and add a history for the orchestration (by worker 1), + // there is no etag stored for the OrchestrationSession, and so the a "conflict" exception will be thrown as worker 2 already created a history for the orchestration. + SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => + await service1.CompleteTaskOrchestrationWorkItemAsync(workItem1, runtimeState, new List(), new List(), new List(), null, null) + ); + Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); + DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; + Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode); + await service1.ReleaseTaskOrchestrationWorkItemAsync(workItem1); + await service2.ReleaseTaskOrchestrationWorkItemAsync(workItem2); + + // Now simulate a task completing for the orchestration + var taskCompletedEvent = new TaskCompletedEvent(-1, 0, string.Empty); + await service2.SendTaskOrchestrationMessageAsync(new TaskMessage { Event = taskCompletedEvent, OrchestrationInstance = orchestrationInstance }); + // Worker 2 gets the next work item related to this task completion and starts "working" on it + workItem2 = await service2.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + runtimeState = workItem2.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(taskCompletedEvent); + runtimeState.AddEvent(new ExecutionCompletedEvent(1, string.Empty, OrchestrationStatus.Completed)); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + // Now force worker 2 to lose the lease and have worker 1 acquire it + lease = await service2.ListBlobLeasesAsync().SingleAsync(); + await service2.OnOwnershipLeaseReleasedAsync(lease, CloseReason.LeaseLost); + await TestHelpers.WaitFor( + condition: () => !service2.OwnedControlQueues.Any(), + timeout: TimeSpan.FromSeconds(30)); + await service1.OnOwnershipLeaseAquiredAsync(lease); + await TestHelpers.WaitFor( + condition: () => service1.OwnedControlQueues.Any(), + timeout: TimeSpan.FromSeconds(60)); + + // Worker 1 also acquires the work item and starts "working" on it + workItem1 = await service1.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + workItem1.OrchestrationRuntimeState = runtimeState; + + // Worker 1 completes the work item + await service1.CompleteTaskOrchestrationWorkItemAsync(workItem1, runtimeState, new List(), new List(), new List(), null, null); + // Now worker 2 attempts to complete the same work item. Since this is not the first work item for the orchestration, now an etag exists for the OrchestrationSession, and the exception + // that is thrown will be "precondition failed" as the Etag is stale after worker 1 completed the work item. + exception = await Assert.ThrowsExceptionAsync(async () => + await service2.CompleteTaskOrchestrationWorkItemAsync(workItem2, runtimeState, new List(), new List(), new List(), null, null) + ); + Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); + dtse = (DurableTaskStorageException)exception.InnerException; + Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode); + } + finally + { + await service1?.StopAsync(isForced: true); + await service2?.StopAsync(isForced: true); + } } /// /// Confirm that if a worker tries to complete a work item after stalling and another worker has since completed the work item, /// a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has the correct status code (precondition failed). - /// The specific scenario tested is if the the worker stalled after updating the history table but before updating the instance table. + /// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table. /// When it attempts to update the instance table with a stale etag, it will fail. /// /// @@ -649,72 +659,80 @@ await TestHelpers.WaitFor( [TestMethod] public async Task WorkerAttemptingToUpdateInstanceTableAfterStalling() { - var orchestrationInstance = new OrchestrationInstance - { - InstanceId = "instance_id", - ExecutionId = "execution_id", - }; - - ExecutionStartedEvent startedEvent = new(-1, string.Empty) - { - Name = "orchestration", - Version = string.Empty, - OrchestrationInstance = orchestrationInstance, - ScheduledStartTime = DateTime.UtcNow, - }; - - var settings = new AzureStorageOrchestrationServiceSettings + AzureStorageOrchestrationService service = null; + try { - PartitionCount = 1, - StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), - TaskHubName = TestHelpers.GetTestTaskHubName(), - ExtendedSessionsEnabled = false - }; - this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe); - - var service = new AzureStorageOrchestrationService(settings); - await service.CreateAsync(); - await service.StartAsync(); + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id", + ExecutionId = "execution_id", + }; - // Create the orchestration and get the first work item and start "working" on it - await service.CreateTaskOrchestrationAsync( - new TaskMessage() + ExecutionStartedEvent startedEvent = new(-1, string.Empty) { + Name = "orchestration", + Version = string.Empty, OrchestrationInstance = orchestrationInstance, - Event = startedEvent - }); - var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan.FromMinutes(5), - CancellationToken.None); - var runtimeState = workItem.OrchestrationRuntimeState; - runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); - runtimeState.AddEvent(startedEvent); - runtimeState.AddEvent(new ExecutionCompletedEvent(0, string.Empty, OrchestrationStatus.Completed)); - runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + ScheduledStartTime = DateTime.UtcNow, + }; + + var settings = new AzureStorageOrchestrationServiceSettings + { + PartitionCount = 1, + StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), + TaskHubName = TestHelpers.GetTestTaskHubName(), + ExtendedSessionsEnabled = false + }; + this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe); - AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + service = new AzureStorageOrchestrationService(settings); + await service.CreateAsync(); + await service.StartAsync(); - // Now manually update the instance to have status "Completed" - Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); - TableEntity entity = new (orchestrationInstance.InstanceId, "") + // Create the orchestration and get the first work item and start "working" on it + await service.CreateTaskOrchestrationAsync( + new TaskMessage() + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }); + var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + var runtimeState = workItem.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(startedEvent); + runtimeState.AddEvent(new ExecutionCompletedEvent(0, string.Empty, OrchestrationStatus.Completed)); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + // Now manually update the instance to have status "Completed" + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new(orchestrationInstance.InstanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"), + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + // Confirm an exception is thrown due to the etag mismatch for the instance table when the worker attempts to complete the work item + SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) + ); + Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); + DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; + Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode); + } + finally { - ["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"), - }; - await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); - - // Confirm an exception is thrown due to the etag mismatch for the instance table when the worker attempts to complete the work item - SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => - await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) - ); - Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); - DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; - Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode); + await service?.StopAsync(isForced: true); + } } /// /// Confirm that if a worker tries to complete a work item for a suborchestration after stalling and another worker has since completed the work item, /// a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has the correct status code (conflict). - /// The specific scenario tested is if the the worker stalled after updating the history table but before updating the instance table for the first work item + /// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table for the first work item /// for a suborchestration. When it attempts to insert a new entity into the instance table for the suborchestration (since for a suborchestration, /// the instance entity is only created upon completion of the first work item), it will fail. /// @@ -727,109 +745,117 @@ await service.CreateTaskOrchestrationAsync( [TestMethod] public async Task WorkerAttemptingToUpdateInstanceTableAfterStallingForSubOrchestration() { - var orchestrationInstance = new OrchestrationInstance + AzureStorageOrchestrationService service = null; + try { - InstanceId = "instance_id", - ExecutionId = "execution_id", - }; + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id", + ExecutionId = "execution_id", + }; - ExecutionStartedEvent startedEvent = new(-1, string.Empty) - { - Name = "orchestration", - Version = string.Empty, - OrchestrationInstance = orchestrationInstance, - ScheduledStartTime = DateTime.UtcNow, - }; + ExecutionStartedEvent startedEvent = new(-1, string.Empty) + { + Name = "orchestration", + Version = string.Empty, + OrchestrationInstance = orchestrationInstance, + ScheduledStartTime = DateTime.UtcNow, + }; - var settings = new AzureStorageOrchestrationServiceSettings - { - PartitionCount = 1, - StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), - TaskHubName = TestHelpers.GetTestTaskHubName(), - ExtendedSessionsEnabled = false - }; - this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe); + var settings = new AzureStorageOrchestrationServiceSettings + { + PartitionCount = 1, + StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), + TaskHubName = TestHelpers.GetTestTaskHubName(), + ExtendedSessionsEnabled = false + }; + this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe); - var service = new AzureStorageOrchestrationService(settings); - await service.CreateAsync(); - await service.StartAsync(); + service = new AzureStorageOrchestrationService(settings); + await service.CreateAsync(); + await service.StartAsync(); - // Create the orchestration and get the first work item and start "working" on it - await service.CreateTaskOrchestrationAsync( - new TaskMessage() - { - OrchestrationInstance = orchestrationInstance, - Event = startedEvent + // Create the orchestration and get the first work item and start "working" on it + await service.CreateTaskOrchestrationAsync( + new TaskMessage() + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }); + var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + var runtimeState = workItem.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(startedEvent); + runtimeState.AddEvent(new SubOrchestrationInstanceCreatedEvent(0) + { + Name = "suborchestration", + InstanceId = "sub_instance_id" }); - var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan.FromMinutes(5), - CancellationToken.None); - var runtimeState = workItem.OrchestrationRuntimeState; - runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); - runtimeState.AddEvent(startedEvent); - runtimeState.AddEvent(new SubOrchestrationInstanceCreatedEvent(0) - { - Name = "suborchestration", - InstanceId = "sub_instance_id" - }); - runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); - - // Create the task message to start the suborchestration - var subOrchestrationExecutionStartedEvent = new ExecutionStartedEvent(-1, string.Empty) - { - OrchestrationInstance = new OrchestrationInstance - { - InstanceId = "sub_instance_id", - ExecutionId = Guid.NewGuid().ToString("N") - }, - ParentInstance = new ParentInstance - { - OrchestrationInstance = runtimeState.OrchestrationInstance, - Name = runtimeState.Name, - Version = runtimeState.Version, - TaskScheduleId = 0, - }, - Name = "suborchestration" - }; - List orchestratorMessages = - new() { + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + // Create the task message to start the suborchestration + var subOrchestrationExecutionStartedEvent = new ExecutionStartedEvent(-1, string.Empty) + { + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = "sub_instance_id", + ExecutionId = Guid.NewGuid().ToString("N") + }, + ParentInstance = new ParentInstance + { + OrchestrationInstance = runtimeState.OrchestrationInstance, + Name = runtimeState.Name, + Version = runtimeState.Version, + TaskScheduleId = 0, + }, + Name = "suborchestration" + }; + List orchestratorMessages = + new() { new TaskMessage() { OrchestrationInstance = subOrchestrationExecutionStartedEvent.OrchestrationInstance, Event = subOrchestrationExecutionStartedEvent, } - }; + }; - // Complete the first work item, which will send the execution started message for the suborchestration - await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), orchestratorMessages, new List(), null, null); - - // Now get the work item for the suborchestration and "work" on it - workItem = await service.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan.FromMinutes(5), - CancellationToken.None); - runtimeState = workItem.OrchestrationRuntimeState; - runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); - runtimeState.AddEvent(subOrchestrationExecutionStartedEvent); - runtimeState.AddEvent(new ExecutionCompletedEvent(0, string.Empty, OrchestrationStatus.Completed)); - runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); - - AzureStorageClient azureStorageClient = new (settings); - Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); - // Now manually update the suborchestration to have status "Completed" - TableEntity entity = new ("sub_instance_id", "") - { - ["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"), - }; - await instanceTable.InsertEntityAsync(entity); - - // Confirm an exception is thrown because the worker attempts to insert a new entity for the suborchestration into the instance table - // when one already exists - SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => - await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) - ); - Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); - DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; - Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode); + // Complete the first work item, which will send the execution started message for the suborchestration + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), orchestratorMessages, new List(), null, null); + + // Now get the work item for the suborchestration and "work" on it + workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + runtimeState = workItem.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(subOrchestrationExecutionStartedEvent); + runtimeState.AddEvent(new ExecutionCompletedEvent(0, string.Empty, OrchestrationStatus.Completed)); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + AzureStorageClient azureStorageClient = new(settings); + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + // Now manually update the suborchestration to have status "Completed" + TableEntity entity = new("sub_instance_id", "") + { + ["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"), + }; + await instanceTable.InsertEntityAsync(entity); + + // Confirm an exception is thrown because the worker attempts to insert a new entity for the suborchestration into the instance table + // when one already exists + SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) + ); + Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); + DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; + Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode); + } + finally + { + await service?.StopAsync(isForced: true); + } } [TestMethod] From 8b0a23493f6fa65c51cf137675ac713bbdf4c256 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Tue, 9 Dec 2025 12:08:43 -0800 Subject: [PATCH 5/8] added instance status fetching when the etag isnt already present --- .../OrchestrationSessionManager.cs | 14 ++++++++++++++ .../AzureStorageScaleTests.cs | 1 - 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs index c583f39e0..f2b0396a3 100644 --- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs +++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs @@ -527,6 +527,20 @@ async Task ScheduleOrchestrationStatePrefetch( batch.ETags.HistoryETag = history.ETag; batch.LastCheckpointTime = history.LastCheckpointTime; batch.TrackingStoreContext = history.TrackingStoreContext; + + // Try to get the instance ETag from the tracking store if it wasn't already provided + if (batch.ETags.InstanceETag == null) + { + // Do we want to introduce a new method to just get the ETag without fetching the full instance status? + // I'm not sure this is necessary seeing as this method does not fetch the input, which is the only potentially + // large field of the instance entity anyway + InstanceStatus? instanceStatus = await this.trackingStore.FetchInstanceStatusAsync( + batch.OrchestrationInstanceId, + cancellationToken); + // The instance could not exist in the case that these messages are for the first execution of a suborchestration, + // or an entity-started orchestration, for example + batch.ETags.InstanceETag = instanceStatus?.ETag; + } } if (this.settings.UseSeparateQueueForEntityWorkItems diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs index 03960d426..dc81f33fc 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs @@ -537,7 +537,6 @@ public async Task MultipleWorkersAttemptingToCompleteSameWorkItem() await TestHelpers.WaitFor( condition: () => service1.OwnedControlQueues.Any(), timeout: TimeSpan.FromSeconds(30)); - ControlQueue controlQueue = service1.OwnedControlQueues.Single(); // Create the orchestration and get the first work item and start "working" on it await service1.CreateTaskOrchestrationAsync( From 23be843d64bbb49f3f0b76091e355a9c67b22677 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 15 Dec 2025 14:51:07 -0800 Subject: [PATCH 6/8] added feature flag --- ...zureStorageOrchestrationServiceSettings.cs | 17 +++ .../OrchestrationSessionManager.cs | 12 +- .../Tracking/AzureTableTrackingStore.cs | 76 ++++++----- .../AzureStorageScaleTests.cs | 121 +++++++++++++----- 4 files changed, 160 insertions(+), 66 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index 48e653868..e614da92f 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -294,5 +294,22 @@ internal LogHelper Logger /// The default is . /// public QueueClientMessageEncoding QueueClientMessageEncoding { get; set; } = QueueClientMessageEncoding.UTF8; + + /// + /// When true, an etag is used when attempting to make instance table updates upon completing an orchestration work item. + /// + /// + /// By default, etags are only used when updating the history table upon completing an orchestration work item. This can lead + /// to subtle race conditions where the instance table is incorrectly updated. Consider the following scenario: + /// 1. Worker A completes an orchestration work item, sends outgoing messages, updates the history table, then stalls. + /// 2. Worker B picks up the next and final orchestration work item for the same instance and completes it, updating the history + /// table and instance table with the new terminal status. + /// 3. Worker A resumes and overrides the terminal status in the instance table with an incorrect non-terminal status. + /// This will leave the instance status of the orchestration permanently as "Running" even though it has actually completed. + /// To prevent such scenarios, enabling this setting will ensure that instance table updates also use etags. This would prevent + /// worker A's update in step 3 from completing. Enabling this setting will also introduce a performance overhead since the instance + /// table must now be read before processing every orchestration work item to obtain the latest etag. + /// + public bool UseInstanceTableEtag { get; set; } = false; } } diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs index f2b0396a3..03f6fad48 100644 --- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs +++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs @@ -284,7 +284,13 @@ async Task> DedupeExecutionStartedMessagesAsync( { // Happy path: The message matches the table status. Alternatively, if the table doesn't have an ExecutionId field (older clients, pre-v1.8.5), // then we have no way of knowing if it's a duplicate. Either way, allow it to run. - message.MessageMetadata = remoteInstance.ETag; + + // There's actually no extra cost from setting and using the instance etag in this case, I just don't for consistency since it will not be set and + // used for future work items. Is this a good reason? + if (this.settings.UseInstanceTableEtag) + { + message.MessageMetadata = remoteInstance.ETag; + } } else if (expectedGeneration == remoteInstance?.State.Generation && this.IsScheduledAfterInstanceUpdate(message, remoteInstance?.State)) { @@ -477,7 +483,7 @@ internal void AddMessageToPendingOrchestration( if (targetBatch == null) { targetBatch = new PendingMessageBatch(controlQueue, instanceId, executionId); - if (data.MessageMetadata is ETag instanceEtag) + if (this.settings.UseInstanceTableEtag && data.MessageMetadata is ETag instanceEtag) { targetBatch.ETags.InstanceETag = instanceEtag; } @@ -529,7 +535,7 @@ async Task ScheduleOrchestrationStatePrefetch( batch.TrackingStoreContext = history.TrackingStoreContext; // Try to get the instance ETag from the tracking store if it wasn't already provided - if (batch.ETags.InstanceETag == null) + if (this.settings.UseInstanceTableEtag && batch.ETags.InstanceETag == null) { // Do we want to introduce a new method to just get the ETag without fetching the full instance status? // I'm not sure this is necessary seeing as this method does not fetch the input, which is the only potentially diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 86616dc97..026497aae 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -1416,47 +1416,57 @@ bool ExceedsMaxTablePropertySize(string data) { var orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew(); - try - { - Response result = await (eTag == null - ? this.InstancesTable.InsertEntityAsync(instanceEntity) - : this.InstancesTable.MergeEntityAsync(instanceEntity, eTag.Value)); - - this.settings.Logger.InstanceStatusUpdate( - this.storageAccountName, - this.taskHubName, - instanceId, - executionId, - runtimeStatus, - episodeNumber, - orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds); + ETag? newEtag = null; - return result.Headers.ETag; + if (!this.settings.UseInstanceTableEtag) + { + await this.InstancesTable.InsertOrMergeEntityAsync(instanceEntity); } - catch (DurableTaskStorageException ex) + else { - // Handle the case where the instance table has already been updated by another caller. - // Common case: the resulting code is 'PreconditionFailed', which means we are trying to update an existing instance entity and "eTag" no longer matches the one stored. - // Edge case: the resulting code is 'Conflict'. This is the case when eTag is null, and we are trying to insert a new instance entity, in which case the exception - // indicates that the table entity we are trying to "add" already exists. - if (ex.HttpStatusCode == (int)HttpStatusCode.Conflict || ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed) + try { - this.settings.Logger.SplitBrainDetected( - this.storageAccountName, - this.taskHubName, - instanceId, - executionId, - newEventCount: 0, - totalEventCount: 1, // these fields don't really make sense for the instance table case. do we want to introduce a new log? or are we okay with this since "InstanceEntity" - // in the new events field will allow this to be detectable? - "InstanceEntity", - orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds, - eTag is null ? string.Empty : eTag.ToString()); + Response result = await (eTag == null + ? this.InstancesTable.InsertEntityAsync(instanceEntity) + : this.InstancesTable.MergeEntityAsync(instanceEntity, eTag.Value)); + newEtag = result.Headers.ETag; } + catch (DurableTaskStorageException ex) + { + // Handle the case where the instance table has already been updated by another caller. + // Common case: the resulting code is 'PreconditionFailed', which means we are trying to update an existing instance entity and "eTag" no longer matches the one stored. + // Edge case: the resulting code is 'Conflict'. This is the case when eTag is null, and we are trying to insert a new instance entity, in which case the exception + // indicates that the table entity we are trying to "add" already exists. + if (ex.HttpStatusCode == (int)HttpStatusCode.Conflict || ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed) + { + this.settings.Logger.SplitBrainDetected( + this.storageAccountName, + this.taskHubName, + instanceId, + executionId, + newEventCount: 0, + totalEventCount: 1, // these fields don't really make sense for the instance table case. do we want to introduce a new log? or are we okay with this since "InstanceEntity" + // in the new events field will allow this to be detectable? + "InstanceEntity", + orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds, + eTag is null ? string.Empty : eTag.ToString()); + } - throw; + throw; + } } + this.settings.Logger.InstanceStatusUpdate( + this.storageAccountName, + this.taskHubName, + instanceId, + executionId, + runtimeStatus, + episodeNumber, + orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds); + + return newEtag; + } class TrackingStoreContext diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs index dc81f33fc..320a26d42 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs @@ -644,19 +644,26 @@ await TestHelpers.WaitFor( } /// - /// Confirm that if a worker tries to complete a work item after stalling and another worker has since completed the work item, - /// a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has the correct status code (precondition failed). - /// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table. - /// When it attempts to update the instance table with a stale etag, it will fail. + /// Confirm that: + /// 1. If is true, and a worker attempts to update the instance table with a stale + /// etag upon completing a work item, a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has the correct status code + /// (precondition failed). + /// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table. When it attempts to update + /// the instance table with a stale etag, it will fail. + /// 2. If is false for the above scenario, then the call to update the instance table + /// will go through, and the instance table will be updated with a "stale" status. /// /// /// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker /// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker /// has since updated" the instance table. /// + /// The value to use for /// - [TestMethod] - public async Task WorkerAttemptingToUpdateInstanceTableAfterStalling() + [DataTestMethod] + [DataRow(true)] + [DataRow(false)] + public async Task WorkerAttemptingToUpdateInstanceTableAfterStalling(bool useInstanceEtag) { AzureStorageOrchestrationService service = null; try @@ -680,7 +687,8 @@ public async Task WorkerAttemptingToUpdateInstanceTableAfterStalling() PartitionCount = 1, StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), TaskHubName = TestHelpers.GetTestTaskHubName(), - ExtendedSessionsEnabled = false + ExtendedSessionsEnabled = false, + UseInstanceTableEtag = useInstanceEtag }; this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe); @@ -701,7 +709,7 @@ await service.CreateTaskOrchestrationAsync( var runtimeState = workItem.OrchestrationRuntimeState; runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); runtimeState.AddEvent(startedEvent); - runtimeState.AddEvent(new ExecutionCompletedEvent(0, string.Empty, OrchestrationStatus.Completed)); + runtimeState.AddEvent(new TaskScheduledEvent(0)); runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); AzureStorageClient azureStorageClient = new AzureStorageClient(settings); @@ -714,13 +722,35 @@ await service.CreateTaskOrchestrationAsync( }; await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); - // Confirm an exception is thrown due to the etag mismatch for the instance table when the worker attempts to complete the work item - SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => - await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) - ); - Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); - DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; - Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode); + if (useInstanceEtag) + { + // Confirm an exception is thrown due to the etag mismatch for the instance table when the worker attempts to complete the work item + SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) + ); + Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); + DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; + Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode); + } + else + { + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null); + + var queryCondition = new OrchestrationInstanceStatusQueryCondition + { + InstanceId = "instance_id", + FetchInput = false, + }; + + ODataCondition odata = queryCondition.ToOData(); + OrchestrationInstanceStatus instanceTableEntity = await instanceTable + .ExecuteQueryAsync(odata.Filter, 1, odata.Select, CancellationToken.None) + .FirstOrDefaultAsync(); + + // Confirm the instance table was updated with a "stale" status + Assert.IsNotNull(instanceTableEntity); + Assert.AreEqual(OrchestrationStatus.Running.ToString(), instanceTableEntity.RuntimeStatus); + } } finally { @@ -729,20 +759,27 @@ await service.CreateTaskOrchestrationAsync( } /// - /// Confirm that if a worker tries to complete a work item for a suborchestration after stalling and another worker has since completed the work item, - /// a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has the correct status code (conflict). + /// Confirm that: + /// 1. If is true, and a worker attempts to update the instance table with a stale + /// etag upon completing a work item for a suborchestration, a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has + /// the correct status code (conflict). /// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table for the first work item /// for a suborchestration. When it attempts to insert a new entity into the instance table for the suborchestration (since for a suborchestration, /// the instance entity is only created upon completion of the first work item), it will fail. + /// 2. If is false for the above scenario, then the call to update the instance table + /// will go through, and the instance table will be updated with a "stale" status. /// /// - /// Since it is impossible to force stalling, we simulate the above scenario by manually inserting an entry into the instance table before the worker + /// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker /// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker /// has since updated" the instance table. /// + /// The value to use for /// - [TestMethod] - public async Task WorkerAttemptingToUpdateInstanceTableAfterStallingForSubOrchestration() + [DataTestMethod] + [DataRow(true)] + [DataRow(false)] + public async Task WorkerAttemptingToUpdateInstanceTableAfterStallingForSubOrchestration(bool useInstanceEtag) { AzureStorageOrchestrationService service = null; try @@ -766,7 +803,8 @@ public async Task WorkerAttemptingToUpdateInstanceTableAfterStallingForSubOrches PartitionCount = 1, StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), TaskHubName = TestHelpers.GetTestTaskHubName(), - ExtendedSessionsEnabled = false + ExtendedSessionsEnabled = false, + UseInstanceTableEtag = useInstanceEtag }; this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe); @@ -830,7 +868,7 @@ await service.CreateTaskOrchestrationAsync( runtimeState = workItem.OrchestrationRuntimeState; runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); runtimeState.AddEvent(subOrchestrationExecutionStartedEvent); - runtimeState.AddEvent(new ExecutionCompletedEvent(0, string.Empty, OrchestrationStatus.Completed)); + runtimeState.AddEvent(new TaskScheduledEvent(0)); runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); AzureStorageClient azureStorageClient = new(settings); @@ -842,14 +880,37 @@ await service.CreateTaskOrchestrationAsync( }; await instanceTable.InsertEntityAsync(entity); - // Confirm an exception is thrown because the worker attempts to insert a new entity for the suborchestration into the instance table - // when one already exists - SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => - await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) - ); - Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); - DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; - Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode); + if (useInstanceEtag) + { + // Confirm an exception is thrown because the worker attempts to insert a new entity for the suborchestration into the instance table + // when one already exists + SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) + ); + Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); + DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; + Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode); + } + else + { + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null); + + var queryCondition = new OrchestrationInstanceStatusQueryCondition + { + InstanceId = "sub_instance_id", + FetchInput = false, + }; + + ODataCondition odata = queryCondition.ToOData(); + OrchestrationInstanceStatus instanceTableEntity = await instanceTable + .ExecuteQueryAsync(odata.Filter, 1, odata.Select, CancellationToken.None) + .FirstOrDefaultAsync(); + + // Confirm the instance table was updated with a "stale" status + Assert.IsNotNull(instanceTableEntity); + Assert.AreEqual(OrchestrationStatus.Running.ToString(), instanceTableEntity.RuntimeStatus); + } + } finally { From 03aeb4aeb25bd5fb3e0539e76bc5c39f32e15667 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 15 Dec 2025 16:17:10 -0800 Subject: [PATCH 7/8] addressing PR comments --- .../OrchestrationETags.cs | 16 ++++++++++++++-- .../OrchestrationSessionManager.cs | 3 --- .../Tracking/AzureTableTrackingStore.cs | 7 +++---- .../AzureStorageScaleTests.cs | 14 +++++++------- 4 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/DurableTask.AzureStorage/OrchestrationETags.cs b/src/DurableTask.AzureStorage/OrchestrationETags.cs index 9e7f67531..6c304b393 100644 --- a/src/DurableTask.AzureStorage/OrchestrationETags.cs +++ b/src/DurableTask.AzureStorage/OrchestrationETags.cs @@ -1,8 +1,20 @@ -using Azure; - +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- #nullable enable namespace DurableTask.AzureStorage { + using Azure; + class OrchestrationETags { internal ETag? InstanceETag { get; set; } diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs index 03f6fad48..4b281e1c9 100644 --- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs +++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs @@ -537,9 +537,6 @@ async Task ScheduleOrchestrationStatePrefetch( // Try to get the instance ETag from the tracking store if it wasn't already provided if (this.settings.UseInstanceTableEtag && batch.ETags.InstanceETag == null) { - // Do we want to introduce a new method to just get the ETag without fetching the full instance status? - // I'm not sure this is necessary seeing as this method does not fetch the input, which is the only potentially - // large field of the instance entity anyway InstanceStatus? instanceStatus = await this.trackingStore.FetchInstanceStatusAsync( batch.OrchestrationInstanceId, cancellationToken); diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 026497aae..f1e67a0bf 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -1028,7 +1028,7 @@ public override async Task UpdateStateAsync( cancellationToken: cancellationToken); } - eTags.InstanceETag = await this.TryUpdateInstanceTableAsync(instanceEntity, eTags.InstanceETag, instanceId, executionId, runtimeStatus, episodeNumber); + eTags.InstanceETag = await this.UpdateInstanceTableAsync(instanceEntity, eTags.InstanceETag, instanceId, executionId, runtimeStatus, episodeNumber); // finally, delete orphaned blobs from the previous execution history. // We had to wait until the new history has committed to make sure the blobs are no longer necessary. @@ -1412,7 +1412,7 @@ bool ExceedsMaxTablePropertySize(string data) return false; } - async Task TryUpdateInstanceTableAsync(TableEntity instanceEntity, ETag? eTag, string instanceId, string executionId, OrchestrationStatus runtimeStatus, int episodeNumber) + async Task UpdateInstanceTableAsync(TableEntity instanceEntity, ETag? eTag, string instanceId, string executionId, OrchestrationStatus runtimeStatus, int episodeNumber) { var orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew(); @@ -1445,8 +1445,7 @@ bool ExceedsMaxTablePropertySize(string data) instanceId, executionId, newEventCount: 0, - totalEventCount: 1, // these fields don't really make sense for the instance table case. do we want to introduce a new log? or are we okay with this since "InstanceEntity" - // in the new events field will allow this to be detectable? + totalEventCount: 1, "InstanceEntity", orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds, eTag is null ? string.Empty : eTag.ToString()); diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs index 320a26d42..a2872df75 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs @@ -13,6 +13,13 @@ namespace DurableTask.AzureStorage.Tests { + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Net; + using System.Threading; + using System.Threading.Tasks; using Azure.Data.Tables; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; @@ -26,13 +33,6 @@ namespace DurableTask.AzureStorage.Tests using DurableTask.Core.Exceptions; using DurableTask.Core.History; using Microsoft.VisualStudio.TestTools.UnitTesting; - using System; - using System.Collections.Generic; - using System.Diagnostics; - using System.Linq; - using System.Net; - using System.Threading; - using System.Threading.Tasks; /// /// Validates the following requirements: From e3ab0b6ebf1d50a024bdb42d8a64f0ffc919d07b Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 15 Dec 2025 16:22:17 -0800 Subject: [PATCH 8/8] removed one more design question --- src/DurableTask.AzureStorage/OrchestrationSessionManager.cs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs index 4b281e1c9..7ecae232d 100644 --- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs +++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs @@ -284,9 +284,6 @@ async Task> DedupeExecutionStartedMessagesAsync( { // Happy path: The message matches the table status. Alternatively, if the table doesn't have an ExecutionId field (older clients, pre-v1.8.5), // then we have no way of knowing if it's a duplicate. Either way, allow it to run. - - // There's actually no extra cost from setting and using the instance etag in this case, I just don't for consistency since it will not be set and - // used for future work items. Is this a good reason? if (this.settings.UseInstanceTableEtag) { message.MessageMetadata = remoteInstance.ETag;