Skip to content

Commit e32fa7a

Browse files
authored
Partial Purge Support (#400)
* Enhance PurgeResult to include completion status and update related gRPC response handling. Added new AbandonTask RPCs in orchestrator_service.proto. Updated versions.txt with latest proto file source. * Update ShimDurableTaskClientTests to assert IsComplete property is not false in PurgeResult * Add end-to-end tests for PurgeInstance and PurgeInstances_WithFilter methods in GrpcDurableTaskClientIntegrationTests. These tests verify the correct behavior of instance purging and ensure instances are removed as expected.
1 parent dc7c09b commit e32fa7a

File tree

6 files changed

+132
-3
lines changed

6 files changed

+132
-3
lines changed

src/Client/Core/PurgeResult.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,29 @@ public PurgeResult(int count)
1818
this.PurgedInstanceCount = count;
1919
}
2020

21+
/// <summary>
22+
/// Initializes a new instance of the <see cref="PurgeResult" /> class.
23+
/// </summary>
24+
/// <param name="count">The number of instances deleted.</param>
25+
/// <param name="isComplete">A value indicating whether the purge operation is complete.
26+
/// If true, the purge operation is complete. All instances were purged.
27+
/// If false, not all instances were purged. Please purge again.
28+
/// If null, whether or not all instances were purged is undefined.</param>
29+
public PurgeResult(int count, bool? isComplete)
30+
: this(count)
31+
{
32+
this.IsComplete = isComplete;
33+
}
34+
2135
/// <summary>
2236
/// Gets the number of purged instances.
2337
/// </summary>
2438
/// <value>The number of purged instances.</value>
2539
public int PurgedInstanceCount { get; }
40+
41+
/// <summary>
42+
/// Gets a value indicating whether the purge operation is complete.
43+
/// </summary>
44+
/// <value>A value indicating whether the purge operation is complete.</value>
45+
public bool? IsComplete { get; }
2646
}

