diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
index 22e0e80da..1c87113b7 100644
--- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
+++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
@@ -1230,7 +1230,7 @@ await this.CommitOutboundQueueMessages(
// will result in a duplicate replay of the orchestration with no side-effects.
try
{
- session.ETag = await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETag, session.TrackingStoreContext);
+ await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETags, session.TrackingStoreContext);
// update the runtime state and execution id stored in the session
session.UpdateRuntimeState(runtimeState);
diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs
index 48e653868..e614da92f 100644
--- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs
+++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs
@@ -294,5 +294,22 @@ internal LogHelper Logger
/// The default is .
///
public QueueClientMessageEncoding QueueClientMessageEncoding { get; set; } = QueueClientMessageEncoding.UTF8;
+
+ ///
+ /// When true, an etag is used when attempting to make instance table updates upon completing an orchestration work item.
+ ///
+ ///
+ /// By default, etags are only used when updating the history table upon completing an orchestration work item. This can lead
+ /// to subtle race conditions where the instance table is incorrectly updated. Consider the following scenario:
+ /// 1. Worker A completes an orchestration work item, sends outgoing messages, updates the history table, then stalls.
+ /// 2. Worker B picks up the next and final orchestration work item for the same instance and completes it, updating the history
+ /// table and instance table with the new terminal status.
+ /// 3. Worker A resumes and overrides the terminal status in the instance table with an incorrect non-terminal status.
+ /// This will leave the instance status of the orchestration permanently as "Running" even though it has actually completed.
+ /// To prevent such scenarios, enabling this setting will ensure that instance table updates also use etags. This would prevent
+ /// worker A's update in step 3 from completing. Enabling this setting will also introduce a performance overhead since the instance
+ /// table must now be read before processing every orchestration work item to obtain the latest etag.
+ ///
+ public bool UseInstanceTableEtag { get; set; } = false;
}
}
diff --git a/src/DurableTask.AzureStorage/MessageData.cs b/src/DurableTask.AzureStorage/MessageData.cs
index d89e7c686..4715043c7 100644
--- a/src/DurableTask.AzureStorage/MessageData.cs
+++ b/src/DurableTask.AzureStorage/MessageData.cs
@@ -111,6 +111,8 @@ internal void Update(UpdateReceipt receipt)
{
this.OriginalQueueMessage = this.OriginalQueueMessage.Update(receipt);
}
+
+ internal object MessageMetadata { get; set; }
}
///
diff --git a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs
index 719694e97..1b2e4a20e 100644
--- a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs
+++ b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs
@@ -37,7 +37,7 @@ public OrchestrationSession(
ControlQueue controlQueue,
List initialMessageBatch,
OrchestrationRuntimeState runtimeState,
- ETag? eTag,
+ OrchestrationETags eTags,
DateTime lastCheckpointTime,
object trackingStoreContext,
TimeSpan idleTimeout,
@@ -48,7 +48,7 @@ public OrchestrationSession(
this.ControlQueue = controlQueue ?? throw new ArgumentNullException(nameof(controlQueue));
this.CurrentMessageBatch = initialMessageBatch ?? throw new ArgumentNullException(nameof(initialMessageBatch));
this.RuntimeState = runtimeState ?? throw new ArgumentNullException(nameof(runtimeState));
- this.ETag = eTag;
+ this.ETags = eTags;
this.LastCheckpointTime = lastCheckpointTime;
this.TrackingStoreContext = trackingStoreContext;
@@ -66,7 +66,7 @@ public OrchestrationSession(
public OrchestrationRuntimeState RuntimeState { get; private set; }
- public ETag? ETag { get; set; }
+ public OrchestrationETags ETags { get; set; }
public DateTime LastCheckpointTime { get; }
diff --git a/src/DurableTask.AzureStorage/OrchestrationETags.cs b/src/DurableTask.AzureStorage/OrchestrationETags.cs
new file mode 100644
index 000000000..6c304b393
--- /dev/null
+++ b/src/DurableTask.AzureStorage/OrchestrationETags.cs
@@ -0,0 +1,24 @@
+// ----------------------------------------------------------------------------------
+// Copyright Microsoft Corporation
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ----------------------------------------------------------------------------------
+#nullable enable
+namespace DurableTask.AzureStorage
+{
+ using Azure;
+
+ class OrchestrationETags
+ {
+ internal ETag? InstanceETag { get; set; }
+
+ internal ETag? HistoryETag { get; set; }
+ }
+}
diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
index 51a6e07ba..7ecae232d 100644
--- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
+++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
@@ -271,21 +271,25 @@ async Task> DedupeExecutionStartedMessagesAsync(
// Terminology:
// "Local" -> the instance ID info comes from the local copy of the message we're examining
// "Remote" -> the instance ID info comes from the Instances table that we're querying
- IAsyncEnumerable instances = this.trackingStore.GetStateAsync(instanceIds, cancellationToken);
- IDictionary remoteOrchestrationsById =
- await instances.ToDictionaryAsync(o => o.OrchestrationInstance.InstanceId, cancellationToken);
+ IAsyncEnumerable instances = this.trackingStore.FetchInstanceStatusAsync(instanceIds, cancellationToken);
+ IDictionary remoteOrchestrationsById =
+ await instances.ToDictionaryAsync(o => o.State.OrchestrationInstance.InstanceId, cancellationToken);
foreach (MessageData message in executionStartedMessages)
{
OrchestrationInstance localInstance = message.TaskMessage.OrchestrationInstance;
var expectedGeneration = ((ExecutionStartedEvent)message.TaskMessage.Event).Generation;
- if (remoteOrchestrationsById.TryGetValue(localInstance.InstanceId, out OrchestrationState remoteInstance) &&
- (remoteInstance.OrchestrationInstance.ExecutionId == null || string.Equals(localInstance.ExecutionId, remoteInstance.OrchestrationInstance.ExecutionId, StringComparison.OrdinalIgnoreCase)))
+ if (remoteOrchestrationsById.TryGetValue(localInstance.InstanceId, out InstanceStatus remoteInstance) &&
+ (remoteInstance.State.OrchestrationInstance.ExecutionId == null || string.Equals(localInstance.ExecutionId, remoteInstance.State.OrchestrationInstance.ExecutionId, StringComparison.OrdinalIgnoreCase)))
{
// Happy path: The message matches the table status. Alternatively, if the table doesn't have an ExecutionId field (older clients, pre-v1.8.5),
// then we have no way of knowing if it's a duplicate. Either way, allow it to run.
+ if (this.settings.UseInstanceTableEtag)
+ {
+ message.MessageMetadata = remoteInstance.ETag;
+ }
}
- else if (expectedGeneration == remoteInstance?.Generation && this.IsScheduledAfterInstanceUpdate(message, remoteInstance))
+ else if (expectedGeneration == remoteInstance?.State.Generation && this.IsScheduledAfterInstanceUpdate(message, remoteInstance?.State))
{
// The message was scheduled after the Instances table was updated with the orchestration info.
// We know almost certainly that this is a redundant message and can be safely discarded because
@@ -476,6 +480,10 @@ internal void AddMessageToPendingOrchestration(
if (targetBatch == null)
{
targetBatch = new PendingMessageBatch(controlQueue, instanceId, executionId);
+ if (this.settings.UseInstanceTableEtag && data.MessageMetadata is ETag instanceEtag)
+ {
+ targetBatch.ETags.InstanceETag = instanceEtag;
+ }
node = this.pendingOrchestrationMessageBatches.AddLast(targetBatch);
// Before the batch of messages can be processed, we need to download the latest execution state.
@@ -519,9 +527,20 @@ async Task ScheduleOrchestrationStatePrefetch(
cancellationToken);
batch.OrchestrationState = new OrchestrationRuntimeState(history.Events);
- batch.ETag = history.ETag;
+ batch.ETags.HistoryETag = history.ETag;
batch.LastCheckpointTime = history.LastCheckpointTime;
batch.TrackingStoreContext = history.TrackingStoreContext;
+
+ // Try to get the instance ETag from the tracking store if it wasn't already provided
+ if (this.settings.UseInstanceTableEtag && batch.ETags.InstanceETag == null)
+ {
+ InstanceStatus? instanceStatus = await this.trackingStore.FetchInstanceStatusAsync(
+ batch.OrchestrationInstanceId,
+ cancellationToken);
+ // The instance could not exist in the case that these messages are for the first execution of a suborchestration,
+ // or an entity-started orchestration, for example
+ batch.ETags.InstanceETag = instanceStatus?.ETag;
+ }
}
if (this.settings.UseSeparateQueueForEntityWorkItems
@@ -590,7 +609,7 @@ async Task ScheduleOrchestrationStatePrefetch(
nextBatch.ControlQueue,
nextBatch.Messages,
nextBatch.OrchestrationState,
- nextBatch.ETag,
+ nextBatch.ETags,
nextBatch.LastCheckpointTime,
nextBatch.TrackingStoreContext,
this.settings.ExtendedSessionIdleTimeout,
@@ -737,8 +756,10 @@ public OrchestrationRuntimeState? OrchestrationState
}
}
- public ETag? ETag { get; set; }
+ public OrchestrationETags ETags { get; } = new OrchestrationETags();
+
public DateTime LastCheckpointTime { get; set; }
+
public object? TrackingStoreContext { get; set; }
}
}
diff --git a/src/DurableTask.AzureStorage/Storage/Table.cs b/src/DurableTask.AzureStorage/Storage/Table.cs
index e47b49529..7d726e1d7 100644
--- a/src/DurableTask.AzureStorage/Storage/Table.cs
+++ b/src/DurableTask.AzureStorage/Storage/Table.cs
@@ -86,16 +86,18 @@ public async Task DeleteEntityAsync(T tableEntity, ETag ifMatch = default, Ca
this.stats.TableEntitiesWritten.Increment();
}
- public async Task InsertEntityAsync(T tableEntity, CancellationToken cancellationToken = default) where T : ITableEntity
+ public async Task InsertEntityAsync(T tableEntity, CancellationToken cancellationToken = default) where T : ITableEntity
{
- await this.tableClient.AddEntityAsync(tableEntity, cancellationToken).DecorateFailure();
+ Response result = await this.tableClient.AddEntityAsync(tableEntity, cancellationToken).DecorateFailure();
this.stats.TableEntitiesWritten.Increment();
+ return result;
}
- public async Task MergeEntityAsync(T tableEntity, ETag ifMatch, CancellationToken cancellationToken = default) where T : ITableEntity
+ public async Task MergeEntityAsync(T tableEntity, ETag ifMatch, CancellationToken cancellationToken = default) where T : ITableEntity
{
- await this.tableClient.UpdateEntityAsync(tableEntity, ifMatch, TableUpdateMode.Merge, cancellationToken).DecorateFailure();
+ Response result = await this.tableClient.UpdateEntityAsync(tableEntity, ifMatch, TableUpdateMode.Merge, cancellationToken).DecorateFailure();
this.stats.TableEntitiesWritten.Increment();
+ return result;
}
public async Task InsertOrMergeEntityAsync(T tableEntity, CancellationToken cancellationToken = default) where T : ITableEntity
diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
index c6f63838a..f1e67a0bf 100644
--- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
+++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
@@ -510,19 +510,19 @@ async Task ConvertFromAsync(OrchestrationInstanceStatus orch
}
///
- public override async IAsyncEnumerable GetStateAsync(IEnumerable instanceIds, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ public override async IAsyncEnumerable FetchInstanceStatusAsync(IEnumerable instanceIds, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (instanceIds == null)
{
yield break;
}
- IEnumerable> instanceQueries = instanceIds.Select(instance => this.GetStateAsync(instance, allExecutions: true, fetchInput: false, cancellationToken).SingleOrDefaultAsync().AsTask());
- foreach (OrchestrationState state in await Task.WhenAll(instanceQueries))
+ IEnumerable> instanceQueries = instanceIds.Select(instance => this.FetchInstanceStatusAsync(instance, cancellationToken));
+ foreach (InstanceStatus status in await Task.WhenAll(instanceQueries))
{
- if (state != null)
+ if (status != null)
{
- yield return state;
+ yield return status;
}
}
}
@@ -839,12 +839,12 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
}
///
- public override async Task UpdateStateAsync(
+ public override async Task UpdateStateAsync(
OrchestrationRuntimeState newRuntimeState,
OrchestrationRuntimeState oldRuntimeState,
string instanceId,
string executionId,
- ETag? eTagValue,
+ OrchestrationETags eTags,
object trackingStoreContext,
CancellationToken cancellationToken = default)
{
@@ -991,7 +991,7 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
// Table storage only supports inserts of up to 100 entities at a time or 4 MB at a time.
if (historyEventBatch.Count == 99 || estimatedBytes > 3 * 1024 * 1024 /* 3 MB */)
{
- eTagValue = await this.UploadHistoryBatch(
+ eTags.HistoryETag = await this.UploadHistoryBatch(
instanceId,
sanitizedInstanceId,
executionId,
@@ -1000,7 +1000,7 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
allEvents.Count,
episodeNumber,
estimatedBytes,
- eTagValue,
+ eTags.HistoryETag,
isFinalBatch: isFinalEvent,
cancellationToken: cancellationToken);
@@ -1014,7 +1014,7 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
// First persistence step is to commit history to the history table. Messages must come after.
if (historyEventBatch.Count > 0)
{
- eTagValue = await this.UploadHistoryBatch(
+ eTags.HistoryETag = await this.UploadHistoryBatch(
instanceId,
sanitizedInstanceId,
executionId,
@@ -1023,22 +1023,12 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
allEvents.Count,
episodeNumber,
estimatedBytes,
- eTagValue,
+ eTags.HistoryETag,
isFinalBatch: true,
cancellationToken: cancellationToken);
}
- Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew();
- await this.InstancesTable.InsertOrMergeEntityAsync(instanceEntity);
-
- this.settings.Logger.InstanceStatusUpdate(
- this.storageAccountName,
- this.taskHubName,
- instanceId,
- executionId,
- runtimeStatus,
- episodeNumber,
- orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds);
+ eTags.InstanceETag = await this.UpdateInstanceTableAsync(instanceEntity, eTags.InstanceETag, instanceId, executionId, runtimeStatus, episodeNumber);
// finally, delete orphaned blobs from the previous execution history.
// We had to wait until the new history has committed to make sure the blobs are no longer necessary.
@@ -1051,8 +1041,6 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
}
await Task.WhenAll(tasks);
}
-
- return eTagValue;
}
public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync(
@@ -1365,7 +1353,7 @@ static string GetBlobName(TableEntity entity, string property)
}
catch (DurableTaskStorageException ex)
{
- // Handle the case where the the history has already been updated by another caller.
+ // Handle the case where the history has already been updated by another caller.
// Common case: the resulting code is 'PreconditionFailed', which means "eTagValue" no longer matches the one stored, and TableTransactionActionType is "Update".
// Edge case: the resulting code is 'Conflict'. This is the case when eTagValue is null, and the TableTransactionActionType is "Add",
// in which case the exception indicates that the table entity we are trying to "add" already exists.
@@ -1424,6 +1412,62 @@ bool ExceedsMaxTablePropertySize(string data)
return false;
}
+ async Task UpdateInstanceTableAsync(TableEntity instanceEntity, ETag? eTag, string instanceId, string executionId, OrchestrationStatus runtimeStatus, int episodeNumber)
+ {
+ var orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew();
+
+ ETag? newEtag = null;
+
+ if (!this.settings.UseInstanceTableEtag)
+ {
+ await this.InstancesTable.InsertOrMergeEntityAsync(instanceEntity);
+ }
+ else
+ {
+ try
+ {
+ Response result = await (eTag == null
+ ? this.InstancesTable.InsertEntityAsync(instanceEntity)
+ : this.InstancesTable.MergeEntityAsync(instanceEntity, eTag.Value));
+ newEtag = result.Headers.ETag;
+ }
+ catch (DurableTaskStorageException ex)
+ {
+ // Handle the case where the instance table has already been updated by another caller.
+ // Common case: the resulting code is 'PreconditionFailed', which means we are trying to update an existing instance entity and "eTag" no longer matches the one stored.
+ // Edge case: the resulting code is 'Conflict'. This is the case when eTag is null, and we are trying to insert a new instance entity, in which case the exception
+ // indicates that the table entity we are trying to "add" already exists.
+ if (ex.HttpStatusCode == (int)HttpStatusCode.Conflict || ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
+ {
+ this.settings.Logger.SplitBrainDetected(
+ this.storageAccountName,
+ this.taskHubName,
+ instanceId,
+ executionId,
+ newEventCount: 0,
+ totalEventCount: 1,
+ "InstanceEntity",
+ orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds,
+ eTag is null ? string.Empty : eTag.ToString());
+ }
+
+ throw;
+ }
+ }
+
+ this.settings.Logger.InstanceStatusUpdate(
+ this.storageAccountName,
+ this.taskHubName,
+ instanceId,
+ executionId,
+ runtimeStatus,
+ episodeNumber,
+ orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds);
+
+ return newEtag;
+
+ }
+
class TrackingStoreContext
{
public List Blobs { get; set; } = new List();
diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs
index 4a1b91fae..a1fc52e9f 100644
--- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs
+++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs
@@ -72,10 +72,10 @@ interface ITrackingStore
/// The RuntimeState for an olderExecution
/// InstanceId for the Orchestration Update
/// ExecutionId for the Orchestration Update
- /// The ETag value to use for safe updates
+ /// The ETag value for the instance and history tables to use for safe updates
/// Additional context for the execution that is maintained by the tracking store.
/// The token to monitor for cancellation requests. The default value is .
- Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, ETag? eTag, object trackingStoreContext, CancellationToken cancellationToken = default);
+ Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, OrchestrationETags eTags, object trackingStoreContext, CancellationToken cancellationToken = default);
///
/// Get The Orchestration State for the Latest or All Executions
@@ -127,7 +127,7 @@ interface ITrackingStore
///
/// The list of instances to query for.
/// The token to monitor for cancellation requests. The default value is .
- IAsyncEnumerable GetStateAsync(IEnumerable instanceIds, CancellationToken cancellationToken = default);
+ IAsyncEnumerable FetchInstanceStatusAsync(IEnumerable instanceIds, CancellationToken cancellationToken = default);
///
/// Get The Orchestration State for querying orchestration instances by the condition
diff --git a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs
index ca3da07bd..f719c92dd 100644
--- a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs
+++ b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs
@@ -136,20 +136,20 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
}
///
- public override async Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, ETag? eTag, object executionData, CancellationToken cancellationToken = default)
+ public override async Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, OrchestrationETags eTags, object executionData, CancellationToken cancellationToken = default)
{
//In case there is a runtime state for an older execution/iteration as well that needs to be committed, commit it.
//This may be the case if a ContinueAsNew was executed on the orchestration
if (newRuntimeState != oldRuntimeState)
{
- eTag = await UpdateStateAsync(oldRuntimeState, instanceId, oldRuntimeState.OrchestrationInstance.ExecutionId, eTag, cancellationToken);
+ await UpdateStateAsync(oldRuntimeState, instanceId, oldRuntimeState.OrchestrationInstance.ExecutionId, eTags, cancellationToken);
}
- return await UpdateStateAsync(newRuntimeState, instanceId, executionId, eTag, cancellationToken);
+ await UpdateStateAsync(newRuntimeState, instanceId, executionId, eTags, cancellationToken);
}
///
- private async Task UpdateStateAsync(OrchestrationRuntimeState runtimeState, string instanceId, string executionId, ETag? eTag, CancellationToken cancellationToken = default)
+ private async Task UpdateStateAsync(OrchestrationRuntimeState runtimeState, string instanceId, string executionId, OrchestrationETags eTags, CancellationToken cancellationToken = default)
{
int oldEventsCount = (runtimeState.Events.Count - runtimeState.NewEvents.Count);
await instanceStore.WriteEntitiesAsync(runtimeState.NewEvents.Select((x, i) =>
@@ -173,8 +173,6 @@ await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[]
SequenceNumber = runtimeState.Events.Count
}
});
-
- return null;
}
public override async Task UpdateStatusForTerminationAsync(
diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs
index 7879537e4..d02a729c0 100644
--- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs
+++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs
@@ -60,7 +60,7 @@ public virtual IAsyncEnumerable GetStateAsync(CancellationTo
}
///
- public virtual IAsyncEnumerable GetStateAsync(IEnumerable instanceIds, CancellationToken cancellationToken = default)
+ public virtual IAsyncEnumerable FetchInstanceStatusAsync(IEnumerable instanceIds, CancellationToken cancellationToken = default)
{
throw new NotSupportedException();
}
@@ -107,7 +107,7 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId, CancellationTo
public abstract Task StartAsync(CancellationToken cancellationToken = default);
///
- public abstract Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, ETag? eTag, object executionData, CancellationToken cancellationToken = default);
+ public abstract Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, OrchestrationETags eTags, object executionData, CancellationToken cancellationToken = default);
///
public abstract Task UpdateInstanceStatusForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, bool instanceEntityExists, CancellationToken cancellationToken = default);
diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs
index 64b9eb22b..a2872df75 100644
--- a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs
+++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs
@@ -505,133 +505,417 @@ await TestHelpers.WaitFor(
[TestMethod]
public async Task MultipleWorkersAttemptingToCompleteSameWorkItem()
{
- var orchestrationInstance = new OrchestrationInstance
+ AzureStorageOrchestrationService service1 = null;
+ AzureStorageOrchestrationService service2 = null;
+ try
{
- InstanceId = "instance_id",
- ExecutionId = "execution_id",
- };
+ var orchestrationInstance = new OrchestrationInstance
+ {
+ InstanceId = "instance_id",
+ ExecutionId = "execution_id",
+ };
+
+ ExecutionStartedEvent startedEvent = new(-1, string.Empty)
+ {
+ Name = "orchestration",
+ Version = string.Empty,
+ OrchestrationInstance = orchestrationInstance,
+ ScheduledStartTime = DateTime.UtcNow,
+ };
- ExecutionStartedEvent startedEvent = new(-1, string.Empty)
+ // Create worker 1, wait for it to acquire the lease.
+ // Make sure to set a small control queue visibility timeout so that worker 2 can reacquire the work item quickly once worker 1 loses the lease.
+ service1 = await this.EnsureTaskHubAsync(
+ nameof(MultipleWorkersAttemptingToCompleteSameWorkItem),
+ testDeletion: false,
+ deleteBeforeCreate: true,
+ partitionCount: 1,
+ workerId: "1",
+ controlQueueVisibilityTimeout: TimeSpan.FromSeconds(1)
+ );
+ await service1.StartAsync();
+ await TestHelpers.WaitFor(
+ condition: () => service1.OwnedControlQueues.Any(),
+ timeout: TimeSpan.FromSeconds(30));
+
+ // Create the orchestration and get the first work item and start "working" on it
+ await service1.CreateTaskOrchestrationAsync(
+ new TaskMessage()
+ {
+ OrchestrationInstance = orchestrationInstance,
+ Event = startedEvent
+ });
+ var workItem1 = await service1.LockNextTaskOrchestrationWorkItemAsync(
+ TimeSpan.FromMinutes(5),
+ CancellationToken.None);
+ var runtimeState = workItem1.OrchestrationRuntimeState;
+ runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
+ runtimeState.AddEvent(startedEvent);
+ runtimeState.AddEvent(new TaskScheduledEvent(0, "task"));
+ runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
+
+ // Now lose the lease
+ BlobPartitionLease lease = await service1.ListBlobLeasesAsync().SingleAsync();
+ await service1.OnOwnershipLeaseReleasedAsync(lease, CloseReason.LeaseLost);
+ await TestHelpers.WaitFor(
+ condition: () => !service1.OwnedControlQueues.Any(),
+ timeout: TimeSpan.FromSeconds(30));
+
+ // Create worker 2, wait for it to now acquire the lease
+ service2 = await this.EnsureTaskHubAsync(
+ nameof(MultipleWorkersAttemptingToCompleteSameWorkItem),
+ testDeletion: false,
+ deleteBeforeCreate: false,
+ workerId: "2",
+ partitionCount: 1,
+ controlQueueVisibilityTimeout: TimeSpan.FromSeconds(1)
+ );
+ await service2.StartAsync();
+ await service2.OnOwnershipLeaseAquiredAsync(lease);
+ await TestHelpers.WaitFor(
+ condition: () => service2.OwnedControlQueues.Any(),
+ timeout: TimeSpan.FromSeconds(60));
+
+ // Have worker 2 dequeue the same work item and start "working" on it
+ var workItem2 = await service2.LockNextTaskOrchestrationWorkItemAsync(
+ TimeSpan.FromMinutes(5),
+ CancellationToken.None);
+ workItem2.OrchestrationRuntimeState = runtimeState;
+
+ // Worker 2 completes the work item
+ await service2.CompleteTaskOrchestrationWorkItemAsync(workItem2, runtimeState, new List(), new List(), new List(), null, null);
+ // Now worker 1 will attempt to complete the same work item. Since this is the first attempt to complete a work item and add a history for the orchestration (by worker 1),
+ // there is no etag stored for the OrchestrationSession, and so the a "conflict" exception will be thrown as worker 2 already created a history for the orchestration.
+ SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () =>
+ await service1.CompleteTaskOrchestrationWorkItemAsync(workItem1, runtimeState, new List(), new List(), new List(), null, null)
+ );
+ Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException));
+ DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException;
+ Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode);
+ await service1.ReleaseTaskOrchestrationWorkItemAsync(workItem1);
+ await service2.ReleaseTaskOrchestrationWorkItemAsync(workItem2);
+
+ // Now simulate a task completing for the orchestration
+ var taskCompletedEvent = new TaskCompletedEvent(-1, 0, string.Empty);
+ await service2.SendTaskOrchestrationMessageAsync(new TaskMessage { Event = taskCompletedEvent, OrchestrationInstance = orchestrationInstance });
+ // Worker 2 gets the next work item related to this task completion and starts "working" on it
+ workItem2 = await service2.LockNextTaskOrchestrationWorkItemAsync(
+ TimeSpan.FromMinutes(5),
+ CancellationToken.None);
+ runtimeState = workItem2.OrchestrationRuntimeState;
+ runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
+ runtimeState.AddEvent(taskCompletedEvent);
+ runtimeState.AddEvent(new ExecutionCompletedEvent(1, string.Empty, OrchestrationStatus.Completed));
+ runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
+
+ // Now force worker 2 to lose the lease and have worker 1 acquire it
+ lease = await service2.ListBlobLeasesAsync().SingleAsync();
+ await service2.OnOwnershipLeaseReleasedAsync(lease, CloseReason.LeaseLost);
+ await TestHelpers.WaitFor(
+ condition: () => !service2.OwnedControlQueues.Any(),
+ timeout: TimeSpan.FromSeconds(30));
+ await service1.OnOwnershipLeaseAquiredAsync(lease);
+ await TestHelpers.WaitFor(
+ condition: () => service1.OwnedControlQueues.Any(),
+ timeout: TimeSpan.FromSeconds(60));
+
+ // Worker 1 also acquires the work item and starts "working" on it
+ workItem1 = await service1.LockNextTaskOrchestrationWorkItemAsync(
+ TimeSpan.FromMinutes(5),
+ CancellationToken.None);
+ workItem1.OrchestrationRuntimeState = runtimeState;
+
+ // Worker 1 completes the work item
+ await service1.CompleteTaskOrchestrationWorkItemAsync(workItem1, runtimeState, new List(), new List(), new List(), null, null);
+ // Now worker 2 attempts to complete the same work item. Since this is not the first work item for the orchestration, now an etag exists for the OrchestrationSession, and the exception
+ // that is thrown will be "precondition failed" as the Etag is stale after worker 1 completed the work item.
+ exception = await Assert.ThrowsExceptionAsync(async () =>
+ await service2.CompleteTaskOrchestrationWorkItemAsync(workItem2, runtimeState, new List(), new List(), new List(), null, null)
+ );
+ Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException));
+ dtse = (DurableTaskStorageException)exception.InnerException;
+ Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode);
+ }
+ finally
{
- Name = "orchestration",
- Version = string.Empty,
- OrchestrationInstance = orchestrationInstance,
- ScheduledStartTime = DateTime.UtcNow,
- };
+ await service1?.StopAsync(isForced: true);
+ await service2?.StopAsync(isForced: true);
+ }
+ }
- // Create worker 1, wait for it to acquire the lease.
- // Make sure to set a small control queue visibility timeout so that worker 2 can reacquire the work item quickly once worker 1 loses the lease.
- var service1 = await this.EnsureTaskHubAsync(
- nameof(MultipleWorkersAttemptingToCompleteSameWorkItem),
- testDeletion: false,
- deleteBeforeCreate: true,
- partitionCount: 1,
- workerId: "1",
- controlQueueVisibilityTimeout: TimeSpan.FromSeconds(1)
- );
- await service1.StartAsync();
- await TestHelpers.WaitFor(
- condition: () => service1.OwnedControlQueues.Any(),
- timeout: TimeSpan.FromSeconds(30));
- ControlQueue controlQueue = service1.OwnedControlQueues.Single();
+ ///
+ /// Confirm that:
+ /// 1. If is true, and a worker attempts to update the instance table with a stale
+ /// etag upon completing a work item, a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has the correct status code
+ /// (precondition failed).
+ /// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table. When it attempts to update
+ /// the instance table with a stale etag, it will fail.
+ /// 2. If is false for the above scenario, then the call to update the instance table
+ /// will go through, and the instance table will be updated with a "stale" status.
+ ///
+ ///
+ /// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker
+ /// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker
+ /// has since updated" the instance table.
+ ///
+ /// The value to use for
+ ///
+ [DataTestMethod]
+ [DataRow(true)]
+ [DataRow(false)]
+ public async Task WorkerAttemptingToUpdateInstanceTableAfterStalling(bool useInstanceEtag)
+ {
+ AzureStorageOrchestrationService service = null;
+ try
+ {
+ var orchestrationInstance = new OrchestrationInstance
+ {
+ InstanceId = "instance_id",
+ ExecutionId = "execution_id",
+ };
- // Create the orchestration and get the first work item and start "working" on it
- await service1.CreateTaskOrchestrationAsync(
- new TaskMessage()
+ ExecutionStartedEvent startedEvent = new(-1, string.Empty)
{
+ Name = "orchestration",
+ Version = string.Empty,
OrchestrationInstance = orchestrationInstance,
- Event = startedEvent
+ ScheduledStartTime = DateTime.UtcNow,
+ };
+
+ var settings = new AzureStorageOrchestrationServiceSettings
+ {
+ PartitionCount = 1,
+ StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()),
+ TaskHubName = TestHelpers.GetTestTaskHubName(),
+ ExtendedSessionsEnabled = false,
+ UseInstanceTableEtag = useInstanceEtag
+ };
+ this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe);
+
+ service = new AzureStorageOrchestrationService(settings);
+ await service.CreateAsync();
+ await service.StartAsync();
+
+ // Create the orchestration and get the first work item and start "working" on it
+ await service.CreateTaskOrchestrationAsync(
+ new TaskMessage()
+ {
+ OrchestrationInstance = orchestrationInstance,
+ Event = startedEvent
+ });
+ var workItem = await service.LockNextTaskOrchestrationWorkItemAsync(
+ TimeSpan.FromMinutes(5),
+ CancellationToken.None);
+ var runtimeState = workItem.OrchestrationRuntimeState;
+ runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
+ runtimeState.AddEvent(startedEvent);
+ runtimeState.AddEvent(new TaskScheduledEvent(0));
+ runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
+
+ AzureStorageClient azureStorageClient = new AzureStorageClient(settings);
+
+ // Now manually update the instance to have status "Completed"
+ Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName);
+ TableEntity entity = new(orchestrationInstance.InstanceId, "")
+ {
+ ["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"),
+ };
+ await instanceTable.MergeEntityAsync(entity, Azure.ETag.All);
+
+ if (useInstanceEtag)
+ {
+ // Confirm an exception is thrown due to the etag mismatch for the instance table when the worker attempts to complete the work item
+ SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () =>
+ await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null)
+ );
+ Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException));
+ DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException;
+ Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode);
+ }
+ else
+ {
+ await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null);
+
+ var queryCondition = new OrchestrationInstanceStatusQueryCondition
+ {
+ InstanceId = "instance_id",
+ FetchInput = false,
+ };
+
+ ODataCondition odata = queryCondition.ToOData();
+ OrchestrationInstanceStatus instanceTableEntity = await instanceTable
+ .ExecuteQueryAsync(odata.Filter, 1, odata.Select, CancellationToken.None)
+ .FirstOrDefaultAsync();
+
+ // Confirm the instance table was updated with a "stale" status
+ Assert.IsNotNull(instanceTableEntity);
+ Assert.AreEqual(OrchestrationStatus.Running.ToString(), instanceTableEntity.RuntimeStatus);
+ }
+ }
+ finally
+ {
+ await service?.StopAsync(isForced: true);
+ }
+ }
+
+ ///
+ /// Confirm that:
+ /// 1. If is true, and a worker attempts to update the instance table with a stale
+ /// etag upon completing a work item for a suborchestration, a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has
+ /// the correct status code (conflict).
+ /// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table for the first work item
+ /// for a suborchestration. When it attempts to insert a new entity into the instance table for the suborchestration (since for a suborchestration,
+ /// the instance entity is only created upon completion of the first work item), it will fail.
+ /// 2. If is false for the above scenario, then the call to update the instance table
+ /// will go through, and the instance table will be updated with a "stale" status.
+ ///
+ ///
+ /// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker
+ /// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker
+ /// has since updated" the instance table.
+ ///
+ /// The value to use for
+ ///
+ [DataTestMethod]
+ [DataRow(true)]
+ [DataRow(false)]
+ public async Task WorkerAttemptingToUpdateInstanceTableAfterStallingForSubOrchestration(bool useInstanceEtag)
+ {
+ AzureStorageOrchestrationService service = null;
+ try
+ {
+ var orchestrationInstance = new OrchestrationInstance
+ {
+ InstanceId = "instance_id",
+ ExecutionId = "execution_id",
+ };
+
+ ExecutionStartedEvent startedEvent = new(-1, string.Empty)
+ {
+ Name = "orchestration",
+ Version = string.Empty,
+ OrchestrationInstance = orchestrationInstance,
+ ScheduledStartTime = DateTime.UtcNow,
+ };
+
+ var settings = new AzureStorageOrchestrationServiceSettings
+ {
+ PartitionCount = 1,
+ StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()),
+ TaskHubName = TestHelpers.GetTestTaskHubName(),
+ ExtendedSessionsEnabled = false,
+ UseInstanceTableEtag = useInstanceEtag
+ };
+ this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe);
+
+ service = new AzureStorageOrchestrationService(settings);
+ await service.CreateAsync();
+ await service.StartAsync();
+
+ // Create the orchestration and get the first work item and start "working" on it
+ await service.CreateTaskOrchestrationAsync(
+ new TaskMessage()
+ {
+ OrchestrationInstance = orchestrationInstance,
+ Event = startedEvent
+ });
+ var workItem = await service.LockNextTaskOrchestrationWorkItemAsync(
+ TimeSpan.FromMinutes(5),
+ CancellationToken.None);
+ var runtimeState = workItem.OrchestrationRuntimeState;
+ runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
+ runtimeState.AddEvent(startedEvent);
+ runtimeState.AddEvent(new SubOrchestrationInstanceCreatedEvent(0)
+ {
+ Name = "suborchestration",
+ InstanceId = "sub_instance_id"
});
- var workItem1 = await service1.LockNextTaskOrchestrationWorkItemAsync(
- TimeSpan.FromMinutes(5),
- CancellationToken.None);
- var runtimeState = workItem1.OrchestrationRuntimeState;
- runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
- runtimeState.AddEvent(startedEvent);
- runtimeState.AddEvent(new TaskScheduledEvent(0, "task"));
- runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
-
- // Now lose the lease
- BlobPartitionLease lease = await service1.ListBlobLeasesAsync().SingleAsync();
- await service1.OnOwnershipLeaseReleasedAsync(lease, CloseReason.LeaseLost);
- await TestHelpers.WaitFor(
- condition: () => !service1.OwnedControlQueues.Any(),
- timeout: TimeSpan.FromSeconds(30));
+ runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
- // Create worker 2, wait for it to now acquire the lease
- var service2 = await this.EnsureTaskHubAsync(
- nameof(MultipleWorkersAttemptingToCompleteSameWorkItem),
- testDeletion: false,
- deleteBeforeCreate: false,
- workerId: "2",
- partitionCount: 1,
- controlQueueVisibilityTimeout: TimeSpan.FromSeconds(1)
- );
- await service2.StartAsync();
- await service2.OnOwnershipLeaseAquiredAsync(lease);
- await TestHelpers.WaitFor(
- condition: () => service2.OwnedControlQueues.Any(),
- timeout: TimeSpan.FromSeconds(60));
+ // Create the task message to start the suborchestration
+ var subOrchestrationExecutionStartedEvent = new ExecutionStartedEvent(-1, string.Empty)
+ {
+ OrchestrationInstance = new OrchestrationInstance
+ {
+ InstanceId = "sub_instance_id",
+ ExecutionId = Guid.NewGuid().ToString("N")
+ },
+ ParentInstance = new ParentInstance
+ {
+ OrchestrationInstance = runtimeState.OrchestrationInstance,
+ Name = runtimeState.Name,
+ Version = runtimeState.Version,
+ TaskScheduleId = 0,
+ },
+ Name = "suborchestration"
+ };
+ List orchestratorMessages =
+ new() {
+ new TaskMessage()
+ {
+ OrchestrationInstance = subOrchestrationExecutionStartedEvent.OrchestrationInstance,
+ Event = subOrchestrationExecutionStartedEvent,
+ }
+ };
- // Have worker 2 dequeue the same work item and start "working" on it
- var workItem2 = await service2.LockNextTaskOrchestrationWorkItemAsync(
- TimeSpan.FromMinutes(5),
- CancellationToken.None);
- workItem2.OrchestrationRuntimeState = runtimeState;
-
- // Worker 2 completes the work item
- await service2.CompleteTaskOrchestrationWorkItemAsync(workItem2, runtimeState, new List(), new List(), new List(), null, null);
- // Now worker 1 will attempt to complete the same work item. Since this is the first attempt to complete a work item and add a history for the orchestration (by worker 1),
- // there is no etag stored for the OrchestrationSession, and so the a "conflict" exception will be thrown as worker 2 already created a history for the orchestration.
- SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () =>
- await service1.CompleteTaskOrchestrationWorkItemAsync(workItem1, runtimeState, new List(), new List(), new List(), null, null)
- );
- Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException));
- DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException;
- Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode);
- await service1.ReleaseTaskOrchestrationWorkItemAsync(workItem1);
- await service2.ReleaseTaskOrchestrationWorkItemAsync(workItem2);
-
- // Now simulate a task completing for the orchestration
- var taskCompletedEvent = new TaskCompletedEvent(-1, 0, string.Empty);
- await service2.SendTaskOrchestrationMessageAsync(new TaskMessage { Event = taskCompletedEvent, OrchestrationInstance = orchestrationInstance });
- // Worker 2 gets the next work item related to this task completion and starts "working" on it
- workItem2 = await service2.LockNextTaskOrchestrationWorkItemAsync(
- TimeSpan.FromMinutes(5),
- CancellationToken.None);
- runtimeState = workItem2.OrchestrationRuntimeState;
- runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
- runtimeState.AddEvent(taskCompletedEvent);
- runtimeState.AddEvent(new ExecutionCompletedEvent(1, string.Empty, OrchestrationStatus.Completed));
- runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
-
- // Now force worker 2 to lose the lease and have worker 1 acquire it
- lease = await service2.ListBlobLeasesAsync().SingleAsync();
- await service2.OnOwnershipLeaseReleasedAsync(lease, CloseReason.LeaseLost);
- await TestHelpers.WaitFor(
- condition: () => !service2.OwnedControlQueues.Any(),
- timeout: TimeSpan.FromSeconds(30));
- await service1.OnOwnershipLeaseAquiredAsync(lease);
- await TestHelpers.WaitFor(
- condition: () => service1.OwnedControlQueues.Any(),
- timeout: TimeSpan.FromSeconds(60));
+ // Complete the first work item, which will send the execution started message for the suborchestration
+ await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), orchestratorMessages, new List(), null, null);
+
+ // Now get the work item for the suborchestration and "work" on it
+ workItem = await service.LockNextTaskOrchestrationWorkItemAsync(
+ TimeSpan.FromMinutes(5),
+ CancellationToken.None);
+ runtimeState = workItem.OrchestrationRuntimeState;
+ runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
+ runtimeState.AddEvent(subOrchestrationExecutionStartedEvent);
+ runtimeState.AddEvent(new TaskScheduledEvent(0));
+ runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
+
+ AzureStorageClient azureStorageClient = new(settings);
+ Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName);
+ // Now manually update the suborchestration to have status "Completed"
+ TableEntity entity = new("sub_instance_id", "")
+ {
+ ["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"),
+ };
+ await instanceTable.InsertEntityAsync(entity);
- // Worker 1 also acquires the work item and starts "working" on it
- workItem1 = await service1.LockNextTaskOrchestrationWorkItemAsync(
- TimeSpan.FromMinutes(5),
- CancellationToken.None);
- workItem1.OrchestrationRuntimeState = runtimeState;
-
- // Worker 1 completes the work item
- await service1.CompleteTaskOrchestrationWorkItemAsync(workItem1, runtimeState, new List(), new List(), new List(), null, null);
- // Now worker 2 attempts to complete the same work item. Since this is not the first work item for the orchestration, now an etag exists for the OrchestrationSession, and the exception
- // that is thrown will be "precondition failed" as the Etag is stale after worker 1 completed the work item.
- exception = await Assert.ThrowsExceptionAsync(async () =>
- await service2.CompleteTaskOrchestrationWorkItemAsync(workItem2, runtimeState, new List(), new List(), new List(), null, null)
- );
- Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException));
- dtse = (DurableTaskStorageException)exception.InnerException;
- Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode);
+ if (useInstanceEtag)
+ {
+ // Confirm an exception is thrown because the worker attempts to insert a new entity for the suborchestration into the instance table
+ // when one already exists
+ SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () =>
+ await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null)
+ );
+ Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException));
+ DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException;
+ Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode);
+ }
+ else
+ {
+ await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null);
+
+ var queryCondition = new OrchestrationInstanceStatusQueryCondition
+ {
+ InstanceId = "sub_instance_id",
+ FetchInput = false,
+ };
+
+ ODataCondition odata = queryCondition.ToOData();
+ OrchestrationInstanceStatus instanceTableEntity = await instanceTable
+ .ExecuteQueryAsync(odata.Filter, 1, odata.Select, CancellationToken.None)
+ .FirstOrDefaultAsync();
+
+ // Confirm the instance table was updated with a "stale" status
+ Assert.IsNotNull(instanceTableEntity);
+ Assert.AreEqual(OrchestrationStatus.Running.ToString(), instanceTableEntity.RuntimeStatus);
+ }
+
+ }
+ finally
+ {
+ await service?.StopAsync(isForced: true);
+ }
}
[TestMethod]