Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions src/DurableTask.Core/TaskOrchestrationDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,18 +222,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
return;
}

var isExtendedSession = false;

CorrelationTraceClient.Propagate(
() =>
{
// Check if it is extended session.
// TODO: Remove this code - it looks incorrect and dangerous
isExtendedSession = this.concurrentSessionLock.Acquire();
this.concurrentSessionLock.Release();
workItem.IsExtendedSession = isExtendedSession;
});

var concurrencyLockAcquired = false;
var processCount = 0;
try
{
Expand All @@ -242,6 +231,14 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
// If the provider provided work items, execute them.
if (workItem.NewMessages?.Count > 0)
{
// We only need to acquire the lock on the first execution within the extended session
if (!concurrencyLockAcquired)
{
concurrencyLockAcquired = this.concurrentSessionLock.Acquire();
}
workItem.IsExtendedSession = concurrencyLockAcquired;
// Regardless of whether or not we acquired the concurrent session lock, we will make sure to execute this work item.
// If we failed to acquire it, we will end the extended session after this execution.
bool isCompletedOrInterrupted = await this.OnProcessWorkItemAsync(workItem);
if (isCompletedOrInterrupted)
{
Expand All @@ -251,15 +248,11 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
processCount++;
}

// Fetches beyond the first require getting an extended session lock, used to prevent starvation.
if (processCount > 0 && !isExtendedSession)
// If we failed to acquire the concurrent session lock, we will end the extended session after the execution of the first work item
if (processCount > 0 && !concurrencyLockAcquired)
{
isExtendedSession = this.concurrentSessionLock.Acquire();
if (!isExtendedSession)
{
TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-MaxOperations", "Failed to acquire concurrent session lock.");
break;
}
TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-MaxOperations", "Failed to acquire concurrent session lock.");
break;
}

TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-StartFetch", "Starting fetch of existing session.");
Expand All @@ -282,7 +275,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
}
finally
{
if (isExtendedSession)
if (concurrencyLockAcquired)
{
TraceHelper.Trace(
TraceEventType.Verbose,
Expand Down Expand Up @@ -735,6 +728,7 @@ async Task<OrchestrationExecutionCursor> ExecuteOrchestrationAsync(Orchestration
dispatchContext.SetProperty(workItem);
dispatchContext.SetProperty(GetOrchestrationExecutionContext(runtimeState));
dispatchContext.SetProperty(this.entityParameters);
dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = workItem.IsExtendedSession, IncludePastEvents = true });

TaskOrchestrationExecutor? executor = null;

Expand Down Expand Up @@ -786,12 +780,22 @@ async Task ResumeOrchestrationAsync(TaskOrchestrationWorkItem workItem)
dispatchContext.SetProperty(cursor.TaskOrchestration);
dispatchContext.SetProperty(cursor.RuntimeState);
dispatchContext.SetProperty(workItem);
dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = true, IncludePastEvents = false });

cursor.LatestDecisions = Enumerable.Empty<OrchestratorAction>();
await this.dispatchPipeline.RunAsync(dispatchContext, _ =>
{
OrchestratorExecutionResult result = cursor.OrchestrationExecutor.ExecuteNewEvents();
dispatchContext.SetProperty(result);
// Check to see if the custom middleware intercepted and substituted the orchestration execution
// with its own execution behavior, providing us with the end results. If so, we can terminate
// the dispatch pipeline here.
var resultFromMiddleware = dispatchContext.GetProperty<OrchestratorExecutionResult>();
if (resultFromMiddleware != null)
{
return CompletedTask;
}

OrchestratorExecutionResult resultFromOrchestrator = cursor.OrchestrationExecutor.ExecuteNewEvents();
dispatchContext.SetProperty(resultFromOrchestrator);
return CompletedTask;
});

Expand Down
5 changes: 4 additions & 1 deletion src/DurableTask.Core/TaskOrchestrationExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ public TaskOrchestrationExecutor(
{
}

internal bool IsCompleted => this.result != null && (this.result.IsCompleted || this.result.IsFaulted);
/// <summary>
/// Whether or not the orchestration has completed.
/// </summary>
public bool IsCompleted => this.result != null && (this.result.IsCompleted || this.result.IsFaulted);

/// <summary>
/// Executes an orchestration from the beginning.
Expand Down
19 changes: 19 additions & 0 deletions src/DurableTask.Core/WorkItemMetadata.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace DurableTask.Core
{
/// <summary>
/// A class representing metadata information about a work item.
/// </summary>
public class WorkItemMetadata
{
/// <summary>
/// Gets or sets whether or not the execution of the work item is within an extended session.
/// </summary>
public bool IsExtendedSession { get; set; }

/// <summary>
/// Gets or sets whether or not to include past events in the orchestration history when executing the work item via middleware.
/// This assumes that the middleware is able to handle extended sessions and does not require history for replays.
/// </summary>
public bool IncludePastEvents { get; set; }
}
}
Loading