Skip to content

Commit 541e30f

Browse files
YunchuWanghalspang
authored andcommitted
Partial Purge Support (#400)
* Enhance PurgeResult to include completion status and update related gRPC response handling. Added new AbandonTask RPCs in orchestrator_service.proto. Updated versions.txt with latest proto file source. * Update ShimDurableTaskClientTests to assert IsComplete property is not false in PurgeResult * Add end-to-end tests for PurgeInstance and PurgeInstances_WithFilter methods in GrpcDurableTaskClientIntegrationTests. These tests verify the correct behavior of instance purging and ensure instances are removed as expected.
1 parent dc7c09b commit 541e30f

File tree

14 files changed

+1205
-658
lines changed

14 files changed

+1205
-658
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
- Introduce default version setting to DurableTaskClient and expose to orchestrator ([#393](https://github.com/microsoft/durabletask-dotnet/pull/393))
66
- Add support for local credential types in DTS libraries ([#396](https://github.com/microsoft/durabletask-dotnet/pull/396))
77
- Add utility for easier version comparison in orchestration context ([#394](https://github.com/microsoft/durabletask-dotnet/pull/394))
8-
- Add tags support for orchestrations ([#397])(https://github.com/microsoft/durabletask-dotnet/pull/397)
8+
- Add tags support for orchestrations ([#397](https://github.com/microsoft/durabletask-dotnet/pull/397))
9+
- Add support for versioning in the gRPC worker ([#401](https://github.com/microsoft/durabletask-dotnet/pull/401))
910

1011
## v1.8.1
1112

src/Abstractions/TaskOrchestrationContext.cs

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4+
using Microsoft.DurableTask.Abstractions;
45
using Microsoft.DurableTask.Entities;
56
using Microsoft.Extensions.Logging;
67

@@ -421,32 +422,7 @@ public virtual ILogger CreateReplaySafeLogger<T>()
421422
/// <returns>True if the orchestration's version is greater than the provided version, false otherwise.</returns>
422423
public virtual int CompareVersionTo(string version)
423424
{
424-
// Both versions are empty, treat as equal.
425-
if (string.IsNullOrWhiteSpace(this.Version) && string.IsNullOrWhiteSpace(version))
426-
{
427-
return 0;
428-
}
429-
430-
// An empty version in the context is always less than a defined version in the parameter.
431-
if (string.IsNullOrWhiteSpace(this.Version))
432-
{
433-
return -1;
434-
}
435-
436-
// An empty version in the parameter is always less than a defined version in the context.
437-
if (string.IsNullOrWhiteSpace(version))
438-
{
439-
return 1;
440-
}
441-
442-
// If both versions use the .NET Version class, return that comparison.
443-
if (System.Version.TryParse(this.Version, out Version contextVersion) && System.Version.TryParse(version, out Version otherVersion))
444-
{
445-
return contextVersion.CompareTo(otherVersion);
446-
}
447-
448-
// If we have gotten to here, we don't know the syntax of the versions we are comparing, use a string comparison as a final check.
449-
return string.Compare(this.Version, version, StringComparison.OrdinalIgnoreCase);
425+
return TaskOrchestrationVersioningUtils.CompareVersions(this.Version, version);
450426
}
451427

452428
class ReplaySafeLogger : ILogger
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
namespace Microsoft.DurableTask.Abstractions;
5+
6+
/// <summary>
7+
/// Utilities for handling Orchestration/Task versioning operations.
8+
/// </summary>
9+
public static class TaskOrchestrationVersioningUtils
10+
{
11+
/// <summary>
12+
/// Compare two versions to each other.
13+
/// </summary>
14+
/// <remarks>
15+
/// This method's comparison is handled in the following order:
16+
/// 1. The versions are checked if they are empty (non-versioned). Both being empty signifies equality.
17+
/// 2. If sourceVersion is empty but otherVersion is defined, this is treated as the source being less than the other.
18+
/// 3. If otherVersion is empty but sourceVersion is defined, this is treated as the source being greater than the other.
19+
/// 4. Both versions are attempted to be parsed into System.Version and compared as such.
20+
/// 5. If all else fails, a direct string comparison is done between the versions.
21+
/// </remarks>
22+
/// <param name="sourceVersion">The source version that will be compared against the other version.</param>
23+
/// <param name="otherVersion">The other version to compare against.</param>
24+
/// <returns>An int representing how sourceVersion compares to otherVersion.</returns>
25+
public static int CompareVersions(string sourceVersion, string otherVersion)
26+
{
27+
// Both versions are empty, treat as equal.
28+
if (string.IsNullOrWhiteSpace(sourceVersion) && string.IsNullOrWhiteSpace(otherVersion))
29+
{
30+
return 0;
31+
}
32+
33+
// An empty version in the context is always less than a defined version in the parameter.
34+
if (string.IsNullOrWhiteSpace(sourceVersion))
35+
{
36+
return -1;
37+
}
38+
39+
// An empty version in the parameter is always less than a defined version in the context.
40+
if (string.IsNullOrWhiteSpace(otherVersion))
41+
{
42+
return 1;
43+
}
44+
45+
// If both versions use the .NET Version class, return that comparison.
46+
if (System.Version.TryParse(sourceVersion, out Version parsedSourceVersion) && System.Version.TryParse(otherVersion, out Version parsedOtherVersion))
47+
{
48+
return parsedSourceVersion.CompareTo(parsedOtherVersion);
49+
}
50+
51+
// If we have gotten to here, we don't know the syntax of the versions we are comparing, use a string comparison as a final check.
52+
return string.Compare(sourceVersion, otherVersion, StringComparison.OrdinalIgnoreCase);
53+
}
54+
}

src/Client/Core/DurableTaskClient.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,19 @@ public virtual Task<PurgeResult> PurgeAllInstancesAsync(
399399
throw new NotSupportedException($"{this.GetType()} does not support purging of orchestration instances.");
400400
}
401401

402+
/// <summary>
403+
/// Abandons a task orchestration work item to be re-enqueued at a later time.
404+
/// </summary>
405+
/// <param name="completionToken">The completion token that was given when the WorkItem was fetched.</param>
406+
/// <param name="cancellation">
407+
/// A <see cref="CancellationToken"/> that can be used to cancel the purge operation.
408+
/// </param>
409+
/// <returns>A task that completes when the WorkItem has been abandoned.</returns>
410+
public virtual Task AbandonOrchestrationTask(string completionToken, CancellationToken cancellation = default)
411+
{
412+
throw new NotSupportedException($"{this.GetType()} does not support abandoning task orchestration work items.");
413+
}
414+
402415
// TODO: Create task hub
403416

404417
// TODO: Delete task hub

src/Client/Core/PurgeResult.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,29 @@ public PurgeResult(int count)
1818
this.PurgedInstanceCount = count;
1919
}
2020

21+
/// <summary>
22+
/// Initializes a new instance of the <see cref="PurgeResult" /> class.
23+
/// </summary>
24+
/// <param name="count">The number of instances deleted.</param>
25+
/// <param name="isComplete">A value indicating whether the purge operation is complete.
26+
/// If true, the purge operation is complete. All instances were purged.
27+
/// If false, not all instances were purged. Please purge again.
28+
/// If null, whether or not all instances were purged is undefined.</param>
29+
public PurgeResult(int count, bool? isComplete)
30+
: this(count)
31+
{
32+
this.IsComplete = isComplete;
33+
}
34+
2135
/// <summary>
2236
/// Gets the number of purged instances.
2337
/// </summary>
2438
/// <value>The number of purged instances.</value>
2539
public int PurgedInstanceCount { get; }
40+
41+
/// <summary>
42+
/// Gets a value indicating whether the purge operation is complete.
43+
/// </summary>
44+
/// <value>A value indicating whether the purge operation is complete.</value>
45+
public bool? IsComplete { get; }
2646
}

src/Client/Grpc/GrpcDurableTaskClient.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,17 @@ public override Task<PurgeResult> PurgeAllInstancesAsync(
396396
return this.PurgeInstancesCoreAsync(request, cancellation);
397397
}
398398

399+
/// <inheritdoc/>
400+
public override async Task AbandonOrchestrationTask(string completionToken, CancellationToken cancellation = default)
401+
{
402+
P.AbandonOrchestrationTaskRequest request = new()
403+
{
404+
CompletionToken = completionToken,
405+
};
406+
407+
await this.sidecarClient.AbandonTaskOrchestratorWorkItemAsync(request, cancellationToken: cancellation);
408+
}
409+
399410
static AsyncDisposable GetCallInvoker(GrpcDurableTaskClientOptions options, out CallInvoker callInvoker)
400411
{
401412
if (options.Channel is GrpcChannel c)
@@ -446,7 +457,7 @@ async Task<PurgeResult> PurgeInstancesCoreAsync(
446457
{
447458
P.PurgeInstancesResponse response = await this.sidecarClient.PurgeInstancesAsync(
448459
request, cancellationToken: cancellation);
449-
return new PurgeResult(response.DeletedInstanceCount);
460+
return new PurgeResult(response.DeletedInstanceCount, response.IsComplete);
450461
}
451462
catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
452463
{

src/Grpc/orchestrator_service.proto

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,7 @@ message PurgeInstanceFilter {
463463

464464
message PurgeInstancesResponse {
465465
int32 deletedInstanceCount = 1;
466+
google.protobuf.BoolValue isComplete = 2;
466467
}
467468

468469
message CreateTaskHubRequest {
@@ -617,6 +618,30 @@ message StartNewOrchestrationAction {
617618
google.protobuf.Timestamp scheduledTime = 5;
618619
}
619620

621+
message AbandonActivityTaskRequest {
622+
string completionToken = 1;
623+
}
624+
625+
message AbandonActivityTaskResponse {
626+
// Empty.
627+
}
628+
629+
message AbandonOrchestrationTaskRequest {
630+
string completionToken = 1;
631+
}
632+
633+
message AbandonOrchestrationTaskResponse {
634+
// Empty.
635+
}
636+
637+
message AbandonEntityTaskRequest {
638+
string completionToken = 1;
639+
}
640+
641+
message AbandonEntityTaskResponse {
642+
// Empty.
643+
}
644+
620645
service TaskHubSidecarService {
621646
// Sends a hello request to the sidecar service.
622647
rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty);
@@ -678,6 +703,15 @@ service TaskHubSidecarService {
678703

679704
// clean entity storage
680705
rpc CleanEntityStorage(CleanEntityStorageRequest) returns (CleanEntityStorageResponse);
706+
707+
// Abandons a single work item
708+
rpc AbandonTaskActivityWorkItem(AbandonActivityTaskRequest) returns (AbandonActivityTaskResponse);
709+
710+
// Abandon an orchestration work item
711+
rpc AbandonTaskOrchestratorWorkItem(AbandonOrchestrationTaskRequest) returns (AbandonOrchestrationTaskResponse);
712+
713+
// Abandon an entity work item
714+
rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse);
681715
}
682716

683717
message GetWorkItemsRequest {

src/Grpc/versions.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
# The following files were downloaded from branch main at 2025-03-19 19:55:31 UTC
2-
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/4792f47019ab2b3e9ea979fb4af72427a4144c51/protos/orchestrator_service.proto
1+
# The following files were downloaded from branch main at 2025-03-24 23:37:31 UTC
2+
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/c85ef11430ff8e10e21105abb545b0803bb86c66/protos/orchestrator_service.proto

src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Microsoft.DurableTask.Worker.Hosting;
55
using Microsoft.Extensions.DependencyInjection;
66
using Microsoft.Extensions.Options;
7+
using static Microsoft.DurableTask.Worker.DurableTaskWorkerOptions;
78

89
namespace Microsoft.DurableTask.Worker;
910

@@ -86,4 +87,25 @@ public static IDurableTaskWorkerBuilder UseBuildTarget<TTarget, TOptions>(this I
8687
});
8788
return builder;
8889
}
90+
91+
/// <summary>
92+
/// Configures the versioning options for this builder.
93+
/// </summary>
94+
/// <param name="builder">The builder to set the builder target for.</param>
95+
/// <param name="versionOptions">The collection of options specified for versioning the worker.</param>
96+
/// <returns>The original builder, for call chaining.</returns>
97+
public static IDurableTaskWorkerBuilder UseVersioning(this IDurableTaskWorkerBuilder builder, VersioningOptions versionOptions)
98+
{
99+
Check.NotNull(builder);
100+
builder.Configure(options =>
101+
{
102+
options.Versioning = new VersioningOptions
103+
{
104+
Version = versionOptions.Version,
105+
MatchStrategy = versionOptions.MatchStrategy,
106+
FailureStrategy = versionOptions.FailureStrategy,
107+
};
108+
});
109+
return builder;
110+
}
89111
}

0 commit comments

Comments
 (0)