Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
26734db
enhance purge
YunchuWang Mar 13, 2026
ee9812f
Merge branch 'main' of https://github.com/Azure/durabletask into wang…
YunchuWang Mar 13, 2026
8a886b4
revert timestamp change
YunchuWang Mar 13, 2026
db2cb4d
Refactor purge functionality and add tests for large message cleanup
YunchuWang Mar 13, 2026
1603320
remove semaphore
YunchuWang Mar 14, 2026
473cb65
Add partial purge with timeout support
YunchuWang Mar 16, 2026
972357a
Enforce 30s timeout cap and use effectiveToken for in-flight deletes
YunchuWang Mar 16, 2026
1ef227d
Enforce 30s purge timeout and return IsComplete
YunchuWang Mar 19, 2026
0362969
Add opt-in Timeout on PurgeInstanceFilter for partial purge
YunchuWang Mar 19, 2026
6e6a6ce
Address PR review comments for purge enhancement
YunchuWang Mar 20, 2026
3459012
Merge branch 'main' into wangbill/enpurge
YunchuWang Mar 20, 2026
d8bd90d
Add unit tests for Timeout/IsComplete purge feature
YunchuWang Mar 20, 2026
4d8b3f6
Address PR review comments
YunchuWang Mar 20, 2026
dcc69c4
Use effectiveToken for in-flight deletes and align docs
YunchuWang Mar 20, 2026
48bdb55
Merge branch 'main' into wangbill/enpurge
YunchuWang Mar 20, 2026
c18d935
Update src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
YunchuWang Mar 20, 2026
7de125b
Potential fix for pull request finding 'Constant condition'
YunchuWang Mar 20, 2026
b3c7ed2
Merge branch 'main' into wangbill/enpurge
YunchuWang Mar 20, 2026
6ea7372
Refactor deletion logic to use a dedicated async method for instance …
YunchuWang Mar 20, 2026
6736aad
Merge branch 'wangbill/enpurge' of https://github.com/Azure/durableta…
YunchuWang Mar 20, 2026
167d150
Add purge instance history method with optional timeout and correspon…
YunchuWang Mar 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ----------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1886,9 +1886,9 @@ public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string i
/// <summary>
/// Gets the state of all orchestration instances that match the specified parameters.
/// </summary>
/// <param name="createdTimeFrom">CreatedTime of orchestrations. Fetch status grater than this value.</param>
/// <param name="createdTimeFrom">CreatedTime of orchestrations. Fetch status greater than this value.</param>
/// <param name="createdTimeTo">CreatedTime of orchestrations. Fetch status less than this value.</param>
/// <param name="runtimeStatus">RuntimeStatus of orchestrations. You can specify several status.</param>
/// <param name="runtimeStatus">RuntimeStatus of orchestrations. You can specify several statuses.</param>
/// <param name="cancellationToken">Cancellation Token</param>
/// <returns>List of <see cref="OrchestrationState"/></returns>
public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus, CancellationToken cancellationToken = default(CancellationToken))
Expand All @@ -1900,9 +1900,9 @@ public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string i
/// <summary>
/// Gets the state of all orchestration instances that match the specified parameters.
/// </summary>
/// <param name="createdTimeFrom">CreatedTime of orchestrations. Fetch status grater than this value.</param>
/// <param name="createdTimeFrom">CreatedTime of orchestrations. Fetch status greater than this value.</param>
/// <param name="createdTimeTo">CreatedTime of orchestrations. Fetch status less than this value.</param>
/// <param name="runtimeStatus">RuntimeStatus of orchestrations. You can specify several status.</param>
/// <param name="runtimeStatus">RuntimeStatus of orchestrations. You can specify several statuses.</param>
/// <param name="top">Top is number of records per one request.</param>
/// <param name="continuationToken">ContinuationToken of the pager.</param>
/// <param name="cancellationToken">Cancellation Token</param>
Expand Down Expand Up @@ -2021,9 +2021,9 @@ public Task<PurgeHistoryResult> PurgeInstanceHistoryAsync(string instanceId)
/// <summary>
/// Purge history for orchestrations that match the specified parameters.
/// </summary>
/// <param name="createdTimeFrom">CreatedTime of orchestrations. Purges history grater than this value.</param>
/// <param name="createdTimeFrom">CreatedTime of orchestrations. Purges history greater than this value.</param>
/// <param name="createdTimeTo">CreatedTime of orchestrations. Purges history less than this value.</param>
/// <param name="runtimeStatus">RuntimeStatus of orchestrations. You can specify several status.</param>
/// <param name="runtimeStatus">RuntimeStatus of orchestrations. You can specify several statuses.</param>
/// <returns>Class containing number of storage requests sent, along with instances and rows deleted/purged</returns>
public Task<PurgeHistoryResult> PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus)
{
Expand All @@ -2040,10 +2040,11 @@ async Task<PurgeResult> IOrchestrationServicePurgeClient.PurgeInstanceStateAsync
/// <inheritdoc />
async Task<PurgeResult> IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter)
{
PurgeHistoryResult storagePurgeHistoryResult = await this.PurgeInstanceHistoryAsync(
purgeInstanceFilter.CreatedTimeFrom,
purgeInstanceFilter.CreatedTimeTo,
purgeInstanceFilter.RuntimeStatus);
PurgeHistoryResult storagePurgeHistoryResult = await this.trackingStore.PurgeInstanceHistoryAsync(
purgeInstanceFilter.CreatedTimeFrom,
purgeInstanceFilter.CreatedTimeTo,
purgeInstanceFilter.RuntimeStatus,
purgeInstanceFilter.Timeout);
return storagePurgeHistoryResult.ToCorePurgeHistoryResult();
}
#nullable enable
Expand Down
21 changes: 19 additions & 2 deletions src/DurableTask.AzureStorage/MessageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,9 @@ public string GetNewLargeMessageBlobName(MessageData message)

