Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
26734db
enhance purge
YunchuWang Mar 13, 2026
ee9812f
Merge branch 'main' of https://github.com/Azure/durabletask into wang…
YunchuWang Mar 13, 2026
8a886b4
revert timestamp change
YunchuWang Mar 13, 2026
db2cb4d
Refactor purge functionality and add tests for large message cleanup
YunchuWang Mar 13, 2026
1603320
remove semaphore
YunchuWang Mar 14, 2026
473cb65
Add partial purge with timeout support
YunchuWang Mar 16, 2026
972357a
Enforce 30s timeout cap and use effectiveToken for in-flight deletes
YunchuWang Mar 16, 2026
1ef227d
Enforce 30s purge timeout and return IsComplete
YunchuWang Mar 19, 2026
0362969
Add opt-in Timeout on PurgeInstanceFilter for partial purge
YunchuWang Mar 19, 2026
6e6a6ce
Address PR review comments for purge enhancement
YunchuWang Mar 20, 2026
3459012
Merge branch 'main' into wangbill/enpurge
YunchuWang Mar 20, 2026
d8bd90d
Add unit tests for Timeout/IsComplete purge feature
YunchuWang Mar 20, 2026
4d8b3f6
Address PR review comments
YunchuWang Mar 20, 2026
dcc69c4
Use effectiveToken for in-flight deletes and align docs
YunchuWang Mar 20, 2026
48bdb55
Merge branch 'main' into wangbill/enpurge
YunchuWang Mar 20, 2026
c18d935
Update src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
YunchuWang Mar 20, 2026
7de125b
Potential fix for pull request finding 'Constant condition'
YunchuWang Mar 20, 2026
b3c7ed2
Merge branch 'main' into wangbill/enpurge
YunchuWang Mar 20, 2026
6ea7372
Refactor deletion logic to use a dedicated async method for instance …
YunchuWang Mar 20, 2026
6736aad
Merge branch 'wangbill/enpurge' of https://github.com/Azure/durableta…
YunchuWang Mar 20, 2026
167d150
Add purge instance history method with optional timeout and correspon…
YunchuWang Mar 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------
#nullable enable
namespace DurableTask.AzureStorage.Tests.Storage
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Data.Tables;
using DurableTask.AzureStorage.Storage;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;

