Skip to content

Commit 5eb2643

Browse files
sophiatevSophia Tevosyan
andauthored
Adding ETag Usage to the Instance Table (#1280)
* first commit * removing unnecessary update to OrchestrationState * forgot to update the instance etag * added test cleanup * added instance status fetching when the etag isnt already present * added feature flag * addressing PR comments * removed one more design question --------- Co-authored-by: Sophia Tevosyan <[email protected]>
1 parent ee24d35 commit 5eb2643

File tree

12 files changed

+561
-169
lines changed

12 files changed

+561
-169
lines changed

src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1230,7 +1230,7 @@ await this.CommitOutboundQueueMessages(
12301230
// will result in a duplicate replay of the orchestration with no side-effects.
12311231
try
12321232
{
1233-
session.ETag = await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETag, session.TrackingStoreContext);
1233+
await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETags, session.TrackingStoreContext);
12341234
// update the runtime state and execution id stored in the session
12351235
session.UpdateRuntimeState(runtimeState);
12361236

src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,5 +294,22 @@ internal LogHelper Logger
294294
/// The default is <see cref="QueueClientMessageEncoding.UTF8"/>.
295295
/// </summary>
296296
public QueueClientMessageEncoding QueueClientMessageEncoding { get; set; } = QueueClientMessageEncoding.UTF8;
297+
298+
/// <summary>
299+
/// When true, an etag is used when attempting to make instance table updates upon completing an orchestration work item.
300+
/// </summary>
301+
/// <remarks>
302+
/// By default, etags are only used when updating the history table upon completing an orchestration work item. This can lead
303+
/// to subtle race conditions where the instance table is incorrectly updated. Consider the following scenario:
304+
/// 1. Worker A completes an orchestration work item, sends outgoing messages, updates the history table, then stalls.
305+
/// 2. Worker B picks up the next and final orchestration work item for the same instance and completes it, updating the history
306+
/// table and instance table with the new terminal status.
307+
/// 3. Worker A resumes and overrides the terminal status in the instance table with an incorrect non-terminal status.
308+
/// This will leave the instance status of the orchestration permanently as "Running" even though it has actually completed.
309+
/// To prevent such scenarios, enabling this setting will ensure that instance table updates also use etags. This would prevent
310+
/// worker A's update in step 3 from completing. Enabling this setting will also introduce a performance overhead since the instance
311+
/// table must now be read before processing every orchestration work item to obtain the latest etag.
312+
/// </remarks>
313+
public bool UseInstanceTableEtag { get; set; } = false;
297314
}
298315
}

src/DurableTask.AzureStorage/MessageData.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ internal void Update(UpdateReceipt receipt)
111111
{
112112
this.OriginalQueueMessage = this.OriginalQueueMessage.Update(receipt);
113113
}
114+
115+
internal object MessageMetadata { get; set; }
114116
}
115117

116118
/// <summary>

