diff --git a/CHANGELOG.md b/CHANGELOG.md index 02449a9e2..931050c72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,8 @@ - Introduce default version setting to DurableTaskClient and expose to orchestrator ([#393](https://github.com/microsoft/durabletask-dotnet/pull/393)) - Add support for local credential types in DTS libraries ([#396](https://github.com/microsoft/durabletask-dotnet/pull/396)) - Add utility for easier version comparison in orchestration context ([#394](https://github.com/microsoft/durabletask-dotnet/pull/394)) -- Add tags support for orchestrations ([#397])(https://github.com/microsoft/durabletask-dotnet/pull/397) +- Add tags support for orchestrations ([#397](https://github.com/microsoft/durabletask-dotnet/pull/397)) +- Add support for versioning in the gRPC worker ([#401](https://github.com/microsoft/durabletask-dotnet/pull/401)) ## v1.8.1 diff --git a/src/Abstractions/TaskOrchestrationContext.cs b/src/Abstractions/TaskOrchestrationContext.cs index ecba913be..6bdbe1588 100644 --- a/src/Abstractions/TaskOrchestrationContext.cs +++ b/src/Abstractions/TaskOrchestrationContext.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using Microsoft.DurableTask.Abstractions; using Microsoft.DurableTask.Entities; using Microsoft.Extensions.Logging; @@ -421,32 +422,7 @@ public virtual ILogger CreateReplaySafeLogger() /// True if the orchestration's version is greater than the provided version, false otherwise. public virtual int CompareVersionTo(string version) { - // Both versions are empty, treat as equal. - if (string.IsNullOrWhiteSpace(this.Version) && string.IsNullOrWhiteSpace(version)) - { - return 0; - } - - // An empty version in the context is always less than a defined version in the parameter. - if (string.IsNullOrWhiteSpace(this.Version)) - { - return -1; - } - - // An empty version in the parameter is always less than a defined version in the context. - if (string.IsNullOrWhiteSpace(version)) - { - return 1; - } - - // If both versions use the .NET Version class, return that comparison. - if (System.Version.TryParse(this.Version, out Version contextVersion) && System.Version.TryParse(version, out Version otherVersion)) - { - return contextVersion.CompareTo(otherVersion); - } - - // If we have gotten to here, we don't know the syntax of the versions we are comparing, use a string comparison as a final check. - return string.Compare(this.Version, version, StringComparison.OrdinalIgnoreCase); + return TaskOrchestrationVersioningUtils.CompareVersions(this.Version, version); } class ReplaySafeLogger : ILogger diff --git a/src/Abstractions/TaskOrchestrationVersioningUtils.cs b/src/Abstractions/TaskOrchestrationVersioningUtils.cs new file mode 100644 index 000000000..024b96688 --- /dev/null +++ b/src/Abstractions/TaskOrchestrationVersioningUtils.cs @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Abstractions; + +/// +/// Utilities for handling Orchestration/Task versioning operations. +/// +public static class TaskOrchestrationVersioningUtils +{ + /// + /// Compare two versions to each other. + /// + /// + /// This method's comparison is handled in the following order: + /// 1. The versions are checked if they are empty (non-versioned). Both being empty signifies equality. + /// 2. If sourceVersion is empty but otherVersion is defined, this is treated as the source being less than the other. + /// 3. If otherVersion is empty but sourceVersion is defined, this is treated as the source being greater than the other. + /// 4. Both versions are attempted to be parsed into System.Version and compared as such. + /// 5. If all else fails, a direct string comparison is done between the versions. + /// + /// The source version that will be compared against the other version. + /// The other version to compare against. + /// An int representing how sourceVersion compares to otherVersion. + public static int CompareVersions(string sourceVersion, string otherVersion) + { + // Both versions are empty, treat as equal. + if (string.IsNullOrWhiteSpace(sourceVersion) && string.IsNullOrWhiteSpace(otherVersion)) + { + return 0; + } + + // An empty version in the context is always less than a defined version in the parameter. + if (string.IsNullOrWhiteSpace(sourceVersion)) + { + return -1; + } + + // An empty version in the parameter is always less than a defined version in the context. + if (string.IsNullOrWhiteSpace(otherVersion)) + { + return 1; + } + + // If both versions use the .NET Version class, return that comparison. + if (System.Version.TryParse(sourceVersion, out Version parsedSourceVersion) && System.Version.TryParse(otherVersion, out Version parsedOtherVersion)) + { + return parsedSourceVersion.CompareTo(parsedOtherVersion); + } + + // If we have gotten to here, we don't know the syntax of the versions we are comparing, use a string comparison as a final check. + return string.Compare(sourceVersion, otherVersion, StringComparison.OrdinalIgnoreCase); + } +} diff --git a/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs b/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs index b2911842d..322e7d403 100644 --- a/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs +++ b/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs @@ -4,6 +4,7 @@ using Microsoft.DurableTask.Worker.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; +using static Microsoft.DurableTask.Worker.DurableTaskWorkerOptions; namespace Microsoft.DurableTask.Worker; @@ -86,4 +87,25 @@ public static IDurableTaskWorkerBuilder UseBuildTarget(this I }); return builder; } + + /// + /// Configures the versioning options for this builder. + /// + /// The builder to set the builder target for. + /// The collection of options specified for versioning the worker. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseVersioning(this IDurableTaskWorkerBuilder builder, VersioningOptions versionOptions) + { + Check.NotNull(builder); + builder.Configure(options => + { + options.Versioning = new VersioningOptions + { + Version = versionOptions.Version, + MatchStrategy = versionOptions.MatchStrategy, + FailureStrategy = versionOptions.FailureStrategy, + }; + }); + return builder; + } } diff --git a/src/Worker/Core/DurableTaskWorkerOptions.cs b/src/Worker/Core/DurableTaskWorkerOptions.cs index 7233d6045..395666c9a 100644 --- a/src/Worker/Core/DurableTaskWorkerOptions.cs +++ b/src/Worker/Core/DurableTaskWorkerOptions.cs @@ -12,6 +12,43 @@ public class DurableTaskWorkerOptions { DataConverter dataConverter = JsonDataConverter.Default; + /// + /// Defines the version matching strategy for the Durable Task worker. + /// + public enum VersionMatchStrategy + { + /// + /// Ignore Orchestration version, all work received is processed. + /// + None = 0, + + /// + /// Worker will only process Tasks from Orchestrations with the same version as the worker. + /// + Strict = 1, + + /// + /// Worker will process Tasks from Orchestrations whose version is less than or equal to the worker. + /// + CurrentOrOlder = 2, + } + + /// + /// Defines the versioning failure strategy for the Durable Task worker. + /// + public enum VersionFailureStrategy + { + /// + /// Do not change the orchestration state if the version does not adhere to the matching strategy. + /// + Reject = 0, + + /// + /// Fail the orchestration if the version does not adhere to the matching strategy. + /// + Fail = 1, + } + /// /// Gets or sets the data converter. Default value is . /// @@ -93,6 +130,21 @@ public DataConverter DataConverter /// public ConcurrencyOptions Concurrency { get; } = new(); + /// + /// Gets or sets the versioning options for the Durable Task worker. + /// + /// + /// Worker versioning controls how a worker will handle orchestrations of different versions. Defining both the + /// version of the worker, the versions that can be worked on, and what to do in case a version does not comply + /// with the given options. + /// + public VersioningOptions? Versioning { get; set; } + + /// + /// Gets a value indicating whether versioning is explicitly set or not. + /// + public bool IsVersioningSet { get; internal set; } + /// /// Gets a value indicating whether was explicitly set or not. /// @@ -116,6 +168,7 @@ internal void ApplyTo(DurableTaskWorkerOptions other) other.DataConverter = this.DataConverter; other.MaximumTimerInterval = this.MaximumTimerInterval; other.EnableEntitySupport = this.EnableEntitySupport; + other.Versioning = this.Versioning; } } @@ -139,4 +192,28 @@ public class ConcurrencyOptions /// public int MaximumConcurrentEntityWorkItems { get; set; } = 100 * Environment.ProcessorCount; } + + /// + /// Options for the Durable Task worker versioning. + /// + public class VersioningOptions + { + /// + /// Gets or sets the version of orchestrations that the worker can work on. + /// + public string Version { get; set; } = string.Empty; + + /// + /// Gets or sets the versioning strategy for the Durable Task worker. + /// + public VersionMatchStrategy MatchStrategy { get; set; } = VersionMatchStrategy.None; + + /// + /// Gets or sets the versioning failure strategy for the Durable Task worker. + /// + /// + /// If the version matching strategy is set to , this value has no effect. + /// + public VersionFailureStrategy FailureStrategy { get; set; } = VersionFailureStrategy.Reject; + } } diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index 193cbd919..dbc18c81b 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -6,6 +6,7 @@ using DurableTask.Core.Entities; using DurableTask.Core.Entities.OperationFormat; using DurableTask.Core.History; +using Microsoft.DurableTask.Abstractions; using Microsoft.DurableTask.Entities; using Microsoft.DurableTask.Worker.Shims; using Microsoft.Extensions.DependencyInjection; @@ -107,6 +108,62 @@ static string GetActionsListForLogging(IReadOnlyList actio } } + static P.TaskFailureDetails? EvaluateOrchestrationVersioning(DurableTaskWorkerOptions.VersioningOptions? versioning, string orchestrationVersion, out bool versionCheckFailed) + { + P.TaskFailureDetails? failureDetails = null; + versionCheckFailed = false; + if (versioning != null) + { + int versionComparison = TaskOrchestrationVersioningUtils.CompareVersions(orchestrationVersion, versioning.Version); + + switch (versioning.MatchStrategy) + { + case DurableTaskWorkerOptions.VersionMatchStrategy.None: + // No versioning, breakout. + break; + case DurableTaskWorkerOptions.VersionMatchStrategy.Strict: + // Comparison of 0 indicates equality. + if (versionComparison != 0) + { + failureDetails = new P.TaskFailureDetails + { + ErrorType = "VersionMismatch", + ErrorMessage = $"The orchestration version '{orchestrationVersion}' does not match the worker version '{versioning.Version}'.", + IsNonRetriable = true, + }; + } + + break; + case DurableTaskWorkerOptions.VersionMatchStrategy.CurrentOrOlder: + // Comparison > 0 indicates the orchestration version is greater than the worker version. + if (versionComparison > 0) + { + failureDetails = new P.TaskFailureDetails + { + ErrorType = "VersionMismatch", + ErrorMessage = $"The orchestration version '{orchestrationVersion}' is greater than the worker version '{versioning.Version}'.", + IsNonRetriable = true, + }; + } + + break; + default: + // If there is a type of versioning we don't understand, it is better to treat it as a versioning failure. + failureDetails = new P.TaskFailureDetails + { + ErrorType = "VersionError", + ErrorMessage = $"The version match strategy '{orchestrationVersion}' is unknown.", + IsNonRetriable = true, + }; + break; + } + + versionCheckFailed = failureDetails != null; + } + + return failureDetails; + } + async ValueTask BuildRuntimeStateAsync( P.OrchestratorRequest orchestratorRequest, ProtoUtils.EntityConversionState? entityConversionState, @@ -298,6 +355,8 @@ async Task OnRunOrchestratorAsync( ? new(this.internalOptions.InsertEntityUnlocksOnCompletion) : null; + DurableTaskWorkerOptions.VersioningOptions? versioning = this.worker.workerOptions.Versioning; + bool versionFailure = false; try { OrchestrationRuntimeState runtimeState = await this.BuildRuntimeStateAsync( @@ -305,43 +364,50 @@ async Task OnRunOrchestratorAsync( entityConversionState, cancellationToken); - name = new TaskName(runtimeState.Name); + // If versioning has been explicitly set, we attempt to follow that pattern. If it is not set, we don't compare versions here. + failureDetails = EvaluateOrchestrationVersioning(versioning, runtimeState.Version, out versionFailure); - this.Logger.ReceivedOrchestratorRequest( - name, - request.InstanceId, - runtimeState.PastEvents.Count, - runtimeState.NewEvents.Count); - - await using AsyncServiceScope scope = this.worker.services.CreateAsyncScope(); - if (this.worker.Factory.TryCreateOrchestrator( - name, scope.ServiceProvider, out ITaskOrchestrator? orchestrator)) + // Only continue with the work if the versioning check passed. + if (failureDetails == null) { - // Both the factory invocation and the ExecuteAsync could involve user code and need to be handled - // as part of try/catch. - ParentOrchestrationInstance? parent = runtimeState.ParentInstance switch - { - ParentInstance p => new(new(p.Name), p.OrchestrationInstance.InstanceId), - _ => null, - }; + name = new TaskName(runtimeState.Name); - TaskOrchestration shim = this.shimFactory.CreateOrchestration(name, orchestrator, parent); - TaskOrchestrationExecutor executor = new( - runtimeState, - shim, - BehaviorOnContinueAsNew.Carryover, - request.EntityParameters.ToCore(), - ErrorPropagationMode.UseFailureDetails); - result = executor.Execute(); - } - else - { - failureDetails = new P.TaskFailureDetails + this.Logger.ReceivedOrchestratorRequest( + name, + request.InstanceId, + runtimeState.PastEvents.Count, + runtimeState.NewEvents.Count); + + await using AsyncServiceScope scope = this.worker.services.CreateAsyncScope(); + if (this.worker.Factory.TryCreateOrchestrator( + name, scope.ServiceProvider, out ITaskOrchestrator? orchestrator)) { - ErrorType = "OrchestratorTaskNotFound", - ErrorMessage = $"No orchestrator task named '{name}' was found.", - IsNonRetriable = true, - }; + // Both the factory invocation and the ExecuteAsync could involve user code and need to be handled + // as part of try/catch. + ParentOrchestrationInstance? parent = runtimeState.ParentInstance switch + { + ParentInstance p => new(new(p.Name), p.OrchestrationInstance.InstanceId), + _ => null, + }; + + TaskOrchestration shim = this.shimFactory.CreateOrchestration(name, orchestrator, parent); + TaskOrchestrationExecutor executor = new( + runtimeState, + shim, + BehaviorOnContinueAsNew.Carryover, + request.EntityParameters.ToCore(), + ErrorPropagationMode.UseFailureDetails); + result = executor.Execute(); + } + else + { + failureDetails = new P.TaskFailureDetails + { + ErrorType = "OrchestratorTaskNotFound", + ErrorMessage = $"No orchestrator task named '{name}' was found.", + IsNonRetriable = true, + }; + } } } catch (Exception unexpected) @@ -361,6 +427,41 @@ async Task OnRunOrchestratorAsync( completionToken, entityConversionState); } + else if (versioning != null && failureDetails != null && versionFailure) + { + this.Logger.OrchestrationVersionFailure(versioning.FailureStrategy.ToString(), failureDetails.ErrorMessage); + if (versioning.FailureStrategy == DurableTaskWorkerOptions.VersionFailureStrategy.Fail) + { + response = new P.OrchestratorResponse + { + InstanceId = request.InstanceId, + CompletionToken = completionToken, + Actions = + { + new P.OrchestratorAction + { + CompleteOrchestration = new P.CompleteOrchestrationAction + { + OrchestrationStatus = P.OrchestrationStatus.Failed, + FailureDetails = failureDetails, + }, + }, + }, + }; + } + else + { + this.Logger.AbandoningOrchestrationDueToVersioning(request.InstanceId, completionToken); + await this.client.AbandonTaskOrchestratorWorkItemAsync( + new P.AbandonOrchestrationTaskRequest + { + CompletionToken = completionToken, + }, + cancellationToken: cancellationToken); + + return; + } + } else { // This is the case for failures that happened *outside* the orchestrator executor diff --git a/src/Worker/Grpc/Logs.cs b/src/Worker/Grpc/Logs.cs index 0cdea3ebd..4f0f46df8 100644 --- a/src/Worker/Grpc/Logs.cs +++ b/src/Worker/Grpc/Logs.cs @@ -51,5 +51,11 @@ static partial class Logs [LoggerMessage(EventId = 56, Level = LogLevel.Warning, Message = "Channel to backend has stopped receiving traffic, will attempt to reconnect.")] public static partial void ConnectionTimeout(this ILogger logger); + + [LoggerMessage(EventId = 57, Level = LogLevel.Warning, Message = "Orchestration version did not meet worker versioning requirements. Error action = '{errorAction}'. Version error = '{versionError}'")] + public static partial void OrchestrationVersionFailure(this ILogger logger, string errorAction, string versionError); + + [LoggerMessage(EventId = 58, Level = LogLevel.Information, Message = "Abandoning orchestration. InstanceId = '{instanceId}'. Completion token = '{completionToken}'")] + public static partial void AbandoningOrchestrationDueToVersioning(this ILogger logger, string instanceId, string completionToken); } } diff --git a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs index 5c33f5ba4..030adfbce 100644 --- a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs +++ b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs @@ -1,42 +1,41 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System.Text.Json; -using System.Text.Json.Nodes; -using Microsoft.DurableTask.Client; -using Microsoft.DurableTask.Tests.Logging; -using Microsoft.DurableTask.Worker; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; -using Xunit.Abstractions; - -namespace Microsoft.DurableTask.Grpc.Tests; - -public class OrchestrationPatterns : IntegrationTestBase -{ - public OrchestrationPatterns(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture) - : base(output, sidecarFixture) - { } - - [Fact] - public async Task EmptyOrchestration() - { - TaskName orchestratorName = nameof(EmptyOrchestration); - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, ctx => Task.FromResult(null))); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, this.TimeoutToken); - - Assert.NotNull(metadata); - Assert.Equal(instanceId, metadata.InstanceId); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - } - - [Fact] +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Text.Json; +using System.Text.Json.Nodes; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Tests.Logging; +using Microsoft.DurableTask.Worker; +using Microsoft.Extensions.DependencyInjection; +using Xunit.Abstractions; + +namespace Microsoft.DurableTask.Grpc.Tests; + +public class OrchestrationPatterns : IntegrationTestBase +{ + public OrchestrationPatterns(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture) + : base(output, sidecarFixture) + { } + + [Fact] + public async Task EmptyOrchestration() + { + TaskName orchestratorName = nameof(EmptyOrchestration); + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, ctx => Task.FromResult(null))); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + } + + [Fact] public async Task ScheduleOrchesrationWithTags() { TaskName orchestratorName = nameof(EmptyOrchestration); @@ -69,562 +68,740 @@ public async Task ScheduleOrchesrationWithTags() } [Fact] - public async Task SingleTimer() - { - TaskName orchestratorName = nameof(SingleTimer); - TimeSpan delay = TimeSpan.FromSeconds(3); - - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks.AddOrchestratorFunc( - orchestratorName, ctx => ctx.CreateTimer(delay, CancellationToken.None))); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, this.TimeoutToken); - - Assert.NotNull(metadata); - Assert.Equal(instanceId, metadata.InstanceId); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - - // Verify that the delay actually happened with a 1 second variation - Assert.True(metadata.CreatedAt.Add(delay) <= metadata.LastUpdatedAt.AddSeconds(1)); - } - - [Fact] - public async Task LongTimer() - { - TaskName orchestratorName = nameof(SingleTimer); - TimeSpan delay = TimeSpan.FromSeconds(7); - TimeSpan timerInterval = TimeSpan.FromSeconds(3); - const int ExpectedTimers = 3; // two for 3 seconds and one for 1 second - - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.Configure(opt => opt.MaximumTimerInterval = timerInterval); - b.AddTasks(tasks => tasks.AddOrchestratorFunc( - orchestratorName, ctx => ctx.CreateTimer(delay, CancellationToken.None))); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken); - - Assert.NotNull(metadata); - Assert.Equal(instanceId, metadata.InstanceId); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - - // Verify that the delay actually happened - Assert.True(metadata.CreatedAt.Add(delay) <= metadata.LastUpdatedAt.AddSeconds(1)); - - // Verify that the correct number of timers were created - IReadOnlyCollection logs = this.GetLogs(); - int timersCreated = logs.Count(log => log.Message.Contains("CreateTimer")); - Assert.Equal(ExpectedTimers, timersCreated); - } - - [Fact] - public async Task IsReplaying() - { - TaskName orchestratorName = nameof(IsReplaying); - - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx => - { - var list = new List { ctx.IsReplaying }; - await ctx.CreateTimer(TimeSpan.Zero, CancellationToken.None); - list.Add(ctx.IsReplaying); - await ctx.CreateTimer(TimeSpan.Zero, CancellationToken.None); - list.Add(ctx.IsReplaying); - return list; - })); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - List? results = metadata.ReadOutputAs>(); - Assert.NotNull(results); - Assert.Equal(3, results!.Count); - Assert.True(results[0]); - Assert.True(results[1]); - Assert.False(results[2]); - } - - [Fact] - public async Task CurrentDateTimeUtc() - { - TaskName orchestratorName = nameof(CurrentDateTimeUtc); - TaskName echoActivityName = "Echo"; - - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks - .AddOrchestratorFunc(orchestratorName, async ctx => - { - DateTime currentDate1 = ctx.CurrentUtcDateTime; - DateTime originalDate1 = await ctx.CallActivityAsync(echoActivityName, currentDate1); - if (currentDate1 != originalDate1) - { - return false; - } - - DateTime currentDate2 = ctx.CurrentUtcDateTime; - DateTime originalDate2 = await ctx.CallActivityAsync(echoActivityName, currentDate2); - if (currentDate2 != originalDate2) - { - return false; - } - - return currentDate1 != currentDate2; - }) - .AddActivityFunc(echoActivityName, (ctx, input) => input)); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - Assert.NotNull(metadata); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - Assert.True(metadata.ReadOutputAs()); - } - - [Fact] - public async Task SingleActivity() - { - TaskName orchestratorName = nameof(SingleActivity); - TaskName sayHelloActivityName = "SayHello"; - - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks - .AddOrchestratorFunc( - orchestratorName, (ctx, input) => ctx.CallActivityAsync(sayHelloActivityName, input)) - .AddActivityFunc(sayHelloActivityName, (ctx, name) => $"Hello, {name}!")); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: "World"); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - Assert.NotNull(metadata); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - Assert.Equal("Hello, World!", metadata.ReadOutputAs()); - } - - [Fact] - public async Task SingleActivity_Async() - { - TaskName orchestratorName = nameof(SingleActivity); - TaskName sayHelloActivityName = "SayHello"; - - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks - .AddOrchestratorFunc( - orchestratorName, (ctx, input) => ctx.CallActivityAsync(sayHelloActivityName, input)) - .AddActivityFunc( - sayHelloActivityName, async (ctx, name) => await Task.FromResult($"Hello, {name}!"))); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: "World"); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - Assert.NotNull(metadata); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - Assert.Equal("Hello, World!", metadata.ReadOutputAs()); - } - - [Fact] - public async Task ActivityChain() - { - TaskName orchestratorName = nameof(ActivityChain); - TaskName plusOneActivityName = "PlusOne"; - - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks - .AddOrchestratorFunc(orchestratorName, async ctx => - { - int value = 0; - for (int i = 0; i < 10; i++) - { - value = await ctx.CallActivityAsync(plusOneActivityName, value); - } - - return value; - }) - .AddActivityFunc(plusOneActivityName, (ctx, input) => input + 1)); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: "World"); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - Assert.NotNull(metadata); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - Assert.Equal(10, metadata.ReadOutputAs()); - } - - [Fact] - public async Task ActivityFanOut() - { - TaskName orchestratorName = nameof(ActivityFanOut); - TaskName toStringActivity = "ToString"; - - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks - .AddOrchestratorFunc(orchestratorName, async ctx => - { - var tasks = new List>(); - for (int i = 0; i < 10; i++) - { - tasks.Add(ctx.CallActivityAsync(toStringActivity, i)); - } - - string[] results = await Task.WhenAll(tasks); - Array.Sort(results); - Array.Reverse(results); - return results; - }) - .AddActivityFunc(toStringActivity, (ctx, input) => input.ToString())); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - Assert.NotNull(metadata); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - - string[] expected = new[] { "9", "8", "7", "6", "5", "4", "3", "2", "1", "0" }; - Assert.Equal(expected, metadata.ReadOutputAs()); - } - - [Theory] - [InlineData(1)] - [InlineData(100)] - public async Task ExternalEvents(int eventCount) - { - TaskName orchestratorName = nameof(ExternalEvents); - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx => - { - List events = new(); - for (int i = 0; i < eventCount; i++) - { - events.Add(await ctx.WaitForExternalEvent($"Event{i}")); - } - - return events; - })); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - - // To ensure consistency, wait for the instance to start before sending the events - OrchestrationMetadata metadata = await server.Client.WaitForInstanceStartAsync( - instanceId, - this.TimeoutToken); - - // Send events one-at-a-time to that we can better ensure ordered processing. - for (int i = 0; i < eventCount; i++) - { - await server.Client.RaiseEventAsync(metadata.InstanceId, $"Event{i}", eventPayload: i); - } - - // Once the orchestration receives all the events it is expecting, it should complete. - metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - Assert.NotNull(metadata); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - - int[] expected = Enumerable.Range(0, eventCount).ToArray(); - Assert.Equal(expected, metadata.ReadOutputAs()); - } - - [Theory] - [InlineData(1)] - [InlineData(5)] - public async Task ExternalEventsInParallel(int eventCount) - { - TaskName orchestratorName = nameof(ExternalEvents); - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx => - { - List> events = new(); - for (int i = 0; i < eventCount; i++) - { - events.Add(ctx.WaitForExternalEvent("Event")); - } - - return await Task.WhenAll(events); - })); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - - // To ensure consistency, wait for the instance to start before sending the events - OrchestrationMetadata metadata = await server.Client.WaitForInstanceStartAsync( - instanceId, - this.TimeoutToken); - - // Send events one-at-a-time to that we can better ensure ordered processing. - for (int i = 0; i < eventCount; i++) - { - await server.Client.RaiseEventAsync(metadata.InstanceId, "Event", eventPayload: i); - } - - // Once the orchestration receives all the events it is expecting, it should complete. - metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true); - Assert.NotNull(metadata); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - - int[] expected = Enumerable.Range(0, eventCount).ToArray(); - Assert.Equal(expected, metadata.ReadOutputAs()); - } - - [Fact] - public async Task Termination() - { - TaskName orchestrationName = nameof(Termination); - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks.AddOrchestratorFunc( - orchestrationName, ctx => ctx.CreateTimer(TimeSpan.FromSeconds(3), CancellationToken.None))); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestrationName); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceStartAsync(instanceId, this.TimeoutToken); - - var expectedOutput = new { quote = "I'll be back." }; - await server.Client.TerminateInstanceAsync(instanceId, expectedOutput); - - metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - Assert.NotNull(metadata); - Assert.Equal(instanceId, metadata.InstanceId); - Assert.Equal(OrchestrationRuntimeStatus.Terminated, metadata.RuntimeStatus); - - JsonElement actualOutput = metadata.ReadOutputAs(); - string? actualQuote = actualOutput.GetProperty("quote").GetString(); - Assert.NotNull(actualQuote); - Assert.Equal(expectedOutput.quote, actualQuote); - } - - [Fact] - public async Task ContinueAsNew() - { - TaskName orchestratorName = nameof(ContinueAsNew); - - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async (ctx, input) => - { - if (input < 10) - { - await ctx.CreateTimer(TimeSpan.Zero, CancellationToken.None); - ctx.ContinueAsNew(input + 1); - } - - return input; - })); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - Assert.NotNull(metadata); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - Assert.Equal(10, metadata.ReadOutputAs()); - } - - [Fact] - public async Task SubOrchestration() - { - TaskName orchestratorName = nameof(SubOrchestration); - - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async (ctx, input) => - { - int result = 5; - if (input < 3) - { - // recursively call this same orchestrator - result += await ctx.CallSubOrchestratorAsync(orchestratorName, input: input + 1); - } - - return result; - })); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: 1); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - Assert.NotNull(metadata); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - Assert.Equal(15, metadata.ReadOutputAs()); - } - - [Fact] - public async Task SetCustomStatus() - { - TaskName orchestratorName = nameof(SetCustomStatus); - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx => - { - ctx.SetCustomStatus("Started!"); - - object customStatus = await ctx.WaitForExternalEvent("StatusEvent"); - ctx.SetCustomStatus(customStatus); - })); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - - // To ensure consistency, wait for the instance to start before sending the events - OrchestrationMetadata metadata = await server.Client.WaitForInstanceStartAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - Assert.NotNull(metadata); - Assert.Equal("Started!", metadata.ReadCustomStatusAs()); - - // Send a tuple payload, which will be used as the custom status - (string, int) eventPayload = ("Hello", 42); - await server.Client.RaiseEventAsync( - metadata.InstanceId, - eventName: "StatusEvent", - eventPayload); - - // Once the orchestration receives all the events it is expecting, it should complete. - metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - Assert.NotNull(metadata); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - Assert.Equal(eventPayload, metadata.ReadCustomStatusAs<(string, int)>()); - } - - [Fact] - public async Task NewGuidTest() - { - TaskName orchestratorName = nameof(ContinueAsNew); - TaskName echoActivityName = "Echo"; - - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks - .AddOrchestratorFunc(orchestratorName, async (ctx, input) => - { - // Test 1: Ensure two consecutively created GUIDs are unique - Guid currentGuid0 = ctx.NewGuid(); - Guid currentGuid1 = ctx.NewGuid(); - if (currentGuid0 == currentGuid1) - { - return false; - } - - // Test 2: Ensure that the same GUID values are created on each replay - Guid originalGuid1 = await ctx.CallActivityAsync(echoActivityName, currentGuid1); - if (currentGuid1 != originalGuid1) - { - return false; - } - - // Test 3: Ensure that the same GUID values are created on each replay even after an await - Guid currentGuid2 = ctx.NewGuid(); - Guid originalGuid2 = await ctx.CallActivityAsync(echoActivityName, currentGuid2); - if (currentGuid2 != originalGuid2) - { - return false; - } - - // Test 4: Finish confirming that every generated GUID is unique - return currentGuid1 != currentGuid2; - }) - .AddActivityFunc(echoActivityName, (ctx, input) => input)); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - Assert.NotNull(metadata); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - Assert.True(metadata.ReadOutputAs()); - } - - [Fact] - public async Task SpecialSerialization() - { - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks - .AddOrchestratorFunc("SpecialSerialization_Orchestration", (ctx, input) => - { - if (input is null) - { - throw new ArgumentNullException(nameof(input)); - } - - return ctx.CallActivityAsync("SpecialSerialization_Activity", input); - }) - .AddActivityFunc("SpecialSerialization_Activity", (ctx, input) => - { - if (input is not null) - { - input["newProperty"] = "new value"; - } - - return Task.FromResult(input); - })); - }); - - JsonNode input = new JsonObject() { ["originalProperty"] = "original value" }; - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( - "SpecialSerialization_Orchestration", input: input); - OrchestrationMetadata result = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - JsonNode? output = result.ReadOutputAs(); - - Assert.NotNull(output); - Assert.Equal("original value", output?["originalProperty"]?.ToString()); - Assert.Equal("new value", output?["newProperty"]?.ToString()); - } - - // TODO: Additional versioning tests - [Fact] - public async Task OrchestrationVersionPassedThroughContext() - { - var version = "0.1"; - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks - .AddOrchestratorFunc("Versioned_Orchestration", (ctx, input) => - { - return ctx.CallActivityAsync("Versioned_Activity", ctx.Version); - }) - .AddActivityFunc("Versioned_Activity", (ctx, input) => - { - return $"Orchestration version: {input}"; - })); - }, c => - { - c.UseDefaultVersion(version); - }); - - var instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync("Versioned_Orchestration", input: string.Empty); - var result = await server.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true, this.TimeoutToken); - var output = result.ReadOutputAs(); - - Assert.NotNull(output); - Assert.Equal(output, $"Orchestration version: {version}"); - + public async Task SingleTimer() + { + TaskName orchestratorName = nameof(SingleTimer); + TimeSpan delay = TimeSpan.FromSeconds(3); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc( + orchestratorName, ctx => ctx.CreateTimer(delay, CancellationToken.None))); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + + // Verify that the delay actually happened with a 1 second variation + Assert.True(metadata.CreatedAt.Add(delay) <= metadata.LastUpdatedAt.AddSeconds(1)); + } + + [Fact] + public async Task LongTimer() + { + TaskName orchestratorName = nameof(SingleTimer); + TimeSpan delay = TimeSpan.FromSeconds(7); + TimeSpan timerInterval = TimeSpan.FromSeconds(3); + const int ExpectedTimers = 3; // two for 3 seconds and one for 1 second + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.Configure(opt => opt.MaximumTimerInterval = timerInterval); + b.AddTasks(tasks => tasks.AddOrchestratorFunc( + orchestratorName, ctx => ctx.CreateTimer(delay, CancellationToken.None))); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + + // Verify that the delay actually happened + Assert.True(metadata.CreatedAt.Add(delay) <= metadata.LastUpdatedAt.AddSeconds(1)); + + // Verify that the correct number of timers were created + IReadOnlyCollection logs = this.GetLogs(); + int timersCreated = logs.Count(log => log.Message.Contains("CreateTimer")); + Assert.Equal(ExpectedTimers, timersCreated); + } + + [Fact] + public async Task IsReplaying() + { + TaskName orchestratorName = nameof(IsReplaying); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx => + { + var list = new List { ctx.IsReplaying }; + await ctx.CreateTimer(TimeSpan.Zero, CancellationToken.None); + list.Add(ctx.IsReplaying); + await ctx.CreateTimer(TimeSpan.Zero, CancellationToken.None); + list.Add(ctx.IsReplaying); + return list; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + List? results = metadata.ReadOutputAs>(); + Assert.NotNull(results); + Assert.Equal(3, results!.Count); + Assert.True(results[0]); + Assert.True(results[1]); + Assert.False(results[2]); + } + + [Fact] + public async Task CurrentDateTimeUtc() + { + TaskName orchestratorName = nameof(CurrentDateTimeUtc); + TaskName echoActivityName = "Echo"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + DateTime currentDate1 = ctx.CurrentUtcDateTime; + DateTime originalDate1 = await ctx.CallActivityAsync(echoActivityName, currentDate1); + if (currentDate1 != originalDate1) + { + return false; + } + + DateTime currentDate2 = ctx.CurrentUtcDateTime; + DateTime originalDate2 = await ctx.CallActivityAsync(echoActivityName, currentDate2); + if (currentDate2 != originalDate2) + { + return false; + } + + return currentDate1 != currentDate2; + }) + .AddActivityFunc(echoActivityName, (ctx, input) => input)); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.True(metadata.ReadOutputAs()); + } + + [Fact] + public async Task SingleActivity() + { + TaskName orchestratorName = nameof(SingleActivity); + TaskName sayHelloActivityName = "SayHello"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc( + orchestratorName, (ctx, input) => ctx.CallActivityAsync(sayHelloActivityName, input)) + .AddActivityFunc(sayHelloActivityName, (ctx, name) => $"Hello, {name}!")); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: "World"); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("Hello, World!", metadata.ReadOutputAs()); + } + + [Fact] + public async Task SingleActivity_Async() + { + TaskName orchestratorName = nameof(SingleActivity); + TaskName sayHelloActivityName = "SayHello"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc( + orchestratorName, (ctx, input) => ctx.CallActivityAsync(sayHelloActivityName, input)) + .AddActivityFunc( + sayHelloActivityName, async (ctx, name) => await Task.FromResult($"Hello, {name}!"))); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: "World"); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("Hello, World!", metadata.ReadOutputAs()); + } + + [Fact] + public async Task ActivityChain() + { + TaskName orchestratorName = nameof(ActivityChain); + TaskName plusOneActivityName = "PlusOne"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + int value = 0; + for (int i = 0; i < 10; i++) + { + value = await ctx.CallActivityAsync(plusOneActivityName, value); + } + + return value; + }) + .AddActivityFunc(plusOneActivityName, (ctx, input) => input + 1)); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: "World"); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(10, metadata.ReadOutputAs()); + } + + [Fact] + public async Task ActivityFanOut() + { + TaskName orchestratorName = nameof(ActivityFanOut); + TaskName toStringActivity = "ToString"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + var tasks = new List>(); + for (int i = 0; i < 10; i++) + { + tasks.Add(ctx.CallActivityAsync(toStringActivity, i)); + } + + string[] results = await Task.WhenAll(tasks); + Array.Sort(results); + Array.Reverse(results); + return results; + }) + .AddActivityFunc(toStringActivity, (ctx, input) => input.ToString())); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + + string[] expected = new[] { "9", "8", "7", "6", "5", "4", "3", "2", "1", "0" }; + Assert.Equal(expected, metadata.ReadOutputAs()); + } + + [Theory] + [InlineData(1)] + [InlineData(100)] + public async Task ExternalEvents(int eventCount) + { + TaskName orchestratorName = nameof(ExternalEvents); + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx => + { + List events = new(); + for (int i = 0; i < eventCount; i++) + { + events.Add(await ctx.WaitForExternalEvent($"Event{i}")); + } + + return events; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + + // To ensure consistency, wait for the instance to start before sending the events + OrchestrationMetadata metadata = await server.Client.WaitForInstanceStartAsync( + instanceId, + this.TimeoutToken); + + // Send events one-at-a-time to that we can better ensure ordered processing. + for (int i = 0; i < eventCount; i++) + { + await server.Client.RaiseEventAsync(metadata.InstanceId, $"Event{i}", eventPayload: i); + } + + // Once the orchestration receives all the events it is expecting, it should complete. + metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + + int[] expected = Enumerable.Range(0, eventCount).ToArray(); + Assert.Equal(expected, metadata.ReadOutputAs()); + } + + [Theory] + [InlineData(1)] + [InlineData(5)] + public async Task ExternalEventsInParallel(int eventCount) + { + TaskName orchestratorName = nameof(ExternalEvents); + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx => + { + List> events = new(); + for (int i = 0; i < eventCount; i++) + { + events.Add(ctx.WaitForExternalEvent("Event")); + } + + return await Task.WhenAll(events); + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + + // To ensure consistency, wait for the instance to start before sending the events + OrchestrationMetadata metadata = await server.Client.WaitForInstanceStartAsync( + instanceId, + this.TimeoutToken); + + // Send events one-at-a-time to that we can better ensure ordered processing. + for (int i = 0; i < eventCount; i++) + { + await server.Client.RaiseEventAsync(metadata.InstanceId, "Event", eventPayload: i); + } + + // Once the orchestration receives all the events it is expecting, it should complete. + metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true); + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + + int[] expected = Enumerable.Range(0, eventCount).ToArray(); + Assert.Equal(expected, metadata.ReadOutputAs()); + } + + [Fact] + public async Task Termination() + { + TaskName orchestrationName = nameof(Termination); + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc( + orchestrationName, ctx => ctx.CreateTimer(TimeSpan.FromSeconds(3), CancellationToken.None))); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestrationName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceStartAsync(instanceId, this.TimeoutToken); + + var expectedOutput = new { quote = "I'll be back." }; + await server.Client.TerminateInstanceAsync(instanceId, expectedOutput); + + metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Terminated, metadata.RuntimeStatus); + + JsonElement actualOutput = metadata.ReadOutputAs(); + string? actualQuote = actualOutput.GetProperty("quote").GetString(); + Assert.NotNull(actualQuote); + Assert.Equal(expectedOutput.quote, actualQuote); + } + + [Fact] + public async Task ContinueAsNew() + { + TaskName orchestratorName = nameof(ContinueAsNew); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async (ctx, input) => + { + if (input < 10) + { + await ctx.CreateTimer(TimeSpan.Zero, CancellationToken.None); + ctx.ContinueAsNew(input + 1); + } + + return input; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(10, metadata.ReadOutputAs()); + } + + [Fact] + public async Task SubOrchestration() + { + TaskName orchestratorName = nameof(SubOrchestration); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async (ctx, input) => + { + int result = 5; + if (input < 3) + { + // recursively call this same orchestrator + result += await ctx.CallSubOrchestratorAsync(orchestratorName, input: input + 1); + } + + return result; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: 1); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(15, metadata.ReadOutputAs()); + } + + [Fact] + public async Task SetCustomStatus() + { + TaskName orchestratorName = nameof(SetCustomStatus); + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx => + { + ctx.SetCustomStatus("Started!"); + + object customStatus = await ctx.WaitForExternalEvent("StatusEvent"); + ctx.SetCustomStatus(customStatus); + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + + // To ensure consistency, wait for the instance to start before sending the events + OrchestrationMetadata metadata = await server.Client.WaitForInstanceStartAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + Assert.NotNull(metadata); + Assert.Equal("Started!", metadata.ReadCustomStatusAs()); + + // Send a tuple payload, which will be used as the custom status + (string, int) eventPayload = ("Hello", 42); + await server.Client.RaiseEventAsync( + metadata.InstanceId, + eventName: "StatusEvent", + eventPayload); + + // Once the orchestration receives all the events it is expecting, it should complete. + metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(eventPayload, metadata.ReadCustomStatusAs<(string, int)>()); + } + + [Fact] + public async Task NewGuidTest() + { + TaskName orchestratorName = nameof(ContinueAsNew); + TaskName echoActivityName = "Echo"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async (ctx, input) => + { + // Test 1: Ensure two consecutively created GUIDs are unique + Guid currentGuid0 = ctx.NewGuid(); + Guid currentGuid1 = ctx.NewGuid(); + if (currentGuid0 == currentGuid1) + { + return false; + } + + // Test 2: Ensure that the same GUID values are created on each replay + Guid originalGuid1 = await ctx.CallActivityAsync(echoActivityName, currentGuid1); + if (currentGuid1 != originalGuid1) + { + return false; + } + + // Test 3: Ensure that the same GUID values are created on each replay even after an await + Guid currentGuid2 = ctx.NewGuid(); + Guid originalGuid2 = await ctx.CallActivityAsync(echoActivityName, currentGuid2); + if (currentGuid2 != originalGuid2) + { + return false; + } + + // Test 4: Finish confirming that every generated GUID is unique + return currentGuid1 != currentGuid2; + }) + .AddActivityFunc(echoActivityName, (ctx, input) => input)); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.True(metadata.ReadOutputAs()); + } + + [Fact] + public async Task SpecialSerialization() + { + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc("SpecialSerialization_Orchestration", (ctx, input) => + { + if (input is null) + { + throw new ArgumentNullException(nameof(input)); + } + + return ctx.CallActivityAsync("SpecialSerialization_Activity", input); + }) + .AddActivityFunc("SpecialSerialization_Activity", (ctx, input) => + { + if (input is not null) + { + input["newProperty"] = "new value"; + } + + return Task.FromResult(input); + })); + }); + + JsonNode input = new JsonObject() { ["originalProperty"] = "original value" }; + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + "SpecialSerialization_Orchestration", input: input); + OrchestrationMetadata result = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + JsonNode? output = result.ReadOutputAs(); + + Assert.NotNull(output); + Assert.Equal("original value", output?["originalProperty"]?.ToString()); + Assert.Equal("new value", output?["newProperty"]?.ToString()); + } + + // TODO: Additional versioning tests + [Fact] + public async Task OrchestrationVersionPassedThroughContext() + { + var version = "0.1"; + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc("Versioned_Orchestration", (ctx, input) => + { + return ctx.CallActivityAsync("Versioned_Activity", ctx.Version); + }) + .AddActivityFunc("Versioned_Activity", (ctx, input) => + { + return $"Orchestration version: {input}"; + })); + }, c => + { + c.UseDefaultVersion(version); + }); + + var instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync("Versioned_Orchestration", input: string.Empty); + var result = await server.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true, this.TimeoutToken); + var output = result.ReadOutputAs(); + + Assert.NotNull(output); + Assert.Equal(output, $"Orchestration version: {version}"); } - // TODO: Test for multiple external events with the same name - // TODO: Test for ContinueAsNew with external events that carry over - // TODO: Test for catching activity exceptions of specific types -} + [Fact] + public async Task OrchestrationVersioning_MatchTypeNotSpecified_NoVersionFailure() + { + var workerVersion = "0.1"; + var clientVersion = "0.2"; + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc("Versioned_Orchestration", (ctx, input) => + { + return ctx.CallActivityAsync("Versioned_Activity", ctx.Version); + }) + .AddActivityFunc("Versioned_Activity", (ctx, input) => + { + return $"Orchestration version: {input}"; + })); + b.UseVersioning(new() + { + Version = workerVersion, + FailureStrategy = DurableTaskWorkerOptions.VersionFailureStrategy.Fail + }); + }, c => + { + c.UseDefaultVersion(clientVersion); + }); + + var instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync("Versioned_Orchestration", input: string.Empty); + var result = await server.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true, this.TimeoutToken); + var output = result.ReadOutputAs(); + + Assert.NotNull(output); + // The worker doesn't pass it's version through the context, so we check the client version. The fact that it passed indicates versioning was ignored. + Assert.Equal(output, $"Orchestration version: {clientVersion}"); + } + + [Fact] + public async Task OrchestrationVersioning_MatchTypeNone_NoVersionFailure() + { + var workerVersion = "0.1"; + var clientVersion = "0.2"; + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc("Versioned_Orchestration", (ctx, input) => + { + return ctx.CallActivityAsync("Versioned_Activity", ctx.Version); + }) + .AddActivityFunc("Versioned_Activity", (ctx, input) => + { + return $"Orchestration version: {input}"; + })); + b.UseVersioning(new() + { + Version = workerVersion, + MatchStrategy = DurableTaskWorkerOptions.VersionMatchStrategy.None, + FailureStrategy = DurableTaskWorkerOptions.VersionFailureStrategy.Fail + }); + }, c => + { + c.UseDefaultVersion(clientVersion); + }); + + var instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync("Versioned_Orchestration", input: string.Empty); + var result = await server.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true, this.TimeoutToken); + var output = result.ReadOutputAs(); + + Assert.NotNull(output); + // The worker doesn't pass it's version through the context, so we check the client version. The fact that it passed indicates versioning was ignored. + Assert.Equal(output, $"Orchestration version: {clientVersion}"); + } + + [Fact] + public async Task OrchestrationVersioning_MatchTypeStrict_VersionFailure() + { + var workerVersion = "0.1"; + var clientVersion = "0.2"; + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc("Versioned_Orchestration", (ctx, input) => + { + return ctx.CallActivityAsync("Versioned_Activity", ctx.Version); + }) + .AddActivityFunc("Versioned_Activity", (ctx, input) => + { + return $"Orchestration version: {input}"; + })); + b.UseVersioning(new() + { + Version = workerVersion, + MatchStrategy = DurableTaskWorkerOptions.VersionMatchStrategy.Strict, + FailureStrategy = DurableTaskWorkerOptions.VersionFailureStrategy.Fail + }); + }, c => + { + c.UseDefaultVersion(clientVersion); + }); + + var instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync("Versioned_Orchestration", input: string.Empty); + var result = await server.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(result); + Assert.Equal(OrchestrationRuntimeStatus.Failed, result.RuntimeStatus); + Assert.NotNull(result.FailureDetails); + Assert.Equal("VersionMismatch", result.FailureDetails.ErrorType); + } + + [Fact] + public async Task OrchestrationVersioning_MatchTypeCurrentOrOlder_VersionFailure() + { + var workerVersion = "0.1"; + var clientVersion = "0.2"; + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc("Versioned_Orchestration", (ctx, input) => + { + return ctx.CallActivityAsync("Versioned_Activity", ctx.Version); + }) + .AddActivityFunc("Versioned_Activity", (ctx, input) => + { + return $"Orchestration version: {input}"; + })); + b.UseVersioning(new() + { + Version = workerVersion, + MatchStrategy = DurableTaskWorkerOptions.VersionMatchStrategy.CurrentOrOlder, + FailureStrategy = DurableTaskWorkerOptions.VersionFailureStrategy.Fail + }); + }, c => + { + c.UseDefaultVersion(clientVersion); + }); + + var instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync("Versioned_Orchestration", input: string.Empty); + var result = await server.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(result); + Assert.Equal(OrchestrationRuntimeStatus.Failed, result.RuntimeStatus); + Assert.NotNull(result.FailureDetails); + Assert.Equal("VersionMismatch", result.FailureDetails.ErrorType); + } + + [Fact] + public async Task OrchestrationVersioning_MatchTypeCurrentOrOlder_VersionSuccess() + { + var workerVersion = "0.3"; + var clientVersion = "0.2"; + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc("Versioned_Orchestration", (ctx, input) => + { + return ctx.CallActivityAsync("Versioned_Activity", ctx.Version); + }) + .AddActivityFunc("Versioned_Activity", (ctx, input) => + { + return $"Orchestration version: {input}"; + })); + b.UseVersioning(new() + { + Version = workerVersion, + MatchStrategy = DurableTaskWorkerOptions.VersionMatchStrategy.CurrentOrOlder, + FailureStrategy = DurableTaskWorkerOptions.VersionFailureStrategy.Fail + }); + }, c => + { + c.UseDefaultVersion(clientVersion); + }); + + var instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync("Versioned_Orchestration", input: string.Empty); + var result = await server.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true, this.TimeoutToken); + var output = result.ReadOutputAs(); + + Assert.NotNull(output); + // The worker doesn't pass it's version through the context, so we check the client version. The fact that it passed indicates versioning was ignored. + Assert.Equal(output, $"Orchestration version: {clientVersion}"); + } + + // TODO: Test for multiple external events with the same name + // TODO: Test for ContinueAsNew with external events that carry over + // TODO: Test for catching activity exceptions of specific types +}