Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1215,11 +1215,16 @@ await this.CommitOutboundQueueMessages(
this.orchestrationSessionManager.AddMessageToPendingOrchestration(session.ControlQueue, messages, session.TraceActivityId, CancellationToken.None);
}
}
catch (RequestFailedException rfe) when (rfe.Status == (int)HttpStatusCode.PreconditionFailed)
// Handle the case where the 'ETag' has changed, which implies another worker has taken over this work item while
// we were trying to process it. We detect this in 2 cases:
// Common case: the resulting code is 'PreconditionFailed', which means our ETag no longer matches the one stored.
// Edge case: the resulting code is 'Conflict'. This can occur if this was the first orchestration work item.
// The 'Conflict' represents that we attempted to insert a new orchestration history when one already exists.
catch (DurableTaskStorageException dtse) when (dtse.HttpStatusCode == (int)HttpStatusCode.Conflict || dtse.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
{
// Precondition failure is expected to be handled internally and logged as a warning.
// The orchestration dispatcher will handle this exception by abandoning the work item
throw new SessionAbortedException("Aborting execution due to failed precondition.", rfe);
throw new SessionAbortedException("Aborting execution due to conflicting completion of the work item by another worker.", dtse);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1203,7 +1203,11 @@ static string GetBlobName(TableEntity entity, string property)
}
catch (DurableTaskStorageException ex)
{
if (ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
// Handle the case where the the history has already been updated by another caller.
// Common case: the resulting code is 'PreconditionFailed', which means "eTagValue" no longer matches the one stored, and TableTransactionActionType is "Update".
// Edge case: the resulting code is 'Conflict'. This is the case when eTagValue is null, and the TableTransactionActionType is "Add",
// in which case the exception indicates that the table entity we are trying to "add" already exists.
if (ex.HttpStatusCode == (int)HttpStatusCode.Conflict || ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
{
this.settings.Logger.SplitBrainDetected(
this.storageAccountName,
Expand All @@ -1214,7 +1218,7 @@ static string GetBlobName(TableEntity entity, string property)
numberOfTotalEvents,
historyEventNamesBuffer.ToString(0, historyEventNamesBuffer.Length - 1), // remove trailing comma
stopwatch.ElapsedMilliseconds,
eTagValue?.ToString());
eTagValue is null ? string.Empty : eTagValue.ToString());
}

throw;
Expand Down
153 changes: 152 additions & 1 deletion test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace DurableTask.AzureStorage.Tests
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Azure.Data.Tables;
Expand All @@ -29,6 +30,7 @@ namespace DurableTask.AzureStorage.Tests
using DurableTask.AzureStorage.Storage;
using DurableTask.AzureStorage.Tracking;
using DurableTask.Core;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using Microsoft.VisualStudio.TestTools.UnitTesting;

Expand Down Expand Up @@ -88,6 +90,8 @@ async Task<AzureStorageOrchestrationService> EnsureTaskHubAsync(
bool testDeletion,
bool deleteBeforeCreate = true,
string workerId = "test",
int partitionCount = 4,
TimeSpan? controlQueueVisibilityTimeout = null,
PartitionManagerType partitionManagerType = PartitionManagerType.V2Safe)
{
string storageConnectionString = TestHelpers.GetTestStorageAccountConnectionString();
Expand All @@ -99,7 +103,12 @@ async Task<AzureStorageOrchestrationService> EnsureTaskHubAsync(
StorageAccountClientProvider = new StorageAccountClientProvider(storageConnectionString),
TaskHubName = taskHubName,
WorkerId = workerId,
PartitionCount = partitionCount,
};
if (controlQueueVisibilityTimeout != null)
{
settings.ControlQueueVisibilityTimeout = controlQueueVisibilityTimeout.Value;
}
this.SetPartitionManagerType(settings, partitionManagerType);


Expand All @@ -119,7 +128,7 @@ async Task<AzureStorageOrchestrationService> EnsureTaskHubAsync(
// Control queues
Assert.IsNotNull(service.AllControlQueues, "Control queue collection was not initialized.");
ControlQueue[] controlQueues = service.AllControlQueues.ToArray();
Assert.AreEqual(4, controlQueues.Length, "Expected to see the default four control queues created.");
Assert.AreEqual(partitionCount, controlQueues.Length, $"Expected to see the default {partitionCount} control queues created.");
foreach (ControlQueue queue in controlQueues)
{
Assert.IsTrue(await queue.InnerQueue.ExistsAsync(), $"Queue {queue.Name} was not created.");
Expand Down Expand Up @@ -483,6 +492,148 @@ await TestHelpers.WaitFor(
Assert.IsTrue(queueMessages.All(msg => msg.DequeueCount == 1));
}

/// <summary>
/// Confirm that if two workers try to complete the same work item, a SessionAbortedException is thrown which wraps the
/// inner DurableTaskStorageException, which has the correct status code.
/// We check two cases:
/// 1. If this is the first work item for the orchestration , the DurableTaskStorageException that is wrapped has status "Conflict"
/// which is due to trying to insert an orchestration history when one already exists.
/// 2. If this is not the first work item, the DurableTaskStorageException that is wrapped has status "PreconditionFailed"
/// which is due to trying to update the existing orchestration history with a stale etag.
/// </summary>
/// <returns></returns>
[TestMethod]
public async Task MultipleWorkersAttemptingToCompleteSameWorkItem()
{
var orchestrationInstance = new OrchestrationInstance
{
InstanceId = "instance_id",
ExecutionId = "execution_id",
};

ExecutionStartedEvent startedEvent = new(-1, string.Empty)
{
Name = "orchestration",
Version = string.Empty,
OrchestrationInstance = orchestrationInstance,
ScheduledStartTime = DateTime.UtcNow,
};

// Create worker 1, wait for it to acquire the lease.
// Make sure to set a small control queue visibility timeout so that worker 2 can reacquire the work item quickly once worker 1 loses the lease.
var service1 = await this.EnsureTaskHubAsync(
nameof(MultipleWorkersAttemptingToCompleteSameWorkItem),
testDeletion: false,
deleteBeforeCreate: true,
partitionCount: 1,
workerId: "1",
controlQueueVisibilityTimeout: TimeSpan.FromSeconds(1)
);
await service1.StartAsync();
await TestHelpers.WaitFor(
condition: () => service1.OwnedControlQueues.Any(),
timeout: TimeSpan.FromSeconds(30));
ControlQueue controlQueue = service1.OwnedControlQueues.Single();

// Create the orchestration and get the first work item and start "working" on it
await service1.CreateTaskOrchestrationAsync(
new TaskMessage()
{
OrchestrationInstance = orchestrationInstance,
Event = startedEvent
});
var workItem1 = await service1.LockNextTaskOrchestrationWorkItemAsync(
TimeSpan.FromMinutes(5),
CancellationToken.None);
var runtimeState = workItem1.OrchestrationRuntimeState;
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
runtimeState.AddEvent(startedEvent);
runtimeState.AddEvent(new TaskScheduledEvent(0, "task"));
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));

// Now lose the lease
BlobPartitionLease lease = await service1.ListBlobLeasesAsync().SingleAsync();
await service1.OnOwnershipLeaseReleasedAsync(lease, CloseReason.LeaseLost);
await TestHelpers.WaitFor(
condition: () => !service1.OwnedControlQueues.Any(),
timeout: TimeSpan.FromSeconds(30));

// Create worker 2, wait for it to now acquire the lease
var service2 = await this.EnsureTaskHubAsync(
nameof(MultipleWorkersAttemptingToCompleteSameWorkItem),
testDeletion: false,
deleteBeforeCreate: false,
workerId: "2",
partitionCount: 1,
controlQueueVisibilityTimeout: TimeSpan.FromSeconds(1)
);
await service2.StartAsync();
await service2.OnOwnershipLeaseAquiredAsync(lease);
await TestHelpers.WaitFor(
condition: () => service2.OwnedControlQueues.Any(),
timeout: TimeSpan.FromSeconds(60));

// Have worker 2 dequeue the same work item and start "working" on it
var workItem2 = await service2.LockNextTaskOrchestrationWorkItemAsync(
TimeSpan.FromMinutes(5),
CancellationToken.None);
workItem2.OrchestrationRuntimeState = runtimeState;

// Worker 2 completes the work item
await service2.CompleteTaskOrchestrationWorkItemAsync(workItem2, runtimeState, new List<TaskMessage>(), new List<TaskMessage>(), new List<TaskMessage>(), null, null);
// Now worker 1 will attempt to complete the same work item. Since this is the first attempt to complete a work item and add a history for the orchestration (by worker 1),
// there is no etag stored for the OrchestrationSession, and so the a "conflict" exception will be thrown as worker 2 already created a history for the orchestration.
SessionAbortedException exception = await Assert.ThrowsExceptionAsync<SessionAbortedException>(async () =>
await service1.CompleteTaskOrchestrationWorkItemAsync(workItem1, runtimeState, new List<TaskMessage>(), new List<TaskMessage>(), new List<TaskMessage>(), null, null)
);
Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException));
DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException;
Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode);
await service1.ReleaseTaskOrchestrationWorkItemAsync(workItem1);
await service2.ReleaseTaskOrchestrationWorkItemAsync(workItem2);

// Now simulate a task completing for the orchestration
var taskCompletedEvent = new TaskCompletedEvent(-1, 0, string.Empty);
await service2.SendTaskOrchestrationMessageAsync(new TaskMessage { Event = taskCompletedEvent, OrchestrationInstance = orchestrationInstance });
// Worker 2 gets the next work item related to this task completion and starts "working" on it
workItem2 = await service2.LockNextTaskOrchestrationWorkItemAsync(
TimeSpan.FromMinutes(5),
CancellationToken.None);
runtimeState = workItem2.OrchestrationRuntimeState;
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
runtimeState.AddEvent(taskCompletedEvent);
runtimeState.AddEvent(new ExecutionCompletedEvent(1, string.Empty, OrchestrationStatus.Completed));
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));

