Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.AzureStorage.Tests
{
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Data.Tables;
using DurableTask.AzureStorage.Storage;
using DurableTask.AzureStorage.Tracking;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;

/// <summary>
/// Tests for the DecompressLargeEntityProperties method in AzureTableTrackingStore.
/// These tests verify that the method gracefully handles BlobNotFound errors.
/// See: https://github.com/Azure/azure-functions-durable-extension/issues/3264
/// </summary>
[TestClass]
public class DecompressLargeEntityPropertiesTests
{
/// <summary>
/// Verifies the method signature of DecompressLargeEntityProperties.
/// The method should return Task&lt;bool&gt; to indicate success/failure when handling blob retrieval.
/// When a blob is not found (404 error), it should return false instead of throwing an exception.
/// This can happen when a late message from a previous execution arrives after ContinueAsNew
/// deleted the blobs, or when blobs are cleaned up due to retention policies.
/// See: https://github.com/Azure/azure-functions-durable-extension/issues/3264
/// </summary>
[TestMethod]
public void DecompressLargeEntityProperties_MethodExists_AndReturnsBool()
{
// Arrange
var trackingStoreType = typeof(AzureTableTrackingStore);
var method = trackingStoreType.GetMethod(
"DecompressLargeEntityProperties",
BindingFlags.NonPublic | BindingFlags.Instance);

// Assert - Verify method exists and has correct signature
Assert.IsNotNull(method, "DecompressLargeEntityProperties method should exist");
Assert.AreEqual(typeof(Task<bool>), method.ReturnType,
"Method should return Task<bool> to indicate success (true) or blob not found (false)");

// Verify method has correct parameters
var parameters = method.GetParameters();
Assert.AreEqual(3, parameters.Length, "Method should have 3 parameters");
Assert.AreEqual(typeof(TableEntity), parameters[0].ParameterType, "First param should be TableEntity");
Assert.AreEqual(typeof(List<string>), parameters[1].ParameterType, "Second param should be List<string>");
Assert.AreEqual(typeof(CancellationToken), parameters[2].ParameterType, "Third param should be CancellationToken");
}

/// <summary>
/// Tests that DecompressLargeEntityProperties also handles DurableTaskStorageException
/// wrapping a RequestFailedException with 404 status.
/// </summary>
[TestMethod]
public void DecompressLargeEntityProperties_MethodSignature_ReturnsTaskBool()
{
// Arrange
var trackingStoreType = typeof(AzureTableTrackingStore);
var method = trackingStoreType.GetMethod(
"DecompressLargeEntityProperties",
BindingFlags.NonPublic | BindingFlags.Instance);

// Assert
Assert.IsNotNull(method, "DecompressLargeEntityProperties method should exist");
Assert.AreEqual(typeof(Task<bool>), method.ReturnType,
"Method should return Task<bool> to indicate success/failure");

var parameters = method.GetParameters();
Assert.AreEqual(3, parameters.Length, "Method should have 3 parameters");
Assert.AreEqual(typeof(TableEntity), parameters[0].ParameterType);
Assert.AreEqual(typeof(List<string>), parameters[1].ParameterType);
Assert.AreEqual(typeof(CancellationToken), parameters[2].ParameterType);
}

/// <summary>
/// Verifies that RequestFailedException with status 404 is properly catchable
/// with the pattern used in the fix.
/// </summary>
[TestMethod]
public void RequestFailedException_Status404_CanBeCaughtWithWhenClause()
{
// Arrange
var exception = new RequestFailedException(
status: 404,
message: "The specified blob does not exist.",
errorCode: "BlobNotFound",
innerException: null);

// Act & Assert
bool caught = false;
try
{
throw exception;
}
catch (RequestFailedException e) when (e.Status == 404)
{
caught = true;
}

Assert.IsTrue(caught, "RequestFailedException with status 404 should be caught");
}

/// <summary>
/// Verifies that DurableTaskStorageException wrapping RequestFailedException
/// with status 404 can be caught with the pattern used in the fix.
/// </summary>
[TestMethod]
public void DurableTaskStorageException_WrappingBlobNotFound_CanBeCaughtWithWhenClause()
{
// Arrange
var innerException = new RequestFailedException(
status: 404,
message: "The specified blob does not exist.",
errorCode: "BlobNotFound",
innerException: null);

var wrappedException = new DurableTaskStorageException(innerException);

// Act & Assert
bool caught = false;
try
{
throw wrappedException;
}
catch (DurableTaskStorageException e) when (e.InnerException is RequestFailedException { Status: 404 })
{
caught = true;
}

Assert.IsTrue(caught, "DurableTaskStorageException wrapping 404 should be caught");
}

/// <summary>
/// Verifies that other status codes are NOT caught by the 404 filter.
/// </summary>
[TestMethod]
public void RequestFailedException_OtherStatus_NotCaughtBy404Filter()
{
// Arrange - 500 Internal Server Error
var exception = new RequestFailedException(
status: 500,
message: "Internal Server Error",
errorCode: "InternalError",
innerException: null);

// Act & Assert
bool caughtBy404Filter = false;
bool caughtByGenericHandler = false;
try
{
throw exception;
}
catch (RequestFailedException e) when (e.Status == 404)
{
caughtBy404Filter = true;
}
catch (RequestFailedException)
{
caughtByGenericHandler = true;
}

Assert.IsFalse(caughtBy404Filter, "404 filter should NOT catch 500 error");
Assert.IsTrue(caughtByGenericHandler, "500 error should be caught by generic handler");
}
}
}
48 changes: 43 additions & 5 deletions src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,13 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in
}

