Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
2bb20bd
updated to use c# built in memory cache
Jul 10, 2025
afde88d
fixed bug where new events were not cleared
Jul 11, 2025
5001bb7
added a new field to the OrchestratorResponse, restored the old publi…
Jul 14, 2025
38d3a32
removed unused ExtendedSessions object
Jul 14, 2025
92bb2cd
fixing line endings
Jul 14, 2025
38a6011
Merge branch 'main' into stevosyan/extended-sessions-for-orchestratio…
Jul 14, 2025
c23f1f6
added comments to the ExtendedSessionState class and the memory cache…
Jul 14, 2025
d8b0332
fixing props line endings
Jul 14, 2025
2162650
Merge branch 'main' into stevosyan/extended-sessions-for-orchestratio…
Jul 15, 2025
044f780
added a wrapper ExtendedSessionsCache object
Aug 1, 2025
b5d8481
added comments
Aug 1, 2025
0def003
updated the expiration scan frequency
Aug 1, 2025
306dd39
addressing some comments
Aug 1, 2025
834711b
fixing indentation
Aug 1, 2025
3f24e5f
adding proto files
Aug 1, 2025
18f1cb7
Merge branch 'main' into stevosyan/extended-sessions-for-orchestratio…
Aug 1, 2025
206457d
added a max frequency to cache scan expiration
Aug 1, 2025
754948e
reverting to old implementation without a max
Aug 1, 2025
80165ea
addressing comments
Aug 1, 2025
eac9190
adding updated protos
Aug 1, 2025
298d911
added null check
Aug 2, 2025
2c21871
missed an earlier PR comment
Aug 2, 2025
e6860a9
added the durabletask packages from the test source
Aug 4, 2025
77794ac
updated reference to new preview package
Aug 4, 2025
8e1c6c1
pushing the updated yml
Aug 4, 2025
a5252de
forgot to update the release package name
Aug 4, 2025
3c40541
adding unit tests
Aug 6, 2025
1c38f81
had the wrong default timeout
Aug 6, 2025
5a762a3
fixed failing test
Aug 6, 2025
083a304
adding new dependencies and new package number
Aug 9, 2025
c0af286
pushing bug fix for not honoring a host restarting an extended session
Aug 21, 2025
527cf6d
updating version numbers
Aug 21, 2025
5972863
Merge branch 'main' into stevosyan/extended-sessions-for-orchestratio…
Aug 21, 2025
0604373
added proto changes from main branch
Aug 21, 2025
8e69683
downgrading the cache package to a version that is test with net6.0
Aug 21, 2025
547261e
reverting ci changes
Sep 11, 2025
44d654f
adding caching package
Sep 11, 2025
d11758c
addressing PR comments
Sep 12, 2025
bbe2d2d
missed one file
Sep 12, 2025
dcbf62b
had a mistake in the LoadAndRun path
Sep 12, 2025
9a573aa
slight update to extended session parameter name
Sep 12, 2025
6f8d8d8
Merge branch 'main' into stevosyan/extended-sessions-for-orchestratio…
Sep 15, 2025
6a0354a
slight change to avoid unnecessary initialization of the cache if pro…
Sep 16, 2025
11ab61d
slight change to make the idle timeout need to be >0 rather than >=0
Sep 16, 2025
f18a910
updating version numbers and protos
Sep 17, 2025
d5bcf8c
Merge branch 'main' into stevosyan/extended-sessions-for-orchestratio…
Sep 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

<!-- Microsoft.Extensions.* Packages -->
<ItemGroup>
<PackageVersion Include="Microsoft.Extensions.Caching.Memory" Version="6.0.3" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="6.0.2" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
Expand All @@ -28,7 +29,7 @@

<!-- DurableTask Packages -->
<ItemGroup>
<PackageVersion Include="Microsoft.Azure.DurableTask.Core" Version="3.3.0" />
<PackageVersion Include="Microsoft.Azure.DurableTask.Core" Version="3.4.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" Version="1.2.2" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion eng/targets/Release.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
</PropertyGroup>

<PropertyGroup>
<VersionPrefix>1.14.0</VersionPrefix>
<VersionPrefix>1.15.0</VersionPrefix>
<VersionSuffix></VersionSuffix>
</PropertyGroup>

Expand Down
20 changes: 19 additions & 1 deletion src/Grpc/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,10 @@ message OrchestratorResponse {
// The number of work item events that were processed by the orchestrator.
// This field is optional. If not set, the service should assume that the orchestrator processed all events.
google.protobuf.Int32Value numEventsProcessed = 5;

OrchestrationTraceContext orchestrationTraceContext = 6;

// Whether or not a history is required to complete the original OrchestratorRequest and none was provided.
bool requiresHistory = 7;
}

