diff --git a/Directory.Packages.props b/Directory.Packages.props index 8388030fa..34509fe65 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -9,6 +9,7 @@ + @@ -28,7 +29,7 @@ - + diff --git a/eng/targets/Release.props b/eng/targets/Release.props index c731d3f9f..f4c376c63 100644 --- a/eng/targets/Release.props +++ b/eng/targets/Release.props @@ -17,7 +17,7 @@ - 1.14.0 + 1.15.0 diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index 3b9c4f408..df5143bc9 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -342,8 +342,10 @@ message OrchestratorResponse { // The number of work item events that were processed by the orchestrator. // This field is optional. If not set, the service should assume that the orchestrator processed all events. google.protobuf.Int32Value numEventsProcessed = 5; - OrchestrationTraceContext orchestrationTraceContext = 6; + + // Whether or not a history is required to complete the original OrchestratorRequest and none was provided. + bool requiresHistory = 7; } message CreateInstanceRequest { @@ -678,6 +680,18 @@ message AbandonEntityTaskResponse { // Empty. } +message SkipGracefulOrchestrationTerminationsRequest { + // A maximum of 500 instance IDs can be provided in this list. + repeated string instanceIds = 1; + google.protobuf.StringValue reason = 2; +} + +message SkipGracefulOrchestrationTerminationsResponse { + // Those instances which could not be terminated because they had locked entities at the time of this termination call, + // are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged) + repeated string unterminatedInstanceIds = 1; +} + service TaskHubSidecarService { // Sends a hello request to the sidecar service. rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty); @@ -751,6 +765,10 @@ service TaskHubSidecarService { // Abandon an entity work item rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse); + + // "Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated". + // Note that a maximum of 500 orchestrations can be terminated at a time using this method. + rpc SkipGracefulOrchestrationTerminations(SkipGracefulOrchestrationTerminationsRequest) returns (SkipGracefulOrchestrationTerminationsResponse); } message GetWorkItemsRequest { diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index e9f651378..3e4d1b210 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-09-10 22:50:45 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/985035a0890575ae18be0eb2a3ac93c10824498a/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2025-09-17 01:45:58 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/f5745e0d83f608d77871c1894d9260ceaae08967/protos/orchestrator_service.proto diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index 54957e72f..e3e331f77 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -279,18 +279,19 @@ internal static Timestamp ToTimestamp(this DateTime dateTime) /// /// The entity conversion state, or null if no conversion is required. /// The that represents orchestration execution. + /// Whether or not a history is required to complete the orchestration request and none was provided. /// The orchestrator response. /// When an orchestrator action is unknown. internal static P.OrchestratorResponse ConstructOrchestratorResponse( string instanceId, string executionId, string? customStatus, - IEnumerable actions, + IEnumerable? actions, string completionToken, EntityConversionState? entityConversionState, - Activity? orchestrationActivity) + Activity? orchestrationActivity, + bool requiresHistory = false) { - Check.NotNull(actions); var response = new P.OrchestratorResponse { InstanceId = instanceId, @@ -302,8 +303,16 @@ internal static P.OrchestratorResponse ConstructOrchestratorResponse( SpanID = orchestrationActivity?.SpanId.ToString(), SpanStartTime = orchestrationActivity?.StartTimeUtc.ToTimestamp(), }, + RequiresHistory = requiresHistory, }; + // If a history is required and the orchestration request was not completed, then there is no list of actions. + if (requiresHistory) + { + return response; + } + + Check.NotNull(actions); foreach (OrchestratorAction action in actions) { var protoAction = new P.OrchestratorAction { Id = action.Id }; diff --git a/src/Worker/Core/ExtendedSessionState.cs b/src/Worker/Core/ExtendedSessionState.cs new file mode 100644 index 000000000..4d67e0476 --- /dev/null +++ b/src/Worker/Core/ExtendedSessionState.cs @@ -0,0 +1,40 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using DurableTask.Core; + +namespace Microsoft.DurableTask.Worker; + +/// +/// Represents the state of an extended session for an orchestration. +/// +public class ExtendedSessionState +{ + /// + /// Initializes a new instance of the class. + /// + /// The orchestration's runtime state. + /// The TaskOrchestration implementation of the orchestration. + /// The TaskOrchestrationExecutor for the orchestration. + public ExtendedSessionState(OrchestrationRuntimeState state, TaskOrchestration taskOrchestration, TaskOrchestrationExecutor orchestrationExecutor) + { + this.RuntimeState = state; + this.TaskOrchestration = taskOrchestration; + this.OrchestrationExecutor = orchestrationExecutor; + } + + /// + /// Gets or sets the saved runtime state of the orchestration. + /// + public OrchestrationRuntimeState RuntimeState { get; set; } + + /// + /// Gets or sets the saved TaskOrchestration implementation of the orchestration. + /// + public TaskOrchestration TaskOrchestration { get; set; } + + /// + /// Gets or sets the saved TaskOrchestrationExecutor. + /// + public TaskOrchestrationExecutor OrchestrationExecutor { get; set; } +} diff --git a/src/Worker/Core/ExtendedSessionsCache.cs b/src/Worker/Core/ExtendedSessionsCache.cs new file mode 100644 index 000000000..59df25366 --- /dev/null +++ b/src/Worker/Core/ExtendedSessionsCache.cs @@ -0,0 +1,48 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.Extensions.Caching.Memory; + +namespace Microsoft.DurableTask.Worker; + +/// +/// A cache for extended sessions that wraps a instance. +/// Responsible for holding for orchestrations that are running within extended sessions. +/// +public class ExtendedSessionsCache : IDisposable +{ + MemoryCache? extendedSessions; + + /// + /// Gets a value indicating whether returns whether or not the cache has been initialized. + /// + /// True if the cache has been initialized, false otherwise. + public bool IsInitialized => this.extendedSessions is not null; + + /// + /// Dispose the cache and release all resources. + /// + public void Dispose() + { + this.extendedSessions?.Dispose(); + GC.SuppressFinalize(this); + } + + /// + /// Gets the cache for extended sessions if it has already been initialized, or otherwise initializes it with the given expiration scan frequency. + /// + /// + /// The expiration scan frequency of the cache, in seconds. + /// This specifies how often the cache checks for stale items, and evicts them. + /// + /// The IMemoryCache that holds the cached . + public MemoryCache GetOrInitializeCache(double expirationScanFrequencyInSeconds) + { + this.extendedSessions ??= new MemoryCache(new MemoryCacheOptions + { + ExpirationScanFrequency = TimeSpan.FromSeconds(expirationScanFrequencyInSeconds / 5), + }); + + return this.extendedSessions; + } +} diff --git a/src/Worker/Core/Worker.csproj b/src/Worker/Core/Worker.csproj index 5791edf73..daed82b15 100644 --- a/src/Worker/Core/Worker.csproj +++ b/src/Worker/Core/Worker.csproj @@ -9,6 +9,7 @@ The worker is responsible for processing durable task work items. + diff --git a/src/Worker/Grpc/GrpcOrchestrationRunner.cs b/src/Worker/Grpc/GrpcOrchestrationRunner.cs index 064b38c65..74a92006a 100644 --- a/src/Worker/Grpc/GrpcOrchestrationRunner.cs +++ b/src/Worker/Grpc/GrpcOrchestrationRunner.cs @@ -5,6 +5,7 @@ using DurableTask.Core.History; using Google.Protobuf; using Microsoft.DurableTask.Worker.Shims; +using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.DependencyInjection; using P = Microsoft.DurableTask.Protobuf; @@ -82,6 +83,40 @@ public static string LoadAndRun( string encodedOrchestratorRequest, ITaskOrchestrator implementation, IServiceProvider? services = null) + { + return LoadAndRun(encodedOrchestratorRequest, implementation, extendedSessionsCache: null, services: services); + } + + /// + /// Deserializes orchestration history from and uses it to resume the + /// orchestrator implemented by . + /// + /// + /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string. + /// + /// + /// An implementation that defines the orchestrator logic. + /// + /// + /// The cache of extended sessions which can be used to retrieve the if this orchestration request is from within an extended session. + /// + /// + /// Optional from which injected dependencies can be retrieved. + /// + /// + /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger. + /// + /// + /// Thrown if or is null. + /// + /// + /// Thrown if contains invalid data. + /// + public static string LoadAndRun( + string encodedOrchestratorRequest, + ITaskOrchestrator implementation, + ExtendedSessionsCache? extendedSessionsCache, + IServiceProvider? services = null) { Check.NotNullOrEmpty(encodedOrchestratorRequest); Check.NotNull(implementation); @@ -95,34 +130,114 @@ public static string LoadAndRun( pair => pair.Key, pair => ProtoUtils.ConvertValueToObject(pair.Value)); - // Re-construct the orchestration state from the history. - // New events must be added using the AddEvent method. - OrchestrationRuntimeState runtimeState = new(pastEvents); - foreach (HistoryEvent newEvent in newEvents) - { - runtimeState.AddEvent(newEvent); + OrchestratorExecutionResult? result = null; + MemoryCache? extendedSessions = null; + + // If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the orchestration history is attached + bool addToExtendedSessions = false; + bool requiresHistory = false; + bool pastEventsIncluded = true; + bool isExtendedSession = false; + double extendedSessionIdleTimeoutInSeconds = 0; + + // Only attempt to initialize the extended sessions cache if all the parameters are correctly specified + if (properties.TryGetValue("ExtendedSessionIdleTimeoutInSeconds", out object? extendedSessionIdleTimeoutObj) + && extendedSessionIdleTimeoutObj is double extendedSessionIdleTimeout + && extendedSessionIdleTimeout > 0 + && properties.TryGetValue("IsExtendedSession", out object? extendedSessionObj) + && extendedSessionObj is bool extendedSession) + { + extendedSessionIdleTimeoutInSeconds = extendedSessionIdleTimeout; + isExtendedSession = extendedSession; + extendedSessions = extendedSessionsCache?.GetOrInitializeCache(extendedSessionIdleTimeoutInSeconds); + } + + if (properties.TryGetValue("IncludePastEvents", out object? includePastEventsObj) + && includePastEventsObj is bool includePastEvents) + { + pastEventsIncluded = includePastEvents; + } + + if (isExtendedSession && extendedSessions != null) + { + // If a history was provided, even if we already have an extended session stored, we always want to evict whatever state is in the cache and replace it with a new extended + // session based on the provided history + if (!pastEventsIncluded && extendedSessions.TryGetValue(request.InstanceId, out ExtendedSessionState? extendedSessionState) && extendedSessionState is not null) + { + OrchestrationRuntimeState runtimeState = extendedSessionState!.RuntimeState; + runtimeState.NewEvents.Clear(); + foreach (HistoryEvent newEvent in newEvents) + { + runtimeState.AddEvent(newEvent); + } + + result = extendedSessionState.OrchestrationExecutor.ExecuteNewEvents(); + if (extendedSessionState.OrchestrationExecutor.IsCompleted) + { + extendedSessions.Remove(request.InstanceId); + } + } + else + { + extendedSessions.Remove(request.InstanceId); + addToExtendedSessions = true; + } } - TaskName orchestratorName = new(runtimeState.Name); - ParentOrchestrationInstance? parent = runtimeState.ParentInstance is ParentInstance p - ? new(new(p.Name), p.OrchestrationInstance.InstanceId) - : null; + if (result == null) + { + // DurableTask.Core did not attach the orchestration history since the extended session is still active on its end, but we have since evicted the + // session and lost the orchestration history so we cannot replay the orchestration. + if (!pastEventsIncluded) + { + requiresHistory = true; + } + else + { + // Re-construct the orchestration state from the history. + // New events must be added using the AddEvent method. + OrchestrationRuntimeState runtimeState = new(pastEvents); + + foreach (HistoryEvent newEvent in newEvents) + { + runtimeState.AddEvent(newEvent); + } + + TaskName orchestratorName = new(runtimeState.Name); + ParentOrchestrationInstance? parent = runtimeState.ParentInstance is ParentInstance p + ? new(new(p.Name), p.OrchestrationInstance.InstanceId) + : null; - DurableTaskShimFactory factory = services is null - ? DurableTaskShimFactory.Default - : ActivatorUtilities.GetServiceOrCreateInstance(services); - TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, properties, parent); - TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails); - OrchestratorExecutionResult result = executor.Execute(); + DurableTaskShimFactory factory = services is null + ? DurableTaskShimFactory.Default + : ActivatorUtilities.GetServiceOrCreateInstance(services); + TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, properties, parent); + TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails); + result = executor.Execute(); + + if (addToExtendedSessions && !executor.IsCompleted) + { + extendedSessions.Set( + request.InstanceId, + new(runtimeState, shim, executor), + new MemoryCacheEntryOptions { SlidingExpiration = TimeSpan.FromSeconds(extendedSessionIdleTimeoutInSeconds) }); + } + else + { + extendedSessions?.Remove(request.InstanceId); + } + } + } P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse( - request.InstanceId, + request.InstanceId, request.ExecutionId, - result.CustomStatus, - result.Actions, + result?.CustomStatus, + result?.Actions, completionToken: string.Empty, /* doesn't apply */ - entityConversionState: null, - orchestrationActivity: null); + entityConversionState: null, + orchestrationActivity: null, + requiresHistory: requiresHistory); byte[] responseBytes = response.ToByteArray(); return Convert.ToBase64String(responseBytes); } diff --git a/test/Worker/Grpc.Tests/GrpcOrchestrationRunnerTests.cs b/test/Worker/Grpc.Tests/GrpcOrchestrationRunnerTests.cs new file mode 100644 index 000000000..6c3179211 --- /dev/null +++ b/test/Worker/Grpc.Tests/GrpcOrchestrationRunnerTests.cs @@ -0,0 +1,484 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Google.Protobuf; +using Google.Protobuf.Collections; +using Google.Protobuf.WellKnownTypes; + +namespace Microsoft.DurableTask.Worker.Grpc.Tests; + +public class GrpcOrchestrationRunnerTests +{ + const string TestInstanceId = "instance_id"; + const string TestExecutionId = "execution_id"; + const int DefaultExtendedSessionIdleTimeoutInSeconds = 30; + + [Fact] + public void EmptyOrNullParameters_Throw_Exceptions() + { + Action act = () => + GrpcOrchestrationRunner.LoadAndRun(string.Empty, new SimpleOrchestrator(), new ExtendedSessionsCache()); + act.Should().ThrowExactly().WithParameterName("encodedOrchestratorRequest"); + + act = () => + GrpcOrchestrationRunner.LoadAndRun(null!, new SimpleOrchestrator(), new ExtendedSessionsCache()); + act.Should().ThrowExactly().WithParameterName("encodedOrchestratorRequest"); + + act = () => + GrpcOrchestrationRunner.LoadAndRun("request", null!, new ExtendedSessionsCache()); + act.Should().ThrowExactly().WithParameterName("implementation"); + } + + [Fact] + public void EmptyHistory_Returns_NeedsHistoryInResponse() + { + using var extendedSessions = new ExtendedSessionsCache(); + + // No history and without extended sessions enabled + Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([]); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(false) }}); + byte[] requestBytes = orchestratorRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + string stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + Assert.True(response.RequiresHistory); + Assert.False(extendedSessions.IsInitialized); + + // No history but with extended sessions enabled + orchestratorRequest.Properties.Add(new MapField() { + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + requestBytes = orchestratorRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + Assert.True(response.RequiresHistory); + Assert.True(extendedSessions.IsInitialized); + } + + [Fact] + public void MalformedRequestParameters_Means_CacheNotInitialized() + { + using var extendedSessions = new ExtendedSessionsCache(); + Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([]); + + // Misspelled extended session timeout key + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionsIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + byte[] requestBytes = orchestratorRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + string stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + Assert.False(extendedSessions.IsInitialized); + + // Wrong value type for extended session timeout key + orchestratorRequest.Properties.Clear(); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForString("hi") } }); + requestBytes = orchestratorRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + Assert.False(extendedSessions.IsInitialized); + + // Invalid number for extended session timeout key (must be > 0) + orchestratorRequest.Properties.Clear(); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(0) } }); + requestBytes = orchestratorRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + Assert.False(extendedSessions.IsInitialized); + + // Invalid number for extended session timeout key (must be > 0) + orchestratorRequest.Properties.Clear(); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(-1) } }); + requestBytes = orchestratorRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + Assert.False(extendedSessions.IsInitialized); + + // No extended session timeout key + orchestratorRequest.Properties.Clear(); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(true) } }); + requestBytes = orchestratorRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + Assert.False(extendedSessions.IsInitialized); + + // Misspelled extended session key + orchestratorRequest.Properties.Clear(); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(false) }, + { "isExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + requestBytes = orchestratorRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + Assert.False(extendedSessions.IsInitialized); + + // Wrong value type for extended session key + orchestratorRequest.Properties.Clear(); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForNumber(1) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + requestBytes = orchestratorRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + Assert.False(extendedSessions.IsInitialized); + + // No extended session key + orchestratorRequest.Properties.Clear(); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(false) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + requestBytes = orchestratorRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + Assert.False(extendedSessions.IsInitialized); + } + + [Fact] + public void IsExtendedSessionFalse_Means_NoExtendedSessionStored() + { + using var extendedSessions = new ExtendedSessionsCache(); + Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([]); + + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(false) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + byte[] requestBytes = orchestratorRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + GrpcOrchestrationRunner.LoadAndRun(requestString, new CallSubOrchestrationOrchestrator(), extendedSessions); + Assert.True(extendedSessions.IsInitialized); + Assert.False(extendedSessions.GetOrInitializeCache(DefaultExtendedSessionIdleTimeoutInSeconds).TryGetValue(TestInstanceId, out object? extendedSession)); + } + + /// + /// These tests verify that a malformed/nonexistent "IncludePastEvents" parameter means that the worker will attempt to + /// fulfill the orchestration request and not request a history for it. However, it is of course very undesirable that a + /// history is not attached to the request, but no history is requested by the worker due to a malformed "IncludePastEvents" parameter + /// even when it needs one to fulfill the request. This would need to be checked on whatever side is calling this SDK. + /// + [Fact] + public void MalformedPastEventsParameter_Means_NoHistoryRequired() + { + using var extendedSessions = new ExtendedSessionsCache(); + var historyEvent = new Protobuf.HistoryEvent + { + EventId = -1, + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + ExecutionStarted = new Protobuf.ExecutionStartedEvent() + { + OrchestrationInstance = new Protobuf.OrchestrationInstance + { + InstanceId = TestInstanceId, + ExecutionId = TestExecutionId, + }, + } + }; + Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([historyEvent]); + + // Misspelled include past events key + orchestratorRequest.Properties.Add(new MapField() { + { "INcludePastEvents", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(false) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + byte[] requestBytes = orchestratorRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + string stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + Assert.False(response.RequiresHistory); + + // Wrong value type for include past events key + orchestratorRequest.Properties.Clear(); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForString("no") }, + { "IsExtendedSession", Value.ForBool(false) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + requestBytes = orchestratorRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + Assert.False(response.RequiresHistory); + + // No include past events key + orchestratorRequest.Properties.Clear(); + orchestratorRequest.Properties.Add(new MapField() { + { "IsExtendedSession", Value.ForBool(false) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + requestBytes = orchestratorRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + Assert.False(response.RequiresHistory); + } + + [Fact] + public void Incomplete_Orchestration_Stored() + { + using var extendedSessions = new ExtendedSessionsCache(); + var historyEvent = new Protobuf.HistoryEvent + { + EventId = -1, + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + ExecutionStarted = new Protobuf.ExecutionStartedEvent() + { + OrchestrationInstance = new Protobuf.OrchestrationInstance + { + InstanceId = TestInstanceId, + ExecutionId = TestExecutionId, + }, + } + }; + Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([historyEvent]); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(true) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + byte[] requestBytes = orchestratorRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + GrpcOrchestrationRunner.LoadAndRun(requestString, new CallSubOrchestrationOrchestrator(), extendedSessions); + Assert.True(extendedSessions.IsInitialized); + Assert.True(extendedSessions.GetOrInitializeCache(DefaultExtendedSessionIdleTimeoutInSeconds).TryGetValue(TestInstanceId, out object? extendedSession)); + } + + [Fact] + public void Complete_Orchestration_NotStored() + { + using var extendedSessions = new ExtendedSessionsCache(); + var historyEvent = new Protobuf.HistoryEvent + { + EventId = -1, + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + ExecutionStarted = new Protobuf.ExecutionStartedEvent() + { + OrchestrationInstance = new Protobuf.OrchestrationInstance + { + InstanceId = TestInstanceId, + ExecutionId = TestExecutionId, + }, + } + }; + Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([historyEvent]); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(true) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + byte[] requestBytes = orchestratorRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + Assert.True(extendedSessions.IsInitialized); + Assert.False(extendedSessions.GetOrInitializeCache(DefaultExtendedSessionIdleTimeoutInSeconds).TryGetValue(TestInstanceId, out object? extendedSession)); + } + + [Fact] + public void ExternallyEndedExtendedSession_Evicted() + { + using var extendedSessions = new ExtendedSessionsCache(); + var historyEvent = new Protobuf.HistoryEvent + { + EventId = -1, + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + ExecutionStarted = new Protobuf.ExecutionStartedEvent() + { + OrchestrationInstance = new Protobuf.OrchestrationInstance + { + InstanceId = TestInstanceId, + ExecutionId = TestExecutionId, + }, + } + }; + Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([historyEvent]); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(true) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + byte[] requestBytes = orchestratorRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + GrpcOrchestrationRunner.LoadAndRun(requestString, new CallSubOrchestrationOrchestrator(), extendedSessions); + Assert.True(extendedSessions.IsInitialized); + Assert.True(extendedSessions.GetOrInitializeCache(DefaultExtendedSessionIdleTimeoutInSeconds).TryGetValue(TestInstanceId, out object? extendedSession)); + + // Now set the extended session flag to false for this instance + orchestratorRequest.Properties.Clear(); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(true) }, + { "IsExtendedSession", Value.ForBool(false) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + requestBytes = orchestratorRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + GrpcOrchestrationRunner.LoadAndRun(requestString, new CallSubOrchestrationOrchestrator(), extendedSessions); + Assert.True(extendedSessions.IsInitialized); + Assert.False(extendedSessions.GetOrInitializeCache(DefaultExtendedSessionIdleTimeoutInSeconds).TryGetValue(TestInstanceId, out extendedSession)); + } + + [Fact] + public async void Stale_ExtendedSessions_Evicted_Async() + { + using var extendedSessions = new ExtendedSessionsCache(); + int extendedSessionIdleTimeout = 5; + var historyEvent = new Protobuf.HistoryEvent + { + EventId = -1, + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + ExecutionStarted = new Protobuf.ExecutionStartedEvent() + { + OrchestrationInstance = new Protobuf.OrchestrationInstance + { + InstanceId = TestInstanceId, + ExecutionId = TestExecutionId, + }, + } + }; + Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([historyEvent]); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(true) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(extendedSessionIdleTimeout) } }); + byte[] requestBytes = orchestratorRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + GrpcOrchestrationRunner.LoadAndRun(requestString, new CallSubOrchestrationOrchestrator(), extendedSessions); + Assert.True(extendedSessions.IsInitialized); + Assert.True(extendedSessions.GetOrInitializeCache(extendedSessionIdleTimeout).TryGetValue(TestInstanceId, out object? extendedSession)); + + // Wait for longer than the timeout to account for finite cache scan for stale items frequency + await Task.Delay(extendedSessionIdleTimeout * 1000 * 2); + Assert.False(extendedSessions.GetOrInitializeCache(extendedSessionIdleTimeout).TryGetValue(TestInstanceId, out extendedSession)); + + // Now that the entry was evicted from the cache, the orchestration runner needs an orchestration history to complete the request + orchestratorRequest.Properties.Clear(); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(extendedSessionIdleTimeout) } }); + requestBytes = orchestratorRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + string stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new CallSubOrchestrationOrchestrator(), extendedSessions); + Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + Assert.True(response.RequiresHistory); + } + + [Fact] + public void PastEventIncluded_Means_ExtendedSession_Evicted() + { + using var extendedSessions = new ExtendedSessionsCache(); + int extendedSessionIdleTimeout = 5; + var historyEvent = new Protobuf.HistoryEvent + { + EventId = -1, + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + ExecutionStarted = new Protobuf.ExecutionStartedEvent() + { + OrchestrationInstance = new Protobuf.OrchestrationInstance + { + InstanceId = TestInstanceId, + ExecutionId = TestExecutionId, + }, + } + }; + Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([historyEvent]); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(true) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(extendedSessionIdleTimeout) } }); + byte[] requestBytes = orchestratorRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + GrpcOrchestrationRunner.LoadAndRun(requestString, new CallSubOrchestrationOrchestrator(), extendedSessions); + Assert.True(extendedSessions.IsInitialized); + Assert.True(extendedSessions.GetOrInitializeCache(extendedSessionIdleTimeout).TryGetValue(TestInstanceId, out object? extendedSession)); + + // Now we will retry the same exact request. If the extended session is not evicted, then the request will fail due to duplicate ExecutionStarted events being detected + // If the extended session is evicted because IncludePastEvents is true, then the request will succeed and a new extended session will be stored + GrpcOrchestrationRunner.LoadAndRun(requestString, new CallSubOrchestrationOrchestrator(), extendedSessions); + Assert.True(extendedSessions.GetOrInitializeCache(extendedSessionIdleTimeout).TryGetValue(TestInstanceId, out extendedSession)); + } + + [Fact] + public void Null_ExtendedSessionsCache_IsOkay() + { + var historyEvent = new Protobuf.HistoryEvent + { + EventId = -1, + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + ExecutionStarted = new Protobuf.ExecutionStartedEvent() + { + OrchestrationInstance = new Protobuf.OrchestrationInstance + { + InstanceId = TestInstanceId, + ExecutionId = TestExecutionId, + }, + } + }; + Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([historyEvent]); + + // Set up the parameters as if extended sessions are enabled, but do not pass an extended session cache to the request. + // The request should still be successful. + orchestratorRequest.Properties.Add(new MapField() { + { "IncludePastEvents", Value.ForBool(true) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + byte[] requestBytes = orchestratorRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + string stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator()); + Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + Assert.Single(response.Actions); + Assert.NotNull(response.Actions[0].CompleteOrchestration); + Assert.Equal(Protobuf.OrchestrationStatus.Completed, response.Actions[0].CompleteOrchestration.OrchestrationStatus); + + // Now try it again without any properties specified. The request should still be successful. + orchestratorRequest.Properties.Clear(); + requestBytes = orchestratorRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator()); + response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + Assert.Single(response.Actions); + Assert.NotNull(response.Actions[0].CompleteOrchestration); + Assert.Equal(Protobuf.OrchestrationStatus.Completed, response.Actions[0].CompleteOrchestration.OrchestrationStatus); + } + + static Protobuf.OrchestratorRequest CreateOrchestratorRequest(IEnumerable newEvents) + { + var orchestratorRequest = new Protobuf.OrchestratorRequest() + { + InstanceId = TestInstanceId, + PastEvents = { Enumerable.Empty() }, + NewEvents = { newEvents }, + EntityParameters = new Protobuf.OrchestratorEntityParameters + { + EntityMessageReorderWindow = Duration.FromTimeSpan(TimeSpan.Zero), + }, + }; + return orchestratorRequest; + } + + class SimpleOrchestrator : TaskOrchestrator + { + public override Task RunAsync(TaskOrchestrationContext context, string input) + { + return Task.FromResult(input); + } + } + + class CallSubOrchestrationOrchestrator : TaskOrchestrator + { + public override async Task RunAsync(TaskOrchestrationContext context, string input) + { + await context.CallSubOrchestratorAsync(nameof(SimpleOrchestrator)); + return input; + } + } +}