From f70e8e5b5a902f6f8b4fbf1a00b0df43295fabfe Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 10 Dec 2025 17:56:40 +0000 Subject: [PATCH 01/10] Initial plan From 9bcfb1bfcbe8e048aef9b7cc8ca350b23926447a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 10 Dec 2025 18:14:41 +0000 Subject: [PATCH 02/10] Add integration tests for Suspend/Resume functionality - Added 5 new integration tests for suspend/resume operations - Tests cover basic suspend/resume, with/without reasons, and edge cases - All tests passing successfully Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- .../GrpcDurableTaskClientIntegrationTests.cs | 221 ++++++++++++++++++ 1 file changed, 221 insertions(+) diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index 2d5df3363..a985a88a9 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -288,6 +288,203 @@ await restartAction.Should().ThrowAsync() .WithMessage("*An orchestration with the instanceId non-existent-instance-id was not found*"); } + [Fact] + public async Task SuspendAndResumeInstance_EndToEnd() + { + // Arrange + await using HostTestLifetime server = await this.StartLongRunningAsync(); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, input: false); + + // Wait for the orchestration to start + await server.Client.WaitForInstanceStartAsync(instanceId, default); + + // Act - Suspend the orchestration + await server.Client.SuspendInstanceAsync(instanceId, "Test suspension", default); + + // Poll for suspended status (up to 5 seconds) + OrchestrationMetadata? suspendedMetadata = null; + DateTime deadline = DateTime.UtcNow.AddSeconds(5); + while (DateTime.UtcNow < deadline) + { + suspendedMetadata = await server.Client.GetInstanceAsync(instanceId, false); + if (suspendedMetadata?.RuntimeStatus == OrchestrationRuntimeStatus.Suspended) + { + break; + } + + await Task.Delay(TimeSpan.FromMilliseconds(100)); + } + + // Assert - Verify orchestration is suspended + suspendedMetadata.Should().NotBeNull(); + suspendedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Suspended); + suspendedMetadata.InstanceId.Should().Be(instanceId); + + // Act - Resume the orchestration + await server.Client.ResumeInstanceAsync(instanceId, "Test resumption", default); + + // Poll for running status (up to 5 seconds) + OrchestrationMetadata? resumedMetadata = null; + deadline = DateTime.UtcNow.AddSeconds(5); + while (DateTime.UtcNow < deadline) + { + resumedMetadata = await server.Client.GetInstanceAsync(instanceId, false); + if (resumedMetadata?.RuntimeStatus == OrchestrationRuntimeStatus.Running) + { + break; + } + + await Task.Delay(TimeSpan.FromMilliseconds(100)); + } + + // Assert - Verify orchestration is running again + resumedMetadata.Should().NotBeNull(); + resumedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Running); + + // Complete the orchestration + await server.Client.RaiseEventAsync(instanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(instanceId, default); + + // Verify the orchestration completed successfully + OrchestrationMetadata? completedMetadata = await server.Client.GetInstanceAsync(instanceId, false); + completedMetadata.Should().NotBeNull(); + completedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + } + + [Fact] + public async Task SuspendInstance_WithoutReason_Succeeds() + { + // Arrange + await using HostTestLifetime server = await this.StartLongRunningAsync(); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, input: false); + + await server.Client.WaitForInstanceStartAsync(instanceId, default); + + // Act - Suspend without a reason + await server.Client.SuspendInstanceAsync(instanceId, cancellation: default); + + // Poll for suspended status (up to 5 seconds) + OrchestrationMetadata? suspendedMetadata = null; + DateTime deadline = DateTime.UtcNow.AddSeconds(5); + while (DateTime.UtcNow < deadline) + { + suspendedMetadata = await server.Client.GetInstanceAsync(instanceId, false); + if (suspendedMetadata?.RuntimeStatus == OrchestrationRuntimeStatus.Suspended) + { + break; + } + + await Task.Delay(TimeSpan.FromMilliseconds(100)); + } + + // Assert + suspendedMetadata.Should().NotBeNull(); + suspendedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Suspended); + } + + [Fact] + public async Task ResumeInstance_WithoutReason_Succeeds() + { + // Arrange + await using HostTestLifetime server = await this.StartLongRunningAsync(); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, input: false); + + await server.Client.WaitForInstanceStartAsync(instanceId, default); + await server.Client.SuspendInstanceAsync(instanceId, "Test suspension", default); + + // Wait for suspension + DateTime deadline = DateTime.UtcNow.AddSeconds(5); + while (DateTime.UtcNow < deadline) + { + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + if (metadata?.RuntimeStatus == OrchestrationRuntimeStatus.Suspended) + { + break; + } + + await Task.Delay(TimeSpan.FromMilliseconds(100)); + } + + // Act - Resume without a reason + await server.Client.ResumeInstanceAsync(instanceId, cancellation: default); + + // Poll for running status (up to 5 seconds) + OrchestrationMetadata? resumedMetadata = null; + deadline = DateTime.UtcNow.AddSeconds(5); + while (DateTime.UtcNow < deadline) + { + resumedMetadata = await server.Client.GetInstanceAsync(instanceId, false); + if (resumedMetadata?.RuntimeStatus == OrchestrationRuntimeStatus.Running) + { + break; + } + + await Task.Delay(TimeSpan.FromMilliseconds(100)); + } + + // Assert + resumedMetadata.Should().NotBeNull(); + resumedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Running); + } + + [Fact] + public async Task SuspendInstance_AlreadyCompleted_NoError() + { + // Arrange + await using HostTestLifetime server = await this.StartAsync(); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, input: false); + + await server.Client.WaitForInstanceStartAsync(instanceId, default); + await server.Client.RaiseEventAsync(instanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(instanceId, default); + + // Verify it's completed + OrchestrationMetadata? completedMetadata = await server.Client.GetInstanceAsync(instanceId, false); + completedMetadata.Should().NotBeNull(); + completedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + + // Act - Try to suspend a completed orchestration (should not throw) + await server.Client.SuspendInstanceAsync(instanceId, "Test suspension", default); + + // Assert - Status should remain completed + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + } + + [Fact] + public async Task ResumeInstance_NotSuspended_NoError() + { + // Arrange + await using HostTestLifetime server = await this.StartLongRunningAsync(); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, input: false); + + await server.Client.WaitForInstanceStartAsync(instanceId, default); + + // Verify it's running + OrchestrationMetadata? runningMetadata = await server.Client.GetInstanceAsync(instanceId, false); + runningMetadata.Should().NotBeNull(); + runningMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Running); + + // Act - Try to resume an already running orchestration (should not throw) + await server.Client.ResumeInstanceAsync(instanceId, "Test resumption", default); + + // Assert - Status should remain running + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Running); + } + Task StartAsync() { static async Task Orchestration(TaskOrchestrationContext context, bool shouldThrow) @@ -308,6 +505,30 @@ static async Task Orchestration(TaskOrchestrationContext context, bool s }); } + Task StartLongRunningAsync() + { + static async Task LongRunningOrchestration(TaskOrchestrationContext context, bool shouldThrow) + { + context.SetCustomStatus("waiting"); + // Wait for external event or a long timer (5 minutes) to allow suspend/resume operations + Task eventTask = context.WaitForExternalEvent("event"); + Task timerTask = context.CreateTimer(TimeSpan.FromMinutes(5), CancellationToken.None); + await Task.WhenAny(eventTask, timerTask); + + if (shouldThrow) + { + throw new InvalidOperationException("Orchestration failed"); + } + + return $"{shouldThrow} -> output"; + } + + return this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(OrchestrationName, LongRunningOrchestration)); + }); + } + class DateTimeToleranceComparer : IEqualityComparer { public bool Equals(DateTimeOffset x, DateTimeOffset y) => (x - y).Duration() < TimeSpan.FromMilliseconds(100); From b62a50a8a568231c3a014c5b749b29880701bbad Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 10 Dec 2025 18:23:42 +0000 Subject: [PATCH 03/10] Refactor suspend/resume tests to reduce code duplication - Extracted PollForStatusAsync helper method for polling orchestration status - Added constants for polling timeout and interval - Reduced code duplication across test methods - Addressed code review feedback Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- .../GrpcDurableTaskClientIntegrationTests.cs | 99 +++++++------------ 1 file changed, 36 insertions(+), 63 deletions(-) diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index a985a88a9..2b3107c31 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -13,6 +13,8 @@ namespace Microsoft.DurableTask.Grpc.Tests; public class DurableTaskGrpcClientIntegrationTests : IntegrationTestBase { const string OrchestrationName = "TestOrchestration"; + const int PollingTimeoutSeconds = 5; + const int PollingIntervalMilliseconds = 100; public DurableTaskGrpcClientIntegrationTests(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture) : base(output, sidecarFixture) @@ -303,19 +305,9 @@ public async Task SuspendAndResumeInstance_EndToEnd() // Act - Suspend the orchestration await server.Client.SuspendInstanceAsync(instanceId, "Test suspension", default); - // Poll for suspended status (up to 5 seconds) - OrchestrationMetadata? suspendedMetadata = null; - DateTime deadline = DateTime.UtcNow.AddSeconds(5); - while (DateTime.UtcNow < deadline) - { - suspendedMetadata = await server.Client.GetInstanceAsync(instanceId, false); - if (suspendedMetadata?.RuntimeStatus == OrchestrationRuntimeStatus.Suspended) - { - break; - } - - await Task.Delay(TimeSpan.FromMilliseconds(100)); - } + // Poll for suspended status + OrchestrationMetadata? suspendedMetadata = await this.PollForStatusAsync( + server.Client, instanceId, OrchestrationRuntimeStatus.Suspended, default); // Assert - Verify orchestration is suspended suspendedMetadata.Should().NotBeNull(); @@ -325,19 +317,9 @@ public async Task SuspendAndResumeInstance_EndToEnd() // Act - Resume the orchestration await server.Client.ResumeInstanceAsync(instanceId, "Test resumption", default); - // Poll for running status (up to 5 seconds) - OrchestrationMetadata? resumedMetadata = null; - deadline = DateTime.UtcNow.AddSeconds(5); - while (DateTime.UtcNow < deadline) - { - resumedMetadata = await server.Client.GetInstanceAsync(instanceId, false); - if (resumedMetadata?.RuntimeStatus == OrchestrationRuntimeStatus.Running) - { - break; - } - - await Task.Delay(TimeSpan.FromMilliseconds(100)); - } + // Poll for running status + OrchestrationMetadata? resumedMetadata = await this.PollForStatusAsync( + server.Client, instanceId, OrchestrationRuntimeStatus.Running, default); // Assert - Verify orchestration is running again resumedMetadata.Should().NotBeNull(); @@ -367,19 +349,9 @@ public async Task SuspendInstance_WithoutReason_Succeeds() // Act - Suspend without a reason await server.Client.SuspendInstanceAsync(instanceId, cancellation: default); - // Poll for suspended status (up to 5 seconds) - OrchestrationMetadata? suspendedMetadata = null; - DateTime deadline = DateTime.UtcNow.AddSeconds(5); - while (DateTime.UtcNow < deadline) - { - suspendedMetadata = await server.Client.GetInstanceAsync(instanceId, false); - if (suspendedMetadata?.RuntimeStatus == OrchestrationRuntimeStatus.Suspended) - { - break; - } - - await Task.Delay(TimeSpan.FromMilliseconds(100)); - } + // Poll for suspended status + OrchestrationMetadata? suspendedMetadata = await this.PollForStatusAsync( + server.Client, instanceId, OrchestrationRuntimeStatus.Suspended, default); // Assert suspendedMetadata.Should().NotBeNull(); @@ -399,34 +371,14 @@ public async Task ResumeInstance_WithoutReason_Succeeds() await server.Client.SuspendInstanceAsync(instanceId, "Test suspension", default); // Wait for suspension - DateTime deadline = DateTime.UtcNow.AddSeconds(5); - while (DateTime.UtcNow < deadline) - { - OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); - if (metadata?.RuntimeStatus == OrchestrationRuntimeStatus.Suspended) - { - break; - } - - await Task.Delay(TimeSpan.FromMilliseconds(100)); - } + await this.PollForStatusAsync(server.Client, instanceId, OrchestrationRuntimeStatus.Suspended, default); // Act - Resume without a reason await server.Client.ResumeInstanceAsync(instanceId, cancellation: default); - // Poll for running status (up to 5 seconds) - OrchestrationMetadata? resumedMetadata = null; - deadline = DateTime.UtcNow.AddSeconds(5); - while (DateTime.UtcNow < deadline) - { - resumedMetadata = await server.Client.GetInstanceAsync(instanceId, false); - if (resumedMetadata?.RuntimeStatus == OrchestrationRuntimeStatus.Running) - { - break; - } - - await Task.Delay(TimeSpan.FromMilliseconds(100)); - } + // Poll for running status + OrchestrationMetadata? resumedMetadata = await this.PollForStatusAsync( + server.Client, instanceId, OrchestrationRuntimeStatus.Running, default); // Assert resumedMetadata.Should().NotBeNull(); @@ -529,6 +481,27 @@ static async Task LongRunningOrchestration(TaskOrchestrationContext cont }); } + async Task PollForStatusAsync( + DurableTaskClient client, + string instanceId, + OrchestrationRuntimeStatus expectedStatus, + CancellationToken cancellation = default) + { + DateTime deadline = DateTime.UtcNow.AddSeconds(PollingTimeoutSeconds); + while (DateTime.UtcNow < deadline) + { + OrchestrationMetadata? metadata = await client.GetInstanceAsync(instanceId, false, cancellation); + if (metadata?.RuntimeStatus == expectedStatus) + { + return metadata; + } + + await Task.Delay(TimeSpan.FromMilliseconds(PollingIntervalMilliseconds), cancellation); + } + + return await client.GetInstanceAsync(instanceId, false, cancellation); + } + class DateTimeToleranceComparer : IEqualityComparer { public bool Equals(DateTimeOffset x, DateTimeOffset y) => (x - y).Duration() < TimeSpan.FromMilliseconds(100); From 5d1414bec5fe06c8610c0d44fa33f486fb8e6d31 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 10 Dec 2025 18:26:02 +0000 Subject: [PATCH 04/10] Reduce long-running orchestration timer from 5 minutes to 30 seconds - More efficient for integration tests - Still sufficient for suspend/resume testing - Reduces resource usage and test execution time - Addressed code review feedback Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- .../GrpcDurableTaskClientIntegrationTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index 2b3107c31..2522c8460 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -462,9 +462,9 @@ Task StartLongRunningAsync() static async Task LongRunningOrchestration(TaskOrchestrationContext context, bool shouldThrow) { context.SetCustomStatus("waiting"); - // Wait for external event or a long timer (5 minutes) to allow suspend/resume operations + // Wait for external event or a timer (30 seconds) to allow suspend/resume operations Task eventTask = context.WaitForExternalEvent("event"); - Task timerTask = context.CreateTimer(TimeSpan.FromMinutes(5), CancellationToken.None); + Task timerTask = context.CreateTimer(TimeSpan.FromSeconds(30), CancellationToken.None); await Task.WhenAny(eventTask, timerTask); if (shouldThrow) From a784c1fbd6bc3c7f13c8badc29281847c8af394d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 13 Dec 2025 22:42:02 +0000 Subject: [PATCH 05/10] Resolve merge conflict with main branch - Rebased on latest main (f3a7fca) - Added dedupe status tests from main alongside suspend/resume tests - All 15 tests now present (5 dedupe + 5 suspend/resume + 5 original) - Resolved import conflicts (added Grpc.Core using statements) - All builds passing Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- src/Abstractions/TaskOptions.cs | 8 + src/Analyzers/AnalyzerReleases.Shipped.md | 9 + src/Analyzers/AnalyzerReleases.Unshipped.md | 4 +- src/Analyzers/Analyzers.csproj | 2 +- .../StartOrchestrationOptionsExtensions.cs | 36 ++ src/Client/Grpc/GrpcDurableTaskClient.cs | 37 +- src/Client/Grpc/ProtoUtils.cs | 80 ++- .../ShimDurableTaskClient.cs | 21 +- .../PayloadStore/BlobPayloadStore.cs | 34 +- .../Sidecar/Grpc/TaskHubGrpcServer.cs | 26 +- .../Abstractions.Tests.csproj | 1 + test/Abstractions.Tests/TaskOptionsTests.cs | 144 +++++- .../Grpc.Tests/GrpcDurableTaskClientTests.cs | 131 +++++ test/Client/Grpc.Tests/ProtoUtilsTests.cs | 463 ++++++++++++++++++ .../ShimDurableTaskClientTests.cs | 205 +++++++- .../GrpcDurableTaskClientIntegrationTests.cs | 174 +++++++ 16 files changed, 1328 insertions(+), 47 deletions(-) create mode 100644 src/Client/Core/StartOrchestrationOptionsExtensions.cs create mode 100644 test/Client/Grpc.Tests/GrpcDurableTaskClientTests.cs create mode 100644 test/Client/Grpc.Tests/ProtoUtilsTests.cs diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index f8e527fe1..c38144b61 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -134,4 +134,12 @@ public record StartOrchestrationOptions(string? InstanceId = null, DateTimeOffse /// Gets the version to associate with the orchestration instance. /// public TaskVersion? Version { get; init; } + + /// + /// Gets the orchestration runtime statuses that should be considered for deduplication. + /// + /// + /// For type-safe usage, use the WithDedupeStatuses extension method. + /// + public IReadOnlyList? DedupeStatuses { get; init; } } diff --git a/src/Analyzers/AnalyzerReleases.Shipped.md b/src/Analyzers/AnalyzerReleases.Shipped.md index c25049810..b3ea2041f 100644 --- a/src/Analyzers/AnalyzerReleases.Shipped.md +++ b/src/Analyzers/AnalyzerReleases.Shipped.md @@ -1,6 +1,15 @@ ; Shipped analyzer releases ; https://github.com/dotnet/roslyn/blob/main/src/RoslynAnalyzers/Microsoft.CodeAnalysis.Analyzers/ReleaseTrackingAnalyzers.Help.md +## Release 0.2.0 + +### New Rules + +Rule ID | Category | Severity | Notes +--------|----------|----------|------- +DURABLE2003 | Activity | Warning | **FunctionNotFoundAnalyzer**: Warns when an activity function call references a name that does not match any defined activity in the compilation. +DURABLE2004 | Orchestration | Warning | **FunctionNotFoundAnalyzer**: Warns when a sub-orchestration call references a name that does not match any defined orchestrator in the compilation. + ## Release 0.1.0 ### New Rules diff --git a/src/Analyzers/AnalyzerReleases.Unshipped.md b/src/Analyzers/AnalyzerReleases.Unshipped.md index 150fb7356..d9cf2bd76 100644 --- a/src/Analyzers/AnalyzerReleases.Unshipped.md +++ b/src/Analyzers/AnalyzerReleases.Unshipped.md @@ -4,6 +4,4 @@ ### New Rules Rule ID | Category | Severity | Notes ---------|----------|----------|------- -DURABLE2003 | Activity | Warning | **FunctionNotFoundAnalyzer**: Warns when an activity function call references a name that does not match any defined activity in the compilation. -DURABLE2004 | Orchestration | Warning | **FunctionNotFoundAnalyzer**: Warns when a sub-orchestration call references a name that does not match any defined orchestrator in the compilation. \ No newline at end of file +--------|----------|----------|------- \ No newline at end of file diff --git a/src/Analyzers/Analyzers.csproj b/src/Analyzers/Analyzers.csproj index dad1b98a3..be79830c6 100644 --- a/src/Analyzers/Analyzers.csproj +++ b/src/Analyzers/Analyzers.csproj @@ -11,7 +11,7 @@ - 0.1.0 + 0.2.0 .NET Analyzers for the Durable Task SDK. en diff --git a/src/Client/Core/StartOrchestrationOptionsExtensions.cs b/src/Client/Core/StartOrchestrationOptionsExtensions.cs new file mode 100644 index 000000000..4bfac52fd --- /dev/null +++ b/src/Client/Core/StartOrchestrationOptionsExtensions.cs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Linq; + +namespace Microsoft.DurableTask.Client; + +/// +/// Extension methods for to provide type-safe deduplication status configuration. +/// +public static class StartOrchestrationOptionsExtensions +{ + public static readonly OrchestrationRuntimeStatus[] ValidDedupeStatuses = new[] + { + OrchestrationRuntimeStatus.Completed, + OrchestrationRuntimeStatus.Failed, + OrchestrationRuntimeStatus.Terminated, + OrchestrationRuntimeStatus.Canceled, + }; + + /// + /// Creates a new with the specified deduplication statuses. + /// + /// The base options to extend. + /// The orchestration runtime statuses that should be considered for deduplication. + /// A new instance with the deduplication statuses set. + public static StartOrchestrationOptions WithDedupeStatuses( + this StartOrchestrationOptions options, + params OrchestrationRuntimeStatus[] dedupeStatuses) + { + return options with + { + DedupeStatuses = dedupeStatuses.Select(s => s.ToString()).ToList(), + }; + } +} diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index 766868926..b6a6c72d8 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Collections.Immutable; using System.Diagnostics; using System.Text; using DurableTask.Core.History; @@ -91,7 +92,7 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( version = this.options.DefaultVersion; } - var request = new P.CreateInstanceRequest + P.CreateInstanceRequest request = new() { Name = orchestratorName.Name, Version = version, @@ -122,6 +123,34 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( request.ScheduledStartTimestamp = Timestamp.FromDateTimeOffset(startAt.Value.ToUniversalTime()); } + // Set orchestration ID reuse policy for deduplication support + // Note: This requires the protobuf to support OrchestrationIdReusePolicy field + // If the protobuf doesn't support it yet, this will need to be updated when the protobuf is updated + if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0) + { + // Parse and validate all status strings to enum first + ImmutableHashSet dedupeStatuses = options.DedupeStatuses + .Select(s => + { + if (!System.Enum.TryParse(s, ignoreCase: true, out OrchestrationRuntimeStatus status)) + { + throw new ArgumentException( + $"Invalid orchestration runtime status: '{s}' for deduplication."); + } + + return status; + }).ToImmutableHashSet(); + + // Convert dedupe statuses to protobuf statuses and create reuse policy + IEnumerable dedupeStatusesProto = dedupeStatuses.Select(s => s.ToGrpcStatus()); + P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatusesProto); + + if (policy != null) + { + request.OrchestrationIdReusePolicy = policy; + } + } + using Activity? newActivity = TraceHelper.StartActivityForNewOrchestration(request); P.CreateInstanceResponse? result = await this.sidecarClient.StartInstanceAsync( @@ -405,7 +434,7 @@ public override async Task RestartAsync( Check.NotNullOrEmpty(instanceId); Check.NotEntity(this.options.EnableEntitySupport, instanceId); - var request = new P.RestartInstanceRequest + P.RestartInstanceRequest request = new P.RestartInstanceRequest { InstanceId = instanceId, RestartWithNewInstanceId = restartWithNewInstanceId, @@ -441,7 +470,7 @@ public override async Task RewindInstanceAsync( Check.NotNullOrEmpty(instanceId); Check.NotEntity(this.options.EnableEntitySupport, instanceId); - var request = new P.RewindInstanceRequest + P.RewindInstanceRequest request = new P.RewindInstanceRequest { InstanceId = instanceId, Reason = reason, @@ -573,7 +602,7 @@ async Task PurgeInstancesCoreAsync( OrchestrationMetadata CreateMetadata(P.OrchestrationState state, bool includeInputsAndOutputs) { - var metadata = new OrchestrationMetadata(state.Name, state.InstanceId) + OrchestrationMetadata metadata = new OrchestrationMetadata(state.Name, state.InstanceId) { CreatedAt = state.CreatedTimestamp.ToDateTimeOffset(), LastUpdatedAt = state.LastUpdatedTimestamp.ToDateTimeOffset(), diff --git a/src/Client/Grpc/ProtoUtils.cs b/src/Client/Grpc/ProtoUtils.cs index f5bc750d7..f307f43fc 100644 --- a/src/Client/Grpc/ProtoUtils.cs +++ b/src/Client/Grpc/ProtoUtils.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Collections.Immutable; +using System.Linq; using P = Microsoft.DurableTask.Protobuf; namespace Microsoft.DurableTask.Client.Grpc; @@ -8,8 +10,84 @@ namespace Microsoft.DurableTask.Client.Grpc; /// /// Protobuf helpers and utilities. /// -static class ProtoUtils +public static class ProtoUtils { + /// + /// Gets the terminal orchestration statuses that are commonly used for deduplication. + /// These are the statuses that can be used in OrchestrationIdReusePolicy. + /// + /// An immutable array of terminal orchestration statuses. + public static ImmutableArray GetTerminalStatuses() + { +#pragma warning disable CS0618 // Type or member is obsolete - Canceled is intentionally included for compatibility + return ImmutableArray.Create( + P.OrchestrationStatus.Completed, + P.OrchestrationStatus.Failed, + P.OrchestrationStatus.Terminated, + P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + } + + /// + /// Converts dedupe statuses (statuses that should NOT be replaced) to an OrchestrationIdReusePolicy + /// with replaceable statuses (statuses that CAN be replaced). + /// + /// The orchestration statuses that should NOT be replaced. These are statuses for which an exception should be thrown if an orchestration already exists. + /// An OrchestrationIdReusePolicy with replaceable statuses set, or null if all terminal statuses are dedupe statuses. + /// + /// The policy uses "replaceableStatus" - these are statuses that CAN be replaced. + /// dedupeStatuses are statuses that should NOT be replaced. + /// So replaceableStatus = all terminal statuses MINUS dedupeStatuses. + /// + public static P.OrchestrationIdReusePolicy? ConvertDedupeStatusesToReusePolicy( + IEnumerable? dedupeStatuses) + { + ImmutableArray terminalStatuses = GetTerminalStatuses(); + ImmutableHashSet dedupeStatusSet = dedupeStatuses?.ToImmutableHashSet() ?? ImmutableHashSet.Empty; + + P.OrchestrationIdReusePolicy policy = new(); + + // Add terminal statuses that are NOT in dedupeStatuses as replaceable + foreach (P.OrchestrationStatus terminalStatus in terminalStatuses.Where(status => !dedupeStatusSet.Contains(status))) + { + policy.ReplaceableStatus.Add(terminalStatus); + } + + // Only return policy if we have replaceable statuses + return policy.ReplaceableStatus.Count > 0 ? policy : null; + } + + /// + /// Converts an OrchestrationIdReusePolicy with replaceable statuses to dedupe statuses + /// (statuses that should NOT be replaced). + /// + /// The OrchestrationIdReusePolicy containing replaceable statuses. + /// An array of orchestration statuses that should NOT be replaced, or null if all terminal statuses are replaceable. + /// + /// The policy uses "replaceableStatus" - these are statuses that CAN be replaced. + /// dedupeStatuses are statuses that should NOT be replaced (should throw exception). + /// So dedupeStatuses = all terminal statuses MINUS replaceableStatus. + /// + public static P.OrchestrationStatus[]? ConvertReusePolicyToDedupeStatuses( + P.OrchestrationIdReusePolicy? policy) + { + if (policy == null || policy.ReplaceableStatus.Count == 0) + { + return null; + } + + ImmutableArray terminalStatuses = GetTerminalStatuses(); + ImmutableHashSet replaceableStatusSet = policy.ReplaceableStatus.ToImmutableHashSet(); + + // Calculate dedupe statuses = terminal statuses - replaceable statuses + P.OrchestrationStatus[] dedupeStatuses = terminalStatuses + .Where(terminalStatus => !replaceableStatusSet.Contains(terminalStatus)) + .ToArray(); + + // Only return if there are dedupe statuses + return dedupeStatuses.Length > 0 ? dedupeStatuses : null; + } + #pragma warning disable 0618 // Referencing Obsolete member. This is intention as we are only converting it. /// /// Converts to . diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index 4fbf828d0..f6b6140f1 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -7,6 +7,7 @@ using DurableTask.Core; using DurableTask.Core.History; using DurableTask.Core.Query; +using Microsoft.DurableTask.Client; using Microsoft.DurableTask.Client.Entities; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -192,7 +193,23 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( }, }; - await this.Client.CreateTaskOrchestrationAsync(message); + Core.OrchestrationStatus[]? dedupeStatuses = null; + if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0) + { + dedupeStatuses = options.DedupeStatuses + .Select(s => + { + if (!Enum.TryParse(s, ignoreCase: true, out var status)) + { + throw new ArgumentException( + $"Invalid orchestration runtime status: '{s}' for deduplication."); + } + return status.ConvertToCore(); + }) + .ToArray(); + } + + await this.Client.CreateTaskOrchestrationAsync(message, dedupeStatuses); return instanceId; } @@ -303,7 +320,7 @@ public override async Task RestartAsync( }, }; - await this.Client.CreateTaskOrchestrationAsync(message); + await this.Client.CreateTaskOrchestrationAsync(message, dedupeStatuses: null); return newInstanceId; } diff --git a/src/Extensions/AzureBlobPayloads/PayloadStore/BlobPayloadStore.cs b/src/Extensions/AzureBlobPayloads/PayloadStore/BlobPayloadStore.cs index 9edced26d..ec66a44bd 100644 --- a/src/Extensions/AzureBlobPayloads/PayloadStore/BlobPayloadStore.cs +++ b/src/Extensions/AzureBlobPayloads/PayloadStore/BlobPayloadStore.cs @@ -2,7 +2,9 @@ // Licensed under the MIT License. using System.IO.Compression; +using System.Net; using System.Text; +using Azure; using Azure.Core; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; @@ -117,20 +119,30 @@ public override async Task DownloadAsync(string token, CancellationToken BlobClient blob = this.containerClient.GetBlobClient(name); - using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken: cancellationToken); - Stream contentStream = result.Content; - bool isGzip = string.Equals( - result.Details.ContentEncoding, ContentEncodingGzip, StringComparison.OrdinalIgnoreCase); + try + { + using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken: cancellationToken); + Stream contentStream = result.Content; + bool isGzip = string.Equals( + result.Details.ContentEncoding, ContentEncodingGzip, StringComparison.OrdinalIgnoreCase); + + if (isGzip) + { + using GZipStream decompressed = new(contentStream, CompressionMode.Decompress); + using StreamReader reader = new(decompressed, Encoding.UTF8); + return await ReadToEndAsync(reader, cancellationToken); + } - if (isGzip) + using StreamReader uncompressedReader = new(contentStream, Encoding.UTF8); + return await ReadToEndAsync(uncompressedReader, cancellationToken); + } + catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.NotFound) { - using GZipStream decompressed = new(contentStream, CompressionMode.Decompress); - using StreamReader reader = new(decompressed, Encoding.UTF8); - return await ReadToEndAsync(reader, cancellationToken); + throw new InvalidOperationException( + $"The blob '{name}' was not found in container '{container}'. " + + "The payload may have been deleted or the container was never created.", + ex); } - - using StreamReader uncompressedReader = new(contentStream, Encoding.UTF8); - return await ReadToEndAsync(uncompressedReader, cancellationToken); } /// diff --git a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs index 15f65dd84..bbb162c3c 100644 --- a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs +++ b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs @@ -1,14 +1,17 @@ -// Copyright (c) Microsoft Corporation. +// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. using System.Collections.Concurrent; using System.Diagnostics; using System.Globalization; +using System.Linq; using DurableTask.Core; +using DurableTask.Core.Exceptions; using DurableTask.Core.History; using DurableTask.Core.Query; using Google.Protobuf.WellKnownTypes; using Grpc.Core; +using Microsoft.DurableTask.Client.Grpc; using Microsoft.DurableTask.Testing.Sidecar.Dispatcher; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -202,6 +205,18 @@ async Task WaitForWorkItemClientConnection() try { + // Convert OrchestrationIdReusePolicy to dedupeStatuses + // The policy uses "replaceableStatus" - these are statuses that CAN be replaced + // dedupeStatuses are statuses that should NOT be replaced (should throw exception) + // So dedupeStatuses = all terminal statuses MINUS replaceableStatus + OrchestrationStatus[]? dedupeStatuses = null; + P.OrchestrationStatus[]? dedupeStatusesProto = ProtoUtils.ConvertReusePolicyToDedupeStatuses(request.OrchestrationIdReusePolicy); + if (dedupeStatusesProto != null) + { + // Convert protobuf statuses to Core.OrchestrationStatus + dedupeStatuses = dedupeStatusesProto.Select(s => (OrchestrationStatus)s).ToArray(); + } + await this.client.CreateTaskOrchestrationAsync( new TaskMessage { @@ -216,7 +231,14 @@ await this.client.CreateTaskOrchestrationAsync( : null }, OrchestrationInstance = instance, - }); + }, + dedupeStatuses); + } + catch (OrchestrationAlreadyExistsException e) + { + // Convert to gRPC exception + this.log.LogWarning(e, "Orchestration with ID {InstanceId} already exists", instance.InstanceId); + throw new RpcException(new Status(StatusCode.AlreadyExists, e.Message)); } catch (Exception e) { diff --git a/test/Abstractions.Tests/Abstractions.Tests.csproj b/test/Abstractions.Tests/Abstractions.Tests.csproj index 9e3b1ad51..665f53f64 100644 --- a/test/Abstractions.Tests/Abstractions.Tests.csproj +++ b/test/Abstractions.Tests/Abstractions.Tests.csproj @@ -6,6 +6,7 @@ + diff --git a/test/Abstractions.Tests/TaskOptionsTests.cs b/test/Abstractions.Tests/TaskOptionsTests.cs index fe4a101e0..3e805a9ef 100644 --- a/test/Abstractions.Tests/TaskOptionsTests.cs +++ b/test/Abstractions.Tests/TaskOptionsTests.cs @@ -1,13 +1,15 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace Microsoft.DurableTask.Tests; - -public class TaskOptionsTests -{ - [Fact] - public void Empty_Ctors_Okay() - { +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Client; + +namespace Microsoft.DurableTask.Tests; + +public class TaskOptionsTests +{ + [Fact] + public void Empty_Ctors_Okay() + { TaskOptions options = new(); options.Retry.Should().BeNull(); options.Tags.Should().BeNull(); @@ -21,21 +23,129 @@ public void Empty_Ctors_Okay() startOptions.Version.Should().BeNull(); startOptions.InstanceId.Should().BeNull(); startOptions.StartAt.Should().BeNull(); - startOptions.Tags.Should().BeEmpty(); + startOptions.Tags.Should().BeEmpty(); } - [Fact] - public void SubOrchestrationOptions_InstanceId_Correct() + [Fact] + public void SubOrchestrationOptions_InstanceId_Correct() { string instanceId = Guid.NewGuid().ToString(); SubOrchestrationOptions subOptions = new(new TaskOptions(), instanceId); instanceId.Equals(subOptions.InstanceId).Should().BeTrue(); string subInstanceId = Guid.NewGuid().ToString(); - subOptions = new(new SubOrchestrationOptions(instanceId: subInstanceId)); + subOptions = new(new SubOrchestrationOptions(instanceId: subInstanceId)); subInstanceId.Equals(subOptions.InstanceId).Should().BeTrue(); - subOptions = new(new SubOrchestrationOptions(instanceId: subInstanceId), instanceId); + subOptions = new(new SubOrchestrationOptions(instanceId: subInstanceId), instanceId); instanceId.Equals(subOptions.InstanceId).Should().BeTrue(); - } -} + } + + [Fact] + public void WithDedupeStatuses_SetsCorrectStringValues() + { + // Arrange + StartOrchestrationOptions options = new(); + OrchestrationRuntimeStatus[] statuses = new[] + { + OrchestrationRuntimeStatus.Completed, + OrchestrationRuntimeStatus.Failed, + OrchestrationRuntimeStatus.Terminated, + }; + + // Act + StartOrchestrationOptions result = options.WithDedupeStatuses(statuses); + + // Assert + result.DedupeStatuses.Should().NotBeNull(); + result.DedupeStatuses.Should().HaveCount(3); + result.DedupeStatuses.Should().Contain("Completed"); + result.DedupeStatuses.Should().Contain("Failed"); + result.DedupeStatuses.Should().Contain("Terminated"); + } + + [Fact] + public void WithDedupeStatuses_HandlesEmptyArray() + { + // Arrange + StartOrchestrationOptions options = new(); + + // Act + StartOrchestrationOptions result = options.WithDedupeStatuses(); + + // Assert + result.DedupeStatuses.Should().NotBeNull(); + result.DedupeStatuses.Should().BeEmpty(); + } + + [Fact] + public void WithDedupeStatuses_HandlesEmptyArrayExplicit() + { + // Arrange + StartOrchestrationOptions options = new(); + OrchestrationRuntimeStatus[] statuses = Array.Empty(); + + // Act + StartOrchestrationOptions result = options.WithDedupeStatuses(statuses); + + // Assert + result.DedupeStatuses.Should().NotBeNull(); + result.DedupeStatuses.Should().BeEmpty(); + } + + [Fact] + public void WithDedupeStatuses_PreservesOtherProperties() + { + // Arrange + string instanceId = Guid.NewGuid().ToString(); + DateTimeOffset startAt = DateTimeOffset.UtcNow.AddHours(1); + StartOrchestrationOptions options = new(instanceId, startAt); + + // Act + StartOrchestrationOptions result = options.WithDedupeStatuses( + OrchestrationRuntimeStatus.Completed, + OrchestrationRuntimeStatus.Failed); + + // Assert + result.InstanceId.Should().Be(instanceId); + result.StartAt.Should().Be(startAt); + result.DedupeStatuses.Should().NotBeNull(); + result.DedupeStatuses.Should().HaveCount(2); + } + + [Fact] + public void ValidDedupeStatuses_ContainsExpectedTerminalStatuses() + { + // Act +#pragma warning disable CS0618 // Type or member is obsolete - Canceled is intentionally included for compatibility + OrchestrationRuntimeStatus[] validStatuses = StartOrchestrationOptionsExtensions.ValidDedupeStatuses; + + // Assert + validStatuses.Should().NotBeNull(); + validStatuses.Should().HaveCount(4); + validStatuses.Should().Contain(OrchestrationRuntimeStatus.Completed); + validStatuses.Should().Contain(OrchestrationRuntimeStatus.Failed); + validStatuses.Should().Contain(OrchestrationRuntimeStatus.Terminated); + validStatuses.Should().Contain(OrchestrationRuntimeStatus.Canceled); +#pragma warning restore CS0618 + } + + [Fact] + public void WithDedupeStatuses_ConvertsAllEnumValuesToStrings() + { + // Arrange + StartOrchestrationOptions options = new(); + OrchestrationRuntimeStatus[] allStatuses = Enum.GetValues(); + + // Act + StartOrchestrationOptions result = options.WithDedupeStatuses(allStatuses); + + // Assert + result.DedupeStatuses.Should().NotBeNull(); + result.DedupeStatuses.Should().HaveCount(allStatuses.Length); + foreach (OrchestrationRuntimeStatus status in allStatuses) + { + result.DedupeStatuses.Should().Contain(status.ToString()); + } + } +} diff --git a/test/Client/Grpc.Tests/GrpcDurableTaskClientTests.cs b/test/Client/Grpc.Tests/GrpcDurableTaskClientTests.cs new file mode 100644 index 000000000..8d098106c --- /dev/null +++ b/test/Client/Grpc.Tests/GrpcDurableTaskClientTests.cs @@ -0,0 +1,131 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Grpc.Core; +using Microsoft.DurableTask.Client; +using Microsoft.Extensions.Logging; + +namespace Microsoft.DurableTask.Client.Grpc.Tests; + +public class GrpcDurableTaskClientTests +{ + readonly Mock loggerMock = new(); + + GrpcDurableTaskClient CreateClient() + { + var callInvoker = Mock.Of(); + var options = new GrpcDurableTaskClientOptions + { + CallInvoker = callInvoker, + }; + + return new GrpcDurableTaskClient("test", options, this.loggerMock.Object); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstanceAsync_InvalidDedupeStatus_ThrowsArgumentException() + { + // Arrange + var client = this.CreateClient(); + var startOptions = new StartOrchestrationOptions + { + DedupeStatuses = new[] { "InvalidStatus", "AnotherInvalidStatus" }, + }; + + // Act & Assert + Func act = async () => await client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + startOptions); + + var exception = await act.Should().ThrowAsync(); + exception.Which.Message.Should().Contain("Invalid orchestration runtime status: 'InvalidStatus' for deduplication."); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstanceAsync_InvalidDedupeStatus_ContainsInvalidStatusInMessage() + { + // Arrange + var client = this.CreateClient(); + var startOptions = new StartOrchestrationOptions + { + DedupeStatuses = new[] { "NonExistentStatus" }, + }; + + // Act & Assert + Func act = async () => await client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + startOptions); + + var exception = await act.Should().ThrowAsync(); + exception.Which.Message.Should().Contain("'NonExistentStatus'"); + exception.Which.Message.Should().Contain("for deduplication"); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstanceAsync_MixedValidAndInvalidStatus_ThrowsArgumentException() + { + // Arrange + var client = this.CreateClient(); + var startOptions = new StartOrchestrationOptions + { + DedupeStatuses = new[] { "Completed", "InvalidStatus", "Failed" }, + }; + + // Act & Assert + Func act = async () => await client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + startOptions); + + var exception = await act.Should().ThrowAsync(); + exception.Which.Message.Should().Contain("Invalid orchestration runtime status: 'InvalidStatus' for deduplication."); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstanceAsync_CaseInsensitiveValidStatus_DoesNotThrowArgumentException() + { + // Arrange + var client = this.CreateClient(); + var startOptions = new StartOrchestrationOptions + { + DedupeStatuses = new[] { "completed", "FAILED", "Terminated" }, + }; + + // Act & Assert - Case-insensitive parsing should work, so no ArgumentException should be thrown + // The call will fail at the gRPC level, but validation should pass + Func act = async () => await client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + startOptions); + + // Should not throw ArgumentException for invalid status (case-insensitive parsing works) + // It may throw other exceptions due to gRPC call failure, but not ArgumentException + var exception = await act.Should().ThrowAsync(); + exception.Which.Should().NotBeOfType(); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstanceAsync_ValidDedupeStatus_DoesNotThrowArgumentException() + { + // Arrange + var client = this.CreateClient(); + var startOptions = new StartOrchestrationOptions + { + DedupeStatuses = new[] { "Completed", "Failed" }, + }; + + // Act & Assert - Valid statuses should pass validation + // The call will fail at the gRPC level, but validation should pass + Func act = async () => await client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + startOptions); + + // Should not throw ArgumentException for invalid status since "Completed" and "Failed" are valid + var exception = await act.Should().ThrowAsync(); + exception.Which.Should().NotBeOfType(); + } +} + diff --git a/test/Client/Grpc.Tests/ProtoUtilsTests.cs b/test/Client/Grpc.Tests/ProtoUtilsTests.cs new file mode 100644 index 000000000..4db7a8845 --- /dev/null +++ b/test/Client/Grpc.Tests/ProtoUtilsTests.cs @@ -0,0 +1,463 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections.Immutable; +using P = Microsoft.DurableTask.Protobuf; + +namespace Microsoft.DurableTask.Client.Grpc.Tests; + +public class ProtoUtilsTests +{ + [Fact] + public void GetTerminalStatuses_ReturnsExpectedStatuses() + { + // Act + ImmutableArray terminalStatuses = ProtoUtils.GetTerminalStatuses(); + + // Assert + terminalStatuses.Should().HaveCount(4); + terminalStatuses.Should().Contain(P.OrchestrationStatus.Completed); + terminalStatuses.Should().Contain(P.OrchestrationStatus.Failed); + terminalStatuses.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + terminalStatuses.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + } + + [Fact] + public void GetTerminalStatuses_ReturnsImmutableArray() + { + // Act + ImmutableArray terminalStatuses = ProtoUtils.GetTerminalStatuses(); + + // Assert + terminalStatuses.IsDefault.Should().BeFalse(); + terminalStatuses.IsEmpty.Should().BeFalse(); + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_EmptyArray_ReturnsPolicyWithAllTerminalStatuses() + { + // Arrange + var dedupeStatuses = Array.Empty(); + + // Act + P.OrchestrationIdReusePolicy? result = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + // Empty array means no dedupe statuses, so all terminal statuses are replaceable + result.Should().NotBeNull(); + result!.ReplaceableStatus.Should().HaveCount(4); + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_AllTerminalStatuses_ReturnsNull() + { + // Arrange + ImmutableArray allTerminalStatuses = ProtoUtils.GetTerminalStatuses(); + var dedupeStatuses = allTerminalStatuses.ToArray(); + + // Act + P.OrchestrationIdReusePolicy? result = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + result.Should().BeNull(); + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_NoDedupeStatuses_ReturnsPolicyWithAllTerminalStatuses() + { + // Arrange + var dedupeStatuses = Array.Empty(); + + // Act + P.OrchestrationIdReusePolicy? result = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + // When no dedupe statuses, all terminal statuses should be replaceable + result.Should().NotBeNull(); + result!.ReplaceableStatus.Should().HaveCount(4); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Completed); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Failed); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_SingleDedupeStatus_ReturnsPolicyWithRemainingStatuses() + { + // Arrange + var dedupeStatuses = new[] { P.OrchestrationStatus.Completed }; + + // Act + P.OrchestrationIdReusePolicy? result = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + result.Should().NotBeNull(); + result!.ReplaceableStatus.Should().HaveCount(3); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Failed); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + result.ReplaceableStatus.Should().NotContain(P.OrchestrationStatus.Completed); + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_MultipleDedupeStatuses_ReturnsPolicyWithRemainingStatuses() + { + // Arrange + var dedupeStatuses = new[] + { + P.OrchestrationStatus.Completed, + P.OrchestrationStatus.Failed + }; + + // Act + P.OrchestrationIdReusePolicy? result = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + result.Should().NotBeNull(); + result!.ReplaceableStatus.Should().HaveCount(2); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + result.ReplaceableStatus.Should().NotContain(P.OrchestrationStatus.Completed); + result.ReplaceableStatus.Should().NotContain(P.OrchestrationStatus.Failed); + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_DuplicateDedupeStatuses_HandlesDuplicates() + { + // Arrange + var dedupeStatuses = new[] + { + P.OrchestrationStatus.Completed, + P.OrchestrationStatus.Completed, + P.OrchestrationStatus.Failed + }; + + // Act + P.OrchestrationIdReusePolicy? result = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + result.Should().NotBeNull(); + result!.ReplaceableStatus.Should().HaveCount(2); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_NonTerminalStatus_IgnoresNonTerminalStatus() + { + // Arrange + var dedupeStatuses = new[] + { + P.OrchestrationStatus.Completed, + P.OrchestrationStatus.Running, // Non-terminal status + P.OrchestrationStatus.Pending // Non-terminal status + }; + + // Act + P.OrchestrationIdReusePolicy? result = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + result.Should().NotBeNull(); + result!.ReplaceableStatus.Should().HaveCount(3); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Failed); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + result.ReplaceableStatus.Should().NotContain(P.OrchestrationStatus.Completed); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_NullPolicy_ReturnsNull() + { + // Arrange + P.OrchestrationIdReusePolicy? policy = null; + + // Act + P.OrchestrationStatus[]? result = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + result.Should().BeNull(); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_EmptyPolicy_ReturnsNull() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + + // Act + P.OrchestrationStatus[]? result = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + result.Should().BeNull(); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_AllTerminalStatusesReplaceable_ReturnsNull() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + ImmutableArray terminalStatuses = ProtoUtils.GetTerminalStatuses(); + foreach (var status in terminalStatuses) + { + policy.ReplaceableStatus.Add(status); + } + + // Act + P.OrchestrationStatus[]? result = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + result.Should().BeNull(); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_SingleReplaceableStatus_ReturnsRemainingStatuses() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Completed); + + // Act + P.OrchestrationStatus[]? result = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + result.Should().NotBeNull(); + result!.Should().HaveCount(3); + result.Should().Contain(P.OrchestrationStatus.Failed); + result.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + result.Should().NotContain(P.OrchestrationStatus.Completed); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_MultipleReplaceableStatuses_ReturnsRemainingStatuses() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Completed); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Failed); + + // Act + P.OrchestrationStatus[]? result = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + result.Should().NotBeNull(); + result!.Should().HaveCount(2); + result.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + result.Should().NotContain(P.OrchestrationStatus.Completed); + result.Should().NotContain(P.OrchestrationStatus.Failed); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_NonTerminalStatusInPolicy_IgnoresNonTerminalStatus() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Completed); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Running); // Non-terminal status + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Pending); // Non-terminal status + + // Act + P.OrchestrationStatus[]? result = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + result.Should().NotBeNull(); + result!.Should().HaveCount(3); + result.Should().Contain(P.OrchestrationStatus.Failed); + result.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + result.Should().NotContain(P.OrchestrationStatus.Completed); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_DuplicateReplaceableStatuses_HandlesDuplicates() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Completed); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Completed); // Duplicate + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Failed); + + // Act + P.OrchestrationStatus[]? result = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + result.Should().NotBeNull(); + result!.Should().HaveCount(2); + result.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_ThenConvertBack_ReturnsOriginalDedupeStatuses() + { + // Arrange + var originalDedupeStatuses = new[] + { + P.OrchestrationStatus.Completed, + P.OrchestrationStatus.Failed + }; + + // Act + P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(originalDedupeStatuses); + P.OrchestrationStatus[]? convertedBack = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + convertedBack.Should().NotBeNull(); + convertedBack!.Should().BeEquivalentTo(originalDedupeStatuses); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_ThenConvertBack_ReturnsOriginalPolicy() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Completed); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Failed); + + // Act + P.OrchestrationStatus[]? dedupeStatuses = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + P.OrchestrationIdReusePolicy? convertedBack = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + convertedBack.Should().NotBeNull(); + convertedBack!.ReplaceableStatus.Should().BeEquivalentTo(policy.ReplaceableStatus); + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_AllStatuses_ThenConvertBack_ReturnsNull() + { + // Arrange + ImmutableArray allTerminalStatuses = ProtoUtils.GetTerminalStatuses(); + var dedupeStatuses = allTerminalStatuses.ToArray(); + + // Act + P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + P.OrchestrationStatus[]? convertedBack = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + policy.Should().BeNull(); + convertedBack.Should().BeNull(); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_AllStatuses_ThenConvertBack_ReturnsPolicyWithAllStatuses() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + ImmutableArray terminalStatuses = ProtoUtils.GetTerminalStatuses(); + foreach (var status in terminalStatuses) + { + policy.ReplaceableStatus.Add(status); + } + + // Act + P.OrchestrationStatus[]? dedupeStatuses = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + P.OrchestrationIdReusePolicy? convertedBack = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + // Policy with all statuses -> no dedupe statuses -> null + // null dedupe statuses -> all are replaceable -> policy with all statuses + dedupeStatuses.Should().BeNull(); + convertedBack.Should().NotBeNull(); + convertedBack!.ReplaceableStatus.Should().HaveCount(4); + convertedBack.ReplaceableStatus.Should().BeEquivalentTo(policy.ReplaceableStatus); + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_EmptyArray_ThenConvertBack_ReturnsNull() + { + // Arrange + var dedupeStatuses = Array.Empty(); + + // Act + P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + P.OrchestrationStatus[]? convertedBack = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + // Empty dedupe statuses -> all terminal statuses are replaceable -> policy with all statuses + // Policy with all statuses -> no dedupe statuses -> null + policy.Should().NotBeNull(); + convertedBack.Should().BeNull(); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_EmptyPolicy_ThenConvertBack_ReturnsPolicyWithAllStatuses() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + + // Act + P.OrchestrationStatus[]? dedupeStatuses = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + P.OrchestrationIdReusePolicy? convertedBack = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + // Empty policy (no replaceable statuses) -> ConvertReusePolicyToDedupeStatuses returns null + // null dedupe statuses -> all terminal statuses are replaceable -> policy with all statuses + dedupeStatuses.Should().BeNull(); + convertedBack.Should().NotBeNull(); + convertedBack!.ReplaceableStatus.Should().HaveCount(4); + } + + [Theory] + [InlineData(P.OrchestrationStatus.Completed)] + [InlineData(P.OrchestrationStatus.Failed)] + [InlineData(P.OrchestrationStatus.Terminated)] + public void ConvertDedupeStatusesToReusePolicy_SingleStatus_ThenConvertBack_ReturnsOriginal( + P.OrchestrationStatus dedupeStatus) + { + // Arrange + var dedupeStatuses = new[] { dedupeStatus }; + + // Act + P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + P.OrchestrationStatus[]? convertedBack = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + convertedBack.Should().NotBeNull(); + convertedBack!.Should().ContainSingle(); + convertedBack.Should().Contain(dedupeStatus); + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_ThreeOutOfFourStatuses_ThenConvertBack_ReturnsOriginal() + { + // Arrange + var dedupeStatuses = new[] + { + P.OrchestrationStatus.Completed, + P.OrchestrationStatus.Failed, + P.OrchestrationStatus.Terminated + }; + + // Act + P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + P.OrchestrationStatus[]? convertedBack = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + convertedBack.Should().NotBeNull(); + convertedBack!.Should().HaveCount(3); + convertedBack.Should().BeEquivalentTo(dedupeStatuses); + } +} + diff --git a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs index fa65cc42a..9509d1e0b 100644 --- a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs +++ b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs @@ -326,6 +326,195 @@ public async Task ScheduleNewOrchestrationInstance_IdProvided_TagsProvided() await this.RunScheduleNewOrchestrationInstanceAsync("test", "input", options); } + [Fact] + public async Task ScheduleNewOrchestrationInstance_InvalidDedupeStatus_ThrowsArgumentException() + { + // Arrange + StartOrchestrationOptions options = new() + { + DedupeStatuses = new[] { "InvalidStatus" }, + }; + + // Act & Assert + Func act = async () => await this.client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + options, + default); + + var exception = await act.Should().ThrowAsync(); + exception.Which.Message.Should().Contain("Invalid orchestration runtime status: 'InvalidStatus' for deduplication."); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_InvalidDedupeStatus_ContainsInvalidStatusInMessage() + { + // Arrange + StartOrchestrationOptions options = new() + { + DedupeStatuses = new[] { "NonExistentStatus" }, + }; + + // Act & Assert + Func act = async () => await this.client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + options, + default); + + var exception = await act.Should().ThrowAsync(); + exception.Which.Message.Should().Contain("'NonExistentStatus'"); + exception.Which.Message.Should().Contain("for deduplication"); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_MultipleInvalidDedupeStatuses_ThrowsOnFirstInvalid() + { + // Arrange + StartOrchestrationOptions options = new() + { + DedupeStatuses = new[] { "InvalidStatus1", "InvalidStatus2", "InvalidStatus3" }, + }; + + // Act & Assert + Func act = async () => await this.client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + options, + default); + + var exception = await act.Should().ThrowAsync(); + exception.Which.Message.Should().Contain("'InvalidStatus1'"); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_MixedValidAndInvalidStatus_ThrowsArgumentException() + { + // Arrange + StartOrchestrationOptions options = new() + { + DedupeStatuses = new[] { "Completed", "InvalidStatus", "Failed" }, + }; + + // Act & Assert + Func act = async () => await this.client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + options, + default); + + var exception = await act.Should().ThrowAsync(); + exception.Which.Message.Should().Contain("Invalid orchestration runtime status: 'InvalidStatus' for deduplication."); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_ValidDedupeStatuses_DoesNotThrow() + { + // Arrange + StartOrchestrationOptions options = new() + { + DedupeStatuses = new[] { "Completed", "Failed" }, + }; + + // Setup the mock to handle the call + this.orchestrationClient.Setup( + m => m.CreateTaskOrchestrationAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + // Act & Assert - Should not throw ArgumentException for invalid status + Func act = async () => await this.client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + options, + default); + + await act.Should().NotThrowAsync(); + this.orchestrationClient.VerifyAll(); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_CaseInsensitiveValidStatus_DoesNotThrow() + { + // Arrange + StartOrchestrationOptions options = new() + { + DedupeStatuses = new[] { "completed", "FAILED", "Terminated" }, + }; + + // Setup the mock to handle the call + this.orchestrationClient.Setup( + m => m.CreateTaskOrchestrationAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + // Act & Assert - Case-insensitive parsing should work + Func act = async () => await this.client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + options, + default); + + await act.Should().NotThrowAsync(); + this.orchestrationClient.VerifyAll(); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_EmptyDedupeStatuses_DoesNotThrow() + { + // Arrange + StartOrchestrationOptions options = new() + { + DedupeStatuses = new List(), + }; + + // Setup the mock to handle the call + this.orchestrationClient.Setup( + m => m.CreateTaskOrchestrationAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + // Act & Assert + Func act = async () => await this.client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + options, + default); + + await act.Should().NotThrowAsync(); + this.orchestrationClient.VerifyAll(); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_NullDedupeStatuses_DoesNotThrow() + { + // Arrange + StartOrchestrationOptions options = new() + { + DedupeStatuses = null, + }; + + // Setup the mock to handle the call + this.orchestrationClient.Setup( + m => m.CreateTaskOrchestrationAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + // Act & Assert + Func act = async () => await this.client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + options, + default); + + await act.Should().NotThrowAsync(); + this.orchestrationClient.VerifyAll(); + } + [Theory] [InlineData(false)] [InlineData(true)] @@ -347,11 +536,11 @@ public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) .Setup(x => x.GetOrchestrationStateAsync(originalInstanceId, false)) .ReturnsAsync(new List { originalState }); - // Capture the TaskMessage for verification becasue we will create this message at RestartAsync. + // Capture the TaskMessage for verification because we will create this message at RestartAsync. TaskMessage? capturedMessage = null; this.orchestrationClient - .Setup(x => x.CreateTaskOrchestrationAsync(It.IsAny())) - .Callback(msg => capturedMessage = msg) + .Setup(x => x.CreateTaskOrchestrationAsync(It.IsAny(), It.IsAny())) + .Callback((msg, _) => capturedMessage = msg) .Returns(Task.CompletedTask); string restartedInstanceId = await this.client.RestartAsync(originalInstanceId, restartWithNewInstanceId); @@ -367,7 +556,7 @@ public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) // Verify that CreateTaskOrchestrationAsync was called this.orchestrationClient.Verify( - x => x.CreateTaskOrchestrationAsync(It.IsAny()), + x => x.CreateTaskOrchestrationAsync(It.IsAny(), It.IsAny()), Times.Once); // Verify the captured message details @@ -534,7 +723,9 @@ async Task RunScheduleNewOrchestrationInstanceAsync( { // arrange this.orchestrationClient.Setup( - m => m.CreateTaskOrchestrationAsync(MatchStartExecutionMessage(name, input, options))) + m => m.CreateTaskOrchestrationAsync( + MatchStartExecutionMessage(name, input, options), + It.IsAny())) .Returns(Task.CompletedTask); // act @@ -542,7 +733,9 @@ async Task RunScheduleNewOrchestrationInstanceAsync( // assert this.orchestrationClient.Verify( - m => m.CreateTaskOrchestrationAsync(MatchStartExecutionMessage(name, input, options)), + m => m.CreateTaskOrchestrationAsync( + MatchStartExecutionMessage(name, input, options), + It.IsAny()), Times.Once()); if (options?.InstanceId is string str) diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index 2522c8460..c53ced3c3 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -4,9 +4,12 @@ using System.Diagnostics.CodeAnalysis; using FluentAssertions; using FluentAssertions.Execution; +using Grpc.Core; using Microsoft.DurableTask.Client; using Microsoft.DurableTask.Worker; using Xunit.Abstractions; +using RpcException = Grpc.Core.RpcException; +using StatusCode = Grpc.Core.StatusCode; namespace Microsoft.DurableTask.Grpc.Tests; @@ -290,6 +293,177 @@ await restartAction.Should().ThrowAsync() .WithMessage("*An orchestration with the instanceId non-existent-instance-id was not found*"); } + [Fact] + public async Task ScheduleNewOrchestrationInstance_WithDedupeStatuses_ThrowsWhenInstanceExists() + { + await using HostTestLifetime server = await this.StartAsync(); + + string instanceId = "dedup-test-instance"; + + // Schedule and complete first orchestration instance + string firstInstanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: false, + new StartOrchestrationOptions(instanceId)); + + // Wait for it to complete + await server.Client.WaitForInstanceStartAsync(firstInstanceId, default); + await server.Client.RaiseEventAsync(firstInstanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(firstInstanceId, default); + + // Verify it's completed + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + + // Try to create another instance with the same ID and dedupe statuses including Completed + // This should throw an exception + Func createAction = () => server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: false, + new StartOrchestrationOptions(instanceId).WithDedupeStatuses(OrchestrationRuntimeStatus.Completed)); + + await createAction.Should().ThrowAsync() + .Where(e => e.StatusCode == StatusCode.AlreadyExists); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_WithDedupeStatuses_AllowsReplacementWhenStatusNotInDedupeList() + { + await using HostTestLifetime server = await this.StartAsync(); + + string instanceId = "dedup-test-instance-replace"; + + // Create first orchestration instance + string firstInstanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: false, + new StartOrchestrationOptions(instanceId)); + + // Wait for it to complete + await server.Client.WaitForInstanceStartAsync(firstInstanceId, default); + await server.Client.RaiseEventAsync(firstInstanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(firstInstanceId, default); + + // Verify it's completed + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + + // Try to create another instance with the same ID but dedupe statuses does NOT include Completed + // This should succeed (replace the existing instance) + string secondInstanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: false, + new StartOrchestrationOptions(instanceId).WithDedupeStatuses( + OrchestrationRuntimeStatus.Failed, + OrchestrationRuntimeStatus.Terminated)); + + secondInstanceId.Should().Be(instanceId); + + // Wait for the new instance to start running + await server.Client.WaitForInstanceStartAsync(instanceId, default); + + // Verify the new instance is running + OrchestrationMetadata? newMetadata = await server.Client.GetInstanceAsync(instanceId, false); + newMetadata.Should().NotBeNull(); + newMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Running); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_WithDedupeStatuses_ThrowsWhenInstanceIsFailed() + { + await using HostTestLifetime server = await this.StartAsync(); + + string instanceId = "dedup-test-instance-failed"; + + // Create first orchestration instance that will fail + string firstInstanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: true, // true means it will throw + new StartOrchestrationOptions(instanceId)); + + // Wait for it to fail + await server.Client.WaitForInstanceStartAsync(firstInstanceId, default); + await server.Client.RaiseEventAsync(firstInstanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(firstInstanceId, default); + + // Verify it's failed + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Failed); + + // Try to create another instance with the same ID and dedupe statuses including Failed + // This should throw an exception + Func createAction = () => server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: false, + new StartOrchestrationOptions(instanceId).WithDedupeStatuses(OrchestrationRuntimeStatus.Failed)); + + await createAction.Should().ThrowAsync() + .Where(e => e.StatusCode == StatusCode.AlreadyExists); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_WithDedupeStatuses_AllowsCreationWhenInstanceDoesNotExist() + { + await using HostTestLifetime server = await this.StartAsync(); + + string instanceId = "dedup-test-instance-new"; + + // Create instance with dedupe statuses - should succeed since instance doesn't exist + string createdInstanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: false, + new StartOrchestrationOptions(instanceId).WithDedupeStatuses( + OrchestrationRuntimeStatus.Completed, + OrchestrationRuntimeStatus.Failed)); + + createdInstanceId.Should().Be(instanceId); + + // Verify the instance was created + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Running); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_WithMultipleDedupeStatuses_ThrowsWhenAnyStatusMatches() + { + await using HostTestLifetime server = await this.StartAsync(); + + string instanceId = "dedup-test-instance-multiple"; + + // Create first orchestration instance + string firstInstanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: false, + new StartOrchestrationOptions(instanceId)); + + // Wait for it to complete + await server.Client.WaitForInstanceStartAsync(firstInstanceId, default); + await server.Client.RaiseEventAsync(firstInstanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(firstInstanceId, default); + + // Verify it's completed + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + + // Try to create another instance with multiple dedupe statuses including Completed + // This should throw an exception + Func createAction = () => server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: false, + new StartOrchestrationOptions(instanceId).WithDedupeStatuses( + OrchestrationRuntimeStatus.Completed, + OrchestrationRuntimeStatus.Failed, + OrchestrationRuntimeStatus.Terminated)); + + await createAction.Should().ThrowAsync() + .Where(e => e.StatusCode == StatusCode.AlreadyExists); + } + [Fact] public async Task SuspendAndResumeInstance_EndToEnd() { From 0e8e568ef2024048f76556955d90b8ffc9b376ff Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 16 Dec 2025 17:37:31 +0000 Subject: [PATCH 06/10] Update PR description to follow template From 6eca9e59af891ab1eaf9dcbc1e9511b15489b4b6 Mon Sep 17 00:00:00 2001 From: wangbill Date: Tue, 16 Dec 2025 09:43:23 -0800 Subject: [PATCH 07/10] Update test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../GrpcDurableTaskClientIntegrationTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index c53ced3c3..ad042e782 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -638,7 +638,7 @@ static async Task LongRunningOrchestration(TaskOrchestrationContext cont context.SetCustomStatus("waiting"); // Wait for external event or a timer (30 seconds) to allow suspend/resume operations Task eventTask = context.WaitForExternalEvent("event"); - Task timerTask = context.CreateTimer(TimeSpan.FromSeconds(30), CancellationToken.None); + Task timerTask = context.CreateTimer(TimeSpan.FromSeconds(30), context.CancellationToken); await Task.WhenAny(eventTask, timerTask); if (shouldThrow) From b219eda0b8cf37fe900bbfd673b5458e9a6ecb69 Mon Sep 17 00:00:00 2001 From: wangbill Date: Tue, 16 Dec 2025 09:43:53 -0800 Subject: [PATCH 08/10] Update test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../GrpcDurableTaskClientIntegrationTests.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index ad042e782..b8a5959bc 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -639,8 +639,12 @@ static async Task LongRunningOrchestration(TaskOrchestrationContext cont // Wait for external event or a timer (30 seconds) to allow suspend/resume operations Task eventTask = context.WaitForExternalEvent("event"); Task timerTask = context.CreateTimer(TimeSpan.FromSeconds(30), context.CancellationToken); - await Task.WhenAny(eventTask, timerTask); - + Task completedTask = await Task.WhenAny(eventTask, timerTask); + + if (completedTask == timerTask) + { + throw new TimeoutException("Timed out waiting for external event 'event'."); + } if (shouldThrow) { throw new InvalidOperationException("Orchestration failed"); From be8bfa1ec01010bfc25aa65c5b2f6dcd8f4cb6f8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 16 Dec 2025 17:56:52 +0000 Subject: [PATCH 09/10] Fix compilation error: revert to CancellationToken.None TaskOrchestrationContext doesn't have a CancellationToken property. Reverted to using CancellationToken.None which was the original implementation. Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- .../GrpcDurableTaskClientIntegrationTests.cs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index b8a5959bc..c53ced3c3 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -638,13 +638,9 @@ static async Task LongRunningOrchestration(TaskOrchestrationContext cont context.SetCustomStatus("waiting"); // Wait for external event or a timer (30 seconds) to allow suspend/resume operations Task eventTask = context.WaitForExternalEvent("event"); - Task timerTask = context.CreateTimer(TimeSpan.FromSeconds(30), context.CancellationToken); - Task completedTask = await Task.WhenAny(eventTask, timerTask); - - if (completedTask == timerTask) - { - throw new TimeoutException("Timed out waiting for external event 'event'."); - } + Task timerTask = context.CreateTimer(TimeSpan.FromSeconds(30), CancellationToken.None); + await Task.WhenAny(eventTask, timerTask); + if (shouldThrow) { throw new InvalidOperationException("Orchestration failed"); From d82ba37ae120faa3246ae726dab939e2a278072c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 16 Dec 2025 18:34:03 +0000 Subject: [PATCH 10/10] Add timeout checking logic back to LongRunningOrchestration Added back the logic to check which task completed and throw TimeoutException if the timer completes first, as requested by code review. Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- .../GrpcDurableTaskClientIntegrationTests.cs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index c53ced3c3..18892f1a0 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -639,7 +639,12 @@ static async Task LongRunningOrchestration(TaskOrchestrationContext cont // Wait for external event or a timer (30 seconds) to allow suspend/resume operations Task eventTask = context.WaitForExternalEvent("event"); Task timerTask = context.CreateTimer(TimeSpan.FromSeconds(30), CancellationToken.None); - await Task.WhenAny(eventTask, timerTask); + Task completedTask = await Task.WhenAny(eventTask, timerTask); + + if (completedTask == timerTask) + { + throw new TimeoutException("Timed out waiting for external event 'event'."); + } if (shouldThrow) {