diff --git a/src/WebJobs.Extensions.DurableTask/Bindings/EntityTriggerAttributeBindingProvider.cs b/src/WebJobs.Extensions.DurableTask/Bindings/EntityTriggerAttributeBindingProvider.cs index 91e6d6a64..92fd1ed5b 100644 --- a/src/WebJobs.Extensions.DurableTask/Bindings/EntityTriggerAttributeBindingProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/Bindings/EntityTriggerAttributeBindingProvider.cs @@ -130,7 +130,7 @@ public Task BindAsync(object value, ValueBindingContext context) { // Generate a byte array which is the serialized protobuf payload // https://developers.google.com/protocol-buffers/docs/csharptutorial#parsing_and_serialization - var entityBatchRequest = remoteContext.Request.ToEntityBatchRequest(); + var entityBatchRequest = remoteContext.Request.ToEntityBatchRequest(remoteContext.Configurations); // We convert the binary payload into a base64 string because that seems to be the most commonly supported // format for Azure Functions language workers. Attempts to send unencoded byte[] payloads were unsuccessful. diff --git a/src/WebJobs.Extensions.DurableTask/Bindings/OrchestrationTriggerAttributeBindingProvider.cs b/src/WebJobs.Extensions.DurableTask/Bindings/OrchestrationTriggerAttributeBindingProvider.cs index f72bce0bf..8263d0499 100644 --- a/src/WebJobs.Extensions.DurableTask/Bindings/OrchestrationTriggerAttributeBindingProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/Bindings/OrchestrationTriggerAttributeBindingProvider.cs @@ -170,7 +170,7 @@ public Task BindAsync(object? value, ValueBindingContext context) var orchestratorRequest = new Microsoft.DurableTask.Protobuf.OrchestratorRequest() { InstanceId = remoteContext.InstanceId, - PastEvents = { remoteContext.Configurations.IncludePastEvents ? remoteContext.PastEvents.Select(ProtobufUtils.ToHistoryEventProto) : Enumerable.Empty() }, + PastEvents = { remoteContext.Configurations.IncludeState ? remoteContext.PastEvents.Select(ProtobufUtils.ToHistoryEventProto) : Enumerable.Empty() }, NewEvents = { remoteContext.NewEvents.Select(ProtobufUtils.ToHistoryEventProto) }, EntityParameters = remoteContext.EntityParameters.ToProtobuf(), }; diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteEntityContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteEntityContext.cs index c2bf96369..3d7b0e3bf 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteEntityContext.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteEntityContext.cs @@ -5,19 +5,32 @@ using System.Collections.Generic; using DurableTask.Core.Entities; using DurableTask.Core.Entities.OperationFormat; +using Microsoft.Extensions.Options; using Newtonsoft.Json; namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { internal class RemoteEntityContext { - public RemoteEntityContext(EntityBatchRequest batchRequest) + public RemoteEntityContext(EntityBatchRequest batchRequest, DurableTaskOptions options, bool isExtendedSession, bool includeEntityState) { - this.Request = batchRequest; + this.Request = batchRequest; + if (options.ExtendedSessionsEnabled) + { + this.Configurations = new RemoteInstanceConfiguration + { + IsExtendedSession = isExtendedSession, + IncludeState = includeEntityState, + ExtendedSessionIdleTimeoutInSeconds = options.ExtendedSessionIdleTimeoutInSeconds, + }; + } } [JsonProperty("request")] - internal EntityBatchRequest Request { get; private set; } + internal EntityBatchRequest Request { get; private set; } + + [JsonProperty("configurations")] + public RemoteInstanceConfiguration? Configurations { get; private set; } [JsonIgnore] internal EntityBatchResult? Result { get; set; } diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteOrchestratorContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteOrchestratorContext.cs index 16150bdd8..d54c6ef03 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteOrchestratorContext.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteOrchestratorContext.cs @@ -26,14 +26,14 @@ public RemoteOrchestratorContext(OrchestrationRuntimeState runtimeState, TaskOrc { this.runtimeState = runtimeState ?? throw new ArgumentNullException(nameof(runtimeState)); this.EntityParameters = entityParameters; - this.Configurations = new RemoteOrchestratorConfiguration + this.Configurations = new RemoteInstanceConfiguration { HttpDefaultAsyncRequestSleepTimeMilliseconds = options.HttpSettings.DefaultAsyncRequestSleepTimeMilliseconds, }; if (options.ExtendedSessionsEnabled) { this.Configurations.IsExtendedSession = isExtendedSession; - this.Configurations.IncludePastEvents = includePastEvents; + this.Configurations.IncludeState = includePastEvents; this.Configurations.ExtendedSessionIdleTimeoutInSeconds = options.ExtendedSessionIdleTimeoutInSeconds; } } @@ -51,7 +51,7 @@ public RemoteOrchestratorContext(OrchestrationRuntimeState runtimeState, TaskOrc internal int UpperSchemaVersion { get; } = 4; [JsonProperty("configurations")] - public RemoteOrchestratorConfiguration Configurations { get; private set; } + public RemoteInstanceConfiguration Configurations { get; private set; } [JsonIgnore] internal bool OrchestratorCompleted { get; private set; } diff --git a/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs b/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs index 394e0f996..cfcd07553 100644 --- a/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs +++ b/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs @@ -111,10 +111,13 @@ await this.LifeCycleNotificationHelper.OrchestratorStartingAsync( } WorkItemMetadata workItemMetadata = dispatchContext.GetProperty(); - bool isExtendedSession = workItemMetadata.IsExtendedSession; - bool includePastEvents = workItemMetadata.IncludePastEvents; + var context = new RemoteOrchestratorContext( + runtimeState, + entityParameters, + this.Options, + workItemMetadata.IsExtendedSession, + workItemMetadata.IncludeState); - var context = new RemoteOrchestratorContext(runtimeState, entityParameters, this.extension.Options, isExtendedSession, includePastEvents); bool workerRequiresHistory = false; var input = new TriggeredFunctionData @@ -351,9 +354,16 @@ void SetErrorResult(FailureDetails failureDetails) batchRequest.InstanceId, batchRequest.EntityState, functionType: FunctionType.Entity, - isReplay: false); + isReplay: false); + + WorkItemMetadata workItemMetadata = dispatchContext.GetProperty(); + var context = new RemoteEntityContext( + batchRequest, + this.Options, + workItemMetadata.IsExtendedSession, + workItemMetadata.IncludeState); - var context = new RemoteEntityContext(batchRequest); + bool workerRequiresEntityState = false; var input = new TriggeredFunctionData { @@ -381,6 +391,7 @@ void SetErrorResult(FailureDetails failureDetails) byte[] triggerReturnValueBytes = Convert.FromBase64String(triggerReturnValue); P.EntityBatchResult response = P.EntityBatchResult.Parser.ParseFrom(triggerReturnValueBytes); + workerRequiresEntityState = response.RequiresState; context.Result = response.ToEntityBatchResult(); context.ThrowIfFailed(); @@ -449,6 +460,11 @@ void SetErrorResult(FailureDetails failureDetails) } return; + } + + if (workerRequiresEntityState) + { + throw new SessionAbortedException("The worker has since ended the extended session and needs an entity state to execute the request."); } EntityBatchResult batchResult = context.Result diff --git a/src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs b/src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs index 0975c5e98..c28bc8e80 100644 --- a/src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs +++ b/src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs @@ -486,7 +486,7 @@ internal static P.PurgeInstancesResponse CreatePurgeInstancesResponse(PurgeResul /// The operation request to convert. /// The converted operation request. [return: NotNullIfNotNull("entityBatchRequest")] - internal static P.EntityBatchRequest? ToEntityBatchRequest(this EntityBatchRequest? entityBatchRequest) + internal static P.EntityBatchRequest? ToEntityBatchRequest(this EntityBatchRequest? entityBatchRequest, RemoteInstanceConfiguration? configurations) { if (entityBatchRequest == null) { @@ -496,9 +496,11 @@ internal static P.PurgeInstancesResponse CreatePurgeInstancesResponse(PurgeResul var batchRequest = new P.EntityBatchRequest() { InstanceId = entityBatchRequest.InstanceId, - EntityState = entityBatchRequest.EntityState, + EntityState = configurations?.IncludeState == false ? null : entityBatchRequest.EntityState, }; + batchRequest.Properties.Add(ProtobufUtils.ConvertPocoToProtoMap(configurations)); + foreach (var operation in entityBatchRequest.Operations ?? Enumerable.Empty()) { batchRequest.Operations.Add(operation.ToOperationRequest()); diff --git a/src/WebJobs.Extensions.DurableTask/RemoteOrchestratorConfiguration.cs b/src/WebJobs.Extensions.DurableTask/RemoteInstanceConfiguration.cs similarity index 57% rename from src/WebJobs.Extensions.DurableTask/RemoteOrchestratorConfiguration.cs rename to src/WebJobs.Extensions.DurableTask/RemoteInstanceConfiguration.cs index 5b38f88ab..b1325b62b 100644 --- a/src/WebJobs.Extensions.DurableTask/RemoteOrchestratorConfiguration.cs +++ b/src/WebJobs.Extensions.DurableTask/RemoteInstanceConfiguration.cs @@ -4,30 +4,31 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { /// - /// Configuration settings for RemoteOrchestratorContext in out-of-process mode, transmitted via gRPC. + /// Configuration settings for and + /// in out-of-process mode, transmitted via gRPC. /// - public class RemoteOrchestratorConfiguration + internal class RemoteInstanceConfiguration { /// /// Gets or sets the default number of milliseconds between async HTTP status poll requests. /// - public int HttpDefaultAsyncRequestSleepTimeMilliseconds { get; set; } = 30000; + internal int HttpDefaultAsyncRequestSleepTimeMilliseconds { get; set; } = 30000; /// - /// Gets or sets whether or not to include the past history events in the orchestration request. + /// Gets or sets whether or not to include the instance state in the instance request. /// True by default. /// - public bool IncludePastEvents { get; set; } = true; + internal bool IncludeState { get; set; } = true; /// /// Gets or sets whether or not the orchestration request is within an extended session. /// False by default. /// - public bool IsExtendedSession { get; set; } = false; + internal bool IsExtendedSession { get; set; } = false; /// /// Gets or sets the amount of time in seconds before an idle extended session times out. /// - public int ExtendedSessionIdleTimeoutInSeconds { get; set; } + internal int ExtendedSessionIdleTimeoutInSeconds { get; set; } } } diff --git a/src/Worker.Extensions.DurableTask/Execution/DurableFunctionExecutor.Entity.cs b/src/Worker.Extensions.DurableTask/Execution/DurableFunctionExecutor.Entity.cs index 0e7eaa64b..32e198421 100644 --- a/src/Worker.Extensions.DurableTask/Execution/DurableFunctionExecutor.Entity.cs +++ b/src/Worker.Extensions.DurableTask/Execution/DurableFunctionExecutor.Entity.cs @@ -37,7 +37,7 @@ private async ValueTask RunEntityAsync(FunctionContext context, BindingMetadata return; } - TaskEntityDispatcher dispatcher = new(encodedEntityBatch, context.InstanceServices); + TaskEntityDispatcher dispatcher = new(encodedEntityBatch, context.InstanceServices, extendedSessionsCache); triggerInputData.Value = dispatcher; await inner.ExecuteAsync(context); context.GetInvocationResult().Value = dispatcher.Result; diff --git a/src/Worker.Extensions.DurableTask/TaskEntityDispatcher.cs b/src/Worker.Extensions.DurableTask/TaskEntityDispatcher.cs index 0b90c1c17..abfbaa3aa 100644 --- a/src/Worker.Extensions.DurableTask/TaskEntityDispatcher.cs +++ b/src/Worker.Extensions.DurableTask/TaskEntityDispatcher.cs @@ -4,7 +4,9 @@ using System; using System.Threading.Tasks; using Microsoft.DurableTask.Entities; +using Microsoft.DurableTask.Worker; using Microsoft.DurableTask.Worker.Grpc; +using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.DependencyInjection; namespace Microsoft.Azure.Functions.Worker; @@ -19,11 +21,13 @@ public sealed class TaskEntityDispatcher { private readonly string request; private readonly IServiceProvider services; + private readonly ExtendedSessionsCache extendedSessionsCache; - internal TaskEntityDispatcher(string request, IServiceProvider services) + internal TaskEntityDispatcher(string request, IServiceProvider services, ExtendedSessionsCache extendedSessionsCache) { this.request = request; this.services = services; + this.extendedSessionsCache = extendedSessionsCache; } internal string Result { get; private set; } = string.Empty; @@ -40,7 +44,7 @@ public async Task DispatchAsync(ITaskEntity entity) throw new ArgumentNullException(nameof(entity)); } - this.Result = await GrpcEntityRunner.LoadAndRunAsync(this.request, entity, this.services); + this.Result = await GrpcEntityRunner.LoadAndRunAsync(this.request, entity, this.extendedSessionsCache, this.services); } ///