[TestClass]
public class TableDeleteBatchParallelTests
{
const string ConnectionString = "UseDevelopmentStorage=true";
const string TableName = "TestTable";

[TestMethod]
public async Task DeleteBatchParallelAsync_EmptyBatch_ReturnsEmptyResults()
{
Table table = CreateTableWithMockedClient(out _, out _);
var entities = new List<TableEntity>();

TableTransactionResults results = await table.DeleteBatchParallelAsync(entities);

Assert.AreEqual(0, results.Responses.Count);
Assert.AreEqual(0, results.RequestCount);
}

[TestMethod]
public async Task DeleteBatchParallelAsync_SingleBatch_SubmitsOneTransaction()
{
var entities = CreateTestEntities("pk", count: 50);
Table table = CreateTableWithMockedClient(out _, out Mock<TableClient> tableClient);

tableClient
.Setup(t => t.SubmitTransactionAsync(
It.Is<IEnumerable<TableTransactionAction>>(a => a.Count() == 50),
It.IsAny<CancellationToken>()))
.ReturnsAsync(CreateMockBatchResponse(50));

TableTransactionResults results = await table.DeleteBatchParallelAsync(entities);

Assert.AreEqual(50, results.Responses.Count);
tableClient.Verify(
t => t.SubmitTransactionAsync(It.IsAny<IEnumerable<TableTransactionAction>>(), It.IsAny<CancellationToken>()),
Times.Once);
}

[TestMethod]
public async Task DeleteBatchParallelAsync_MultipleBatches_SplitsIntoChunksOf100()
{
var entities = CreateTestEntities("pk", count: 250);
Table table = CreateTableWithMockedClient(out _, out Mock<TableClient> tableClient);

tableClient
.Setup(t => t.SubmitTransactionAsync(
It.IsAny<IEnumerable<TableTransactionAction>>(),
It.IsAny<CancellationToken>()))
.ReturnsAsync((IEnumerable<TableTransactionAction> batch, CancellationToken _) =>
CreateMockBatchResponse(batch.Count()));

TableTransactionResults results = await table.DeleteBatchParallelAsync(entities);

Assert.AreEqual(250, results.Responses.Count);
tableClient.Verify(
t => t.SubmitTransactionAsync(It.IsAny<IEnumerable<TableTransactionAction>>(), It.IsAny<CancellationToken>()),
Times.Exactly(3));
}

[TestMethod]
public async Task DeleteBatchParallelAsync_SubmitsBatchesConcurrently()
{
var entities = CreateTestEntities("pk", count: 500); // 5 batches of 100
int concurrentCount = 0;
int maxConcurrent = 0;

Table table = CreateTableWithMockedClient(out _, out Mock<TableClient> tableClient);

tableClient
.Setup(t => t.SubmitTransactionAsync(
It.IsAny<IEnumerable<TableTransactionAction>>(),
It.IsAny<CancellationToken>()))
.Returns(async (IEnumerable<TableTransactionAction> batch, CancellationToken _) =>
{
int current = Interlocked.Increment(ref concurrentCount);
int snapshot;
do
{
snapshot = Volatile.Read(ref maxConcurrent);
}
while (current > snapshot && Interlocked.CompareExchange(ref maxConcurrent, current, snapshot) != snapshot);

await Task.Delay(50);
Interlocked.Decrement(ref concurrentCount);

return CreateMockBatchResponse(batch.Count());
});

await table.DeleteBatchParallelAsync(entities);

// All 5 batches should run concurrently since there's no internal semaphore
Assert.IsTrue(
maxConcurrent > 1,
$"Expected concurrent execution, but max concurrent was {maxConcurrent}");
}

[TestMethod]
public async Task DeleteBatchParallelAsync_BatchFails404_FallsBackToIndividualDeletes()
{
var entities = CreateTestEntities("pk", count: 3);
Table table = CreateTableWithMockedClient(out _, out Mock<TableClient> tableClient);

tableClient
.Setup(t => t.SubmitTransactionAsync(
It.IsAny<IEnumerable<TableTransactionAction>>(),
It.IsAny<CancellationToken>()))
.ThrowsAsync(new RequestFailedException(404, "Entity not found"));

var mockResponse = new Mock<Response>();
tableClient
.Setup(t => t.DeleteEntityAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<ETag>(),
It.IsAny<CancellationToken>()))
.ReturnsAsync(mockResponse.Object);

TableTransactionResults results = await table.DeleteBatchParallelAsync(entities);

Assert.AreEqual(3, results.Responses.Count);
tableClient.Verify(
t => t.DeleteEntityAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<ETag>(), It.IsAny<CancellationToken>()),
Times.Exactly(3));
}

[TestMethod]
public async Task DeleteBatchParallelAsync_IndividualDeleteSkips404()
{
var entities = CreateTestEntities("pk", count: 3);
Table table = CreateTableWithMockedClient(out _, out Mock<TableClient> tableClient);

tableClient
.Setup(t => t.SubmitTransactionAsync(
It.IsAny<IEnumerable<TableTransactionAction>>(),
It.IsAny<CancellationToken>()))
.ThrowsAsync(new RequestFailedException(404, "Entity not found"));

int callCount = 0;
var mockResponse = new Mock<Response>();
tableClient
.Setup(t => t.DeleteEntityAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<ETag>(),
It.IsAny<CancellationToken>()))
.Returns((string pk, string rk, ETag ifMatch, CancellationToken ct) =>
{
int call = Interlocked.Increment(ref callCount);
if (call == 2)
{
throw new RequestFailedException(404, "Entity already deleted");
}
return Task.FromResult(mockResponse.Object);
});

TableTransactionResults results = await table.DeleteBatchParallelAsync(entities);

