diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 04a32e9fd..9fdee5f8f 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1215,11 +1215,16 @@ await this.CommitOutboundQueueMessages( this.orchestrationSessionManager.AddMessageToPendingOrchestration(session.ControlQueue, messages, session.TraceActivityId, CancellationToken.None); } } - catch (RequestFailedException rfe) when (rfe.Status == (int)HttpStatusCode.PreconditionFailed) + // Handle the case where the 'ETag' has changed, which implies another worker has taken over this work item while + // we were trying to process it. We detect this in 2 cases: + // Common case: the resulting code is 'PreconditionFailed', which means our ETag no longer matches the one stored. + // Edge case: the resulting code is 'Conflict'. This can occur if this was the first orchestration work item. + // The 'Conflict' represents that we attempted to insert a new orchestration history when one already exists. + catch (DurableTaskStorageException dtse) when (dtse.HttpStatusCode == (int)HttpStatusCode.Conflict || dtse.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed) { // Precondition failure is expected to be handled internally and logged as a warning. // The orchestration dispatcher will handle this exception by abandoning the work item - throw new SessionAbortedException("Aborting execution due to failed precondition.", rfe); + throw new SessionAbortedException("Aborting execution due to conflicting completion of the work item by another worker.", dtse); } catch (Exception e) { diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 05fb97c7c..17824a511 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -1203,7 +1203,11 @@ static string GetBlobName(TableEntity entity, string property) } catch (DurableTaskStorageException ex) { - if (ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed) + // Handle the case where the the history has already been updated by another caller. + // Common case: the resulting code is 'PreconditionFailed', which means "eTagValue" no longer matches the one stored, and TableTransactionActionType is "Update". + // Edge case: the resulting code is 'Conflict'. This is the case when eTagValue is null, and the TableTransactionActionType is "Add", + // in which case the exception indicates that the table entity we are trying to "add" already exists. + if (ex.HttpStatusCode == (int)HttpStatusCode.Conflict || ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed) { this.settings.Logger.SplitBrainDetected( this.storageAccountName, @@ -1214,7 +1218,7 @@ static string GetBlobName(TableEntity entity, string property) numberOfTotalEvents, historyEventNamesBuffer.ToString(0, historyEventNamesBuffer.Length - 1), // remove trailing comma stopwatch.ElapsedMilliseconds, - eTagValue?.ToString()); + eTagValue is null ? string.Empty : eTagValue.ToString()); } throw; diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs index 18417b96d..64b9eb22b 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs @@ -17,6 +17,7 @@ namespace DurableTask.AzureStorage.Tests using System.Collections.Generic; using System.Diagnostics; using System.Linq; + using System.Net; using System.Threading; using System.Threading.Tasks; using Azure.Data.Tables; @@ -29,6 +30,7 @@ namespace DurableTask.AzureStorage.Tests using DurableTask.AzureStorage.Storage; using DurableTask.AzureStorage.Tracking; using DurableTask.Core; + using DurableTask.Core.Exceptions; using DurableTask.Core.History; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -88,6 +90,8 @@ async Task EnsureTaskHubAsync( bool testDeletion, bool deleteBeforeCreate = true, string workerId = "test", + int partitionCount = 4, + TimeSpan? controlQueueVisibilityTimeout = null, PartitionManagerType partitionManagerType = PartitionManagerType.V2Safe) { string storageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(); @@ -99,7 +103,12 @@ async Task EnsureTaskHubAsync( StorageAccountClientProvider = new StorageAccountClientProvider(storageConnectionString), TaskHubName = taskHubName, WorkerId = workerId, + PartitionCount = partitionCount, }; + if (controlQueueVisibilityTimeout != null) + { + settings.ControlQueueVisibilityTimeout = controlQueueVisibilityTimeout.Value; + } this.SetPartitionManagerType(settings, partitionManagerType); @@ -119,7 +128,7 @@ async Task EnsureTaskHubAsync( // Control queues Assert.IsNotNull(service.AllControlQueues, "Control queue collection was not initialized."); ControlQueue[] controlQueues = service.AllControlQueues.ToArray(); - Assert.AreEqual(4, controlQueues.Length, "Expected to see the default four control queues created."); + Assert.AreEqual(partitionCount, controlQueues.Length, $"Expected to see the default {partitionCount} control queues created."); foreach (ControlQueue queue in controlQueues) { Assert.IsTrue(await queue.InnerQueue.ExistsAsync(), $"Queue {queue.Name} was not created."); @@ -483,6 +492,148 @@ await TestHelpers.WaitFor( Assert.IsTrue(queueMessages.All(msg => msg.DequeueCount == 1)); } + /// + /// Confirm that if two workers try to complete the same work item, a SessionAbortedException is thrown which wraps the + /// inner DurableTaskStorageException, which has the correct status code. + /// We check two cases: + /// 1. If this is the first work item for the orchestration , the DurableTaskStorageException that is wrapped has status "Conflict" + /// which is due to trying to insert an orchestration history when one already exists. + /// 2. If this is not the first work item, the DurableTaskStorageException that is wrapped has status "PreconditionFailed" + /// which is due to trying to update the existing orchestration history with a stale etag. + /// + /// + [TestMethod] + public async Task MultipleWorkersAttemptingToCompleteSameWorkItem() + { + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id", + ExecutionId = "execution_id", + }; + + ExecutionStartedEvent startedEvent = new(-1, string.Empty) + { + Name = "orchestration", + Version = string.Empty, + OrchestrationInstance = orchestrationInstance, + ScheduledStartTime = DateTime.UtcNow, + }; + + // Create worker 1, wait for it to acquire the lease. + // Make sure to set a small control queue visibility timeout so that worker 2 can reacquire the work item quickly once worker 1 loses the lease. + var service1 = await this.EnsureTaskHubAsync( + nameof(MultipleWorkersAttemptingToCompleteSameWorkItem), + testDeletion: false, + deleteBeforeCreate: true, + partitionCount: 1, + workerId: "1", + controlQueueVisibilityTimeout: TimeSpan.FromSeconds(1) + ); + await service1.StartAsync(); + await TestHelpers.WaitFor( + condition: () => service1.OwnedControlQueues.Any(), + timeout: TimeSpan.FromSeconds(30)); + ControlQueue controlQueue = service1.OwnedControlQueues.Single(); + + // Create the orchestration and get the first work item and start "working" on it + await service1.CreateTaskOrchestrationAsync( + new TaskMessage() + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }); + var workItem1 = await service1.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + var runtimeState = workItem1.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(startedEvent); + runtimeState.AddEvent(new TaskScheduledEvent(0, "task")); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + // Now lose the lease + BlobPartitionLease lease = await service1.ListBlobLeasesAsync().SingleAsync(); + await service1.OnOwnershipLeaseReleasedAsync(lease, CloseReason.LeaseLost); + await TestHelpers.WaitFor( + condition: () => !service1.OwnedControlQueues.Any(), + timeout: TimeSpan.FromSeconds(30)); + + // Create worker 2, wait for it to now acquire the lease + var service2 = await this.EnsureTaskHubAsync( + nameof(MultipleWorkersAttemptingToCompleteSameWorkItem), + testDeletion: false, + deleteBeforeCreate: false, + workerId: "2", + partitionCount: 1, + controlQueueVisibilityTimeout: TimeSpan.FromSeconds(1) + ); + await service2.StartAsync(); + await service2.OnOwnershipLeaseAquiredAsync(lease); + await TestHelpers.WaitFor( + condition: () => service2.OwnedControlQueues.Any(), + timeout: TimeSpan.FromSeconds(60)); + + // Have worker 2 dequeue the same work item and start "working" on it + var workItem2 = await service2.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + workItem2.OrchestrationRuntimeState = runtimeState; + + // Worker 2 completes the work item + await service2.CompleteTaskOrchestrationWorkItemAsync(workItem2, runtimeState, new List(), new List(), new List(), null, null); + // Now worker 1 will attempt to complete the same work item. Since this is the first attempt to complete a work item and add a history for the orchestration (by worker 1), + // there is no etag stored for the OrchestrationSession, and so the a "conflict" exception will be thrown as worker 2 already created a history for the orchestration. + SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => + await service1.CompleteTaskOrchestrationWorkItemAsync(workItem1, runtimeState, new List(), new List(), new List(), null, null) + ); + Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); + DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; + Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode); + await service1.ReleaseTaskOrchestrationWorkItemAsync(workItem1); + await service2.ReleaseTaskOrchestrationWorkItemAsync(workItem2); + + // Now simulate a task completing for the orchestration + var taskCompletedEvent = new TaskCompletedEvent(-1, 0, string.Empty); + await service2.SendTaskOrchestrationMessageAsync(new TaskMessage { Event = taskCompletedEvent, OrchestrationInstance = orchestrationInstance }); + // Worker 2 gets the next work item related to this task completion and starts "working" on it + workItem2 = await service2.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + runtimeState = workItem2.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(taskCompletedEvent); + runtimeState.AddEvent(new ExecutionCompletedEvent(1, string.Empty, OrchestrationStatus.Completed)); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + // Now force worker 2 to lose the lease and have worker 1 acquire it + lease = await service2.ListBlobLeasesAsync().SingleAsync(); + await service2.OnOwnershipLeaseReleasedAsync(lease, CloseReason.LeaseLost); + await TestHelpers.WaitFor( + condition: () => !service2.OwnedControlQueues.Any(), + timeout: TimeSpan.FromSeconds(30)); + await service1.OnOwnershipLeaseAquiredAsync(lease); + await TestHelpers.WaitFor( + condition: () => service1.OwnedControlQueues.Any(), + timeout: TimeSpan.FromSeconds(60)); + + // Worker 1 also acquires the work item and starts "working" on it + workItem1 = await service1.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + workItem1.OrchestrationRuntimeState = runtimeState; + + // Worker 1 completes the work item + await service1.CompleteTaskOrchestrationWorkItemAsync(workItem1, runtimeState, new List(), new List(), new List(), null, null); + // Now worker 2 attempts to complete the same work item. Since this is not the first work item for the orchestration, now an etag exists for the OrchestrationSession, and the exception + // that is thrown will be "precondition failed" as the Etag is stale after worker 1 completed the work item. + exception = await Assert.ThrowsExceptionAsync(async () => + await service2.CompleteTaskOrchestrationWorkItemAsync(workItem2, runtimeState, new List(), new List(), new List(), null, null) + ); + Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); + dtse = (DurableTaskStorageException)exception.InnerException; + Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode); + } + [TestMethod] public async Task MonitorIdleTaskHubDisconnected() {