Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,7 @@ await this.CommitOutboundQueueMessages(
// will result in a duplicate replay of the orchestration with no side-effects.
try
{
session.ETag = await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETag, session.TrackingStoreContext);
await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETags, session.TrackingStoreContext);
// update the runtime state and execution id stored in the session
session.UpdateRuntimeState(runtimeState);

Expand Down
2 changes: 2 additions & 0 deletions src/DurableTask.AzureStorage/MessageData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ internal void Update(UpdateReceipt receipt)
{
this.OriginalQueueMessage = this.OriginalQueueMessage.Update(receipt);
}

internal object MessageMetadata { get; set; }
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public OrchestrationSession(
ControlQueue controlQueue,
List<MessageData> initialMessageBatch,
OrchestrationRuntimeState runtimeState,
ETag? eTag,
OrchestrationETags eTags,
DateTime lastCheckpointTime,
object trackingStoreContext,
TimeSpan idleTimeout,
Expand All @@ -48,7 +48,7 @@ public OrchestrationSession(
this.ControlQueue = controlQueue ?? throw new ArgumentNullException(nameof(controlQueue));
this.CurrentMessageBatch = initialMessageBatch ?? throw new ArgumentNullException(nameof(initialMessageBatch));
this.RuntimeState = runtimeState ?? throw new ArgumentNullException(nameof(runtimeState));
this.ETag = eTag;
this.ETags = eTags;
this.LastCheckpointTime = lastCheckpointTime;
this.TrackingStoreContext = trackingStoreContext;

Expand All @@ -66,7 +66,7 @@ public OrchestrationSession(

public OrchestrationRuntimeState RuntimeState { get; private set; }

public ETag? ETag { get; set; }
public OrchestrationETags ETags { get; set; }

public DateTime LastCheckpointTime { get; }

Expand Down
12 changes: 12 additions & 0 deletions src/DurableTask.AzureStorage/OrchestrationETags.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using Azure;

#nullable enable
namespace DurableTask.AzureStorage
{
class OrchestrationETags
{
internal ETag? InstanceETag { get; set; }

internal ETag? HistoryETag { get; set; }
}
}
25 changes: 16 additions & 9 deletions src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,21 +271,22 @@ async Task<IEnumerable<MessageData>> DedupeExecutionStartedMessagesAsync(
// Terminology:
// "Local" -> the instance ID info comes from the local copy of the message we're examining
// "Remote" -> the instance ID info comes from the Instances table that we're querying
IAsyncEnumerable<OrchestrationState> instances = this.trackingStore.GetStateAsync(instanceIds, cancellationToken);
IDictionary<string, OrchestrationState> remoteOrchestrationsById =
await instances.ToDictionaryAsync(o => o.OrchestrationInstance.InstanceId, cancellationToken);
IAsyncEnumerable<InstanceStatus> instances = this.trackingStore.FetchInstanceStatusAsync(instanceIds, cancellationToken);
IDictionary<string, InstanceStatus> remoteOrchestrationsById =
await instances.ToDictionaryAsync(o => o.State.OrchestrationInstance.InstanceId, cancellationToken);

foreach (MessageData message in executionStartedMessages)
{
OrchestrationInstance localInstance = message.TaskMessage.OrchestrationInstance;
var expectedGeneration = ((ExecutionStartedEvent)message.TaskMessage.Event).Generation;
if (remoteOrchestrationsById.TryGetValue(localInstance.InstanceId, out OrchestrationState remoteInstance) &&
(remoteInstance.OrchestrationInstance.ExecutionId == null || string.Equals(localInstance.ExecutionId, remoteInstance.OrchestrationInstance.ExecutionId, StringComparison.OrdinalIgnoreCase)))
if (remoteOrchestrationsById.TryGetValue(localInstance.InstanceId, out InstanceStatus remoteInstance) &&
(remoteInstance.State.OrchestrationInstance.ExecutionId == null || string.Equals(localInstance.ExecutionId, remoteInstance.State.OrchestrationInstance.ExecutionId, StringComparison.OrdinalIgnoreCase)))
{
// Happy path: The message matches the table status. Alternatively, if the table doesn't have an ExecutionId field (older clients, pre-v1.8.5),
// then we have no way of knowing if it's a duplicate. Either way, allow it to run.
message.MessageMetadata = remoteInstance.ETag;
}
else if (expectedGeneration == remoteInstance?.Generation && this.IsScheduledAfterInstanceUpdate(message, remoteInstance))
else if (expectedGeneration == remoteInstance?.State.Generation && this.IsScheduledAfterInstanceUpdate(message, remoteInstance?.State))
{
// The message was scheduled after the Instances table was updated with the orchestration info.
// We know almost certainly that this is a redundant message and can be safely discarded because
Expand Down Expand Up @@ -476,6 +477,10 @@ internal void AddMessageToPendingOrchestration(
if (targetBatch == null)
{
targetBatch = new PendingMessageBatch(controlQueue, instanceId, executionId);
if (data.MessageMetadata is ETag instanceEtag)
{
targetBatch.ETags.InstanceETag = instanceEtag;
}
node = this.pendingOrchestrationMessageBatches.AddLast(targetBatch);

// Before the batch of messages can be processed, we need to download the latest execution state.
Expand Down Expand Up @@ -519,7 +524,7 @@ async Task ScheduleOrchestrationStatePrefetch(
cancellationToken);

batch.OrchestrationState = new OrchestrationRuntimeState(history.Events);
batch.ETag = history.ETag;
batch.ETags.HistoryETag = history.ETag;
batch.LastCheckpointTime = history.LastCheckpointTime;
batch.TrackingStoreContext = history.TrackingStoreContext;
}
Expand Down Expand Up @@ -590,7 +595,7 @@ async Task ScheduleOrchestrationStatePrefetch(
nextBatch.ControlQueue,
nextBatch.Messages,
nextBatch.OrchestrationState,
nextBatch.ETag,
nextBatch.ETags,
nextBatch.LastCheckpointTime,
nextBatch.TrackingStoreContext,
this.settings.ExtendedSessionIdleTimeout,
Expand Down Expand Up @@ -737,8 +742,10 @@ public OrchestrationRuntimeState? OrchestrationState
}
}

public ETag? ETag { get; set; }
public OrchestrationETags ETags { get; } = new OrchestrationETags();

public DateTime LastCheckpointTime { get; set; }

public object? TrackingStoreContext { get; set; }
}
}
Expand Down
10 changes: 6 additions & 4 deletions src/DurableTask.AzureStorage/Storage/Table.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,18 @@ public async Task DeleteEntityAsync<T>(T tableEntity, ETag ifMatch = default, Ca
this.stats.TableEntitiesWritten.Increment();
}

public async Task InsertEntityAsync<T>(T tableEntity, CancellationToken cancellationToken = default) where T : ITableEntity
public async Task<Response> InsertEntityAsync<T>(T tableEntity, CancellationToken cancellationToken = default) where T : ITableEntity
{
await this.tableClient.AddEntityAsync(tableEntity, cancellationToken).DecorateFailure();
Response result = await this.tableClient.AddEntityAsync(tableEntity, cancellationToken).DecorateFailure();
this.stats.TableEntitiesWritten.Increment();
return result;
}

public async Task MergeEntityAsync<T>(T tableEntity, ETag ifMatch, CancellationToken cancellationToken = default) where T : ITableEntity
public async Task<Response> MergeEntityAsync<T>(T tableEntity, ETag ifMatch, CancellationToken cancellationToken = default) where T : ITableEntity
{
await this.tableClient.UpdateEntityAsync(tableEntity, ifMatch, TableUpdateMode.Merge, cancellationToken).DecorateFailure();
Response result = await this.tableClient.UpdateEntityAsync(tableEntity, ifMatch, TableUpdateMode.Merge, cancellationToken).DecorateFailure();
this.stats.TableEntitiesWritten.Increment();
return result;
}

public async Task InsertOrMergeEntityAsync<T>(T tableEntity, CancellationToken cancellationToken = default) where T : ITableEntity
Expand Down
85 changes: 60 additions & 25 deletions src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -510,19 +510,19 @@ async Task<OrchestrationState> ConvertFromAsync(OrchestrationInstanceStatus orch
}

/// <inheritdoc />
public override async IAsyncEnumerable<OrchestrationState> GetStateAsync(IEnumerable<string> instanceIds, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public override async IAsyncEnumerable<InstanceStatus> FetchInstanceStatusAsync(IEnumerable<string> instanceIds, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (instanceIds == null)
{
yield break;
}

IEnumerable<Task<OrchestrationState>> instanceQueries = instanceIds.Select(instance => this.GetStateAsync(instance, allExecutions: true, fetchInput: false, cancellationToken).SingleOrDefaultAsync().AsTask());
foreach (OrchestrationState state in await Task.WhenAll(instanceQueries))
IEnumerable<Task<InstanceStatus>> instanceQueries = instanceIds.Select(instance => this.FetchInstanceStatusAsync(instance, cancellationToken));
foreach (InstanceStatus status in await Task.WhenAll(instanceQueries))
{
if (state != null)
if (status != null)
{
yield return state;
yield return status;
}
}
}
Expand Down Expand Up @@ -839,12 +839,12 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
}

/// <inheritdoc />
public override async Task<ETag?> UpdateStateAsync(
public override async Task UpdateStateAsync(
OrchestrationRuntimeState newRuntimeState,
OrchestrationRuntimeState oldRuntimeState,
string instanceId,
string executionId,
ETag? eTagValue,
OrchestrationETags eTags,
object trackingStoreContext,
CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -991,7 +991,7 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
// Table storage only supports inserts of up to 100 entities at a time or 4 MB at a time.
if (historyEventBatch.Count == 99 || estimatedBytes > 3 * 1024 * 1024 /* 3 MB */)
{
eTagValue = await this.UploadHistoryBatch(
eTags.HistoryETag = await this.UploadHistoryBatch(
instanceId,
sanitizedInstanceId,
executionId,
Expand All @@ -1000,7 +1000,7 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
allEvents.Count,
episodeNumber,
estimatedBytes,
eTagValue,
eTags.HistoryETag,
isFinalBatch: isFinalEvent,
cancellationToken: cancellationToken);

Expand All @@ -1014,7 +1014,7 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
// First persistence step is to commit history to the history table. Messages must come after.
if (historyEventBatch.Count > 0)
{
eTagValue = await this.UploadHistoryBatch(
eTags.HistoryETag = await this.UploadHistoryBatch(
instanceId,
sanitizedInstanceId,
executionId,
Expand All @@ -1023,22 +1023,12 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
allEvents.Count,
episodeNumber,
estimatedBytes,
eTagValue,
eTags.HistoryETag,
isFinalBatch: true,
cancellationToken: cancellationToken);
}

Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew();
await this.InstancesTable.InsertOrMergeEntityAsync(instanceEntity);

this.settings.Logger.InstanceStatusUpdate(
this.storageAccountName,
this.taskHubName,
instanceId,
executionId,
runtimeStatus,
episodeNumber,
orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds);
eTags.InstanceETag = await this.TryUpdateInstanceTableAsync(instanceEntity, eTags.InstanceETag, instanceId, executionId, runtimeStatus, episodeNumber);

// finally, delete orphaned blobs from the previous execution history.
// We had to wait until the new history has committed to make sure the blobs are no longer necessary.
Expand All @@ -1051,8 +1041,6 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
}
await Task.WhenAll(tasks);
}

return eTagValue;
}

public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync(
Expand Down Expand Up @@ -1365,7 +1353,7 @@ static string GetBlobName(TableEntity entity, string property)
}
catch (DurableTaskStorageException ex)
{
// Handle the case where the the history has already been updated by another caller.
// Handle the case where the history has already been updated by another caller.
// Common case: the resulting code is 'PreconditionFailed', which means "eTagValue" no longer matches the one stored, and TableTransactionActionType is "Update".
// Edge case: the resulting code is 'Conflict'. This is the case when eTagValue is null, and the TableTransactionActionType is "Add",
// in which case the exception indicates that the table entity we are trying to "add" already exists.
Expand Down Expand Up @@ -1424,6 +1412,53 @@ bool ExceedsMaxTablePropertySize(string data)
return false;
}

async Task<ETag?> TryUpdateInstanceTableAsync(TableEntity instanceEntity, ETag? eTag, string instanceId, string executionId, OrchestrationStatus runtimeStatus, int episodeNumber)
{
var orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew();

try
{
Response result = await (eTag == null
? this.InstancesTable.InsertEntityAsync(instanceEntity)
: this.InstancesTable.MergeEntityAsync(instanceEntity, eTag.Value));

this.settings.Logger.InstanceStatusUpdate(
this.storageAccountName,
this.taskHubName,
instanceId,
executionId,
runtimeStatus,
episodeNumber,
orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds);

return result.Headers.ETag;
}
catch (DurableTaskStorageException ex)
{
// Handle the case where the instance table has already been updated by another caller.
// Common case: the resulting code is 'PreconditionFailed', which means we are trying to update an existing instance entity and "eTag" no longer matches the one stored.
// Edge case: the resulting code is 'Conflict'. This is the case when eTag is null, and we are trying to insert a new instance entity, in which case the exception
// indicates that the table entity we are trying to "add" already exists.
if (ex.HttpStatusCode == (int)HttpStatusCode.Conflict || ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
{
this.settings.Logger.SplitBrainDetected(
this.storageAccountName,
this.taskHubName,
instanceId,
executionId,
newEventCount: 0,
totalEventCount: 1, // these fields don't really make sense for the instance table case. do we want to introduce a new log? or are we okay with this since "InstanceEntity"
// in the new events field will allow this to be detectable?
"InstanceEntity",
orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds,
eTag is null ? string.Empty : eTag.ToString());
}

throw;
}

}

class TrackingStoreContext
{
public List<string> Blobs { get; set; } = new List<string>();
Expand Down
6 changes: 3 additions & 3 deletions src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ interface ITrackingStore
/// <param name="oldRuntimeState">The RuntimeState for an olderExecution</param>
/// <param name="instanceId">InstanceId for the Orchestration Update</param>
/// <param name="executionId">ExecutionId for the Orchestration Update</param>
/// <param name="eTag">The ETag value to use for safe updates</param>
/// <param name="eTags">The ETag value for the instance and history tables to use for safe updates</param>
/// <param name="trackingStoreContext">Additional context for the execution that is maintained by the tracking store.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
Task<ETag?> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, ETag? eTag, object trackingStoreContext, CancellationToken cancellationToken = default);
Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, OrchestrationETags eTags, object trackingStoreContext, CancellationToken cancellationToken = default);

/// <summary>
/// Get The Orchestration State for the Latest or All Executions
Expand Down Expand Up @@ -127,7 +127,7 @@ interface ITrackingStore
/// </summary>
/// <param name="instanceIds">The list of instances to query for.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
IAsyncEnumerable<OrchestrationState> GetStateAsync(IEnumerable<string> instanceIds, CancellationToken cancellationToken = default);
IAsyncEnumerable<InstanceStatus> FetchInstanceStatusAsync(IEnumerable<string> instanceIds, CancellationToken cancellationToken = default);

/// <summary>
/// Get The Orchestration State for querying orchestration instances by the condition
Expand Down
Loading
Loading