Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,7 @@ await this.CommitOutboundQueueMessages(
// will result in a duplicate replay of the orchestration with no side-effects.
try
{
session.ETag = await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETag, session.TrackingStoreContext);
await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETags, session.TrackingStoreContext);
// update the runtime state and execution id stored in the session
session.UpdateRuntimeState(runtimeState);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,5 +294,22 @@ internal LogHelper Logger
/// The default is <see cref="QueueClientMessageEncoding.UTF8"/>.
/// </summary>
public QueueClientMessageEncoding QueueClientMessageEncoding { get; set; } = QueueClientMessageEncoding.UTF8;

/// <summary>
/// When true, an etag is used when attempting to make instance table updates upon completing an orchestration work item.
/// </summary>
/// <remarks>
/// By default, etags are only used when updating the history table upon completing an orchestration work item. This can lead
/// to subtle race conditions where the instance table is incorrectly updated. Consider the following scenario:
/// 1. Worker A completes an orchestration work item, sends outgoing messages, updates the history table, then stalls.
/// 2. Worker B picks up the next and final orchestration work item for the same instance and completes it, updating the history
/// table and instance table with the new terminal status.
/// 3. Worker A resumes and overrides the terminal status in the instance table with an incorrect non-terminal status.
/// This will leave the instance status of the orchestration permanently as "Running" even though it has actually completed.
/// To prevent such scenarios, enabling this setting will ensure that instance table updates also use etags. This would prevent
/// worker A's update in step 3 from completing. Enabling this setting will also introduce a performance overhead since the instance
/// table must now be read before processing every orchestration work item to obtain the latest etag.
/// </remarks>
public bool UseInstanceTableEtag { get; set; } = false;
}
}
2 changes: 2 additions & 0 deletions src/DurableTask.AzureStorage/MessageData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ internal void Update(UpdateReceipt receipt)
{
this.OriginalQueueMessage = this.OriginalQueueMessage.Update(receipt);
}