Assert.AreEqual(2, results.Responses.Count);
Assert.AreEqual(3, results.RequestCount);
}

[TestMethod]
public async Task DeleteBatchParallelAsync_ExactlyOneBatch_NoBoundaryIssues()
{
var entities = CreateTestEntities("pk", count: 100);
Table table = CreateTableWithMockedClient(out _, out Mock<TableClient> tableClient);

tableClient
.Setup(t => t.SubmitTransactionAsync(
It.Is<IEnumerable<TableTransactionAction>>(a => a.Count() == 100),
It.IsAny<CancellationToken>()))
.ReturnsAsync(CreateMockBatchResponse(100));

TableTransactionResults results = await table.DeleteBatchParallelAsync(entities);

Assert.AreEqual(100, results.Responses.Count);
tableClient.Verify(
t => t.SubmitTransactionAsync(It.IsAny<IEnumerable<TableTransactionAction>>(), It.IsAny<CancellationToken>()),
Times.Once);
}

[TestMethod]
public async Task DeleteBatchParallelAsync_101Entities_CreatesTwoBatches()
{
var entities = CreateTestEntities("pk", count: 101);
Table table = CreateTableWithMockedClient(out _, out Mock<TableClient> tableClient);

tableClient
.Setup(t => t.SubmitTransactionAsync(
It.IsAny<IEnumerable<TableTransactionAction>>(),
It.IsAny<CancellationToken>()))
.ReturnsAsync((IEnumerable<TableTransactionAction> batch, CancellationToken _) =>
CreateMockBatchResponse(batch.Count()));

TableTransactionResults results = await table.DeleteBatchParallelAsync(entities);

Assert.AreEqual(101, results.Responses.Count);
tableClient.Verify(
t => t.SubmitTransactionAsync(It.IsAny<IEnumerable<TableTransactionAction>>(), It.IsAny<CancellationToken>()),
Times.Exactly(2));
}

[TestMethod]
public async Task DeleteBatchParallelAsync_CancellationToken_IsPropagated()
{
var entities = CreateTestEntities("pk", count: 200);
using var cts = new CancellationTokenSource();
Table table = CreateTableWithMockedClient(out _, out Mock<TableClient> tableClient);

int batchesSubmitted = 0;
tableClient
.Setup(t => t.SubmitTransactionAsync(
It.IsAny<IEnumerable<TableTransactionAction>>(),
It.IsAny<CancellationToken>()))
.Returns(async (IEnumerable<TableTransactionAction> batch, CancellationToken ct) =>
{
int count = Interlocked.Increment(ref batchesSubmitted);
if (count == 1)
{
cts.Cancel();
}
ct.ThrowIfCancellationRequested();
return CreateMockBatchResponse(batch.Count());
});

await Assert.ThrowsExceptionAsync<OperationCanceledException>(
() => table.DeleteBatchParallelAsync(entities, cts.Token));
}

#region Helper Methods

static Table CreateTableWithMockedClient(
out Mock<TableServiceClient> tableServiceClient,
out Mock<TableClient> tableClient)
{
var settings = new AzureStorageOrchestrationServiceSettings
{
StorageAccountClientProvider = new StorageAccountClientProvider(ConnectionString),
};

var azureStorageClient = new AzureStorageClient(settings);

tableServiceClient = new Mock<TableServiceClient>(MockBehavior.Strict, ConnectionString);
tableClient = new Mock<TableClient>(MockBehavior.Loose, ConnectionString, TableName);
tableClient.Setup(t => t.Name).Returns(TableName);
tableServiceClient.Setup(t => t.GetTableClient(TableName)).Returns(tableClient.Object);

return new Table(azureStorageClient, tableServiceClient.Object, TableName);
}

static List<TableEntity> CreateTestEntities(string partitionKey, int count)
{
var entities = new List<TableEntity>(count);
for (int i = 0; i < count; i++)
{
entities.Add(new TableEntity(partitionKey, $"rk_{i:D5}")
{
ETag = ETag.All,
});
}
return entities;
}

