diff --git a/src/Client/Core/PurgeResult.cs b/src/Client/Core/PurgeResult.cs index be4c61257..2d96bd0f9 100644 --- a/src/Client/Core/PurgeResult.cs +++ b/src/Client/Core/PurgeResult.cs @@ -18,9 +18,29 @@ public PurgeResult(int count) this.PurgedInstanceCount = count; } + /// + /// Initializes a new instance of the class. + /// + /// The number of instances deleted. + /// A value indicating whether the purge operation is complete. + /// If true, the purge operation is complete. All instances were purged. + /// If false, not all instances were purged. Please purge again. + /// If null, whether or not all instances were purged is undefined. + public PurgeResult(int count, bool? isComplete) + : this(count) + { + this.IsComplete = isComplete; + } + /// /// Gets the number of purged instances. /// /// The number of purged instances. public int PurgedInstanceCount { get; } + + /// + /// Gets a value indicating whether the purge operation is complete. + /// + /// A value indicating whether the purge operation is complete. + public bool? IsComplete { get; } } diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index 32c95b64b..4d32093e9 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -446,7 +446,7 @@ async Task PurgeInstancesCoreAsync( { P.PurgeInstancesResponse response = await this.sidecarClient.PurgeInstancesAsync( request, cancellationToken: cancellation); - return new PurgeResult(response.DeletedInstanceCount); + return new PurgeResult(response.DeletedInstanceCount, response.IsComplete); } catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled) { diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index 64e752818..5dd39cfe6 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -463,6 +463,7 @@ message PurgeInstanceFilter { message PurgeInstancesResponse { int32 deletedInstanceCount = 1; + google.protobuf.BoolValue isComplete = 2; } message CreateTaskHubRequest { @@ -617,6 +618,30 @@ message StartNewOrchestrationAction { google.protobuf.Timestamp scheduledTime = 5; } +message AbandonActivityTaskRequest { + string completionToken = 1; +} + +message AbandonActivityTaskResponse { + // Empty. +} + +message AbandonOrchestrationTaskRequest { + string completionToken = 1; +} + +message AbandonOrchestrationTaskResponse { + // Empty. +} + +message AbandonEntityTaskRequest { + string completionToken = 1; +} + +message AbandonEntityTaskResponse { + // Empty. +} + service TaskHubSidecarService { // Sends a hello request to the sidecar service. rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty); @@ -678,6 +703,15 @@ service TaskHubSidecarService { // clean entity storage rpc CleanEntityStorage(CleanEntityStorageRequest) returns (CleanEntityStorageResponse); + + // Abandons a single work item + rpc AbandonTaskActivityWorkItem(AbandonActivityTaskRequest) returns (AbandonActivityTaskResponse); + + // Abandon an orchestration work item + rpc AbandonTaskOrchestratorWorkItem(AbandonOrchestrationTaskRequest) returns (AbandonOrchestrationTaskResponse); + + // Abandon an entity work item + rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse); } message GetWorkItemsRequest { diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index c016c8199..733b20c07 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-03-19 19:55:31 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/4792f47019ab2b3e9ea979fb4af72427a4144c51/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2025-03-24 23:37:31 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/c85ef11430ff8e10e21105abb545b0803bb86c66/protos/orchestrator_service.proto diff --git a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs index cf67af96b..a9a5d7caa 100644 --- a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs +++ b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs @@ -174,6 +174,7 @@ public async Task PurgeInstanceMetadata() // assert this.orchestrationClient.VerifyAll(); result.PurgedInstanceCount.Should().Be(1); + result.IsComplete.Should().BeNull(); } [Fact] diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index a208671ff..50fe6e070 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -150,6 +150,80 @@ await server.Client.ScheduleNewOrchestrationInstanceAsync( page.ContinuationToken.Should().NotBeNull(); } + [Fact] + public async Task PurgeInstance_EndToEnd() + { + // Arrange + await using HostTestLifetime server = await this.StartAsync(); + + // Create and complete an orchestration instance + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, input: false); + + // Wait for it to start and raise event to complete it + await server.Client.WaitForInstanceStartAsync(instanceId, default); + await server.Client.RaiseEventAsync(instanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(instanceId, default); + + // Verify instance exists before purge + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.InstanceId.Should().Be(instanceId); + metadata.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + + // Act + PurgeResult result = await server.Client.PurgeInstanceAsync( + instanceId, + new PurgeInstanceOptions { Recursive = true }); + + // Assert + result.Should().NotBeNull(); + result.PurgedInstanceCount.Should().Be(1); + result.IsComplete.Should().NotBeFalse(); + // Verify instance no longer exists + OrchestrationMetadata? instance = await server.Client.GetInstanceAsync(instanceId, false); + instance.Should().BeNull(); + } + + [Fact] + public async Task PurgeInstances_WithFilter_EndToEnd() + { + // Arrange + await using HostTestLifetime server = await this.StartAsync(); + List instanceIds = new List(); + + // Create multiple instances + for (int i = 0; i < 3; i++) + { + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, input: false); + instanceIds.Add(instanceId); + + // Wait for it to start and raise event to complete it + await server.Client.WaitForInstanceStartAsync(instanceId, default); + await server.Client.RaiseEventAsync(instanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(instanceId, default); + } + + // Act + PurgeResult result = await server.Client.PurgeAllInstancesAsync( + new PurgeInstancesFilter( + CreatedFrom: DateTime.UtcNow.AddMinutes(-5), + CreatedTo: DateTime.UtcNow, + Statuses: new[] { OrchestrationRuntimeStatus.Completed })); + + // Assert + result.Should().NotBeNull(); + result.PurgedInstanceCount.Should().BeGreaterThan(3); + result.IsComplete.Should().NotBeFalse(); + // Verify instances no longer exist + foreach (string instanceId in instanceIds) + { + OrchestrationMetadata? instance = await server.Client.GetInstanceAsync(instanceId, false); + instance.Should().BeNull(); + } + } + Task StartAsync() { static async Task Orchestration(TaskOrchestrationContext context, bool shouldThrow)