// Some entity properties may be stored in blob storage.
await this.DecompressLargeEntityProperties(entity, trackingStoreContext.Blobs, cancellationToken);
// If the blob is not found (e.g., from a previous execution), skip this entity.
bool decompressSuccess = await this.DecompressLargeEntityProperties(entity, trackingStoreContext.Blobs, cancellationToken);
if (!decompressSuccess)
{
// Skip this history entry as its blob data is unavailable (likely from a stale execution)
continue;
}

events.Add((HistoryEvent)TableEntityConverter.Deserialize(entity, GetTypeForTableEntity(entity)));
}
Expand Down Expand Up @@ -1186,22 +1192,54 @@ property is string stringProperty &&
}
}

async Task DecompressLargeEntityProperties(TableEntity entity, List<string> listOfBlobs, CancellationToken cancellationToken)
async Task<bool> DecompressLargeEntityProperties(TableEntity entity, List<string> listOfBlobs, CancellationToken cancellationToken)
{
// Check for entity properties stored in blob storage
foreach (string propertyName in VariableSizeEntityProperties)
{
string blobPropertyName = GetBlobPropertyName(propertyName);
if (entity.TryGetValue(blobPropertyName, out object property) && property is string blobName)
{
string decompressedMessage = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobName, cancellationToken);
entity[propertyName] = decompressedMessage;
entity.Remove(blobPropertyName);
try
{
string decompressedMessage = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobName, cancellationToken);
entity[propertyName] = decompressedMessage;
entity.Remove(blobPropertyName);
}
catch (RequestFailedException e) when (e.Status == 404)
{
// The blob was not found, which can happen if:
// 1. A late message from a previous execution arrived after ContinueAsNew deleted the blobs
// 2. The blob was cleaned up due to retention policies
// 3. A race condition between blob cleanup and history retrieval for entities
// In these cases, we skip this entity as it belongs to a stale execution.
// See: https://github.com/Azure/azure-functions-durable-extension/issues/3264
this.settings.Logger.GeneralWarning(
this.storageAccountName,
this.taskHubName,
$"Blob '{blobName}' for property '{propertyName}' was not found. " +
$"This history entry may be from a previous execution and will be skipped. " +
$"InstanceId: {entity.PartitionKey}, RowKey: {entity.RowKey}");
return false;
}
catch (DurableTaskStorageException e) when (e.InnerException is RequestFailedException { Status: 404 })
{
// Same as above, but wrapped in DurableTaskStorageException
this.settings.Logger.GeneralWarning(
this.storageAccountName,
this.taskHubName,
$"Blob '{blobName}' for property '{propertyName}' was not found (wrapped exception). " +
$"This history entry may be from a previous execution and will be skipped. " +
$"InstanceId: {entity.PartitionKey}, RowKey: {entity.RowKey}");
return false;
}

// keep track of all the blobs associated with this execution
listOfBlobs.Add(blobName);
}
}

return true;
}

static string GetBlobPropertyName(string originalPropertyName)
Expand Down
Loading