src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public OrchestrationSession(
3737
ControlQueue controlQueue,
3838
List<MessageData> initialMessageBatch,
3939
OrchestrationRuntimeState runtimeState,
40-
ETag? eTag,
40+
OrchestrationETags eTags,
4141
DateTime lastCheckpointTime,
4242
object trackingStoreContext,
4343
TimeSpan idleTimeout,
@@ -48,7 +48,7 @@ public OrchestrationSession(
4848
this.ControlQueue = controlQueue ?? throw new ArgumentNullException(nameof(controlQueue));
4949
this.CurrentMessageBatch = initialMessageBatch ?? throw new ArgumentNullException(nameof(initialMessageBatch));
5050
this.RuntimeState = runtimeState ?? throw new ArgumentNullException(nameof(runtimeState));
51-
this.ETag = eTag;
51+
this.ETags = eTags;
5252
this.LastCheckpointTime = lastCheckpointTime;
5353
this.TrackingStoreContext = trackingStoreContext;
5454

@@ -66,7 +66,7 @@ public OrchestrationSession(
6666

6767
public OrchestrationRuntimeState RuntimeState { get; private set; }
6868

69-
public ETag? ETag { get; set; }
69+
public OrchestrationETags ETags { get; set; }
7070

7171
public DateTime LastCheckpointTime { get; }
7272

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// ----------------------------------------------------------------------------------
2+
// Copyright Microsoft Corporation
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ----------------------------------------------------------------------------------
13+
#nullable enable
14+
namespace DurableTask.AzureStorage
15+
{
16+
using Azure;
17+
18+
class OrchestrationETags
19+
{
20+
internal ETag? InstanceETag { get; set; }
21+
22+
internal ETag? HistoryETag { get; set; }
23+
}
24+
}

src/DurableTask.AzureStorage/OrchestrationSessionManager.cs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -271,21 +271,25 @@ async Task<IEnumerable<MessageData>> DedupeExecutionStartedMessagesAsync(
271271
// Terminology:
272272
// "Local" -> the instance ID info comes from the local copy of the message we're examining
273273
// "Remote" -> the instance ID info comes from the Instances table that we're querying
274-
IAsyncEnumerable<OrchestrationState> instances = this.trackingStore.GetStateAsync(instanceIds, cancellationToken);
275-
IDictionary<string, OrchestrationState> remoteOrchestrationsById =
276-
await instances.ToDictionaryAsync(o => o.OrchestrationInstance.InstanceId, cancellationToken);
274+
IAsyncEnumerable<InstanceStatus> instances = this.trackingStore.FetchInstanceStatusAsync(instanceIds, cancellationToken);
275+
IDictionary<string, InstanceStatus> remoteOrchestrationsById =
276+
await instances.ToDictionaryAsync(o => o.State.OrchestrationInstance.InstanceId, cancellationToken);
277277

278278
foreach (MessageData message in executionStartedMessages)
279279
{
280280
OrchestrationInstance localInstance = message.TaskMessage.OrchestrationInstance;
281281
var expectedGeneration = ((ExecutionStartedEvent)message.TaskMessage.Event).Generation;
282-
if (remoteOrchestrationsById.TryGetValue(localInstance.InstanceId, out OrchestrationState remoteInstance) &&
283-
(remoteInstance.OrchestrationInstance.ExecutionId == null || string.Equals(localInstance.ExecutionId, remoteInstance.OrchestrationInstance.ExecutionId, StringComparison.OrdinalIgnoreCase)))
282+
if (remoteOrchestrationsById.TryGetValue(localInstance.InstanceId, out InstanceStatus remoteInstance) &&
283+
(remoteInstance.State.OrchestrationInstance.ExecutionId == null || string.Equals(localInstance.ExecutionId, remoteInstance.State.OrchestrationInstance.ExecutionId, StringComparison.OrdinalIgnoreCase)))
284284
{
285285
// Happy path: The message matches the table status. Alternatively, if the table doesn't have an ExecutionId field (older clients, pre-v1.8.5),
286286
// then we have no way of knowing if it's a duplicate. Either way, allow it to run.
287+
if (this.settings.UseInstanceTableEtag)
288+
{
289+
message.MessageMetadata = remoteInstance.ETag;
290+
}
287291
}
288-
else if (expectedGeneration == remoteInstance?.Generation && this.IsScheduledAfterInstanceUpdate(message, remoteInstance))
292+
else if (expectedGeneration == remoteInstance?.State.Generation && this.IsScheduledAfterInstanceUpdate(message, remoteInstance?.State))
289293
{
290294
// The message was scheduled after the Instances table was updated with the orchestration info.
291295
// We know almost certainly that this is a redundant message and can be safely discarded because
@@ -476,6 +480,10 @@ internal void AddMessageToPendingOrchestration(
476480
if (targetBatch == null)
477481
{
478482
targetBatch = new PendingMessageBatch(controlQueue, instanceId, executionId);
483+
if (this.settings.UseInstanceTableEtag && data.MessageMetadata is ETag instanceEtag)
484+
{
485+
targetBatch.ETags.InstanceETag = instanceEtag;
486+
}
479487
node = this.pendingOrchestrationMessageBatches.AddLast(targetBatch);
480488

481489
// Before the batch of messages can be processed, we need to download the latest execution state.
@@ -519,9 +527,20 @@ async Task ScheduleOrchestrationStatePrefetch(
519527
cancellationToken);
520528

521529
batch.OrchestrationState = new OrchestrationRuntimeState(history.Events);
522-
batch.ETag = history.ETag;
530+
batch.ETags.HistoryETag = history.ETag;
523531
batch.LastCheckpointTime = history.LastCheckpointTime;
524532
batch.TrackingStoreContext = history.TrackingStoreContext;
533+
534+
// Try to get the instance ETag from the tracking store if it wasn't already provided
535+
if (this.settings.UseInstanceTableEtag && batch.ETags.InstanceETag == null)
536+
{
537+
InstanceStatus? instanceStatus = await this.trackingStore.FetchInstanceStatusAsync(
538+
batch.OrchestrationInstanceId,
539+
cancellationToken);
540+
// The instance could not exist in the case that these messages are for the first execution of a suborchestration,
541+
// or an entity-started orchestration, for example
542+
batch.ETags.InstanceETag = instanceStatus?.ETag;
543+
}
525544
}
526545

527546
if (this.settings.UseSeparateQueueForEntityWorkItems
@@ -590,7 +609,7 @@ async Task ScheduleOrchestrationStatePrefetch(
590609
nextBatch.ControlQueue,
591610
nextBatch.Messages,
592611
nextBatch.OrchestrationState,
593-
nextBatch.ETag,
612+
nextBatch.ETags,
594613
nextBatch.LastCheckpointTime,
595614
nextBatch.TrackingStoreContext,
596615
this.settings.ExtendedSessionIdleTimeout,
@@ -737,8 +756,10 @@ public OrchestrationRuntimeState? OrchestrationState
737756
}
738757
}
739758

740-
public ETag? ETag { get; set; }
759+
public OrchestrationETags ETags { get; } = new OrchestrationETags();
760+
741761
public DateTime LastCheckpointTime { get; set; }
762+
742763
public object? TrackingStoreContext { get; set; }
743764
}
744765
}

src/DurableTask.AzureStorage/Storage/Table.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,18 @@ public async Task DeleteEntityAsync<T>(T tableEntity, ETag ifMatch = default, Ca
8686
this.stats.TableEntitiesWritten.Increment();
8787
}
8888

89-
public async Task InsertEntityAsync<T>(T tableEntity, CancellationToken cancellationToken = default) where T : ITableEntity
89+
public async Task<Response> InsertEntityAsync<T>(T tableEntity, CancellationToken cancellationToken = default) where T : ITableEntity
9090
{
91-
await this.tableClient.AddEntityAsync(tableEntity, cancellationToken).DecorateFailure();
91+
Response result = await this.tableClient.AddEntityAsync(tableEntity, cancellationToken).DecorateFailure();
9292
this.stats.TableEntitiesWritten.Increment();
93+
return result;
9394
}
9495

95-
public async Task MergeEntityAsync<T>(T tableEntity, ETag ifMatch, CancellationToken cancellationToken = default) where T : ITableEntity
96+
public async Task<Response> MergeEntityAsync<T>(T tableEntity, ETag ifMatch, CancellationToken cancellationToken = default) where T : ITableEntity
9697
{
97-
await this.tableClient.UpdateEntityAsync(tableEntity, ifMatch, TableUpdateMode.Merge, cancellationToken).DecorateFailure();
98+
Response result = await this.tableClient.UpdateEntityAsync(tableEntity, ifMatch, TableUpdateMode.Merge, cancellationToken).DecorateFailure();
9899
this.stats.TableEntitiesWritten.Increment();
100+
return result;
99101
}
100102

101103
public async Task InsertOrMergeEntityAsync<T>(T tableEntity, CancellationToken cancellationToken = default) where T : ITableEntity

0 commit comments

Comments
 (0)