// Now force worker 2 to lose the lease and have worker 1 acquire it
lease = await service2.ListBlobLeasesAsync().SingleAsync();
await service2.OnOwnershipLeaseReleasedAsync(lease, CloseReason.LeaseLost);
await TestHelpers.WaitFor(
condition: () => !service2.OwnedControlQueues.Any(),
timeout: TimeSpan.FromSeconds(30));
await service1.OnOwnershipLeaseAquiredAsync(lease);
await TestHelpers.WaitFor(
condition: () => service1.OwnedControlQueues.Any(),
timeout: TimeSpan.FromSeconds(60));

// Worker 1 also acquires the work item and starts "working" on it
workItem1 = await service1.LockNextTaskOrchestrationWorkItemAsync(
TimeSpan.FromMinutes(5),
CancellationToken.None);
workItem1.OrchestrationRuntimeState = runtimeState;

// Worker 1 completes the work item
await service1.CompleteTaskOrchestrationWorkItemAsync(workItem1, runtimeState, new List<TaskMessage>(), new List<TaskMessage>(), new List<TaskMessage>(), null, null);
// Now worker 2 attempts to complete the same work item. Since this is not the first work item for the orchestration, now an etag exists for the OrchestrationSession, and the exception
// that is thrown will be "precondition failed" as the Etag is stale after worker 1 completed the work item.
exception = await Assert.ThrowsExceptionAsync<SessionAbortedException>(async () =>
await service2.CompleteTaskOrchestrationWorkItemAsync(workItem2, runtimeState, new List<TaskMessage>(), new List<TaskMessage>(), new List<TaskMessage>(), null, null)
);
Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException));
dtse = (DurableTaskStorageException)exception.InnerException;
Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode);
}

[TestMethod]
public async Task MonitorIdleTaskHubDisconnected()
{
Expand Down
Loading