public async Task<int> DeleteLargeMessageBlobs(string sanitizedInstanceId, CancellationToken cancellationToken = default)
{
int storageOperationCount = 1;
if (await this.blobContainer.ExistsAsync(cancellationToken))
int storageOperationCount = 0;

try
{
await foreach (Page<Blob> page in this.blobContainer.ListBlobsAsync(sanitizedInstanceId, cancellationToken).AsPages())
{
Expand All @@ -329,6 +330,22 @@ public async Task<int> DeleteLargeMessageBlobs(string sanitizedInstanceId, Cance

storageOperationCount += page.Values.Count;
}

// Count the list operation even if no blobs found (the initial list request still happened)
if (storageOperationCount == 0)
{
storageOperationCount = 1;
}
}
catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404)
{
// Container does not exist; nothing to delete.
storageOperationCount = 1;
}
catch (Azure.RequestFailedException ex) when (ex.Status == 404)
{
// Container does not exist; nothing to delete.
storageOperationCount = 1;
}

return storageOperationCount;
Expand Down
23 changes: 22 additions & 1 deletion src/DurableTask.AzureStorage/PurgeHistoryResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ public PurgeHistoryResult(int storageRequests, int instancesDeleted, int rowsDel
this.RowsDeleted = rowsDeleted;
}

/// <summary>
/// Constructor for purge history statistics with completion status.
/// </summary>
/// <param name="storageRequests">Requests sent to storage</param>
/// <param name="instancesDeleted">Number of instances deleted</param>
/// <param name="rowsDeleted">Number of rows deleted</param>
/// <param name="isComplete">Whether the purge operation completed all matching instances.</param>
public PurgeHistoryResult(int storageRequests, int instancesDeleted, int rowsDeleted, bool? isComplete)
: this(storageRequests, instancesDeleted, rowsDeleted)
{
this.IsComplete = isComplete;
}

/// <summary>
/// Number of requests sent to Storage during this execution of purge history
/// </summary>
Expand All @@ -48,12 +61,20 @@ public PurgeHistoryResult(int storageRequests, int instancesDeleted, int rowsDel
/// </summary>
public int RowsDeleted { get; }

/// <summary>
/// Gets a value indicating whether the purge operation is complete.
/// <c>true</c> if all matching instances were purged;
/// <c>false</c> if more instances remain and purge should be called again;
/// <c>null</c> if completion status is unknown.
/// </summary>
public bool? IsComplete { get; }

/// <summary>
/// Converts from AzureStorage.PurgeHistoryResult to Core.PurgeResult type
/// </summary>
public PurgeResult ToCorePurgeHistoryResult()
{
return new PurgeResult(this.InstancesDeleted);
return new PurgeResult(this.InstancesDeleted, this.IsComplete);
}
}
}
111 changes: 111 additions & 0 deletions src/DurableTask.AzureStorage/Storage/Table.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,117 @@ public async Task<TableTransactionResults> DeleteBatchAsync<T>(IEnumerable<T> en
return await this.ExecuteBatchAsync(entityBatch, item => new TableTransactionAction(TableTransactionActionType.Delete, item), cancellationToken: cancellationToken);
}

