Skip to content

Commit 35318d1

Browse files
sophiatevSophia Tevosyan
andauthored
Extended Sessions for Isolated (Orchestrations) (#1232)
* first commit * added some constants regarding extended sessions * removed the chunk of code i commented out * modified one comment slightly * slight variable name update as per PR comment * added private package names * added a no warn to the application insights csproj to allow building the private packages * had the wrong suffix in two packages * updated the app insights csproj to build the preview package correctly * updating the preview package version * moving acquiring of the extended session lock before execution of the first work item to ensure that the worker does not store the extended session state unnecessarily in the isolated model * updating package numbers * fixing bug related to reacquiring the concurrent session lock unnecessarily * pushing the etag bug fix here for now * updating version number * reverting csproj changes * reverting azure storage bug fix too * addressing PR feedback --------- Co-authored-by: Sophia Tevosyan <[email protected]>
1 parent 3cb8d43 commit 35318d1

File tree

3 files changed

+50
-24
lines changed

3 files changed

+50
-24
lines changed

src/DurableTask.Core/TaskOrchestrationDispatcher.cs

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -222,18 +222,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
222222
return;
223223
}
224224

225-
var isExtendedSession = false;
226-
227-
CorrelationTraceClient.Propagate(
228-
() =>
229-
{
230-
// Check if it is extended session.
231-
// TODO: Remove this code - it looks incorrect and dangerous
232-
isExtendedSession = this.concurrentSessionLock.Acquire();
233-
this.concurrentSessionLock.Release();
234-
workItem.IsExtendedSession = isExtendedSession;
235-
});
236-
225+
var concurrencyLockAcquired = false;
237226
var processCount = 0;
238227
try
239228
{
@@ -242,6 +231,14 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
242231
// If the provider provided work items, execute them.
243232
if (workItem.NewMessages?.Count > 0)
244233
{
234+
// We only need to acquire the lock on the first execution within the extended session
235+
if (!concurrencyLockAcquired)
236+
{
237+
concurrencyLockAcquired = this.concurrentSessionLock.Acquire();
238+
}
239+
workItem.IsExtendedSession = concurrencyLockAcquired;
240+
// Regardless of whether or not we acquired the concurrent session lock, we will make sure to execute this work item.
241+
// If we failed to acquire it, we will end the extended session after this execution.
245242
bool isCompletedOrInterrupted = await this.OnProcessWorkItemAsync(workItem);
246243
if (isCompletedOrInterrupted)
247244
{
@@ -251,15 +248,11 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
251248
processCount++;
252249
}
253250

254-
// Fetches beyond the first require getting an extended session lock, used to prevent starvation.
255-
if (processCount > 0 && !isExtendedSession)
251+
// If we failed to acquire the concurrent session lock, we will end the extended session after the execution of the first work item
252+
if (processCount > 0 && !concurrencyLockAcquired)
256253
{
257-
isExtendedSession = this.concurrentSessionLock.Acquire();
258-
if (!isExtendedSession)
259-
{
260-
TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-MaxOperations", "Failed to acquire concurrent session lock.");
261-
break;
262-
}
254+
TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-MaxOperations", "Failed to acquire concurrent session lock.");
255+
break;
263256
}
264257

265258
TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-StartFetch", "Starting fetch of existing session.");
@@ -282,7 +275,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
282275
}
283276
finally
284277
{
285-
if (isExtendedSession)
278+
if (concurrencyLockAcquired)
286279
{
287280
TraceHelper.Trace(
288281
TraceEventType.Verbose,
@@ -735,6 +728,7 @@ async Task<OrchestrationExecutionCursor> ExecuteOrchestrationAsync(Orchestration
735728
dispatchContext.SetProperty(workItem);
736729
dispatchContext.SetProperty(GetOrchestrationExecutionContext(runtimeState));
737730
dispatchContext.SetProperty(this.entityParameters);
731+
dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = workItem.IsExtendedSession, IncludePastEvents = true });
738732

739733
TaskOrchestrationExecutor? executor = null;
740734

@@ -786,12 +780,22 @@ async Task ResumeOrchestrationAsync(TaskOrchestrationWorkItem workItem)
786780
dispatchContext.SetProperty(cursor.TaskOrchestration);
787781
dispatchContext.SetProperty(cursor.RuntimeState);
788782
dispatchContext.SetProperty(workItem);
783+
dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = true, IncludePastEvents = false });
789784

790785
cursor.LatestDecisions = Enumerable.Empty<OrchestratorAction>();
791786
await this.dispatchPipeline.RunAsync(dispatchContext, _ =>
792787
{
793-
OrchestratorExecutionResult result = cursor.OrchestrationExecutor.ExecuteNewEvents();
794-
dispatchContext.SetProperty(result);
788+
// Check to see if the custom middleware intercepted and substituted the orchestration execution
789+
// with its own execution behavior, providing us with the end results. If so, we can terminate
790+
// the dispatch pipeline here.
791+
var resultFromMiddleware = dispatchContext.GetProperty<OrchestratorExecutionResult>();
792+
if (resultFromMiddleware != null)
793+
{
794+
return CompletedTask;
795+
}
796+
797+
OrchestratorExecutionResult resultFromOrchestrator = cursor.OrchestrationExecutor.ExecuteNewEvents();
798+
dispatchContext.SetProperty(resultFromOrchestrator);
795799
return CompletedTask;
796800
});
797801

src/DurableTask.Core/TaskOrchestrationExecutor.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,10 @@ public TaskOrchestrationExecutor(
8080
{
8181
}
8282

83-
internal bool IsCompleted => this.result != null && (this.result.IsCompleted || this.result.IsFaulted);
83+
/// <summary>
84+
/// Whether or not the orchestration has completed.
85+
/// </summary>
86+
public bool IsCompleted => this.result != null && (this.result.IsCompleted || this.result.IsFaulted);
8487

8588
/// <summary>
8689
/// Executes an orchestration from the beginning.
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
namespace DurableTask.Core
2+
{
3+
/// <summary>
4+
/// A class representing metadata information about a work item.
5+
/// </summary>
6+
public class WorkItemMetadata
7+
{
8+
/// <summary>
9+
/// Gets or sets whether or not the execution of the work item is within an extended session.
10+
/// </summary>
11+
public bool IsExtendedSession { get; set; }
12+
13+
/// <summary>
14+
/// Gets or sets whether or not to include past events in the orchestration history when executing the work item via middleware.
15+
/// This assumes that the middleware is able to handle extended sessions and does not require history for replays.
16+
/// </summary>
17+
public bool IncludePastEvents { get; set; }
18+
}
19+
}

0 commit comments

Comments
 (0)