diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 42564cddf..44adbbbd7 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -222,18 +222,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) return; } - var isExtendedSession = false; - - CorrelationTraceClient.Propagate( - () => - { - // Check if it is extended session. - // TODO: Remove this code - it looks incorrect and dangerous - isExtendedSession = this.concurrentSessionLock.Acquire(); - this.concurrentSessionLock.Release(); - workItem.IsExtendedSession = isExtendedSession; - }); - + var concurrencyLockAcquired = false; var processCount = 0; try { @@ -242,6 +231,14 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) // If the provider provided work items, execute them. if (workItem.NewMessages?.Count > 0) { + // We only need to acquire the lock on the first execution within the extended session + if (!concurrencyLockAcquired) + { + concurrencyLockAcquired = this.concurrentSessionLock.Acquire(); + } + workItem.IsExtendedSession = concurrencyLockAcquired; + // Regardless of whether or not we acquired the concurrent session lock, we will make sure to execute this work item. + // If we failed to acquire it, we will end the extended session after this execution. bool isCompletedOrInterrupted = await this.OnProcessWorkItemAsync(workItem); if (isCompletedOrInterrupted) { @@ -251,15 +248,11 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) processCount++; } - // Fetches beyond the first require getting an extended session lock, used to prevent starvation. - if (processCount > 0 && !isExtendedSession) + // If we failed to acquire the concurrent session lock, we will end the extended session after the execution of the first work item + if (processCount > 0 && !concurrencyLockAcquired) { - isExtendedSession = this.concurrentSessionLock.Acquire(); - if (!isExtendedSession) - { - TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-MaxOperations", "Failed to acquire concurrent session lock."); - break; - } + TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-MaxOperations", "Failed to acquire concurrent session lock."); + break; } TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-StartFetch", "Starting fetch of existing session."); @@ -282,7 +275,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) } finally { - if (isExtendedSession) + if (concurrencyLockAcquired) { TraceHelper.Trace( TraceEventType.Verbose, @@ -735,6 +728,7 @@ async Task ExecuteOrchestrationAsync(Orchestration dispatchContext.SetProperty(workItem); dispatchContext.SetProperty(GetOrchestrationExecutionContext(runtimeState)); dispatchContext.SetProperty(this.entityParameters); + dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = workItem.IsExtendedSession, IncludePastEvents = true }); TaskOrchestrationExecutor? executor = null; @@ -786,12 +780,22 @@ async Task ResumeOrchestrationAsync(TaskOrchestrationWorkItem workItem) dispatchContext.SetProperty(cursor.TaskOrchestration); dispatchContext.SetProperty(cursor.RuntimeState); dispatchContext.SetProperty(workItem); + dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = true, IncludePastEvents = false }); cursor.LatestDecisions = Enumerable.Empty(); await this.dispatchPipeline.RunAsync(dispatchContext, _ => { - OrchestratorExecutionResult result = cursor.OrchestrationExecutor.ExecuteNewEvents(); - dispatchContext.SetProperty(result); + // Check to see if the custom middleware intercepted and substituted the orchestration execution + // with its own execution behavior, providing us with the end results. If so, we can terminate + // the dispatch pipeline here. + var resultFromMiddleware = dispatchContext.GetProperty(); + if (resultFromMiddleware != null) + { + return CompletedTask; + } + + OrchestratorExecutionResult resultFromOrchestrator = cursor.OrchestrationExecutor.ExecuteNewEvents(); + dispatchContext.SetProperty(resultFromOrchestrator); return CompletedTask; }); diff --git a/src/DurableTask.Core/TaskOrchestrationExecutor.cs b/src/DurableTask.Core/TaskOrchestrationExecutor.cs index c5e100a2f..e3dc6fc49 100644 --- a/src/DurableTask.Core/TaskOrchestrationExecutor.cs +++ b/src/DurableTask.Core/TaskOrchestrationExecutor.cs @@ -80,7 +80,10 @@ public TaskOrchestrationExecutor( { } - internal bool IsCompleted => this.result != null && (this.result.IsCompleted || this.result.IsFaulted); + /// + /// Whether or not the orchestration has completed. + /// + public bool IsCompleted => this.result != null && (this.result.IsCompleted || this.result.IsFaulted); /// /// Executes an orchestration from the beginning. diff --git a/src/DurableTask.Core/WorkItemMetadata.cs b/src/DurableTask.Core/WorkItemMetadata.cs new file mode 100644 index 000000000..ae3de4651 --- /dev/null +++ b/src/DurableTask.Core/WorkItemMetadata.cs @@ -0,0 +1,19 @@ +namespace DurableTask.Core +{ + /// + /// A class representing metadata information about a work item. + /// + public class WorkItemMetadata + { + /// + /// Gets or sets whether or not the execution of the work item is within an extended session. + /// + public bool IsExtendedSession { get; set; } + + /// + /// Gets or sets whether or not to include past events in the orchestration history when executing the work item via middleware. + /// This assumes that the middleware is able to handle extended sessions and does not require history for replays. + /// + public bool IncludePastEvents { get; set; } + } +}