/// <summary>
/// Deletes entities in parallel batches of up to 100. Each batch is an atomic transaction,
/// but multiple batches are submitted concurrently for improved throughput.
/// Concurrency is controlled by the global <see cref="Http.ThrottlingHttpPipelinePolicy"/>.
/// If a batch fails because an entity was already deleted (404/EntityNotFound),
/// it falls back to individual deletes for that batch, skipping already-deleted entities.
/// </summary>
public async Task<TableTransactionResults> DeleteBatchParallelAsync<T>(
IReadOnlyList<T> entityBatch,
CancellationToken cancellationToken = default) where T : ITableEntity
{
if (entityBatch.Count == 0)
{
return new TableTransactionResults(Array.Empty<Response>(), TimeSpan.Zero, 0);
}

const int batchSize = 100;
int chunkCount = (entityBatch.Count + batchSize - 1) / batchSize;
var chunks = new List<List<TableTransactionAction>>(chunkCount);

var currentChunk = new List<TableTransactionAction>(batchSize);
foreach (T entity in entityBatch)
{
currentChunk.Add(new TableTransactionAction(TableTransactionActionType.Delete, entity));
if (currentChunk.Count == batchSize)
{
chunks.Add(currentChunk);
currentChunk = new List<TableTransactionAction>(batchSize);
}
}

if (currentChunk.Count > 0)
{
chunks.Add(currentChunk);
}

var resultsBuilder = new TableTransactionResultsBuilder();

var stopwatch = Stopwatch.StartNew();
TableTransactionResults[] allResults = await Task.WhenAll(
chunks.Select(chunk => this.ExecuteBatchWithFallbackAsync(chunk, cancellationToken)));
stopwatch.Stop();

foreach (TableTransactionResults result in allResults)
{
resultsBuilder.Add(result);
}

TableTransactionResults aggregatedResults = resultsBuilder.ToResults();
return new TableTransactionResults(aggregatedResults.Responses, stopwatch.Elapsed, aggregatedResults.RequestCount);
}

/// <summary>
/// Executes a batch transaction. If it fails due to an entity not found (404),
/// falls back to individual delete operations, skipping entities that are already gone.
/// </summary>
async Task<TableTransactionResults> ExecuteBatchWithFallbackAsync(
List<TableTransactionAction> batch,
CancellationToken cancellationToken)
{
try
{
return await this.ExecuteBatchAsync(batch, cancellationToken);
}
catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404)
{
// One or more entities in the batch were already deleted.
// Fall back to individual deletes, skipping 404s.
return await this.DeleteEntitiesIndividuallyAsync(batch, cancellationToken);
}
catch (RequestFailedException ex) when (ex.Status == 404)
{
return await this.DeleteEntitiesIndividuallyAsync(batch, cancellationToken);
}
}

async Task<TableTransactionResults> DeleteEntitiesIndividuallyAsync(
List<TableTransactionAction> batch,
CancellationToken cancellationToken)
{
var responses = new List<Response>();
var stopwatch = Stopwatch.StartNew();
int requestCount = 0;

foreach (TableTransactionAction action in batch)
{
requestCount++;
try
{
Response response = await this.tableClient.DeleteEntityAsync(
action.Entity.PartitionKey,
action.Entity.RowKey,
ETag.All,
cancellationToken).DecorateFailure();
responses.Add(response);
this.stats.TableEntitiesWritten.Increment();
}
catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404)
{
// Entity already deleted; skip.
}
catch (RequestFailedException ex) when (ex.Status == 404)
{
// Entity already deleted; skip.
}
}

stopwatch.Stop();
return new TableTransactionResults(responses, stopwatch.Elapsed, requestCount);
}

public async Task<TableTransactionResults> InsertOrMergeBatchAsync<T>(IEnumerable<T> entityBatch, CancellationToken cancellationToken = default) where T : ITableEntity
{
TableTransactionResults results = await this.ExecuteBatchAsync(entityBatch, item => new TableTransactionAction(TableTransactionActionType.UpsertMerge, item), cancellationToken: cancellationToken);
Expand Down
Loading
Loading