static Response<IReadOnlyList<Response>> CreateMockBatchResponse(int count)
{
var responses = new List<Response>();
for (int i = 0; i < count; i++)
{
responses.Add(new Mock<Response>().Object);
}
return Response.FromValue<IReadOnlyList<Response>>(responses, new Mock<Response>().Object);
}

#endregion
}
}
29 changes: 25 additions & 4 deletions src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2030,6 +2030,21 @@ public Task<PurgeHistoryResult> PurgeInstanceHistoryAsync(DateTime createdTimeFr
return this.trackingStore.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus);
}

/// <summary>
/// Purge history for orchestrations that match the specified parameters, with a timeout.
/// Use this overload to perform partial purges and avoid timeouts when there are many instances.
/// Check <see cref="PurgeHistoryResult.IsComplete"/> to determine if more purging is needed.
/// </summary>
/// <param name="createdTimeFrom">CreatedTime of orchestrations. Purges history grater than this value.</param>
/// <param name="createdTimeTo">CreatedTime of orchestrations. Purges history less than this value.</param>
/// <param name="runtimeStatus">RuntimeStatus of orchestrations. You can specify several status.</param>
/// <param name="timeout">Maximum time to spend purging. Already-started deletions will complete before the method returns.</param>
/// <returns>Class containing number of storage requests sent, along with instances and rows deleted/purged</returns>
public Task<PurgeHistoryResult> PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus, TimeSpan timeout)
{
return this.trackingStore.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus, timeout);
}

/// <inheritdoc />
async Task<PurgeResult> IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(string instanceId)
{
Expand All @@ -2040,10 +2055,16 @@ async Task<PurgeResult> IOrchestrationServicePurgeClient.PurgeInstanceStateAsync
/// <inheritdoc />
async Task<PurgeResult> IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter)
{
PurgeHistoryResult storagePurgeHistoryResult = await this.PurgeInstanceHistoryAsync(
purgeInstanceFilter.CreatedTimeFrom,
purgeInstanceFilter.CreatedTimeTo,
purgeInstanceFilter.RuntimeStatus);
PurgeHistoryResult storagePurgeHistoryResult = purgeInstanceFilter.Timeout.HasValue
? await this.PurgeInstanceHistoryAsync(
purgeInstanceFilter.CreatedTimeFrom,
purgeInstanceFilter.CreatedTimeTo,
purgeInstanceFilter.RuntimeStatus,
purgeInstanceFilter.Timeout.Value)
: await this.PurgeInstanceHistoryAsync(
purgeInstanceFilter.CreatedTimeFrom,
purgeInstanceFilter.CreatedTimeTo,
purgeInstanceFilter.RuntimeStatus);
return storagePurgeHistoryResult.ToCorePurgeHistoryResult();
}
#nullable enable
Expand Down
21 changes: 19 additions & 2 deletions src/DurableTask.AzureStorage/MessageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,9 @@ public string GetNewLargeMessageBlobName(MessageData message)

public async Task<int> DeleteLargeMessageBlobs(string sanitizedInstanceId, CancellationToken cancellationToken = default)
{
int storageOperationCount = 1;
if (await this.blobContainer.ExistsAsync(cancellationToken))
int storageOperationCount = 0;

try
{
await foreach (Page<Blob> page in this.blobContainer.ListBlobsAsync(sanitizedInstanceId, cancellationToken).AsPages())
{
Expand All @@ -329,6 +330,22 @@ public async Task<int> DeleteLargeMessageBlobs(string sanitizedInstanceId, Cance

storageOperationCount += page.Values.Count;
}

// Count the list operation even if no blobs found (the initial list request still happened)
if (storageOperationCount == 0)
{
storageOperationCount = 1;
}
}
catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404)
{
// Container does not exist; nothing to delete.
storageOperationCount = 1;
}
catch (Azure.RequestFailedException ex) when (ex.Status == 404)
{
// Container does not exist; nothing to delete.
storageOperationCount = 1;
}

return storageOperationCount;
Expand Down
Loading
Loading