Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions src/DurableTask.AzureStorage/MessageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,23 @@ public Task<bool> DeleteBlobAsync(string blobName, CancellationToken cancellatio

private async Task<string> DownloadAndDecompressAsBytesAsync(Blob blob, CancellationToken cancellationToken = default)
{
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken);
using GZipStream decompressedBlobStream = new GZipStream(result.Content, CompressionMode.Decompress);
using StreamReader reader = new StreamReader(decompressedBlobStream, Encoding.UTF8);
try
{
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken);
using GZipStream decompressedBlobStream = new GZipStream(result.Content, CompressionMode.Decompress);
using StreamReader reader = new StreamReader(decompressedBlobStream, Encoding.UTF8);

return await reader.ReadToEndAsync();
}
catch (Exception)
{
this.settings.Logger.GeneralWarning(
this.azureStorageClient.BlobAccountName,
this.settings.TaskHubName,
$"Failed to download or decompress blob {blob.Name}.");

return await reader.ReadToEndAsync();
throw;
}
}

public string GetBlobUrl(string blobName)
Expand Down
79 changes: 78 additions & 1 deletion src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace DurableTask.AzureStorage.Tracking
using System.Threading.Tasks;
using Azure;
using Azure.Data.Tables;
using Azure.Storage.Blobs.Models;
using DurableTask.AzureStorage.Linq;
using DurableTask.AzureStorage.Monitoring;
using DurableTask.AzureStorage.Storage;
Expand Down Expand Up @@ -184,7 +185,20 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in
}

// Some entity properties may be stored in blob storage.
await this.DecompressLargeEntityProperties(entity, trackingStoreContext.Blobs, cancellationToken);
bool success = await this.TryDecompressLargeEntityPropertiesAsync(
entity,
trackingStoreContext.Blobs,
instanceId,
executionId,
cancellationToken);
if (!success)
{
// Some properties were not retrieved because we are apparently trying to load
// outdated history. No reason to raise an exception here, as this does not
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused about how/why we'd find outdated history at this point. Shouldn't we have filtered out all the outdated history events via the check on line 173 (where we check the ExecutionId value)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, this is the source of our troubles. Line 173 doesn't perform proper filtering because executionId contains an old execution ID, which happens because we passed an old expectedExecutionId to GetHistoryEntitiesResponseInfoAsync, so results.Entities[0] also belongs to the outdated history, and line 173 filters out everything unrelated to the old execution. Eventually, if we don't encounter missing blobs, we hit this line:

message = runtimeState.Events.Count == 0 ? "No such instance" : "Invalid history (may have been overwritten by a newer instance)";

// impact orchestration execution, but also no reason to continue loading this
// execution history.
break;
}

events.Add((HistoryEvent)TableEntityConverter.Deserialize(entity, GetTypeForTableEntity(entity)));
}
Expand Down Expand Up @@ -1102,6 +1116,46 @@ property is string stringProperty &&
}
}

async Task<bool> TryDecompressLargeEntityPropertiesAsync(
TableEntity entity,
List<string> listOfBlobs,
string instanceId,
string executionId,
CancellationToken cancellationToken)
{
try
{
await this.DecompressLargeEntityProperties(entity, listOfBlobs, cancellationToken);
return true;
}
catch (DurableTaskStorageException ex) when (IsMissingBlob(ex))
{
// A blob is expected to be missing if the entity belongs to a previous execution.
// For example, after ContinueAsNew, we remove blobs from the previous generation,
// but we don't immediately remove entities from the history table.
// Let's check if we are trying to load an entity from a previous execution.
var sentinelExecutionId = await this.GetSentinelExecutionIdAsync(instanceId, cancellationToken);
if (sentinelExecutionId != executionId)
{
// The sentinel contains the execution ID of the most recent execution.
// If it doesn't match the assumed execution ID, this means that we are trying
// to load outdated history. This does not necessarily indicate a big problem,
// so no reason to raise an exception, but we should make the caller aware of this.
this.settings.Logger.GeneralWarning(
this.azureStorageClient.BlobAccountName,
this.settings.TaskHubName,
$"Missing blob when trying to load history: trying to load previous generation history? "
+ $"(entity RowKey: {entity.GetString("RowKey")}, entity ExecutionId: {executionId}, sentinel ExecutionId: {sentinelExecutionId}).",
instanceId);

return false;
}

// Otherwise, the blob is missing for a different reason, possibly indicating data corruption.
throw;
}
}

async Task DecompressLargeEntityProperties(TableEntity entity, List<string> listOfBlobs, CancellationToken cancellationToken)
{
// Check for entity properties stored in blob storage
Expand Down Expand Up @@ -1254,6 +1308,29 @@ bool ExceedsMaxTablePropertySize(string data)
return false;
}

static bool IsMissingBlob(DurableTaskStorageException ex)
{
if (ex.InnerException is RequestFailedException rfe)
{
return rfe.ErrorCode == BlobErrorCode.BlobNotFound;
}

return false;
}

async Task<string> GetSentinelExecutionIdAsync(string instanceId, CancellationToken cancellationToken)
{
string filter = $"{nameof(ITableEntity.PartitionKey)} eq '{KeySanitation.EscapePartitionKey(instanceId)}'"
+ $" and {nameof(ITableEntity.RowKey)} eq '{SentinelRowKey}'";

TableQueryResults<TableEntity> results = await this.HistoryTable
.ExecuteQueryAsync<TableEntity>(filter, select: new[] { nameof(OrchestrationInstance.ExecutionId) }, cancellationToken: cancellationToken)
.GetResultsAsync(cancellationToken: cancellationToken);

var executionId = results.Entities.FirstOrDefault()?.GetString(nameof(OrchestrationInstance.ExecutionId));
return executionId;
}

class TrackingStoreContext
{
public List<string> Blobs { get; set; } = new List<string>();
Expand Down
Loading