internal object MessageMetadata { get; set; }
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public OrchestrationSession(
ControlQueue controlQueue,
List<MessageData> initialMessageBatch,
OrchestrationRuntimeState runtimeState,
ETag? eTag,
OrchestrationETags eTags,
DateTime lastCheckpointTime,
object trackingStoreContext,
TimeSpan idleTimeout,
Expand All @@ -48,7 +48,7 @@ public OrchestrationSession(
this.ControlQueue = controlQueue ?? throw new ArgumentNullException(nameof(controlQueue));
this.CurrentMessageBatch = initialMessageBatch ?? throw new ArgumentNullException(nameof(initialMessageBatch));
this.RuntimeState = runtimeState ?? throw new ArgumentNullException(nameof(runtimeState));
this.ETag = eTag;
this.ETags = eTags;
this.LastCheckpointTime = lastCheckpointTime;
this.TrackingStoreContext = trackingStoreContext;

Expand All @@ -66,7 +66,7 @@ public OrchestrationSession(

public OrchestrationRuntimeState RuntimeState { get; private set; }

public ETag? ETag { get; set; }
public OrchestrationETags ETags { get; set; }

public DateTime LastCheckpointTime { get; }

Expand Down
24 changes: 24 additions & 0 deletions src/DurableTask.AzureStorage/OrchestrationETags.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------
#nullable enable
namespace DurableTask.AzureStorage
{
using Azure;

class OrchestrationETags
{
internal ETag? InstanceETag { get; set; }

internal ETag? HistoryETag { get; set; }
}
}
39 changes: 30 additions & 9 deletions src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,21 +271,25 @@ async Task<IEnumerable<MessageData>> DedupeExecutionStartedMessagesAsync(
// Terminology:
// "Local" -> the instance ID info comes from the local copy of the message we're examining
// "Remote" -> the instance ID info comes from the Instances table that we're querying
IAsyncEnumerable<OrchestrationState> instances = this.trackingStore.GetStateAsync(instanceIds, cancellationToken);
IDictionary<string, OrchestrationState> remoteOrchestrationsById =
await instances.ToDictionaryAsync(o => o.OrchestrationInstance.InstanceId, cancellationToken);
IAsyncEnumerable<InstanceStatus> instances = this.trackingStore.FetchInstanceStatusAsync(instanceIds, cancellationToken);
IDictionary<string, InstanceStatus> remoteOrchestrationsById =
await instances.ToDictionaryAsync(o => o.State.OrchestrationInstance.InstanceId, cancellationToken);

foreach (MessageData message in executionStartedMessages)
{
OrchestrationInstance localInstance = message.TaskMessage.OrchestrationInstance;
var expectedGeneration = ((ExecutionStartedEvent)message.TaskMessage.Event).Generation;
if (remoteOrchestrationsById.TryGetValue(localInstance.InstanceId, out OrchestrationState remoteInstance) &&
(remoteInstance.OrchestrationInstance.ExecutionId == null || string.Equals(localInstance.ExecutionId, remoteInstance.OrchestrationInstance.ExecutionId, StringComparison.OrdinalIgnoreCase)))
if (remoteOrchestrationsById.TryGetValue(localInstance.InstanceId, out InstanceStatus remoteInstance) &&
(remoteInstance.State.OrchestrationInstance.ExecutionId == null || string.Equals(localInstance.ExecutionId, remoteInstance.State.OrchestrationInstance.ExecutionId, StringComparison.OrdinalIgnoreCase)))
{
// Happy path: The message matches the table status. Alternatively, if the table doesn't have an ExecutionId field (older clients, pre-v1.8.5),
// then we have no way of knowing if it's a duplicate. Either way, allow it to run.
if (this.settings.UseInstanceTableEtag)
{
message.MessageMetadata = remoteInstance.ETag;
}
}
else if (expectedGeneration == remoteInstance?.Generation && this.IsScheduledAfterInstanceUpdate(message, remoteInstance))
else if (expectedGeneration == remoteInstance?.State.Generation && this.IsScheduledAfterInstanceUpdate(message, remoteInstance?.State))
{
// The message was scheduled after the Instances table was updated with the orchestration info.
// We know almost certainly that this is a redundant message and can be safely discarded because
Expand Down Expand Up @@ -476,6 +480,10 @@ internal void AddMessageToPendingOrchestration(
if (targetBatch == null)
{
targetBatch = new PendingMessageBatch(controlQueue, instanceId, executionId);
if (this.settings.UseInstanceTableEtag && data.MessageMetadata is ETag instanceEtag)
{
targetBatch.ETags.InstanceETag = instanceEtag;
}
node = this.pendingOrchestrationMessageBatches.AddLast(targetBatch);

// Before the batch of messages can be processed, we need to download the latest execution state.
Expand Down Expand Up @@ -519,9 +527,20 @@ async Task ScheduleOrchestrationStatePrefetch(
cancellationToken);

batch.OrchestrationState = new OrchestrationRuntimeState(history.Events);
batch.ETag = history.ETag;
batch.ETags.HistoryETag = history.ETag;
batch.LastCheckpointTime = history.LastCheckpointTime;
batch.TrackingStoreContext = history.TrackingStoreContext;

// Try to get the instance ETag from the tracking store if it wasn't already provided
if (this.settings.UseInstanceTableEtag && batch.ETags.InstanceETag == null)
{
InstanceStatus? instanceStatus = await this.trackingStore.FetchInstanceStatusAsync(
batch.OrchestrationInstanceId,
cancellationToken);
// The instance could not exist in the case that these messages are for the first execution of a suborchestration,
// or an entity-started orchestration, for example
batch.ETags.InstanceETag = instanceStatus?.ETag;
}
}

if (this.settings.UseSeparateQueueForEntityWorkItems
Expand Down Expand Up @@ -590,7 +609,7 @@ async Task ScheduleOrchestrationStatePrefetch(
nextBatch.ControlQueue,
nextBatch.Messages,
nextBatch.OrchestrationState,
nextBatch.ETag,
nextBatch.ETags,
nextBatch.LastCheckpointTime,
nextBatch.TrackingStoreContext,
this.settings.ExtendedSessionIdleTimeout,
Expand Down Expand Up @@ -737,8 +756,10 @@ public OrchestrationRuntimeState? OrchestrationState
}
}

public ETag? ETag { get; set; }
public OrchestrationETags ETags { get; } = new OrchestrationETags();

public DateTime LastCheckpointTime { get; set; }

public object? TrackingStoreContext { get; set; }
}
}
Expand Down
10 changes: 6 additions & 4 deletions src/DurableTask.AzureStorage/Storage/Table.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,18 @@ public async Task DeleteEntityAsync<T>(T tableEntity, ETag ifMatch = default, Ca
this.stats.TableEntitiesWritten.Increment();
}

public async Task InsertEntityAsync<T>(T tableEntity, CancellationToken cancellationToken = default) where T : ITableEntity
public async Task<Response> InsertEntityAsync<T>(T tableEntity, CancellationToken cancellationToken = default) where T : ITableEntity
{
await this.tableClient.AddEntityAsync(tableEntity, cancellationToken).DecorateFailure();
Response result = await this.tableClient.AddEntityAsync(tableEntity, cancellationToken).DecorateFailure();
this.stats.TableEntitiesWritten.Increment();
return result;
}

public async Task MergeEntityAsync<T>(T tableEntity, ETag ifMatch, CancellationToken cancellationToken = default) where T : ITableEntity
public async Task<Response> MergeEntityAsync<T>(T tableEntity, ETag ifMatch, CancellationToken cancellationToken = default) where T : ITableEntity
{
await this.tableClient.UpdateEntityAsync(tableEntity, ifMatch, TableUpdateMode.Merge, cancellationToken).DecorateFailure();
Response result = await this.tableClient.UpdateEntityAsync(tableEntity, ifMatch, TableUpdateMode.Merge, cancellationToken).DecorateFailure();
this.stats.TableEntitiesWritten.Increment();
return result;
}

public async Task InsertOrMergeEntityAsync<T>(T tableEntity, CancellationToken cancellationToken = default) where T : ITableEntity
Expand Down
Loading
Loading