Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 219 additions & 20 deletions test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ namespace Microsoft.DurableTask.Grpc.Tests;
public class DurableTaskGrpcClientIntegrationTests : IntegrationTestBase
{
const string OrchestrationName = "TestOrchestration";
const int PollingTimeoutSeconds = 5;
const int PollingIntervalMilliseconds = 100;

public DurableTaskGrpcClientIntegrationTests(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture)
: base(output, sidecarFixture)
Expand Down Expand Up @@ -291,26 +293,6 @@ await restartAction.Should().ThrowAsync<ArgumentException>()
.WithMessage("*An orchestration with the instanceId non-existent-instance-id was not found*");
}

Task<HostTestLifetime> StartAsync()
{
static async Task<string> Orchestration(TaskOrchestrationContext context, bool shouldThrow)
{
context.SetCustomStatus("waiting");
await context.WaitForExternalEvent<string>("event");
if (shouldThrow)
{
throw new InvalidOperationException("Orchestration failed");
}

return $"{shouldThrow} -> output";
}

return this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc<bool, string>(OrchestrationName, Orchestration));
});
}

[Fact]
public async Task ScheduleNewOrchestrationInstance_WithDedupeStatuses_ThrowsWhenInstanceExists()
{
Expand Down Expand Up @@ -482,6 +464,223 @@ await createAction.Should().ThrowAsync<RpcException>()
.Where(e => e.StatusCode == StatusCode.AlreadyExists);
}

[Fact]
public async Task SuspendAndResumeInstance_EndToEnd()
{
// Arrange
await using HostTestLifetime server = await this.StartLongRunningAsync();

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(
OrchestrationName, input: false);

// Wait for the orchestration to start
await server.Client.WaitForInstanceStartAsync(instanceId, default);

// Act - Suspend the orchestration
await server.Client.SuspendInstanceAsync(instanceId, "Test suspension", default);

// Poll for suspended status
OrchestrationMetadata? suspendedMetadata = await this.PollForStatusAsync(
server.Client, instanceId, OrchestrationRuntimeStatus.Suspended, default);

// Assert - Verify orchestration is suspended
suspendedMetadata.Should().NotBeNull();
suspendedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Suspended);
suspendedMetadata.InstanceId.Should().Be(instanceId);

// Act - Resume the orchestration
await server.Client.ResumeInstanceAsync(instanceId, "Test resumption", default);

// Poll for running status
OrchestrationMetadata? resumedMetadata = await this.PollForStatusAsync(
server.Client, instanceId, OrchestrationRuntimeStatus.Running, default);

// Assert - Verify orchestration is running again
resumedMetadata.Should().NotBeNull();
resumedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Running);

// Complete the orchestration
await server.Client.RaiseEventAsync(instanceId, "event", default);
await server.Client.WaitForInstanceCompletionAsync(instanceId, default);

// Verify the orchestration completed successfully
OrchestrationMetadata? completedMetadata = await server.Client.GetInstanceAsync(instanceId, false);
completedMetadata.Should().NotBeNull();
completedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed);
}

[Fact]
public async Task SuspendInstance_WithoutReason_Succeeds()
{
// Arrange
await using HostTestLifetime server = await this.StartLongRunningAsync();

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(
OrchestrationName, input: false);

await server.Client.WaitForInstanceStartAsync(instanceId, default);

// Act - Suspend without a reason
await server.Client.SuspendInstanceAsync(instanceId, cancellation: default);

// Poll for suspended status
OrchestrationMetadata? suspendedMetadata = await this.PollForStatusAsync(
server.Client, instanceId, OrchestrationRuntimeStatus.Suspended, default);

// Assert
suspendedMetadata.Should().NotBeNull();
suspendedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Suspended);
}

[Fact]
public async Task ResumeInstance_WithoutReason_Succeeds()
{
// Arrange
await using HostTestLifetime server = await this.StartLongRunningAsync();

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(
OrchestrationName, input: false);

await server.Client.WaitForInstanceStartAsync(instanceId, default);
await server.Client.SuspendInstanceAsync(instanceId, "Test suspension", default);

// Wait for suspension
await this.PollForStatusAsync(server.Client, instanceId, OrchestrationRuntimeStatus.Suspended, default);

// Act - Resume without a reason
await server.Client.ResumeInstanceAsync(instanceId, cancellation: default);

// Poll for running status
OrchestrationMetadata? resumedMetadata = await this.PollForStatusAsync(
server.Client, instanceId, OrchestrationRuntimeStatus.Running, default);

// Assert
resumedMetadata.Should().NotBeNull();
resumedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Running);
}

