diff --git a/src/Worker/Grpc/GrpcEntityRunner.cs b/src/Worker/Grpc/GrpcEntityRunner.cs index f35366dfc..e5f6763e4 100644 --- a/src/Worker/Grpc/GrpcEntityRunner.cs +++ b/src/Worker/Grpc/GrpcEntityRunner.cs @@ -5,7 +5,8 @@ using DurableTask.Core.Entities.OperationFormat; using Google.Protobuf; using Microsoft.DurableTask.Entities; -using Microsoft.DurableTask.Worker.Shims; +using Microsoft.DurableTask.Worker.Shims; +using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.DependencyInjection; using P = Microsoft.DurableTask.Protobuf; @@ -25,7 +26,7 @@ namespace Microsoft.DurableTask.Worker.Grpc; /// /// public static class GrpcEntityRunner -{ +{ /// /// Deserializes entity batch request from and uses it to invoke the /// requested operations implemented by . @@ -51,24 +52,99 @@ public static class GrpcEntityRunner /// public static async Task LoadAndRunAsync( string encodedEntityRequest, ITaskEntity implementation, IServiceProvider? services = null) + { + return await LoadAndRunAsync(encodedEntityRequest, implementation, extendedSessionsCache: null, services: services); + } + + /// + /// Deserializes entity batch request from and uses it to invoke the + /// requested operations implemented by . + /// + /// + /// The encoded protobuf payload representing an entity batch request. This is a base64-encoded string. + /// + /// + /// An implementation that defines the entity logic. + /// + /// + /// The cache of entity states which can be used to retrieve the entity state if this request is from within an extended session. + /// + /// + /// Optional from which injected dependencies can be retrieved. + /// + /// + /// Returns a serialized result of the entity batch that should be used as the return value of the entity function + /// trigger. + /// + /// + /// Thrown if or is null. + /// + /// + /// Thrown if contains invalid data. + /// + public static async Task LoadAndRunAsync( + string encodedEntityRequest, ITaskEntity implementation, ExtendedSessionsCache? extendedSessionsCache, IServiceProvider? services = null) { Check.NotNullOrEmpty(encodedEntityRequest); Check.NotNull(implementation); P.EntityBatchRequest request = P.EntityBatchRequest.Parser.Base64Decode( - encodedEntityRequest); + encodedEntityRequest); + Dictionary properties = request.Properties.ToDictionary( + pair => pair.Key, + pair => ProtoUtils.ConvertValueToObject(pair.Value)); EntityBatchRequest batch = request.ToEntityBatchRequest(); EntityId id = EntityId.FromString(batch.InstanceId!); - TaskName entityName = new(id.Name); - + TaskName entityName = new(id.Name); + + bool addToExtendedSessions = false; + bool stateCached = false; + GrpcInstanceRunnerUtils.ParseRequestPropertiesAndInitializeCache( + properties, + extendedSessionsCache, + out double extendedSessionIdleTimeoutInSeconds, + out bool isExtendedSession, + out bool entityStateIncluded, + out MemoryCache? extendedSessions); + + if (isExtendedSession && extendedSessions != null) + { + addToExtendedSessions = true; + + // If an entity state was provided, even if we already have one stored, we always want to use the provided state. + if (!entityStateIncluded && extendedSessions.TryGetValue(request.InstanceId, out string? entityState)) + { + batch.EntityState = entityState; + stateCached = true; + } + } + + if (!stateCached && !entityStateIncluded) + { + // No state was provided, and we do not have one cached, so we cannot execute the batch request. + return Convert.ToBase64String(new P.EntityBatchResult { RequiresState = true }.ToByteArray()); + } + DurableTaskShimFactory factory = services is null ? DurableTaskShimFactory.Default - : ActivatorUtilities.GetServiceOrCreateInstance(services); - - TaskEntity entity = factory.CreateEntity(entityName, implementation, id); - EntityBatchResult result = await entity.ExecuteOperationBatchAsync(batch); - + : ActivatorUtilities.GetServiceOrCreateInstance(services); + + TaskEntity entity = factory.CreateEntity(entityName, implementation, id); + EntityBatchResult result = await entity.ExecuteOperationBatchAsync(batch); + + if (addToExtendedSessions) + { + extendedSessions.Set( + request.InstanceId, + result.EntityState, + new MemoryCacheEntryOptions { SlidingExpiration = TimeSpan.FromSeconds(extendedSessionIdleTimeoutInSeconds) }); + } + else + { + extendedSessions?.Remove(request.InstanceId); + } + P.EntityBatchResult response = result.ToEntityBatchResult(); byte[] responseBytes = response.ToByteArray(); return Convert.ToBase64String(responseBytes); diff --git a/src/Worker/Grpc/GrpcInstanceRunnerUtils.cs b/src/Worker/Grpc/GrpcInstanceRunnerUtils.cs new file mode 100644 index 000000000..6d395bc8d --- /dev/null +++ b/src/Worker/Grpc/GrpcInstanceRunnerUtils.cs @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Text; +using Microsoft.Extensions.Caching.Memory; + +namespace Microsoft.DurableTask.Worker.Grpc; + +/// +/// Utility methods for the and classes. +/// +static class GrpcInstanceRunnerUtils +{ + /// + /// Parses request properties to determine extended session settings and initializes the extended sessions cache if + /// the settings are properly enabled. + /// + /// + /// If any request property is missing or invalid (i.e. the key is misspelled or the value is of the wrong type), + /// extended sessions are not enabled and default values are assigned are assigned to the returns. + /// + /// + /// A dictionary containing request properties used to configure extended session behavior. + /// + /// The extended sessions cache manager. + /// + /// When the method returns, contains the idle timeout value for extended sessions, in seconds. Cache entries that + /// have not been accessed in this timeframe are evicted from . + /// Set to zero if extended sessions are not enabled. + /// + /// When the method returns, indicates whether this request is from within an extended session. + /// When the method returns, indicates whether instance state is included in the request. + /// When the method returns, contains the extended sessions cache initialized from + /// if and + /// are correctly specified in the ; otherwise, null. + /// + internal static void ParseRequestPropertiesAndInitializeCache( + Dictionary properties, + ExtendedSessionsCache? extendedSessionsCache, + out double extendedSessionIdleTimeoutInSeconds, + out bool isExtendedSession, + out bool stateIncluded, + out MemoryCache? extendedSessions) + { + // If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the instance state is attached + extendedSessions = null; + stateIncluded = true; + isExtendedSession = false; + extendedSessionIdleTimeoutInSeconds = 0; + + // Only attempt to initialize the extended sessions cache if all the parameters are correctly specified + if (properties.TryGetValue("ExtendedSessionIdleTimeoutInSeconds", out object? extendedSessionIdleTimeoutObj) + && extendedSessionIdleTimeoutObj is double extendedSessionIdleTimeout + && extendedSessionIdleTimeout > 0 + && properties.TryGetValue("IsExtendedSession", out object? extendedSessionObj) + && extendedSessionObj is bool extendedSession) + { + extendedSessionIdleTimeoutInSeconds = extendedSessionIdleTimeout; + isExtendedSession = extendedSession; + extendedSessions = extendedSessionsCache?.GetOrInitializeCache(extendedSessionIdleTimeoutInSeconds); + } + + if (properties.TryGetValue("IncludeState", out object? includeStateObj) + && includeStateObj is bool includeState) + { + stateIncluded = includeState; + } + } +} diff --git a/src/Worker/Grpc/GrpcOrchestrationRunner.cs b/src/Worker/Grpc/GrpcOrchestrationRunner.cs index d8d04deaa..77e1c5fd5 100644 --- a/src/Worker/Grpc/GrpcOrchestrationRunner.cs +++ b/src/Worker/Grpc/GrpcOrchestrationRunner.cs @@ -131,32 +131,16 @@ public static string LoadAndRun( pair => ProtoUtils.ConvertValueToObject(pair.Value)); OrchestratorExecutionResult? result = null; - MemoryCache? extendedSessions = null; - // If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the orchestration history is attached bool addToExtendedSessions = false; bool requiresHistory = false; - bool pastEventsIncluded = true; - bool isExtendedSession = false; - double extendedSessionIdleTimeoutInSeconds = 0; - - // Only attempt to initialize the extended sessions cache if all the parameters are correctly specified - if (properties.TryGetValue("ExtendedSessionIdleTimeoutInSeconds", out object? extendedSessionIdleTimeoutObj) - && extendedSessionIdleTimeoutObj is double extendedSessionIdleTimeout - && extendedSessionIdleTimeout > 0 - && properties.TryGetValue("IsExtendedSession", out object? extendedSessionObj) - && extendedSessionObj is bool extendedSession) - { - extendedSessionIdleTimeoutInSeconds = extendedSessionIdleTimeout; - isExtendedSession = extendedSession; - extendedSessions = extendedSessionsCache?.GetOrInitializeCache(extendedSessionIdleTimeoutInSeconds); - } - - if (properties.TryGetValue("IncludePastEvents", out object? includePastEventsObj) - && includePastEventsObj is bool includePastEvents) - { - pastEventsIncluded = includePastEvents; - } + GrpcInstanceRunnerUtils.ParseRequestPropertiesAndInitializeCache( + properties, + extendedSessionsCache, + out double extendedSessionIdleTimeoutInSeconds, + out bool isExtendedSession, + out bool pastEventsIncluded, + out MemoryCache? extendedSessions); if (isExtendedSession && extendedSessions != null) { diff --git a/test/Worker/Grpc.Tests/GrpcEntityRunnerTests.cs b/test/Worker/Grpc.Tests/GrpcEntityRunnerTests.cs new file mode 100644 index 000000000..b63711d4f --- /dev/null +++ b/test/Worker/Grpc.Tests/GrpcEntityRunnerTests.cs @@ -0,0 +1,393 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Google.Protobuf; +using Google.Protobuf.Collections; +using Google.Protobuf.WellKnownTypes; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Caching.Memory; + +namespace Microsoft.DurableTask.Worker.Grpc.Tests; + +public class GrpcEntityRunnerTests +{ + const string TestInstanceId = "@instance_id@my_entity"; + const int DefaultExtendedSessionIdleTimeoutInSeconds = 30; + static readonly Protobuf.OperationRequest setOperation = new() { RequestId = Guid.NewGuid().ToString(), Input = 1.ToString(), Operation = "Set" }; + static readonly Protobuf.OperationRequest addOperation = new() { RequestId = Guid.NewGuid().ToString(), Input = 1.ToString(), Operation = "Add" }; + + [Fact] + public async Task EmptyOrNullParameters_Throw_Exceptions() + { + Func act = async () => + await GrpcEntityRunner.LoadAndRunAsync(string.Empty, new SimpleEntity(), new ExtendedSessionsCache()); + await act.Should().ThrowExactlyAsync().WithParameterName("encodedEntityRequest"); + + act = () => + GrpcEntityRunner.LoadAndRunAsync(null!, new SimpleEntity(), new ExtendedSessionsCache()); + await act.Should().ThrowExactlyAsync().WithParameterName("encodedEntityRequest"); + + act = () => + GrpcEntityRunner.LoadAndRunAsync("request", null!, new ExtendedSessionsCache()); + await act.Should().ThrowExactlyAsync().WithParameterName("implementation"); + } + + [Fact] + public async Task EmptyHistory_Returns_NeedsHistoryInResponse_Async() + { + using var extendedSessions = new ExtendedSessionsCache(); + + // No state and without extended sessions enabled + Protobuf.EntityBatchRequest entityRequest = CreateEntityRequest([]); + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(false) }}); + byte[] requestBytes = entityRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + string responseString = await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Protobuf.EntityBatchResult response = Protobuf.EntityBatchResult.Parser.ParseFrom(Convert.FromBase64String(responseString)); + Assert.True(response.RequiresState); + Assert.False(extendedSessions.IsInitialized); + + // No state but with extended sessions enabled + entityRequest.Properties.Add(new MapField() { + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + requestBytes = entityRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + responseString = await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + response = Protobuf.EntityBatchResult.Parser.ParseFrom(Convert.FromBase64String(responseString)); + Assert.True(response.RequiresState); + Assert.True(extendedSessions.IsInitialized); + } + + [Fact] + public async Task MalformedRequestParameters_Means_CacheNotInitialized_Async() + { + using var extendedSessions = new ExtendedSessionsCache(); + Protobuf.EntityBatchRequest entityRequest = CreateEntityRequest([]); + + // Misspelled extended session timeout key + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionsIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + byte[] requestBytes = entityRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + string responseString = await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(responseString)); + Assert.False(extendedSessions.IsInitialized); + + // Wrong value type for extended session timeout key + entityRequest.Properties.Clear(); + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForString("hi") } }); + requestBytes = entityRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Assert.False(extendedSessions.IsInitialized); + + // Invalid number for extended session timeout key (must be > 0) + entityRequest.Properties.Clear(); + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(0) } }); + requestBytes = entityRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Assert.False(extendedSessions.IsInitialized); + + // Invalid number for extended session timeout key (must be > 0) + entityRequest.Properties.Clear(); + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(-1) } }); + requestBytes = entityRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Assert.False(extendedSessions.IsInitialized); + + // No extended session timeout key + entityRequest.Properties.Clear(); + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(true) } }); + requestBytes = entityRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Assert.False(extendedSessions.IsInitialized); + + // Misspelled extended session key + entityRequest.Properties.Clear(); + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(false) }, + { "isExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + requestBytes = entityRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Assert.False(extendedSessions.IsInitialized); + + // Wrong value type for extended session key + entityRequest.Properties.Clear(); + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForNumber(1) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + requestBytes = entityRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Assert.False(extendedSessions.IsInitialized); + + // No extended session key + entityRequest.Properties.Clear(); + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(false) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + requestBytes = entityRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Assert.False(extendedSessions.IsInitialized); + } + + [Fact] + public async Task IsExtendedSessionFalse_Means_NoExtendedSessionStored_Async() + { + using var extendedSessions = new ExtendedSessionsCache(); + Protobuf.EntityBatchRequest entityRequest = CreateEntityRequest([setOperation]); + + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(false) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + byte[] requestBytes = entityRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Assert.True(extendedSessions.IsInitialized); + Assert.False(extendedSessions.GetOrInitializeCache(DefaultExtendedSessionIdleTimeoutInSeconds).TryGetValue(TestInstanceId, out object? extendedSession)); + } + + /// + /// These tests verify that a malformed/nonexistent "IncludeState" parameter means that the worker will attempt to + /// fulfill the entity request and not request a state for it. However, it is of course very undesirable that a + /// state is not attached to the request, but no state is requested by the worker due to a malformed "IncludeState" parameter + /// even when it needs one to fulfill the request. This would need to be checked on whatever side is calling this SDK. + /// + [Fact] + public async Task MalformedIncludeStateParameter_Means_NoStateRequired_Async() + { + using var extendedSessions = new ExtendedSessionsCache(); + Protobuf.EntityBatchRequest entityRequest = CreateEntityRequest([addOperation], entityState: 1.ToString()); + + // Misspelled include entity state key + entityRequest.Properties.Add(new MapField() { + { "IncludeSTate", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(false) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + byte[] requestBytes = entityRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + string responseString = await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Protobuf.EntityBatchResult response = Protobuf.EntityBatchResult.Parser.ParseFrom(Convert.FromBase64String(responseString)); + Assert.False(response.RequiresState); + Assert.Equal("2", response.EntityState); + + // Wrong value type for include entity state key + entityRequest.Properties.Clear(); + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForString("no") }, + { "IsExtendedSession", Value.ForBool(false) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + requestBytes = entityRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + responseString = await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + response = Protobuf.EntityBatchResult.Parser.ParseFrom(Convert.FromBase64String(responseString)); + Assert.False(response.RequiresState); + Assert.Equal("2", response.EntityState); + + // No include entity state key + entityRequest.Properties.Clear(); + entityRequest.Properties.Add(new MapField() { + { "IsExtendedSession", Value.ForBool(false) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + requestBytes = entityRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + responseString = await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + response = Protobuf.EntityBatchResult.Parser.ParseFrom(Convert.FromBase64String(responseString)); + Assert.False(response.RequiresState); + Assert.Equal("2", response.EntityState); + } + + [Fact] + public async Task Entity_State_Stored_Async() + { + using var extendedSessions = new ExtendedSessionsCache(); + Protobuf.EntityBatchRequest entityRequest = CreateEntityRequest([setOperation]); + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(true) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + byte[] requestBytes = entityRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + string responseString = await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Protobuf.EntityBatchResult response = Protobuf.EntityBatchResult.Parser.ParseFrom(Convert.FromBase64String(responseString)); + Assert.True(extendedSessions.IsInitialized); + Assert.True(extendedSessions.GetOrInitializeCache(DefaultExtendedSessionIdleTimeoutInSeconds).TryGetValue(TestInstanceId, out object? extendedSession)); + Assert.Equal("1", extendedSession); + Assert.Equal("1", response.EntityState); + } + + [Fact] + public async Task ExternallyEndedExtendedSession_Evicted_Async() + { + using var extendedSessions = new ExtendedSessionsCache(); + Protobuf.EntityBatchRequest entityRequest = CreateEntityRequest([setOperation]); + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(true) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + byte[] requestBytes = entityRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + string responseString = await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Protobuf.EntityBatchResult response = Protobuf.EntityBatchResult.Parser.ParseFrom(Convert.FromBase64String(responseString)); + Assert.True(extendedSessions.IsInitialized); + Assert.True(extendedSessions.GetOrInitializeCache(DefaultExtendedSessionIdleTimeoutInSeconds).TryGetValue(TestInstanceId, out object? extendedSession)); + Assert.Equal("1", extendedSession); + Assert.Equal("1", response.EntityState); + + // Now set the extended session flag to false for this instance + entityRequest.Properties.Clear(); + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(true) }, + { "IsExtendedSession", Value.ForBool(false) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + requestBytes = entityRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Assert.True(extendedSessions.IsInitialized); + Assert.False(extendedSessions.GetOrInitializeCache(DefaultExtendedSessionIdleTimeoutInSeconds).TryGetValue(TestInstanceId, out extendedSession)); + } + + [Fact] + public async Task Stale_ExtendedSessions_Evicted_Async() + { + using var extendedSessions = new ExtendedSessionsCache(); + int extendedSessionIdleTimeout = 5; + Protobuf.EntityBatchRequest entityRequest = CreateEntityRequest([setOperation]); + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(true) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(extendedSessionIdleTimeout) } }); + byte[] requestBytes = entityRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + string responseString = await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Protobuf.EntityBatchResult response = Protobuf.EntityBatchResult.Parser.ParseFrom(Convert.FromBase64String(responseString)); + Assert.True(extendedSessions.IsInitialized); + Assert.True(extendedSessions.GetOrInitializeCache(extendedSessionIdleTimeout).TryGetValue(TestInstanceId, out object? extendedSession)); + Assert.Equal("1", extendedSession); + Assert.Equal("1", response.EntityState); + + // Wait for longer than the timeout to account for finite cache scan for stale items frequency + await Task.Delay(extendedSessionIdleTimeout * 1000 * 2); + Assert.False(extendedSessions.GetOrInitializeCache(extendedSessionIdleTimeout).TryGetValue(TestInstanceId, out extendedSession)); + + // Now that the entry was evicted from the cache, the entity runner needs an entity state to complete the request + entityRequest.Properties.Clear(); + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(extendedSessionIdleTimeout) } }); + requestBytes = entityRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + responseString = await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + response = Protobuf.EntityBatchResult.Parser.ParseFrom(Convert.FromBase64String(responseString)); + Assert.True(response.RequiresState); + } + + [Fact] + public async Task EntityStateIncluded_Means_ExtendedSession_Evicted_Async() + { + using var extendedSessions = new ExtendedSessionsCache(); + Protobuf.EntityBatchRequest entityRequest = CreateEntityRequest([addOperation], entityState: 1.ToString()); + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(true) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + byte[] requestBytes = entityRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Assert.True(extendedSessions.IsInitialized); + Assert.True(extendedSessions.GetOrInitializeCache(DefaultExtendedSessionIdleTimeoutInSeconds).TryGetValue(TestInstanceId, out object? extendedSession)); + + // Now we will retry the same request, but with a different value for the state. If the extended session is not evicted, then the result will be incorrect. + entityRequest = CreateEntityRequest([addOperation], entityState: 5.ToString()); + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(true) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + requestBytes = entityRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + string responseString = await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity(), extendedSessions); + Protobuf.EntityBatchResult response = Protobuf.EntityBatchResult.Parser.ParseFrom(Convert.FromBase64String(responseString)); + Assert.True(extendedSessions.GetOrInitializeCache(DefaultExtendedSessionIdleTimeoutInSeconds).TryGetValue(TestInstanceId, out extendedSession)); + Assert.Equal("6", extendedSession); + Assert.Equal("6", response.EntityState); + } + + [Fact] + public async Task Null_ExtendedSessionsCache_IsOkay_Async() + { + Protobuf.EntityBatchRequest entityRequest = CreateEntityRequest([setOperation]); + + // Set up the parameters as if extended sessions are enabled, but do not pass an extended session cache to the request. + // The request should still be successful. + entityRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(true) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + byte[] requestBytes = entityRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + string responseString = await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity()); + Protobuf.EntityBatchResult response = Protobuf.EntityBatchResult.Parser.ParseFrom(Convert.FromBase64String(responseString)); + Assert.Equal("1", response.EntityState); + + // Now try it again without any properties specified. The request should still be successful. + entityRequest.Properties.Clear(); + requestBytes = entityRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + responseString = await GrpcEntityRunner.LoadAndRunAsync(requestString, new SimpleEntity()); + response = Protobuf.EntityBatchResult.Parser.ParseFrom(Convert.FromBase64String(responseString)); + Assert.Equal("1", response.EntityState); + } + + static Protobuf.EntityBatchRequest CreateEntityRequest(IEnumerable requests, string? entityState = null) + { + var entityBatchRequest = new Protobuf.EntityBatchRequest() + { + InstanceId = TestInstanceId, + EntityState = entityState, + Operations = { requests } + }; + return entityBatchRequest; + } + + class SimpleEntity : TaskEntity + { + public void Set(int value) + { + this.State = value; + } + + public void Add(int value) + { + this.State += value; + } + + public void Delete() + { + this.State = null; + } + } +} diff --git a/test/Worker/Grpc.Tests/GrpcOrchestrationRunnerTests.cs b/test/Worker/Grpc.Tests/GrpcOrchestrationRunnerTests.cs index 6c3179211..983deaef9 100644 --- a/test/Worker/Grpc.Tests/GrpcOrchestrationRunnerTests.cs +++ b/test/Worker/Grpc.Tests/GrpcOrchestrationRunnerTests.cs @@ -4,6 +4,7 @@ using Google.Protobuf; using Google.Protobuf.Collections; using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.Caching.Memory; namespace Microsoft.DurableTask.Worker.Grpc.Tests; @@ -37,11 +38,11 @@ public void EmptyHistory_Returns_NeedsHistoryInResponse() // No history and without extended sessions enabled Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([]); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(false) }}); + { "IncludeState", Value.ForBool(false) }}); byte[] requestBytes = orchestratorRequest.ToByteArray(); string requestString = Convert.ToBase64String(requestBytes); - string stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); - Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + string responseString = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(responseString)); Assert.True(response.RequiresHistory); Assert.False(extendedSessions.IsInitialized); @@ -51,12 +52,61 @@ public void EmptyHistory_Returns_NeedsHistoryInResponse() { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); requestBytes = orchestratorRequest.ToByteArray(); requestString = Convert.ToBase64String(requestBytes); - stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); - response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + responseString = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(responseString)); Assert.True(response.RequiresHistory); Assert.True(extendedSessions.IsInitialized); } + [Fact] + public void NullExtendedSessionStored_Means_NeedsExtendedSessionNotUsed() + { + using var extendedSessions = new ExtendedSessionsCache(); + extendedSessions.GetOrInitializeCache(DefaultExtendedSessionIdleTimeoutInSeconds).Set( + TestInstanceId, + null!, + new MemoryCacheEntryOptions { SlidingExpiration = TimeSpan.FromSeconds(DefaultExtendedSessionIdleTimeoutInSeconds) }); + + // No history, so response indicates that a history is required + Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([]); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(false) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + byte[] requestBytes = orchestratorRequest.ToByteArray(); + string requestString = Convert.ToBase64String(requestBytes); + string responseString = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(responseString)); + Assert.True(response.RequiresHistory); + + // History provided so the request can be fulfilled and an extended session is stored + var historyEvent = new Protobuf.HistoryEvent + { + EventId = -1, + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + ExecutionStarted = new Protobuf.ExecutionStartedEvent() + { + OrchestrationInstance = new Protobuf.OrchestrationInstance + { + InstanceId = TestInstanceId, + ExecutionId = TestExecutionId, + }, + } + }; + orchestratorRequest = CreateOrchestratorRequest([historyEvent]); + orchestratorRequest.Properties.Add(new MapField() { + { "IncludeState", Value.ForBool(true) }, + { "IsExtendedSession", Value.ForBool(true) }, + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); + requestBytes = orchestratorRequest.ToByteArray(); + requestString = Convert.ToBase64String(requestBytes); + responseString = GrpcOrchestrationRunner.LoadAndRun(requestString, new CallSubOrchestrationOrchestrator(), extendedSessions); + response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(responseString)); + Assert.False(response.RequiresHistory); + Assert.True(extendedSessions.GetOrInitializeCache(DefaultExtendedSessionIdleTimeoutInSeconds).TryGetValue(TestInstanceId, out object? extendedSession)); + Assert.NotNull(extendedSession); + } + [Fact] public void MalformedRequestParameters_Means_CacheNotInitialized() { @@ -65,19 +115,19 @@ public void MalformedRequestParameters_Means_CacheNotInitialized() // Misspelled extended session timeout key orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(false) }, + { "IncludeState", Value.ForBool(false) }, { "IsExtendedSession", Value.ForBool(true) }, { "ExtendedSessionsIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); byte[] requestBytes = orchestratorRequest.ToByteArray(); string requestString = Convert.ToBase64String(requestBytes); - string stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); - Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + string responseString = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(responseString)); Assert.False(extendedSessions.IsInitialized); // Wrong value type for extended session timeout key orchestratorRequest.Properties.Clear(); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(false) }, + { "IncludeState", Value.ForBool(false) }, { "IsExtendedSession", Value.ForBool(true) }, { "ExtendedSessionIdleTimeoutInSeconds", Value.ForString("hi") } }); requestBytes = orchestratorRequest.ToByteArray(); @@ -88,7 +138,7 @@ public void MalformedRequestParameters_Means_CacheNotInitialized() // Invalid number for extended session timeout key (must be > 0) orchestratorRequest.Properties.Clear(); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(false) }, + { "IncludeState", Value.ForBool(false) }, { "IsExtendedSession", Value.ForBool(true) }, { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(0) } }); requestBytes = orchestratorRequest.ToByteArray(); @@ -99,7 +149,7 @@ public void MalformedRequestParameters_Means_CacheNotInitialized() // Invalid number for extended session timeout key (must be > 0) orchestratorRequest.Properties.Clear(); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(false) }, + { "IncludeState", Value.ForBool(false) }, { "IsExtendedSession", Value.ForBool(true) }, { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(-1) } }); requestBytes = orchestratorRequest.ToByteArray(); @@ -110,7 +160,7 @@ public void MalformedRequestParameters_Means_CacheNotInitialized() // No extended session timeout key orchestratorRequest.Properties.Clear(); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(false) }, + { "IncludeState", Value.ForBool(false) }, { "IsExtendedSession", Value.ForBool(true) } }); requestBytes = orchestratorRequest.ToByteArray(); requestString = Convert.ToBase64String(requestBytes); @@ -120,7 +170,7 @@ public void MalformedRequestParameters_Means_CacheNotInitialized() // Misspelled extended session key orchestratorRequest.Properties.Clear(); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(false) }, + { "IncludeState", Value.ForBool(false) }, { "isExtendedSession", Value.ForBool(true) }, { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); requestBytes = orchestratorRequest.ToByteArray(); @@ -131,7 +181,7 @@ public void MalformedRequestParameters_Means_CacheNotInitialized() // Wrong value type for extended session key orchestratorRequest.Properties.Clear(); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(false) }, + { "IncludeState", Value.ForBool(false) }, { "IsExtendedSession", Value.ForNumber(1) }, { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); requestBytes = orchestratorRequest.ToByteArray(); @@ -142,7 +192,7 @@ public void MalformedRequestParameters_Means_CacheNotInitialized() // No extended session key orchestratorRequest.Properties.Clear(); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(false) }, + { "IncludeState", Value.ForBool(false) }, { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); requestBytes = orchestratorRequest.ToByteArray(); requestString = Convert.ToBase64String(requestBytes); @@ -157,7 +207,7 @@ public void IsExtendedSessionFalse_Means_NoExtendedSessionStored() Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([]); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(false) }, + { "IncludeState", Value.ForBool(false) }, { "IsExtendedSession", Value.ForBool(false) }, { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); byte[] requestBytes = orchestratorRequest.ToByteArray(); @@ -168,9 +218,9 @@ public void IsExtendedSessionFalse_Means_NoExtendedSessionStored() } /// - /// These tests verify that a malformed/nonexistent "IncludePastEvents" parameter means that the worker will attempt to + /// These tests verify that a malformed/nonexistent "IncludeState" parameter means that the worker will attempt to /// fulfill the orchestration request and not request a history for it. However, it is of course very undesirable that a - /// history is not attached to the request, but no history is requested by the worker due to a malformed "IncludePastEvents" parameter + /// history is not attached to the request, but no history is requested by the worker due to a malformed "IncludeState" parameter /// even when it needs one to fulfill the request. This would need to be checked on whatever side is calling this SDK. /// [Fact] @@ -194,25 +244,25 @@ public void MalformedPastEventsParameter_Means_NoHistoryRequired() // Misspelled include past events key orchestratorRequest.Properties.Add(new MapField() { - { "INcludePastEvents", Value.ForBool(false) }, + { "IncludeSTate", Value.ForBool(false) }, { "IsExtendedSession", Value.ForBool(false) }, { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); byte[] requestBytes = orchestratorRequest.ToByteArray(); string requestString = Convert.ToBase64String(requestBytes); - string stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); - Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + string responseString = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(responseString)); Assert.False(response.RequiresHistory); // Wrong value type for include past events key orchestratorRequest.Properties.Clear(); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForString("no") }, + { "IncludeState", Value.ForString("no") }, { "IsExtendedSession", Value.ForBool(false) }, { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); requestBytes = orchestratorRequest.ToByteArray(); requestString = Convert.ToBase64String(requestBytes); - stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); - response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + responseString = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(responseString)); Assert.False(response.RequiresHistory); // No include past events key @@ -222,8 +272,8 @@ public void MalformedPastEventsParameter_Means_NoHistoryRequired() { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); requestBytes = orchestratorRequest.ToByteArray(); requestString = Convert.ToBase64String(requestBytes); - stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); - response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + responseString = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator(), extendedSessions); + response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(responseString)); Assert.False(response.RequiresHistory); } @@ -246,7 +296,7 @@ public void Incomplete_Orchestration_Stored() }; Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([historyEvent]); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(true) }, + { "IncludeState", Value.ForBool(true) }, { "IsExtendedSession", Value.ForBool(true) }, { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); byte[] requestBytes = orchestratorRequest.ToByteArray(); @@ -275,7 +325,7 @@ public void Complete_Orchestration_NotStored() }; Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([historyEvent]); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(true) }, + { "IncludeState", Value.ForBool(true) }, { "IsExtendedSession", Value.ForBool(true) }, { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); byte[] requestBytes = orchestratorRequest.ToByteArray(); @@ -304,7 +354,7 @@ public void ExternallyEndedExtendedSession_Evicted() }; Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([historyEvent]); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(true) }, + { "IncludeState", Value.ForBool(true) }, { "IsExtendedSession", Value.ForBool(true) }, { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); byte[] requestBytes = orchestratorRequest.ToByteArray(); @@ -316,7 +366,7 @@ public void ExternallyEndedExtendedSession_Evicted() // Now set the extended session flag to false for this instance orchestratorRequest.Properties.Clear(); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(true) }, + { "IncludeState", Value.ForBool(true) }, { "IsExtendedSession", Value.ForBool(false) }, { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); requestBytes = orchestratorRequest.ToByteArray(); @@ -346,7 +396,7 @@ public async void Stale_ExtendedSessions_Evicted_Async() }; Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([historyEvent]); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(true) }, + { "IncludeState", Value.ForBool(true) }, { "IsExtendedSession", Value.ForBool(true) }, { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(extendedSessionIdleTimeout) } }); byte[] requestBytes = orchestratorRequest.ToByteArray(); @@ -362,13 +412,13 @@ public async void Stale_ExtendedSessions_Evicted_Async() // Now that the entry was evicted from the cache, the orchestration runner needs an orchestration history to complete the request orchestratorRequest.Properties.Clear(); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(false) }, + { "IncludeState", Value.ForBool(false) }, { "IsExtendedSession", Value.ForBool(true) }, { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(extendedSessionIdleTimeout) } }); requestBytes = orchestratorRequest.ToByteArray(); requestString = Convert.ToBase64String(requestBytes); - string stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new CallSubOrchestrationOrchestrator(), extendedSessions); - Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + string responseString = GrpcOrchestrationRunner.LoadAndRun(requestString, new CallSubOrchestrationOrchestrator(), extendedSessions); + Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(responseString)); Assert.True(response.RequiresHistory); } @@ -376,7 +426,6 @@ public async void Stale_ExtendedSessions_Evicted_Async() public void PastEventIncluded_Means_ExtendedSession_Evicted() { using var extendedSessions = new ExtendedSessionsCache(); - int extendedSessionIdleTimeout = 5; var historyEvent = new Protobuf.HistoryEvent { EventId = -1, @@ -392,19 +441,19 @@ public void PastEventIncluded_Means_ExtendedSession_Evicted() }; Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([historyEvent]); orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(true) }, + { "IncludeState", Value.ForBool(true) }, { "IsExtendedSession", Value.ForBool(true) }, - { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(extendedSessionIdleTimeout) } }); + { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); byte[] requestBytes = orchestratorRequest.ToByteArray(); string requestString = Convert.ToBase64String(requestBytes); GrpcOrchestrationRunner.LoadAndRun(requestString, new CallSubOrchestrationOrchestrator(), extendedSessions); Assert.True(extendedSessions.IsInitialized); - Assert.True(extendedSessions.GetOrInitializeCache(extendedSessionIdleTimeout).TryGetValue(TestInstanceId, out object? extendedSession)); + Assert.True(extendedSessions.GetOrInitializeCache(DefaultExtendedSessionIdleTimeoutInSeconds).TryGetValue(TestInstanceId, out object? extendedSession)); // Now we will retry the same exact request. If the extended session is not evicted, then the request will fail due to duplicate ExecutionStarted events being detected - // If the extended session is evicted because IncludePastEvents is true, then the request will succeed and a new extended session will be stored + // If the extended session is evicted because IncludeState is true, then the request will succeed and a new extended session will be stored GrpcOrchestrationRunner.LoadAndRun(requestString, new CallSubOrchestrationOrchestrator(), extendedSessions); - Assert.True(extendedSessions.GetOrInitializeCache(extendedSessionIdleTimeout).TryGetValue(TestInstanceId, out extendedSession)); + Assert.True(extendedSessions.GetOrInitializeCache(DefaultExtendedSessionIdleTimeoutInSeconds).TryGetValue(TestInstanceId, out extendedSession)); } [Fact] @@ -428,13 +477,13 @@ public void Null_ExtendedSessionsCache_IsOkay() // Set up the parameters as if extended sessions are enabled, but do not pass an extended session cache to the request. // The request should still be successful. orchestratorRequest.Properties.Add(new MapField() { - { "IncludePastEvents", Value.ForBool(true) }, + { "IncludeState", Value.ForBool(true) }, { "IsExtendedSession", Value.ForBool(true) }, { "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(DefaultExtendedSessionIdleTimeoutInSeconds) } }); byte[] requestBytes = orchestratorRequest.ToByteArray(); string requestString = Convert.ToBase64String(requestBytes); - string stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator()); - Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + string responseString = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator()); + Protobuf.OrchestratorResponse response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(responseString)); Assert.Single(response.Actions); Assert.NotNull(response.Actions[0].CompleteOrchestration); Assert.Equal(Protobuf.OrchestrationStatus.Completed, response.Actions[0].CompleteOrchestration.OrchestrationStatus); @@ -443,8 +492,8 @@ public void Null_ExtendedSessionsCache_IsOkay() orchestratorRequest.Properties.Clear(); requestBytes = orchestratorRequest.ToByteArray(); requestString = Convert.ToBase64String(requestBytes); - stringResponse = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator()); - response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(stringResponse)); + responseString = GrpcOrchestrationRunner.LoadAndRun(requestString, new SimpleOrchestrator()); + response = Protobuf.OrchestratorResponse.Parser.ParseFrom(Convert.FromBase64String(responseString)); Assert.Single(response.Actions); Assert.NotNull(response.Actions[0].CompleteOrchestration); Assert.Equal(Protobuf.OrchestrationStatus.Completed, response.Actions[0].CompleteOrchestration.OrchestrationStatus);