From 6eeaff0f1847dea5c0c945d4dfc8d2e3a08832b7 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Mon, 24 Mar 2025 16:40:11 -0700 Subject: [PATCH 1/7] Enhance PurgeResult to include completion status and update related gRPC response handling. Added new AbandonTask RPCs in orchestrator_service.proto. Updated versions.txt with latest proto file source. --- src/Client/Core/PurgeResult.cs | 10 ++++++- src/Client/Grpc/GrpcDurableTaskClient.cs | 2 +- src/Grpc/orchestrator_service.proto | 34 ++++++++++++++++++++++++ src/Grpc/versions.txt | 4 +-- 4 files changed, 46 insertions(+), 4 deletions(-) diff --git a/src/Client/Core/PurgeResult.cs b/src/Client/Core/PurgeResult.cs index be4c61257..25fb13511 100644 --- a/src/Client/Core/PurgeResult.cs +++ b/src/Client/Core/PurgeResult.cs @@ -12,10 +12,12 @@ public class PurgeResult /// Initializes a new instance of the class. /// /// The count of instances purged. - public PurgeResult(int count) + /// A value indicating whether the purge operation is complete. Default is null. + public PurgeResult(int count, bool? isComplete = null) { Check.Argument(count >= 0, nameof(count), "Count must be non-negative"); this.PurgedInstanceCount = count; + this.IsComplete = isComplete; } /// @@ -23,4 +25,10 @@ public PurgeResult(int count) /// /// The number of purged instances. public int PurgedInstanceCount { get; } + + /// + /// Gets a value indicating whether the purge operation is complete. + /// + /// A value indicating whether the purge operation is complete. + public bool? IsComplete { get; } } diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index 32c95b64b..4d32093e9 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -446,7 +446,7 @@ async Task PurgeInstancesCoreAsync( { P.PurgeInstancesResponse response = await this.sidecarClient.PurgeInstancesAsync( request, cancellationToken: cancellation); - return new PurgeResult(response.DeletedInstanceCount); + return new PurgeResult(response.DeletedInstanceCount, response.IsComplete); } catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled) { diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index 64e752818..5dd39cfe6 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -463,6 +463,7 @@ message PurgeInstanceFilter { message PurgeInstancesResponse { int32 deletedInstanceCount = 1; + google.protobuf.BoolValue isComplete = 2; } message CreateTaskHubRequest { @@ -617,6 +618,30 @@ message StartNewOrchestrationAction { google.protobuf.Timestamp scheduledTime = 5; } +message AbandonActivityTaskRequest { + string completionToken = 1; +} + +message AbandonActivityTaskResponse { + // Empty. +} + +message AbandonOrchestrationTaskRequest { + string completionToken = 1; +} + +message AbandonOrchestrationTaskResponse { + // Empty. +} + +message AbandonEntityTaskRequest { + string completionToken = 1; +} + +message AbandonEntityTaskResponse { + // Empty. +} + service TaskHubSidecarService { // Sends a hello request to the sidecar service. rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty); @@ -678,6 +703,15 @@ service TaskHubSidecarService { // clean entity storage rpc CleanEntityStorage(CleanEntityStorageRequest) returns (CleanEntityStorageResponse); + + // Abandons a single work item + rpc AbandonTaskActivityWorkItem(AbandonActivityTaskRequest) returns (AbandonActivityTaskResponse); + + // Abandon an orchestration work item + rpc AbandonTaskOrchestratorWorkItem(AbandonOrchestrationTaskRequest) returns (AbandonOrchestrationTaskResponse); + + // Abandon an entity work item + rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse); } message GetWorkItemsRequest { diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index c016c8199..733b20c07 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-03-19 19:55:31 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/4792f47019ab2b3e9ea979fb4af72427a4144c51/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2025-03-24 23:37:31 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/c85ef11430ff8e10e21105abb545b0803bb86c66/protos/orchestrator_service.proto From 332f35dcccde50803fde71e148fa4a31663abb21 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Mon, 24 Mar 2025 17:43:26 -0700 Subject: [PATCH 2/7] Update ShimDurableTaskClientTests to assert IsComplete property is null in PurgeResult --- .../ShimDurableTaskClientTests.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs index cf67af96b..a9a5d7caa 100644 --- a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs +++ b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs @@ -174,6 +174,7 @@ public async Task PurgeInstanceMetadata() // assert this.orchestrationClient.VerifyAll(); result.PurgedInstanceCount.Should().Be(1); + result.IsComplete.Should().BeNull(); } [Fact] From 951d857545a0740cdf0047169daf8170ca6b69d3 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Mon, 24 Mar 2025 17:55:24 -0700 Subject: [PATCH 3/7] Add end-to-end tests for PurgeInstance and PurgeInstances_WithFilter methods in GrpcDurableTaskClientIntegrationTests. These tests verify the correct behavior of instance purging and ensure instances are removed as expected. --- .../GrpcDurableTaskClientIntegrationTests.cs | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index a208671ff..5177d7245 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -150,6 +150,80 @@ await server.Client.ScheduleNewOrchestrationInstanceAsync( page.ContinuationToken.Should().NotBeNull(); } + [Fact] + public async Task PurgeInstance_EndToEnd() + { + // Arrange + await using HostTestLifetime server = await this.StartAsync(); + + // Create and complete an orchestration instance + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, input: false); + + // Wait for it to start and raise event to complete it + await server.Client.WaitForInstanceStartAsync(instanceId, default); + await server.Client.RaiseEventAsync(instanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(instanceId, default); + + // Verify instance exists before purge + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.InstanceId.Should().Be(instanceId); + metadata.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + + // Act + PurgeResult result = await server.Client.PurgeInstanceAsync( + instanceId, + new PurgeInstanceOptions { Recursive = true }); + + // Assert + result.Should().NotBeNull(); + result.PurgedInstanceCount.Should().Be(1); + result.IsComplete.Should().BeNull(); + // Verify instance no longer exists + OrchestrationMetadata? instance = await server.Client.GetInstanceAsync(instanceId, false); + instance.Should().BeNull(); + } + + [Fact] + public async Task PurgeInstances_WithFilter_EndToEnd() + { + // Arrange + await using HostTestLifetime server = await this.StartAsync(); + List instanceIds = new List(); + + // Create multiple instances + for (int i = 0; i < 3; i++) + { + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, input: false); + instanceIds.Add(instanceId); + + // Wait for it to start and raise event to complete it + await server.Client.WaitForInstanceStartAsync(instanceId, default); + await server.Client.RaiseEventAsync(instanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(instanceId, default); + } + + // Act + PurgeResult result = await server.Client.PurgeAllInstancesAsync( + new PurgeInstancesFilter( + CreatedFrom: DateTime.UtcNow.AddMinutes(-5), + CreatedTo: DateTime.UtcNow, + Statuses: new[] { OrchestrationRuntimeStatus.Completed })); + + // Assert + result.Should().NotBeNull(); + result.PurgedInstanceCount.Should().BeGreaterThan(3); + result.IsComplete.Should().BeNull(); + // Verify instances no longer exist + foreach (string instanceId in instanceIds) + { + OrchestrationMetadata? instance = await server.Client.GetInstanceAsync(instanceId, false); + instance.Should().BeNull(); + } + } + Task StartAsync() { static async Task Orchestration(TaskOrchestrationContext context, bool shouldThrow) From 64503f731e251f8b78545d9931597f0b06ab795d Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Tue, 25 Mar 2025 09:15:24 -0700 Subject: [PATCH 4/7] Refactor PurgeResult --- src/Client/Core/PurgeResult.cs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/Client/Core/PurgeResult.cs b/src/Client/Core/PurgeResult.cs index 25fb13511..6aadfd293 100644 --- a/src/Client/Core/PurgeResult.cs +++ b/src/Client/Core/PurgeResult.cs @@ -12,11 +12,23 @@ public class PurgeResult /// Initializes a new instance of the class. /// /// The count of instances purged. - /// A value indicating whether the purge operation is complete. Default is null. - public PurgeResult(int count, bool? isComplete = null) + public PurgeResult(int count) { Check.Argument(count >= 0, nameof(count), "Count must be non-negative"); this.PurgedInstanceCount = count; + } + + /// + /// Initializes a new instance of the class. + /// + /// The number of instances deleted. + /// A value indicating whether the purge operation is complete. + /// If true, the purge operation is complete. All instances were purged. + /// If false, not all instances were purged. Please purge again. + /// If null, default to legacy purge behavior. All instances were purged. + public PurgeResult(int count, bool? isComplete) + : this(count) + { this.IsComplete = isComplete; } From 87960e02aedbc533f9adfea0f99d7c09dbf9ba8e Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Tue, 25 Mar 2025 09:17:01 -0700 Subject: [PATCH 5/7] feedback --- .../GrpcDurableTaskClientIntegrationTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index 5177d7245..50fe6e070 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -179,7 +179,7 @@ public async Task PurgeInstance_EndToEnd() // Assert result.Should().NotBeNull(); result.PurgedInstanceCount.Should().Be(1); - result.IsComplete.Should().BeNull(); + result.IsComplete.Should().NotBeFalse(); // Verify instance no longer exists OrchestrationMetadata? instance = await server.Client.GetInstanceAsync(instanceId, false); instance.Should().BeNull(); @@ -215,7 +215,7 @@ public async Task PurgeInstances_WithFilter_EndToEnd() // Assert result.Should().NotBeNull(); result.PurgedInstanceCount.Should().BeGreaterThan(3); - result.IsComplete.Should().BeNull(); + result.IsComplete.Should().NotBeFalse(); // Verify instances no longer exist foreach (string instanceId in instanceIds) { From 4c7a4a374f9aa1a0a2ac24c4ba53c1f3ded75540 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Tue, 25 Mar 2025 15:09:53 -0700 Subject: [PATCH 6/7] fb --- src/Client/Core/PurgeResult.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Client/Core/PurgeResult.cs b/src/Client/Core/PurgeResult.cs index 6aadfd293..b0f8aa212 100644 --- a/src/Client/Core/PurgeResult.cs +++ b/src/Client/Core/PurgeResult.cs @@ -25,7 +25,7 @@ public PurgeResult(int count) /// A value indicating whether the purge operation is complete. /// If true, the purge operation is complete. All instances were purged. /// If false, not all instances were purged. Please purge again. - /// If null, default to legacy purge behavior. All instances were purged. + /// If null, whether or not all instances were purged is undefined. public PurgeResult(int count, bool? isComplete) : this(count) { From b737bdcca6ed849973150ec03d866227bec904bb Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Tue, 25 Mar 2025 15:10:45 -0700 Subject: [PATCH 7/7] space --- src/Client/Core/PurgeResult.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Client/Core/PurgeResult.cs b/src/Client/Core/PurgeResult.cs index b0f8aa212..2d96bd0f9 100644 --- a/src/Client/Core/PurgeResult.cs +++ b/src/Client/Core/PurgeResult.cs @@ -22,7 +22,7 @@ public PurgeResult(int count) /// Initializes a new instance of the class. /// /// The number of instances deleted. - /// A value indicating whether the purge operation is complete. + /// A value indicating whether the purge operation is complete. /// If true, the purge operation is complete. All instances were purged. /// If false, not all instances were purged. Please purge again. /// If null, whether or not all instances were purged is undefined.