Skip to content

Commit ee24d35

Browse files
sophiatevSophia TevosyanCopilot
authored
Adding Missing Fields to Instance Entity and Fix Blob References (#1272)
* first commit * added missing properties to other instance entities * missed a parentheses * fixed other bugs, added way more tests * Update src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs Co-authored-by: Copilot <[email protected]> * addressing PR comments * fixing the tests, adding tests for a nonexistent instance entity * updated to only extract the executioncompleted and executionstarted events for large input/outputs to extract the blob names * updated to add better exception handling --------- Co-authored-by: Sophia Tevosyan <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent d04a0ac commit ee24d35

File tree

6 files changed

+644
-59
lines changed

6 files changed

+644
-59
lines changed

src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,6 @@ async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(boo
797797
session.RuntimeState,
798798
orchestrationWorkItem.NewMessages,
799799
settings.AllowReplayingTerminalInstances,
800-
session.TrackingStoreContext,
801800
cancellationToken);
802801
if (!string.IsNullOrEmpty(warningMessage))
803802
{
@@ -1059,7 +1058,6 @@ async Task<string> IsExecutableInstanceAsync(
10591058
OrchestrationRuntimeState runtimeState,
10601059
IList<TaskMessage> newMessages,
10611060
bool allowReplayingTerminalInstances,
1062-
object trackingStoreContext,
10631061
CancellationToken cancellationToken)
10641062
{
10651063
if (runtimeState.ExecutionStartedEvent == null && !newMessages.Any(msg => msg.Event is ExecutionStartedEvent))
@@ -1078,8 +1076,7 @@ async Task<string> IsExecutableInstanceAsync(
10781076
var executionTerminatedEvent = (ExecutionTerminatedEvent)executionTerminatedEventMessage.Event;
10791077
await this.trackingStore.UpdateStatusForTerminationAsync(
10801078
instanceId,
1081-
executionTerminatedEvent.Input,
1082-
executionTerminatedEvent.Timestamp);
1079+
executionTerminatedEvent);
10831080
return $"Instance is {OrchestrationStatus.Terminated}";
10841081
}
10851082

@@ -1103,11 +1100,11 @@ await this.trackingStore.UpdateStatusForTerminationAsync(
11031100
if (instanceStatus == null || (instanceStatus.State.OrchestrationInstance.ExecutionId == runtimeState.OrchestrationInstance.ExecutionId
11041101
&& instanceStatus.State.OrchestrationStatus != runtimeState.OrchestrationStatus))
11051102
{
1106-
await this.trackingStore.UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(
1103+
await this.trackingStore.UpdateInstanceStatusForCompletedOrchestrationAsync(
11071104
runtimeState.OrchestrationInstance.InstanceId,
11081105
runtimeState.OrchestrationInstance.ExecutionId,
11091106
runtimeState,
1110-
trackingStoreContext,
1107+
instanceStatus is not null,
11111108
cancellationToken);
11121109
}
11131110
if (!allowReplayingTerminalInstances)

src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs

Lines changed: 106 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -798,21 +798,25 @@ public override async Task UpdateStatusForRewindAsync(string instanceId, Cancell
798798
/// <inheritdoc />
799799
public override async Task UpdateStatusForTerminationAsync(
800800
string instanceId,
801-
string output,
802-
DateTime lastUpdatedTime,
801+
ExecutionTerminatedEvent executionTerminatedEvent,
803802
CancellationToken cancellationToken = default)
804803
{
805804
string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId);
806-
TableEntity entity = new TableEntity(sanitizedInstanceId, "")
805+
TableEntity instanceEntity = new TableEntity(sanitizedInstanceId, "")
807806
{
808807
["RuntimeStatus"] = OrchestrationStatus.Terminated.ToString("G"),
809-
["LastUpdatedTime"] = lastUpdatedTime,
808+
["LastUpdatedTime"] = executionTerminatedEvent.Timestamp,
810809
["CompletedTime"] = DateTime.UtcNow,
811-
[OutputProperty] = output
810+
// In the case of terminating an orchestration, the termination reason becomes the orchestration's output.
811+
[OutputProperty] = executionTerminatedEvent.Input,
812812
};
813813

814+
// Setting addBlobPropertyName to false ensures that the blob URL is saved as the "Output" of the instance entity, which is the expected behavior
815+
// for large orchestration outputs.
816+
await this.CompressLargeMessageAsync(instanceEntity, listOfBlobs: null, cancellationToken: cancellationToken, addBlobPropertyName: false);
817+
814818
Stopwatch stopwatch = Stopwatch.StartNew();
815-
await this.InstancesTable.MergeEntityAsync(entity, ETag.All, cancellationToken);
819+
await this.InstancesTable.MergeEntityAsync(instanceEntity, ETag.All, cancellationToken);
816820

817821
this.settings.Logger.InstanceStatusUpdate(
818822
this.storageAccountName,
@@ -864,6 +868,7 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
864868
["CustomStatus"] = newRuntimeState.Status ?? "null",
865869
["ExecutionId"] = executionId,
866870
["LastUpdatedTime"] = newEvents.Last().Timestamp,
871+
["TaskHubName"] = this.settings.TaskHubName,
867872
};
868873

869874
// check if we are replacing a previous execution with blobs; those will be deleted from the store after the update. This could occur in a ContinueAsNew scenario
@@ -910,6 +915,8 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
910915
instanceEntity["Version"] = executionStartedEvent.Version;
911916
instanceEntity["CreatedTime"] = executionStartedEvent.Timestamp;
912917
instanceEntity["RuntimeStatus"] = OrchestrationStatus.Running.ToString();
918+
instanceEntity["Tags"] = TagsSerializer.Serialize(executionStartedEvent.Tags);
919+
instanceEntity["Generation"] = executionStartedEvent.Generation;
913920
if (executionStartedEvent.ScheduledStartTime.HasValue)
914921
{
915922
instanceEntity["ScheduledStartTime"] = executionStartedEvent.ScheduledStartTime;
@@ -1048,11 +1055,11 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
10481055
return eTagValue;
10491056
}
10501057

1051-
public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(
1058+
public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync(
10521059
string instanceId,
10531060
string executionId,
10541061
OrchestrationRuntimeState runtimeState,
1055-
object trackingStoreContext,
1062+
bool instanceEntityExists,
10561063
CancellationToken cancellationToken = default)
10571064
{
10581065
if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed &&
@@ -1063,28 +1070,90 @@ public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForComplete
10631070
return;
10641071
}
10651072

1066-
TrackingStoreContext context = (TrackingStoreContext)trackingStoreContext;
1067-
if (context.Blobs.Count > 0)
1068-
{
1069-
var tasks = new List<Task>(context.Blobs.Count);
1070-
foreach (string blobName in context.Blobs)
1071-
{
1072-
tasks.Add(this.messageManager.DeleteBlobAsync(blobName));
1073-
}
1074-
await Task.WhenAll(tasks);
1075-
}
1076-
10771073
string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId);
1074+
ExecutionStartedEvent executionStartedEvent = runtimeState.ExecutionStartedEvent;
1075+
1076+
// We need to set all of the fields of the instance entity in the case that it was never created for the orchestration.
1077+
// This can be the case for a suborchestration that completed in one execution, for example.
10781078
var instanceEntity = new TableEntity(sanitizedInstanceId, string.Empty)
10791079
{
1080+
["Name"] = runtimeState.Name,
1081+
["Version"] = runtimeState.Version,
1082+
["CreatedTime"] = executionStartedEvent.Timestamp,
10801083
// TODO: Translating null to "null" is a temporary workaround. We should prioritize
10811084
// https://github.com/Azure/durabletask/issues/477 so that this is no longer necessary.
10821085
["CustomStatus"] = runtimeState.Status ?? "null",
10831086
["ExecutionId"] = executionId,
10841087
["LastUpdatedTime"] = runtimeState.Events.Last().Timestamp,
10851088
["RuntimeStatus"] = runtimeState.OrchestrationStatus.ToString(),
1086-
["CompletedTime"] = runtimeState.CompletedTime
1089+
["CompletedTime"] = runtimeState.CompletedTime,
1090+
["Tags"] = TagsSerializer.Serialize(executionStartedEvent.Tags),
1091+
["TaskHubName"] = this.settings.TaskHubName,
10871092
};
1093+
if (runtimeState.ExecutionStartedEvent.ScheduledStartTime.HasValue)
1094+
{
1095+
instanceEntity["ScheduledStartTime"] = executionStartedEvent.ScheduledStartTime;
1096+
}
1097+
1098+
static TableEntity GetSingleEntityFromHistoryTableResults(IReadOnlyList<TableEntity> entities, string dataType)
1099+
{
1100+
try
1101+
{
1102+
TableEntity singleEntity = entities.SingleOrDefault();
1103+
1104+
return singleEntity ?? throw new DurableTaskStorageException($"The history table query to determine the blob storage URL " +
1105+
$"for the large orchestration {dataType} returned no rows. Unable to extract the URL from these results.");
1106+
}
1107+
catch (InvalidOperationException)
1108+
{
1109+
throw new DurableTaskStorageException($"The history table query to determine the blob storage URL for the large orchestration " +
1110+
$"{dataType} returned more than one row, when exactly one row is expected. " +
1111+
$"Unable to extract the URL from these results.");
1112+
}
1113+
}
1114+
1115+
// Set the output.
1116+
// In the case that the output is too large and is stored in blob storage, extract the blob name from the ExecutionCompleted history entity.
1117+
if (this.ExceedsMaxTablePropertySize(runtimeState.Output))
1118+
{
1119+
string filter = $"{nameof(ITableEntity.PartitionKey)} eq '{KeySanitation.EscapePartitionKey(instanceId)}'" +
1120+
$" and {nameof(OrchestrationInstance.ExecutionId)} eq '{executionId}'" +
1121+
$" and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.ExecutionCompleted)}'";
1122+
TableEntity executionCompletedEntity = GetSingleEntityFromHistoryTableResults(await this.QueryHistoryAsync(filter, instanceId, cancellationToken), "output");
1123+
this.SetInstancesTablePropertyFromHistoryProperty(
1124+
executionCompletedEntity,
1125+
instanceEntity,
1126+
historyPropertyName: nameof(runtimeState.ExecutionCompletedEvent.Result),
1127+
instancePropertyName: OutputProperty,
1128+
data: runtimeState.Output);
1129+
}
1130+
else
1131+
{
1132+
instanceEntity[OutputProperty] = runtimeState.Output;
1133+
}
1134+
1135+
// If the input has not been set by a previous execution, set the input.
1136+
if (!instanceEntityExists)
1137+
{
1138+
// In the case that the input is too large and is stored in blob storage, extract the blob name from the ExecutionStarted history entity.
1139+
if (this.ExceedsMaxTablePropertySize(runtimeState.Input))
1140+
{
1141+
string filter = $"{nameof(ITableEntity.PartitionKey)} eq '{KeySanitation.EscapePartitionKey(instanceId)}'" +
1142+
$" and {nameof(OrchestrationInstance.ExecutionId)} eq '{executionId}'" +
1143+
$" and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.ExecutionStarted)}'";
1144+
TableEntity executionStartedEntity = GetSingleEntityFromHistoryTableResults(await this.QueryHistoryAsync(filter, instanceId, cancellationToken), "input");
1145+
this.SetInstancesTablePropertyFromHistoryProperty(
1146+
executionStartedEntity,
1147+
instanceEntity,
1148+
historyPropertyName: nameof(executionStartedEvent.Input),
1149+
instancePropertyName: InputProperty,
1150+
data: executionStartedEvent.Input);
1151+
}
1152+
else
1153+
{
1154+
instanceEntity[InputProperty] = runtimeState.Input;
1155+
}
1156+
}
10881157

10891158
Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew();
10901159
await this.InstancesTable.InsertOrMergeEntityAsync(instanceEntity);
@@ -1161,7 +1230,7 @@ void SetInstancesTablePropertyFromHistoryProperty(
11611230
}
11621231
}
11631232

1164-
async Task CompressLargeMessageAsync(TableEntity entity, List<string> listOfBlobs, CancellationToken cancellationToken)
1233+
async Task CompressLargeMessageAsync(TableEntity entity, List<string> listOfBlobs, CancellationToken cancellationToken, bool addBlobPropertyName = true)
11651234
{
11661235
foreach (string propertyName in VariableSizeEntityProperties)
11671236
{
@@ -1176,9 +1245,16 @@ property is string stringProperty &&
11761245

11771246
// Clear out the original property value and create a new "*BlobName"-suffixed property.
11781247
// The runtime will look for the new "*BlobName"-suffixed column to know if a property is stored in a blob.
1179-
string blobPropertyName = GetBlobPropertyName(propertyName);
1180-
entity.Add(blobPropertyName, blobName);
1181-
entity[propertyName] = string.Empty;
1248+
if (addBlobPropertyName)
1249+
{
1250+
string blobPropertyName = GetBlobPropertyName(propertyName);
1251+
entity.Add(blobPropertyName, blobName);
1252+
entity[propertyName] = string.Empty;
1253+
}
1254+
else
1255+
{
1256+
entity[propertyName] = this.messageManager.GetBlobUrl(blobName);
1257+
}
11821258

11831259
// if necessary, keep track of all the blobs associated with this execution
11841260
listOfBlobs?.Add(blobName);
@@ -1226,6 +1302,12 @@ static string GetBlobName(TableEntity entity, string property)
12261302
// EventType. Use a hardcoded value to record the orchestration input.
12271303
eventType = "Input";
12281304
}
1305+
else if (property == "Output")
1306+
{
1307+
// This message is used to terminate an orchestration with no history, so it does not have a
1308+
// corresponding EventType. Use a hardcoded value to record the orchestration output.
1309+
eventType = "Output";
1310+
}
12291311
else if (property == "Tags")
12301312
{
12311313
eventType = "Tags";

src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,15 @@ interface ITrackingStore
105105

106106
/// <summary>
107107
/// Updates the instance status of the specified orchestration instance to match that of <paramref name="runtimeState"/> for a completed orchestration.
108-
/// Also deletes any orphaned blobs of <paramref name="trackingStoreContext"/>.
109108
/// This method is meant to be called in the case that there is an inconsistency between the instance and history table due to a failure during a call to
110109
/// <see cref="UpdateStateAsync"/> for a completing orchestration. If the orchestration is not in a terminal state, the method will immediately return and do nothing.
111110
/// </summary>
112111
/// <param name="instanceId">The ID of the orchestration.</param>
113112
/// <param name="executionId">The execution ID of the orchestration.</param>
114113
/// <param name="runtimeState">The runtime state of the orchestration.</param>
115-
/// <param name="trackingStoreContext">Additional context for the execution that is maintained by the tracking store.</param>
114+
/// <param name="instanceEntityExists">Whether the instance entity already exists in the instance store.</param>
116115
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
117-
Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default);
116+
Task UpdateInstanceStatusForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, bool instanceEntityExists, CancellationToken cancellationToken = default);
118117

119118
/// <summary>
120119
/// Get The Orchestration State for querying all orchestration instances
@@ -166,13 +165,12 @@ interface ITrackingStore
166165
Task UpdateStatusForRewindAsync(string instanceId, CancellationToken cancellationToken = default);
167166

168167
/// <summary>
169-
/// Used to update the instance status to "Terminated" whend a pending orchestration is terminated.
168+
/// Used to update the instance status to "Terminated" when a pending orchestration is terminated.
170169
/// </summary>
171170
/// <param name="instanceId">The instance being terminated</param>
172-
/// <param name="output">The output of the orchestration</param>
173-
/// <param name="lastUpdatedTime">The last updated time of the orchestration (the time the termination request was created)</param>
171+
/// <param name="executionTerminatedEvent">The termination history event.</param>
174172
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
175-
Task UpdateStatusForTerminationAsync(string instanceId, string output, DateTime lastUpdatedTime, CancellationToken cancellationToken = default);
173+
Task UpdateStatusForTerminationAsync(string instanceId, ExecutionTerminatedEvent executionTerminatedEvent, CancellationToken cancellationToken = default);
176174

177175
/// <summary>
178176
/// Purge The History and state which is older than thresholdDateTimeUtc based on the timestamp type specified by timeRangeFilterType

src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -179,24 +179,23 @@ await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[]
179179

180180
public override async Task UpdateStatusForTerminationAsync(
181181
string instanceId,
182-
string output,
183-
DateTime lastUpdatedTime,
182+
ExecutionTerminatedEvent executionTerminatedEvent,
184183
CancellationToken cancellationToken = default)
185184
{
186185
// Get the most recent execution and update its status to terminated
187186
IEnumerable<OrchestrationStateInstanceEntity> instanceEntity = await this.instanceStore.GetOrchestrationStateAsync(instanceId, allInstances: false);
188187
instanceEntity.Single().State.OrchestrationStatus = OrchestrationStatus.Terminated;
189-
instanceEntity.Single().State.LastUpdatedTime = lastUpdatedTime;
188+
instanceEntity.Single().State.LastUpdatedTime = executionTerminatedEvent.Timestamp;
190189
instanceEntity.Single().State.CompletedTime = DateTime.UtcNow;
191-
instanceEntity.Single().State.Output = output;
190+
instanceEntity.Single().State.Output = executionTerminatedEvent.Input;
192191
await this.instanceStore.WriteEntitiesAsync(instanceEntity);
193192
}
194193

195-
public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(
194+
public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync(
196195
string instanceId,
197196
string executionId,
198197
OrchestrationRuntimeState runtimeState,
199-
object trackingStoreContext,
198+
bool instanceEntityExists,
200199
CancellationToken cancellationToken = default)
201200
{
202201
if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed &&
@@ -207,7 +206,6 @@ public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForComplete
207206
return;
208207
}
209208

210-
// No blobs to delete for this tracking store implementation
211209
await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[]
212210
{
213211
new OrchestrationStateInstanceEntity()

0 commit comments

Comments
 (0)