Skip to content

Commit e9f8560

Browse files
sophiatevSophia Tevosyan
andauthored
Extended sessions for entities in .NET isolated (#507)
* first commit * updated implementation to match the orchestration implementation and added tests * updated tests * refactoring * removed assumption that null entity state is invalid * addressing copilot comments * addressing more comments * updated comments * pulled protos from main * copilot comments --------- Co-authored-by: Sophia Tevosyan <stevosyan@microsoft.com>
1 parent beb817d commit e9f8560

File tree

7 files changed

+655
-82
lines changed

7 files changed

+655
-82
lines changed

src/Grpc/orchestrator_service.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,7 @@ message EntityBatchRequest {
599599
string instanceId = 1;
600600
google.protobuf.StringValue entityState = 2;
601601
repeated OperationRequest operations = 3;
602+
map<string, google.protobuf.Value> properties = 4;
602603
}
603604

604605
message EntityBatchResult {
@@ -608,6 +609,8 @@ message EntityBatchResult {
608609
TaskFailureDetails failureDetails = 4;
609610
string completionToken = 5;
610611
repeated OperationInfo operationInfos = 6; // used only with DTS
612+
// Whether or not an entity state is required to complete the original EntityBatchRequest and none was provided.
613+
bool requiresState = 7;
611614
}
612615

613616
message EntityRequest {

src/Grpc/versions.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
# The following files were downloaded from branch main at 2025-12-12 01:49:06 UTC
2-
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/b03c06dea21952dcf2a86551fd761b6b78c64d15/protos/orchestrator_service.proto
1+
# The following files were downloaded from branch main at 2025-12-29 22:13:55 UTC
2+
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/b7e260ad7b84740a2ed5cb4600ce73bef702a979/protos/orchestrator_service.proto

src/Worker/Grpc/GrpcEntityRunner.cs

Lines changed: 87 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
using DurableTask.Core.Entities.OperationFormat;
66
using Google.Protobuf;
77
using Microsoft.DurableTask.Entities;
8-
using Microsoft.DurableTask.Worker.Shims;
8+
using Microsoft.DurableTask.Worker.Shims;
9+
using Microsoft.Extensions.Caching.Memory;
910
using Microsoft.Extensions.DependencyInjection;
1011
using P = Microsoft.DurableTask.Protobuf;
1112

@@ -25,7 +26,7 @@ namespace Microsoft.DurableTask.Worker.Grpc;
2526
/// </para>
2627
/// </remarks>
2728
public static class GrpcEntityRunner
28-
{
29+
{
2930
/// <summary>
3031
/// Deserializes entity batch request from <paramref name="encodedEntityRequest"/> and uses it to invoke the
3132
/// requested operations implemented by <paramref name="implementation"/>.
@@ -51,24 +52,100 @@ public static class GrpcEntityRunner
5152
/// </exception>
5253
public static async Task<string> LoadAndRunAsync(
5354
string encodedEntityRequest, ITaskEntity implementation, IServiceProvider? services = null)
55+
{
56+
return await LoadAndRunAsync(encodedEntityRequest, implementation, extendedSessionsCache: null, services: services);
57+
}
58+
59+
/// <summary>
60+
/// Deserializes entity batch request from <paramref name="encodedEntityRequest"/> and uses it to invoke the
61+
/// requested operations implemented by <paramref name="implementation"/>.
62+
/// </summary>
63+
/// <param name="encodedEntityRequest">
64+
/// The encoded protobuf payload representing an entity batch request. This is a base64-encoded string.
65+
/// </param>
66+
/// <param name="implementation">
67+
/// An <see cref="ITaskEntity"/> implementation that defines the entity logic.
68+
/// </param>
69+
/// <param name="extendedSessionsCache">
70+
/// The cache of entity states which can be used to retrieve the entity state if this request is from within an extended session.
71+
/// </param>
72+
/// <param name="services">
73+
/// Optional <see cref="IServiceProvider"/> from which injected dependencies can be retrieved.
74+
/// </param>
75+
/// <returns>
76+
/// Returns a serialized result of the entity batch that should be used as the return value of the entity function
77+
/// trigger.
78+
/// </returns>
79+
/// <exception cref="ArgumentNullException">
80+
/// Thrown if <paramref name="encodedEntityRequest"/> or <paramref name="implementation"/> is <c>null</c>.
81+
/// </exception>
82+
/// <exception cref="ArgumentException">
83+
/// Thrown if <paramref name="encodedEntityRequest"/> contains invalid data.
84+
/// </exception>
85+
public static async Task<string> LoadAndRunAsync(
86+
string encodedEntityRequest, ITaskEntity implementation, ExtendedSessionsCache? extendedSessionsCache, IServiceProvider? services = null)
5487
{
5588
Check.NotNullOrEmpty(encodedEntityRequest);
5689
Check.NotNull(implementation);
5790

5891
P.EntityBatchRequest request = P.EntityBatchRequest.Parser.Base64Decode<P.EntityBatchRequest>(
59-
encodedEntityRequest);
92+
encodedEntityRequest);
93+
Dictionary<string, object?> properties = request.Properties.ToDictionary(
94+
pair => pair.Key,
95+
pair => ProtoUtils.ConvertValueToObject(pair.Value));
6096

6197
EntityBatchRequest batch = request.ToEntityBatchRequest();
6298
EntityId id = EntityId.FromString(batch.InstanceId!);
63-
TaskName entityName = new(id.Name);
64-
99+
TaskName entityName = new(id.Name);
100+
101+
bool addToExtendedSessions = false;
102+
bool stateCached = false;
103+
GrpcInstanceRunnerUtils.ParseRequestPropertiesAndInitializeCache(
104+
properties,
105+
extendedSessionsCache,
106+
out double extendedSessionIdleTimeoutInSeconds,
107+
out bool isExtendedSession,
108+
out bool entityStateIncluded,
109+
out MemoryCache? extendedSessions);
110+
111+
if (isExtendedSession && extendedSessions != null)
112+
{
113+
addToExtendedSessions = true;
114+
115+
// If an entity state was provided, even if we already have one stored, we always want to use the provided state.
116+
if (!entityStateIncluded && extendedSessions.TryGetValue(request.InstanceId, out string? entityState))
117+
{
118+
batch.EntityState = entityState;
119+
stateCached = true;
120+
}
121+
}
122+
123+
if (!stateCached && !entityStateIncluded)
124+
{
125+
// No state was provided, and we do not have one cached, so we cannot execute the batch request.
126+
return Convert.ToBase64String(new P.EntityBatchResult { RequiresState = true }.ToByteArray());
127+
}
128+
65129
DurableTaskShimFactory factory = services is null
66130
? DurableTaskShimFactory.Default
67-
: ActivatorUtilities.GetServiceOrCreateInstance<DurableTaskShimFactory>(services);
68-
69-
TaskEntity entity = factory.CreateEntity(entityName, implementation, id);
70-
EntityBatchResult result = await entity.ExecuteOperationBatchAsync(batch);
71-
131+
: ActivatorUtilities.GetServiceOrCreateInstance<DurableTaskShimFactory>(services);
132+
133+
TaskEntity entity = factory.CreateEntity(entityName, implementation, id);
134+
EntityBatchResult result = await entity.ExecuteOperationBatchAsync(batch);
135+
136+
if (addToExtendedSessions)
137+
{
138+
// addToExtendedSessions can only be set to true if extendedSessions is not null
139+
extendedSessions!.Set(
140+
request.InstanceId,
141+
result.EntityState,
142+
new MemoryCacheEntryOptions { SlidingExpiration = TimeSpan.FromSeconds(extendedSessionIdleTimeoutInSeconds) });
143+
}
144+
else
145+
{
146+
extendedSessions?.Remove(request.InstanceId);
147+
}
148+
72149
P.EntityBatchResult response = result.ToEntityBatchResult();
73150
byte[] responseBytes = response.ToByteArray();
74151
return Convert.ToBase64String(responseBytes);
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using Microsoft.Extensions.Caching.Memory;
5+
6+
namespace Microsoft.DurableTask.Worker.Grpc;
7+
8+
/// <summary>
9+
/// Utility methods for the <see cref="GrpcOrchestrationRunner"/> and <see cref="GrpcEntityRunner"/> classes.
10+
/// </summary>
11+
static class GrpcInstanceRunnerUtils
12+
{
13+
/// <summary>
14+
/// Parses request properties to determine extended session settings and initializes the extended sessions cache if
15+
/// the settings are properly enabled.
16+
/// </summary>
17+
/// <remarks>
18+
/// If any request property is missing or invalid (i.e. the key is misspelled or the value is of the wrong type),
19+
/// extended sessions are not enabled and default values are assigned to the returns.
20+
/// </remarks>
21+
/// <param name="properties">
22+
/// A dictionary containing request properties used to configure extended session behavior.
23+
/// </param>
24+
/// <param name="extendedSessionsCache">The extended sessions cache manager.</param>
25+
/// <param name="extendedSessionIdleTimeoutInSeconds">
26+
/// When the method returns, contains the idle timeout value for extended sessions, in seconds. Cache entries that
27+
/// have not been accessed in this timeframe are evicted from <paramref name="extendedSessionsCache"/>.
28+
/// Set to zero if extended sessions are not enabled.
29+
/// </param>
30+
/// <param name="isExtendedSession">When the method returns, indicates whether this request is from within an extended session.</param>
31+
/// <param name="stateIncluded">When the method returns, indicates whether instance state is included in the request.</param>
32+
/// <param name="extendedSessions">When the method returns, contains the extended sessions cache initialized from
33+
/// <paramref name="extendedSessionsCache"/> if <paramref name="isExtendedSession"/> and <paramref name="extendedSessionIdleTimeoutInSeconds"/>
34+
/// are correctly specified in the <paramref name="properties"/>; otherwise, null.
35+
/// </param>
36+
internal static void ParseRequestPropertiesAndInitializeCache(
37+
Dictionary<string, object?> properties,
38+
ExtendedSessionsCache? extendedSessionsCache,
39+
out double extendedSessionIdleTimeoutInSeconds,
40+
out bool isExtendedSession,
41+
out bool stateIncluded,
42+
out MemoryCache? extendedSessions)
43+
{
44+
// If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the instance state is attached
45+
extendedSessions = null;
46+
stateIncluded = true;
47+
isExtendedSession = false;
48+
extendedSessionIdleTimeoutInSeconds = 0;
49+
50+
// Only attempt to initialize the extended sessions cache if all the parameters are correctly specified
51+
if (properties.TryGetValue("ExtendedSessionIdleTimeoutInSeconds", out object? extendedSessionIdleTimeoutObj)
52+
&& extendedSessionIdleTimeoutObj is double extendedSessionIdleTimeout
53+
&& extendedSessionIdleTimeout > 0
54+
&& properties.TryGetValue("IsExtendedSession", out object? extendedSessionObj)
55+
&& extendedSessionObj is bool extendedSession)
56+
{
57+
extendedSessionIdleTimeoutInSeconds = extendedSessionIdleTimeout;
58+
isExtendedSession = extendedSession;
59+
extendedSessions = extendedSessionsCache?.GetOrInitializeCache(extendedSessionIdleTimeoutInSeconds);
60+
}
61+
62+
if (properties.TryGetValue("IncludeState", out object? includeStateObj)
63+
&& includeStateObj is bool includeState)
64+
{
65+
stateIncluded = includeState;
66+
}
67+
}
68+
}

src/Worker/Grpc/GrpcOrchestrationRunner.cs

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -131,32 +131,16 @@ public static string LoadAndRun(
131131
pair => ProtoUtils.ConvertValueToObject(pair.Value));
132132

133133
OrchestratorExecutionResult? result = null;
134-
MemoryCache? extendedSessions = null;
135134

136-
// If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the orchestration history is attached
137135
bool addToExtendedSessions = false;
138136
bool requiresHistory = false;
139-
bool pastEventsIncluded = true;
140-
bool isExtendedSession = false;
141-
double extendedSessionIdleTimeoutInSeconds = 0;
142-
143-
// Only attempt to initialize the extended sessions cache if all the parameters are correctly specified
144-
if (properties.TryGetValue("ExtendedSessionIdleTimeoutInSeconds", out object? extendedSessionIdleTimeoutObj)
145-
&& extendedSessionIdleTimeoutObj is double extendedSessionIdleTimeout
146-
&& extendedSessionIdleTimeout > 0
147-
&& properties.TryGetValue("IsExtendedSession", out object? extendedSessionObj)
148-
&& extendedSessionObj is bool extendedSession)
149-
{
150-
extendedSessionIdleTimeoutInSeconds = extendedSessionIdleTimeout;
151-
isExtendedSession = extendedSession;
152-
extendedSessions = extendedSessionsCache?.GetOrInitializeCache(extendedSessionIdleTimeoutInSeconds);
153-
}
154-
155-
if (properties.TryGetValue("IncludePastEvents", out object? includePastEventsObj)
156-
&& includePastEventsObj is bool includePastEvents)
157-
{
158-
pastEventsIncluded = includePastEvents;
159-
}
137+
GrpcInstanceRunnerUtils.ParseRequestPropertiesAndInitializeCache(
138+
properties,
139+
extendedSessionsCache,
140+
out double extendedSessionIdleTimeoutInSeconds,
141+
out bool isExtendedSession,
142+
out bool pastEventsIncluded,
143+
out MemoryCache? extendedSessions);
160144

161145
if (isExtendedSession && extendedSessions != null)
162146
{
@@ -223,7 +207,8 @@ public static string LoadAndRun(
223207

224208
if (addToExtendedSessions && !executor.IsCompleted)
225209
{
226-
extendedSessions.Set<ExtendedSessionState>(
210+
// addToExtendedSessions can only be set to true if extendedSessions is not null
211+
extendedSessions!.Set<ExtendedSessionState>(
227212
request.InstanceId,
228213
new(runtimeState, shim, executor),
229214
new MemoryCacheEntryOptions { SlidingExpiration = TimeSpan.FromSeconds(extendedSessionIdleTimeoutInSeconds) });

0 commit comments

Comments
 (0)