[Fact]
public async Task SuspendInstance_AlreadyCompleted_NoError()
{
// Arrange
await using HostTestLifetime server = await this.StartAsync();

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(
OrchestrationName, input: false);

await server.Client.WaitForInstanceStartAsync(instanceId, default);
await server.Client.RaiseEventAsync(instanceId, "event", default);
await server.Client.WaitForInstanceCompletionAsync(instanceId, default);

// Verify it's completed
OrchestrationMetadata? completedMetadata = await server.Client.GetInstanceAsync(instanceId, false);
completedMetadata.Should().NotBeNull();
completedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed);

// Act - Try to suspend a completed orchestration (should not throw)
await server.Client.SuspendInstanceAsync(instanceId, "Test suspension", default);

// Assert - Status should remain completed
OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false);
metadata.Should().NotBeNull();
metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed);
}

[Fact]
public async Task ResumeInstance_NotSuspended_NoError()
{
// Arrange
await using HostTestLifetime server = await this.StartLongRunningAsync();

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(
OrchestrationName, input: false);

await server.Client.WaitForInstanceStartAsync(instanceId, default);

// Verify it's running
OrchestrationMetadata? runningMetadata = await server.Client.GetInstanceAsync(instanceId, false);
runningMetadata.Should().NotBeNull();
runningMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Running);

// Act - Try to resume an already running orchestration (should not throw)
await server.Client.ResumeInstanceAsync(instanceId, "Test resumption", default);

// Assert - Status should remain running
OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false);
metadata.Should().NotBeNull();
metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Running);
}

Task<HostTestLifetime> StartAsync()
{
static async Task<string> Orchestration(TaskOrchestrationContext context, bool shouldThrow)
{
context.SetCustomStatus("waiting");
await context.WaitForExternalEvent<string>("event");
if (shouldThrow)
{
throw new InvalidOperationException("Orchestration failed");
}

return $"{shouldThrow} -> output";
}

return this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc<bool, string>(OrchestrationName, Orchestration));
});
}

Task<HostTestLifetime> StartLongRunningAsync()
{
static async Task<string> LongRunningOrchestration(TaskOrchestrationContext context, bool shouldThrow)
{
context.SetCustomStatus("waiting");
// Wait for external event or a timer (30 seconds) to allow suspend/resume operations
Task<string> eventTask = context.WaitForExternalEvent<string>("event");
Task timerTask = context.CreateTimer(TimeSpan.FromSeconds(30), CancellationToken.None);
Task completedTask = await Task.WhenAny(eventTask, timerTask);

if (completedTask == timerTask)
{
throw new TimeoutException("Timed out waiting for external event 'event'.");
}

if (shouldThrow)
{
throw new InvalidOperationException("Orchestration failed");
}

return $"{shouldThrow} -> output";
}

return this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc<bool, string>(OrchestrationName, LongRunningOrchestration));
});
}

async Task<OrchestrationMetadata?> PollForStatusAsync(
DurableTaskClient client,
string instanceId,
OrchestrationRuntimeStatus expectedStatus,
CancellationToken cancellation = default)
{
DateTime deadline = DateTime.UtcNow.AddSeconds(PollingTimeoutSeconds);
while (DateTime.UtcNow < deadline)
{
OrchestrationMetadata? metadata = await client.GetInstanceAsync(instanceId, false, cancellation);
if (metadata?.RuntimeStatus == expectedStatus)
{
return metadata;
}

await Task.Delay(TimeSpan.FromMilliseconds(PollingIntervalMilliseconds), cancellation);
}

return await client.GetInstanceAsync(instanceId, false, cancellation);
}

class DateTimeToleranceComparer : IEqualityComparer<DateTimeOffset>
{
public bool Equals(DateTimeOffset x, DateTimeOffset y) => (x - y).Duration() < TimeSpan.FromMilliseconds(100);
Expand Down
Loading