Skip to content

Commit 8c26e62

Browse files
authored
Don't fail orchestrations on missing blobs from previous executions (#1189)
1 parent 3bc255c commit 8c26e62

File tree

2 files changed

+30
-14
lines changed

2 files changed

+30
-14
lines changed

src/DurableTask.AzureStorage/MessageManager.cs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -234,11 +234,23 @@ public Task<bool> DeleteBlobAsync(string blobName, CancellationToken cancellatio
234234

235235
private async Task<string> DownloadAndDecompressAsBytesAsync(Blob blob, CancellationToken cancellationToken = default)
236236
{
237-
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken);
238-
using GZipStream decompressedBlobStream = new GZipStream(result.Content, CompressionMode.Decompress);
239-
using StreamReader reader = new StreamReader(decompressedBlobStream, Encoding.UTF8);
237+
try
238+
{
239+
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken);
240+
using GZipStream decompressedBlobStream = new GZipStream(result.Content, CompressionMode.Decompress);
241+
using StreamReader reader = new StreamReader(decompressedBlobStream, Encoding.UTF8);
242+
243+
return await reader.ReadToEndAsync();
244+
}
245+
catch (Exception)
246+
{
247+
this.settings.Logger.GeneralWarning(
248+
this.azureStorageClient.BlobAccountName,
249+
this.settings.TaskHubName,
250+
$"Failed to download or decompress blob {blob.Name}.");
240251

241-
return await reader.ReadToEndAsync();
252+
throw;
253+
}
242254
}
243255

244256
public string GetBlobUrl(string blobName)

src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -155,14 +155,21 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in
155155
.GetHistoryEntitiesResponseInfoAsync(instanceId, expectedExecutionId, null, cancellationToken)
156156
.GetResultsAsync(cancellationToken: cancellationToken);
157157

158+
// The sentinel row should always be the last row
159+
TableEntity sentinel = results.Entities.LastOrDefault(e => e.RowKey == SentinelRowKey);
160+
158161
IList<HistoryEvent> historyEvents;
159162
string executionId;
160-
TableEntity sentinel = null;
161163
TrackingStoreContext trackingStoreContext = new TrackingStoreContext();
162-
if (results.Entities.Count > 0)
164+
165+
// If expectedExecutionId is provided but it does not match the sentinel executionId,
166+
// it may belong to a previous generation. In that case, treat it as an unknown executionId
167+
// and skip loading history.
168+
if (results.Entities.Count > 0 && (expectedExecutionId == null ||
169+
expectedExecutionId == sentinel?.GetString("ExecutionId")))
163170
{
164171
// The most recent generation will always be in the first history event.
165-
executionId = results.Entities[0].GetString("ExecutionId");
172+
executionId = sentinel?.GetString("ExecutionId") ?? results.Entities[0].GetString("ExecutionId");
166173

167174
// Convert the table entities into history events.
168175
var events = new List<HistoryEvent>(results.Entities.Count);
@@ -175,11 +182,9 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in
175182
break;
176183
}
177184

178-
// The sentinel row does not contain any history events, so save it for later
179-
// and continue
180-
if (entity.RowKey == SentinelRowKey)
185+
// The sentinel row does not contain any history events, so ignore and continue
186+
if (entity == sentinel)
181187
{
182-
sentinel = entity;
183188
continue;
184189
}
185190

@@ -197,11 +202,10 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in
197202
executionId = expectedExecutionId;
198203
}
199204

200-
// Read the checkpoint completion time from the sentinel row, which should always be the last row.
205+
// Read the checkpoint completion time from the sentinel row.
201206
// A sentinel won't exist only if no instance of this ID has ever existed or the instance history
202-
// was purged.The IsCheckpointCompleteProperty was newly added _after_ v1.6.4.
207+
// was purged. The IsCheckpointCompleteProperty was newly added _after_ v1.6.4.
203208
DateTime checkpointCompletionTime = DateTime.MinValue;
204-
sentinel = sentinel ?? results.Entities.LastOrDefault(e => e.RowKey == SentinelRowKey);
205209
ETag? eTagValue = sentinel?.ETag;
206210
if (sentinel != null &&
207211
sentinel.TryGetValue(CheckpointCompletedTimestampProperty, out object timestampObj) &&

0 commit comments

Comments
 (0)