diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index e64865a9..18892f1a 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -16,6 +16,8 @@ namespace Microsoft.DurableTask.Grpc.Tests; public class DurableTaskGrpcClientIntegrationTests : IntegrationTestBase { const string OrchestrationName = "TestOrchestration"; + const int PollingTimeoutSeconds = 5; + const int PollingIntervalMilliseconds = 100; public DurableTaskGrpcClientIntegrationTests(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture) : base(output, sidecarFixture) @@ -291,26 +293,6 @@ await restartAction.Should().ThrowAsync() .WithMessage("*An orchestration with the instanceId non-existent-instance-id was not found*"); } - Task StartAsync() - { - static async Task Orchestration(TaskOrchestrationContext context, bool shouldThrow) - { - context.SetCustomStatus("waiting"); - await context.WaitForExternalEvent("event"); - if (shouldThrow) - { - throw new InvalidOperationException("Orchestration failed"); - } - - return $"{shouldThrow} -> output"; - } - - return this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks.AddOrchestratorFunc(OrchestrationName, Orchestration)); - }); - } - [Fact] public async Task ScheduleNewOrchestrationInstance_WithDedupeStatuses_ThrowsWhenInstanceExists() { @@ -482,6 +464,223 @@ await createAction.Should().ThrowAsync() .Where(e => e.StatusCode == StatusCode.AlreadyExists); } + [Fact] + public async Task SuspendAndResumeInstance_EndToEnd() + { + // Arrange + await using HostTestLifetime server = await this.StartLongRunningAsync(); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, input: false); + + // Wait for the orchestration to start + await server.Client.WaitForInstanceStartAsync(instanceId, default); + + // Act - Suspend the orchestration + await server.Client.SuspendInstanceAsync(instanceId, "Test suspension", default); + + // Poll for suspended status + OrchestrationMetadata? suspendedMetadata = await this.PollForStatusAsync( + server.Client, instanceId, OrchestrationRuntimeStatus.Suspended, default); + + // Assert - Verify orchestration is suspended + suspendedMetadata.Should().NotBeNull(); + suspendedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Suspended); + suspendedMetadata.InstanceId.Should().Be(instanceId); + + // Act - Resume the orchestration + await server.Client.ResumeInstanceAsync(instanceId, "Test resumption", default); + + // Poll for running status + OrchestrationMetadata? resumedMetadata = await this.PollForStatusAsync( + server.Client, instanceId, OrchestrationRuntimeStatus.Running, default); + + // Assert - Verify orchestration is running again + resumedMetadata.Should().NotBeNull(); + resumedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Running); + + // Complete the orchestration + await server.Client.RaiseEventAsync(instanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(instanceId, default); + + // Verify the orchestration completed successfully + OrchestrationMetadata? completedMetadata = await server.Client.GetInstanceAsync(instanceId, false); + completedMetadata.Should().NotBeNull(); + completedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + } + + [Fact] + public async Task SuspendInstance_WithoutReason_Succeeds() + { + // Arrange + await using HostTestLifetime server = await this.StartLongRunningAsync(); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, input: false); + + await server.Client.WaitForInstanceStartAsync(instanceId, default); + + // Act - Suspend without a reason + await server.Client.SuspendInstanceAsync(instanceId, cancellation: default); + + // Poll for suspended status + OrchestrationMetadata? suspendedMetadata = await this.PollForStatusAsync( + server.Client, instanceId, OrchestrationRuntimeStatus.Suspended, default); + + // Assert + suspendedMetadata.Should().NotBeNull(); + suspendedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Suspended); + } + + [Fact] + public async Task ResumeInstance_WithoutReason_Succeeds() + { + // Arrange + await using HostTestLifetime server = await this.StartLongRunningAsync(); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, input: false); + + await server.Client.WaitForInstanceStartAsync(instanceId, default); + await server.Client.SuspendInstanceAsync(instanceId, "Test suspension", default); + + // Wait for suspension + await this.PollForStatusAsync(server.Client, instanceId, OrchestrationRuntimeStatus.Suspended, default); + + // Act - Resume without a reason + await server.Client.ResumeInstanceAsync(instanceId, cancellation: default); + + // Poll for running status + OrchestrationMetadata? resumedMetadata = await this.PollForStatusAsync( + server.Client, instanceId, OrchestrationRuntimeStatus.Running, default); + + // Assert + resumedMetadata.Should().NotBeNull(); + resumedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Running); + } + + [Fact] + public async Task SuspendInstance_AlreadyCompleted_NoError() + { + // Arrange + await using HostTestLifetime server = await this.StartAsync(); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, input: false); + + await server.Client.WaitForInstanceStartAsync(instanceId, default); + await server.Client.RaiseEventAsync(instanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(instanceId, default); + + // Verify it's completed + OrchestrationMetadata? completedMetadata = await server.Client.GetInstanceAsync(instanceId, false); + completedMetadata.Should().NotBeNull(); + completedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + + // Act - Try to suspend a completed orchestration (should not throw) + await server.Client.SuspendInstanceAsync(instanceId, "Test suspension", default); + + // Assert - Status should remain completed + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + } + + [Fact] + public async Task ResumeInstance_NotSuspended_NoError() + { + // Arrange + await using HostTestLifetime server = await this.StartLongRunningAsync(); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, input: false); + + await server.Client.WaitForInstanceStartAsync(instanceId, default); + + // Verify it's running + OrchestrationMetadata? runningMetadata = await server.Client.GetInstanceAsync(instanceId, false); + runningMetadata.Should().NotBeNull(); + runningMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Running); + + // Act - Try to resume an already running orchestration (should not throw) + await server.Client.ResumeInstanceAsync(instanceId, "Test resumption", default); + + // Assert - Status should remain running + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Running); + } + + Task StartAsync() + { + static async Task Orchestration(TaskOrchestrationContext context, bool shouldThrow) + { + context.SetCustomStatus("waiting"); + await context.WaitForExternalEvent("event"); + if (shouldThrow) + { + throw new InvalidOperationException("Orchestration failed"); + } + + return $"{shouldThrow} -> output"; + } + + return this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(OrchestrationName, Orchestration)); + }); + } + + Task StartLongRunningAsync() + { + static async Task LongRunningOrchestration(TaskOrchestrationContext context, bool shouldThrow) + { + context.SetCustomStatus("waiting"); + // Wait for external event or a timer (30 seconds) to allow suspend/resume operations + Task eventTask = context.WaitForExternalEvent("event"); + Task timerTask = context.CreateTimer(TimeSpan.FromSeconds(30), CancellationToken.None); + Task completedTask = await Task.WhenAny(eventTask, timerTask); + + if (completedTask == timerTask) + { + throw new TimeoutException("Timed out waiting for external event 'event'."); + } + + if (shouldThrow) + { + throw new InvalidOperationException("Orchestration failed"); + } + + return $"{shouldThrow} -> output"; + } + + return this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(OrchestrationName, LongRunningOrchestration)); + }); + } + + async Task PollForStatusAsync( + DurableTaskClient client, + string instanceId, + OrchestrationRuntimeStatus expectedStatus, + CancellationToken cancellation = default) + { + DateTime deadline = DateTime.UtcNow.AddSeconds(PollingTimeoutSeconds); + while (DateTime.UtcNow < deadline) + { + OrchestrationMetadata? metadata = await client.GetInstanceAsync(instanceId, false, cancellation); + if (metadata?.RuntimeStatus == expectedStatus) + { + return metadata; + } + + await Task.Delay(TimeSpan.FromMilliseconds(PollingIntervalMilliseconds), cancellation); + } + + return await client.GetInstanceAsync(instanceId, false, cancellation); + } + class DateTimeToleranceComparer : IEqualityComparer { public bool Equals(DateTimeOffset x, DateTimeOffset y) => (x - y).Duration() < TimeSpan.FromMilliseconds(100);