Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/Client/Core/PurgeResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,29 @@ public PurgeResult(int count)
this.PurgedInstanceCount = count;
}

/// <summary>
/// Initializes a new instance of the <see cref="PurgeResult" /> class.
/// </summary>
/// <param name="count">The number of instances deleted.</param>
/// <param name="isComplete">A value indicating whether the purge operation is complete.
/// If true, the purge operation is complete. All instances were purged.
/// If false, not all instances were purged. Please purge again.
/// If null, whether or not all instances were purged is undefined.</param>
public PurgeResult(int count, bool? isComplete)
: this(count)
{
this.IsComplete = isComplete;
}

/// <summary>
/// Gets the number of purged instances.
/// </summary>
/// <value>The number of purged instances.</value>
public int PurgedInstanceCount { get; }

/// <summary>
/// Gets a value indicating whether the purge operation is complete.
/// </summary>
/// <value>A value indicating whether the purge operation is complete.</value>
public bool? IsComplete { get; }
}
2 changes: 1 addition & 1 deletion src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@

if (Activity.Current?.Id != null || Activity.Current?.TraceStateString != null)
{
if (request.ParentTraceContext == null)

Check warning on line 110 in src/Client/Grpc/GrpcDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Dereference of a possibly null reference.

Check warning on line 110 in src/Client/Grpc/GrpcDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.
{
request.ParentTraceContext = new P.TraceContext();
}
Expand All @@ -125,7 +125,7 @@

DateTimeOffset? startAt = options?.StartAt;
this.logger.SchedulingOrchestration(
request.InstanceId,

Check warning on line 128 in src/Client/Grpc/GrpcDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Dereference of a possibly null reference.

Check warning on line 128 in src/Client/Grpc/GrpcDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.
orchestratorName,
sizeInBytes: request.Input != null ? Encoding.UTF8.GetByteCount(request.Input) : 0,
startAt.GetValueOrDefault(DateTimeOffset.UtcNow));
Expand Down Expand Up @@ -446,7 +446,7 @@
{
P.PurgeInstancesResponse response = await this.sidecarClient.PurgeInstancesAsync(
request, cancellationToken: cancellation);
return new PurgeResult(response.DeletedInstanceCount);
return new PurgeResult(response.DeletedInstanceCount, response.IsComplete);
}
catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
{
Expand Down
34 changes: 34 additions & 0 deletions src/Grpc/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ message PurgeInstanceFilter {

message PurgeInstancesResponse {
int32 deletedInstanceCount = 1;
google.protobuf.BoolValue isComplete = 2;
}

message CreateTaskHubRequest {
Expand Down Expand Up @@ -617,6 +618,30 @@ message StartNewOrchestrationAction {
google.protobuf.Timestamp scheduledTime = 5;
}

message AbandonActivityTaskRequest {
string completionToken = 1;
}

message AbandonActivityTaskResponse {
// Empty.
}

message AbandonOrchestrationTaskRequest {
string completionToken = 1;
}

message AbandonOrchestrationTaskResponse {
// Empty.
}

message AbandonEntityTaskRequest {
string completionToken = 1;
}

message AbandonEntityTaskResponse {
// Empty.
}

service TaskHubSidecarService {
// Sends a hello request to the sidecar service.
rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty);
Expand Down Expand Up @@ -678,6 +703,15 @@ service TaskHubSidecarService {

// clean entity storage
rpc CleanEntityStorage(CleanEntityStorageRequest) returns (CleanEntityStorageResponse);

// Abandons a single work item
rpc AbandonTaskActivityWorkItem(AbandonActivityTaskRequest) returns (AbandonActivityTaskResponse);

// Abandon an orchestration work item
rpc AbandonTaskOrchestratorWorkItem(AbandonOrchestrationTaskRequest) returns (AbandonOrchestrationTaskResponse);

// Abandon an entity work item
rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse);
}

message GetWorkItemsRequest {
Expand Down
4 changes: 2 additions & 2 deletions src/Grpc/versions.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# The following files were downloaded from branch main at 2025-03-19 19:55:31 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/4792f47019ab2b3e9ea979fb4af72427a4144c51/protos/orchestrator_service.proto
# The following files were downloaded from branch main at 2025-03-24 23:37:31 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/c85ef11430ff8e10e21105abb545b0803bb86c66/protos/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public async Task PurgeInstanceMetadata()
// assert
this.orchestrationClient.VerifyAll();
result.PurgedInstanceCount.Should().Be(1);
result.IsComplete.Should().BeNull();
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,80 @@ await server.Client.ScheduleNewOrchestrationInstanceAsync(
page.ContinuationToken.Should().NotBeNull();
}

[Fact]
public async Task PurgeInstance_EndToEnd()
{
// Arrange
await using HostTestLifetime server = await this.StartAsync();

// Create and complete an orchestration instance
string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(
OrchestrationName, input: false);

// Wait for it to start and raise event to complete it
await server.Client.WaitForInstanceStartAsync(instanceId, default);
await server.Client.RaiseEventAsync(instanceId, "event", default);
await server.Client.WaitForInstanceCompletionAsync(instanceId, default);

// Verify instance exists before purge
OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false);
metadata.Should().NotBeNull();
metadata!.InstanceId.Should().Be(instanceId);
metadata.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed);

// Act
PurgeResult result = await server.Client.PurgeInstanceAsync(
instanceId,
new PurgeInstanceOptions { Recursive = true });

// Assert
result.Should().NotBeNull();
result.PurgedInstanceCount.Should().Be(1);
result.IsComplete.Should().NotBeFalse();
// Verify instance no longer exists
OrchestrationMetadata? instance = await server.Client.GetInstanceAsync(instanceId, false);
instance.Should().BeNull();
}

[Fact]
public async Task PurgeInstances_WithFilter_EndToEnd()
{
// Arrange
await using HostTestLifetime server = await this.StartAsync();
List<string> instanceIds = new List<string>();

// Create multiple instances
for (int i = 0; i < 3; i++)
{
string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(
OrchestrationName, input: false);
instanceIds.Add(instanceId);

// Wait for it to start and raise event to complete it
await server.Client.WaitForInstanceStartAsync(instanceId, default);
await server.Client.RaiseEventAsync(instanceId, "event", default);
await server.Client.WaitForInstanceCompletionAsync(instanceId, default);
}

// Act
PurgeResult result = await server.Client.PurgeAllInstancesAsync(
new PurgeInstancesFilter(
CreatedFrom: DateTime.UtcNow.AddMinutes(-5),
CreatedTo: DateTime.UtcNow,
Statuses: new[] { OrchestrationRuntimeStatus.Completed }));

// Assert
result.Should().NotBeNull();
result.PurgedInstanceCount.Should().BeGreaterThan(3);
result.IsComplete.Should().NotBeFalse();
// Verify instances no longer exist
foreach (string instanceId in instanceIds)
{
OrchestrationMetadata? instance = await server.Client.GetInstanceAsync(instanceId, false);
instance.Should().BeNull();
}
}

Task<HostTestLifetime> StartAsync()
{
static async Task<string> Orchestration(TaskOrchestrationContext context, bool shouldThrow)
Expand Down
Loading