src/Client/Grpc/GrpcDurableTaskClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ async Task<PurgeResult> PurgeInstancesCoreAsync(
446446
{
447447
P.PurgeInstancesResponse response = await this.sidecarClient.PurgeInstancesAsync(
448448
request, cancellationToken: cancellation);
449-
return new PurgeResult(response.DeletedInstanceCount);
449+
return new PurgeResult(response.DeletedInstanceCount, response.IsComplete);
450450
}
451451
catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
452452
{

src/Grpc/orchestrator_service.proto

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,7 @@ message PurgeInstanceFilter {
463463

464464
message PurgeInstancesResponse {
465465
int32 deletedInstanceCount = 1;
466+
google.protobuf.BoolValue isComplete = 2;
466467
}
467468

468469
message CreateTaskHubRequest {
@@ -617,6 +618,30 @@ message StartNewOrchestrationAction {
617618
google.protobuf.Timestamp scheduledTime = 5;
618619
}
619620

621+
message AbandonActivityTaskRequest {
622+
string completionToken = 1;
623+
}
624+
625+
message AbandonActivityTaskResponse {
626+
// Empty.
627+
}
628+
629+
message AbandonOrchestrationTaskRequest {
630+
string completionToken = 1;
631+
}
632+
633+
message AbandonOrchestrationTaskResponse {
634+
// Empty.
635+
}
636+
637+
message AbandonEntityTaskRequest {
638+
string completionToken = 1;
639+
}
640+
641+
message AbandonEntityTaskResponse {
642+
// Empty.
643+
}
644+
620645
service TaskHubSidecarService {
621646
// Sends a hello request to the sidecar service.
622647
rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty);
@@ -678,6 +703,15 @@ service TaskHubSidecarService {
678703

679704
// clean entity storage
680705
rpc CleanEntityStorage(CleanEntityStorageRequest) returns (CleanEntityStorageResponse);
706+
707+
// Abandons a single work item
708+
rpc AbandonTaskActivityWorkItem(AbandonActivityTaskRequest) returns (AbandonActivityTaskResponse);
709+
710+
// Abandon an orchestration work item
711+
rpc AbandonTaskOrchestratorWorkItem(AbandonOrchestrationTaskRequest) returns (AbandonOrchestrationTaskResponse);
712+
713+
// Abandon an entity work item
714+
rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse);
681715
}
682716

683717
message GetWorkItemsRequest {

src/Grpc/versions.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
# The following files were downloaded from branch main at 2025-03-19 19:55:31 UTC
2-
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/4792f47019ab2b3e9ea979fb4af72427a4144c51/protos/orchestrator_service.proto
1+
# The following files were downloaded from branch main at 2025-03-24 23:37:31 UTC
2+
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/c85ef11430ff8e10e21105abb545b0803bb86c66/protos/orchestrator_service.proto

test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ public async Task PurgeInstanceMetadata()
174174
// assert
175175
this.orchestrationClient.VerifyAll();
176176
result.PurgedInstanceCount.Should().Be(1);
177+
result.IsComplete.Should().BeNull();
177178
}
178179

179180
[Fact]

test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,80 @@ await server.Client.ScheduleNewOrchestrationInstanceAsync(
150150
page.ContinuationToken.Should().NotBeNull();
151151
}
152152

153+
[Fact]
154+
public async Task PurgeInstance_EndToEnd()
155+
{
156+
// Arrange
157+
await using HostTestLifetime server = await this.StartAsync();
158+
159+
// Create and complete an orchestration instance
160+
string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(
161+
OrchestrationName, input: false);
162+
163+
// Wait for it to start and raise event to complete it
164+
await server.Client.WaitForInstanceStartAsync(instanceId, default);
165+
await server.Client.RaiseEventAsync(instanceId, "event", default);
166+
await server.Client.WaitForInstanceCompletionAsync(instanceId, default);
167+
168+
// Verify instance exists before purge
169+
OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false);
170+
metadata.Should().NotBeNull();
171+
metadata!.InstanceId.Should().Be(instanceId);
172+
metadata.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed);
173+
174+
// Act
175+
PurgeResult result = await server.Client.PurgeInstanceAsync(
176+
instanceId,
177+
new PurgeInstanceOptions { Recursive = true });
178+
179+
// Assert
180+
result.Should().NotBeNull();
181+
result.PurgedInstanceCount.Should().Be(1);
182+
result.IsComplete.Should().NotBeFalse();
183+
// Verify instance no longer exists
184+
OrchestrationMetadata? instance = await server.Client.GetInstanceAsync(instanceId, false);
185+
instance.Should().BeNull();
186+
}
187+
188+
[Fact]
189+
public async Task PurgeInstances_WithFilter_EndToEnd()
190+
{
191+
// Arrange
192+
await using HostTestLifetime server = await this.StartAsync();
193+
List<string> instanceIds = new List<string>();
194+
195+
// Create multiple instances
196+
for (int i = 0; i < 3; i++)
197+
{
198+
string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(
199+
OrchestrationName, input: false);
200+
instanceIds.Add(instanceId);
201+
202+
// Wait for it to start and raise event to complete it
203+
await server.Client.WaitForInstanceStartAsync(instanceId, default);
204+
await server.Client.RaiseEventAsync(instanceId, "event", default);
205+
await server.Client.WaitForInstanceCompletionAsync(instanceId, default);
206+
}
207+
208+
// Act
209+
PurgeResult result = await server.Client.PurgeAllInstancesAsync(
210+
new PurgeInstancesFilter(
211+
CreatedFrom: DateTime.UtcNow.AddMinutes(-5),
212+
CreatedTo: DateTime.UtcNow,
213+
Statuses: new[] { OrchestrationRuntimeStatus.Completed }));
214+
215+
// Assert
216+
result.Should().NotBeNull();
217+
result.PurgedInstanceCount.Should().BeGreaterThan(3);
218+
result.IsComplete.Should().NotBeFalse();
219+
// Verify instances no longer exist
220+
foreach (string instanceId in instanceIds)
221+
{
222+
OrchestrationMetadata? instance = await server.Client.GetInstanceAsync(instanceId, false);
223+
instance.Should().BeNull();
224+
}
225+
}
226+
153227
Task<HostTestLifetime> StartAsync()
154228
{
155229
static async Task<string> Orchestration(TaskOrchestrationContext context, bool shouldThrow)

0 commit comments

Comments
 (0)