message CreateInstanceRequest {
Expand Down Expand Up @@ -678,6 +680,18 @@ message AbandonEntityTaskResponse {
// Empty.
}

message SkipGracefulOrchestrationTerminationsRequest {
// A maximum of 500 instance IDs can be provided in this list.
repeated string instanceIds = 1;
google.protobuf.StringValue reason = 2;
}

message SkipGracefulOrchestrationTerminationsResponse {
// Those instances which could not be terminated because they had locked entities at the time of this termination call,
// are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged)
repeated string unterminatedInstanceIds = 1;
}

service TaskHubSidecarService {
// Sends a hello request to the sidecar service.
rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty);
Expand Down Expand Up @@ -751,6 +765,10 @@ service TaskHubSidecarService {

// Abandon an entity work item
rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse);

// "Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated".
// Note that a maximum of 500 orchestrations can be terminated at a time using this method.
rpc SkipGracefulOrchestrationTerminations(SkipGracefulOrchestrationTerminationsRequest) returns (SkipGracefulOrchestrationTerminationsResponse);
}

message GetWorkItemsRequest {
Expand Down
4 changes: 2 additions & 2 deletions src/Grpc/versions.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# The following files were downloaded from branch main at 2025-09-10 22:50:45 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/985035a0890575ae18be0eb2a3ac93c10824498a/protos/orchestrator_service.proto
# The following files were downloaded from branch main at 2025-09-17 01:45:58 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/f5745e0d83f608d77871c1894d9260ceaae08967/protos/orchestrator_service.proto
15 changes: 12 additions & 3 deletions src/Shared/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -279,18 +279,19 @@ internal static Timestamp ToTimestamp(this DateTime dateTime)
/// </param>
/// <param name="entityConversionState">The entity conversion state, or null if no conversion is required.</param>
/// <param name="orchestrationActivity">The <see cref="Activity" /> that represents orchestration execution.</param>
/// <param name="requiresHistory">Whether or not a history is required to complete the orchestration request and none was provided.</param>
/// <returns>The orchestrator response.</returns>
/// <exception cref="NotSupportedException">When an orchestrator action is unknown.</exception>
internal static P.OrchestratorResponse ConstructOrchestratorResponse(
string instanceId,
string executionId,
string? customStatus,
IEnumerable<OrchestratorAction> actions,
IEnumerable<OrchestratorAction>? actions,
string completionToken,
EntityConversionState? entityConversionState,
Activity? orchestrationActivity)
Activity? orchestrationActivity,
bool requiresHistory = false)
{
Check.NotNull(actions);
var response = new P.OrchestratorResponse
{
InstanceId = instanceId,
Expand All @@ -302,8 +303,16 @@ internal static P.OrchestratorResponse ConstructOrchestratorResponse(
SpanID = orchestrationActivity?.SpanId.ToString(),
SpanStartTime = orchestrationActivity?.StartTimeUtc.ToTimestamp(),
},
RequiresHistory = requiresHistory,
};

// If a history is required and the orchestration request was not completed, then there is no list of actions.
if (requiresHistory)
{
return response;
}

Check.NotNull(actions);
foreach (OrchestratorAction action in actions)
{
var protoAction = new P.OrchestratorAction { Id = action.Id };
Expand Down
40 changes: 40 additions & 0 deletions src/Worker/Core/ExtendedSessionState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using DurableTask.Core;

namespace Microsoft.DurableTask.Worker;

/// <summary>
/// Represents the state of an extended session for an orchestration.
/// </summary>
public class ExtendedSessionState
{
/// <summary>
/// Initializes a new instance of the <see cref="ExtendedSessionState"/> class.
/// </summary>
/// <param name="state">The orchestration's runtime state.</param>
/// <param name="taskOrchestration">The TaskOrchestration implementation of the orchestration.</param>
/// <param name="orchestrationExecutor">The TaskOrchestrationExecutor for the orchestration.</param>
public ExtendedSessionState(OrchestrationRuntimeState state, TaskOrchestration taskOrchestration, TaskOrchestrationExecutor orchestrationExecutor)
{
this.RuntimeState = state;
this.TaskOrchestration = taskOrchestration;
this.OrchestrationExecutor = orchestrationExecutor;
}

/// <summary>
/// Gets or sets the saved runtime state of the orchestration.
/// </summary>
public OrchestrationRuntimeState RuntimeState { get; set; }

/// <summary>
/// Gets or sets the saved TaskOrchestration implementation of the orchestration.
/// </summary>
public TaskOrchestration TaskOrchestration { get; set; }

/// <summary>
/// Gets or sets the saved TaskOrchestrationExecutor.
/// </summary>
public TaskOrchestrationExecutor OrchestrationExecutor { get; set; }
}
48 changes: 48 additions & 0 deletions src/Worker/Core/ExtendedSessionsCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Microsoft.Extensions.Caching.Memory;

namespace Microsoft.DurableTask.Worker;

/// <summary>
/// A cache for extended sessions that wraps a <see cref="MemoryCache"/> instance.
/// Responsible for holding <see cref="ExtendedSessionState"/> for orchestrations that are running within extended sessions.
/// </summary>
public class ExtendedSessionsCache : IDisposable
{
MemoryCache? extendedSessions;

/// <summary>
/// Gets a value indicating whether returns whether or not the cache has been initialized.
/// </summary>
/// <returns>True if the cache has been initialized, false otherwise.</returns>
public bool IsInitialized => this.extendedSessions is not null;

/// <summary>
/// Dispose the cache and release all resources.
/// </summary>
public void Dispose()
{
this.extendedSessions?.Dispose();
GC.SuppressFinalize(this);
}

/// <summary>
/// Gets the cache for extended sessions if it has already been initialized, or otherwise initializes it with the given expiration scan frequency.
/// </summary>
/// <param name="expirationScanFrequencyInSeconds">
/// The expiration scan frequency of the cache, in seconds.
/// This specifies how often the cache checks for stale items, and evicts them.
/// </param>
/// <returns>The IMemoryCache that holds the cached <see cref="ExtendedSessionState"/>.</returns>
public MemoryCache GetOrInitializeCache(double expirationScanFrequencyInSeconds)
{
this.extendedSessions ??= new MemoryCache(new MemoryCacheOptions
{
ExpirationScanFrequency = TimeSpan.FromSeconds(expirationScanFrequencyInSeconds / 5),
});

return this.extendedSessions;
}
}
1 change: 1 addition & 0 deletions src/Worker/Core/Worker.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The worker is responsible for processing durable task work items.</PackageDescri
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Caching.Memory" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Options" />
<PackageReference Include="System.Text.Json" />
Expand Down
157 changes: 136 additions & 21 deletions src/Worker/Grpc/GrpcOrchestrationRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using DurableTask.Core.History;
using Google.Protobuf;
using Microsoft.DurableTask.Worker.Shims;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.DependencyInjection;
using P = Microsoft.DurableTask.Protobuf;

Expand Down Expand Up @@ -82,6 +83,40 @@ public static string LoadAndRun(
string encodedOrchestratorRequest,
ITaskOrchestrator implementation,
IServiceProvider? services = null)
{
return LoadAndRun(encodedOrchestratorRequest, implementation, extendedSessionsCache: null, services: services);
}

/// <summary>
/// Deserializes orchestration history from <paramref name="encodedOrchestratorRequest"/> and uses it to resume the
/// orchestrator implemented by <paramref name="implementation"/>.
/// </summary>
/// <param name="encodedOrchestratorRequest">
/// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string.
/// </param>
/// <param name="implementation">
/// An <see cref="ITaskOrchestrator"/> implementation that defines the orchestrator logic.
/// </param>
/// <param name="extendedSessionsCache">
/// The cache of extended sessions which can be used to retrieve the <see cref="ExtendedSessionState"/> if this orchestration request is from within an extended session.
/// </param>
/// <param name="services">
/// Optional <see cref="IServiceProvider"/> from which injected dependencies can be retrieved.
/// </param>
/// <returns>
/// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger.
/// </returns>
/// <exception cref="ArgumentNullException">
/// Thrown if <paramref name="encodedOrchestratorRequest"/> or <paramref name="implementation"/> is <c>null</c>.
/// </exception>
/// <exception cref="ArgumentException">
/// Thrown if <paramref name="encodedOrchestratorRequest"/> contains invalid data.
/// </exception>
public static string LoadAndRun(
string encodedOrchestratorRequest,
ITaskOrchestrator implementation,
ExtendedSessionsCache? extendedSessionsCache,
IServiceProvider? services = null)
{
Check.NotNullOrEmpty(encodedOrchestratorRequest);
Check.NotNull(implementation);
Expand All @@ -95,34 +130,114 @@ public static string LoadAndRun(
pair => pair.Key,
pair => ProtoUtils.ConvertValueToObject(pair.Value));

// Re-construct the orchestration state from the history.
// New events must be added using the AddEvent method.
OrchestrationRuntimeState runtimeState = new(pastEvents);
foreach (HistoryEvent newEvent in newEvents)
{
runtimeState.AddEvent(newEvent);
OrchestratorExecutionResult? result = null;
MemoryCache? extendedSessions = null;

// If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the orchestration history is attached
bool addToExtendedSessions = false;
bool requiresHistory = false;
bool pastEventsIncluded = true;
bool isExtendedSession = false;
double extendedSessionIdleTimeoutInSeconds = 0;

// Only attempt to initialize the extended sessions cache if all the parameters are correctly specified
if (properties.TryGetValue("ExtendedSessionIdleTimeoutInSeconds", out object? extendedSessionIdleTimeoutObj)
&& extendedSessionIdleTimeoutObj is double extendedSessionIdleTimeout
&& extendedSessionIdleTimeout > 0
&& properties.TryGetValue("IsExtendedSession", out object? extendedSessionObj)
&& extendedSessionObj is bool extendedSession)
{
extendedSessionIdleTimeoutInSeconds = extendedSessionIdleTimeout;
isExtendedSession = extendedSession;
extendedSessions = extendedSessionsCache?.GetOrInitializeCache(extendedSessionIdleTimeoutInSeconds);
}

if (properties.TryGetValue("IncludePastEvents", out object? includePastEventsObj)
&& includePastEventsObj is bool includePastEvents)
{
pastEventsIncluded = includePastEvents;
}

if (isExtendedSession && extendedSessions != null)
{
// If a history was provided, even if we already have an extended session stored, we always want to evict whatever state is in the cache and replace it with a new extended
// session based on the provided history
if (!pastEventsIncluded && extendedSessions.TryGetValue(request.InstanceId, out ExtendedSessionState? extendedSessionState) && extendedSessionState is not null)
{
OrchestrationRuntimeState runtimeState = extendedSessionState!.RuntimeState;
runtimeState.NewEvents.Clear();
foreach (HistoryEvent newEvent in newEvents)
{
runtimeState.AddEvent(newEvent);
}

result = extendedSessionState.OrchestrationExecutor.ExecuteNewEvents();
if (extendedSessionState.OrchestrationExecutor.IsCompleted)
{
extendedSessions.Remove(request.InstanceId);
}
}
else
{
extendedSessions.Remove(request.InstanceId);
addToExtendedSessions = true;
}
}

TaskName orchestratorName = new(runtimeState.Name);
ParentOrchestrationInstance? parent = runtimeState.ParentInstance is ParentInstance p
? new(new(p.Name), p.OrchestrationInstance.InstanceId)
: null;
if (result == null)
{
// DurableTask.Core did not attach the orchestration history since the extended session is still active on its end, but we have since evicted the
// session and lost the orchestration history so we cannot replay the orchestration.
if (!pastEventsIncluded)
{
requiresHistory = true;
}
else
{
// Re-construct the orchestration state from the history.
// New events must be added using the AddEvent method.
OrchestrationRuntimeState runtimeState = new(pastEvents);

foreach (HistoryEvent newEvent in newEvents)
{
runtimeState.AddEvent(newEvent);
}

TaskName orchestratorName = new(runtimeState.Name);
ParentOrchestrationInstance? parent = runtimeState.ParentInstance is ParentInstance p
? new(new(p.Name), p.OrchestrationInstance.InstanceId)
: null;

DurableTaskShimFactory factory = services is null
? DurableTaskShimFactory.Default
: ActivatorUtilities.GetServiceOrCreateInstance<DurableTaskShimFactory>(services);
TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, properties, parent);
TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails);
OrchestratorExecutionResult result = executor.Execute();
DurableTaskShimFactory factory = services is null
? DurableTaskShimFactory.Default
: ActivatorUtilities.GetServiceOrCreateInstance<DurableTaskShimFactory>(services);
TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, properties, parent);
TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails);
result = executor.Execute();

if (addToExtendedSessions && !executor.IsCompleted)
{
extendedSessions.Set<ExtendedSessionState>(
request.InstanceId,
new(runtimeState, shim, executor),
new MemoryCacheEntryOptions { SlidingExpiration = TimeSpan.FromSeconds(extendedSessionIdleTimeoutInSeconds) });
}
else
{
extendedSessions?.Remove(request.InstanceId);
}
}
}

P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse(
request.InstanceId,
request.InstanceId,
request.ExecutionId,
result.CustomStatus,
result.Actions,
result?.CustomStatus,
result?.Actions,
completionToken: string.Empty, /* doesn't apply */
entityConversionState: null,
orchestrationActivity: null);
entityConversionState: null,
orchestrationActivity: null,
requiresHistory: requiresHistory);
byte[] responseBytes = response.ToByteArray();
return Convert.ToBase64String(responseBytes);
}
Expand Down
Loading
Loading