Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 86 additions & 10 deletions src/Worker/Grpc/GrpcEntityRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
using DurableTask.Core.Entities.OperationFormat;
using Google.Protobuf;
using Microsoft.DurableTask.Entities;
using Microsoft.DurableTask.Worker.Shims;
using Microsoft.DurableTask.Worker.Shims;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.DependencyInjection;
using P = Microsoft.DurableTask.Protobuf;

Expand All @@ -25,7 +26,7 @@
/// </para>
/// </remarks>
public static class GrpcEntityRunner
{
{
/// <summary>
/// Deserializes entity batch request from <paramref name="encodedEntityRequest"/> and uses it to invoke the
/// requested operations implemented by <paramref name="implementation"/>.
Expand All @@ -51,24 +52,99 @@
/// </exception>
public static async Task<string> LoadAndRunAsync(
string encodedEntityRequest, ITaskEntity implementation, IServiceProvider? services = null)
{
return await LoadAndRunAsync(encodedEntityRequest, implementation, extendedSessionsCache: null, services: services);
}

/// <summary>
/// Deserializes entity batch request from <paramref name="encodedEntityRequest"/> and uses it to invoke the
/// requested operations implemented by <paramref name="implementation"/>.
/// </summary>
/// <param name="encodedEntityRequest">
/// The encoded protobuf payload representing an entity batch request. This is a base64-encoded string.
/// </param>
/// <param name="implementation">
/// An <see cref="ITaskEntity"/> implementation that defines the entity logic.
/// </param>
/// <param name="extendedSessionsCache">
/// The cache of entity states which can be used to retrieve the entity state if this request is from within an extended session.
/// </param>
/// <param name="services">
/// Optional <see cref="IServiceProvider"/> from which injected dependencies can be retrieved.
/// </param>
/// <returns>
/// Returns a serialized result of the entity batch that should be used as the return value of the entity function
/// trigger.
/// </returns>
/// <exception cref="ArgumentNullException">
/// Thrown if <paramref name="encodedEntityRequest"/> or <paramref name="implementation"/> is <c>null</c>.
/// </exception>
/// <exception cref="ArgumentException">
/// Thrown if <paramref name="encodedEntityRequest"/> contains invalid data.
/// </exception>
public static async Task<string> LoadAndRunAsync(
string encodedEntityRequest, ITaskEntity implementation, ExtendedSessionsCache? extendedSessionsCache, IServiceProvider? services = null)
{
Check.NotNullOrEmpty(encodedEntityRequest);
Check.NotNull(implementation);

P.EntityBatchRequest request = P.EntityBatchRequest.Parser.Base64Decode<P.EntityBatchRequest>(
encodedEntityRequest);
encodedEntityRequest);
Dictionary<string, object?> properties = request.Properties.ToDictionary(

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)
pair => pair.Key,
pair => ProtoUtils.ConvertValueToObject(pair.Value));

EntityBatchRequest batch = request.ToEntityBatchRequest();
EntityId id = EntityId.FromString(batch.InstanceId!);
TaskName entityName = new(id.Name);

TaskName entityName = new(id.Name);

bool addToExtendedSessions = false;
bool stateCached = false;
GrpcInstanceRunnerUtils.ParseRequestPropertiesAndInitializeCache(
properties,
extendedSessionsCache,
out double extendedSessionIdleTimeoutInSeconds,
out bool isExtendedSession,
out bool entityStateIncluded,
out MemoryCache? extendedSessions);

if (isExtendedSession && extendedSessions != null)
{
addToExtendedSessions = true;

// If an entity state was provided, even if we already have one stored, we always want to use the provided state.
if (!entityStateIncluded && extendedSessions.TryGetValue(request.InstanceId, out string? entityState))
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing null check for cached entity state. If a null value is stored in the cache (similar to the scenario tested in GrpcOrchestrationRunnerTests.NullExtendedSessionStored_Means_NeedsExtendedSessionNotUsed), this code will incorrectly set stateCached = true and use the null state. The orchestration runner has an explicit && extendedSessionState is not null check on line 149 of GrpcOrchestrationRunner.cs that should be mirrored here. The condition should be:

if (!entityStateIncluded && extendedSessions.TryGetValue(request.InstanceId, out string? entityState) && entityState is not null)
Suggested change
if (!entityStateIncluded && extendedSessions.TryGetValue(request.InstanceId, out string? entityState))
if (!entityStateIncluded && extendedSessions.TryGetValue(request.InstanceId, out string? entityState) && entityState is not null)

Copilot uses AI. Check for mistakes.
{
batch.EntityState = entityState;
stateCached = true;
}
}

if (!stateCached && !entityStateIncluded)
{
// No state was provided, and we do not have one cached, so we cannot execute the batch request.
return Convert.ToBase64String(new P.EntityBatchResult { RequiresState = true }.ToByteArray());

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchResult' does not contain a definition for 'RequiresState'
}

DurableTaskShimFactory factory = services is null
? DurableTaskShimFactory.Default
: ActivatorUtilities.GetServiceOrCreateInstance<DurableTaskShimFactory>(services);

TaskEntity entity = factory.CreateEntity(entityName, implementation, id);
EntityBatchResult result = await entity.ExecuteOperationBatchAsync(batch);

: ActivatorUtilities.GetServiceOrCreateInstance<DurableTaskShimFactory>(services);

TaskEntity entity = factory.CreateEntity(entityName, implementation, id);
EntityBatchResult result = await entity.ExecuteOperationBatchAsync(batch);

if (addToExtendedSessions)
{
extendedSessions.Set(
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable extendedSessions may be null at this access as suggested by this null check.
Variable extendedSessions may be null at this access as suggested by this null check.

Suggested change
extendedSessions.Set(
extendedSessions?.Set(

Copilot uses AI. Check for mistakes.
request.InstanceId,
result.EntityState,
new MemoryCacheEntryOptions { SlidingExpiration = TimeSpan.FromSeconds(extendedSessionIdleTimeoutInSeconds) });
}
else
{
extendedSessions?.Remove(request.InstanceId);
}

P.EntityBatchResult response = result.ToEntityBatchResult();
byte[] responseBytes = response.ToByteArray();
return Convert.ToBase64String(responseBytes);
Expand Down
71 changes: 71 additions & 0 deletions src/Worker/Grpc/GrpcInstanceRunnerUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Text;
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused using statement for System.Text. This namespace appears to be included but not used anywhere in the file.

Suggested change
using System.Text;

Copilot uses AI. Check for mistakes.
using Microsoft.Extensions.Caching.Memory;

namespace Microsoft.DurableTask.Worker.Grpc;

/// <summary>
/// Utility methods for the <see cref="GrpcOrchestrationRunner"/> and <see cref="GrpcEntityRunner"/> classes.
/// </summary>
static class GrpcInstanceRunnerUtils
{
/// <summary>
/// Parses request properties to determine extended session settings and initializes the extended sessions cache if
/// the settings are properly enabled.
/// </summary>
/// <remarks>
/// If any request property is missing or invalid (i.e. the key is misspelled or the value is of the wrong type),
/// extended sessions are not enabled and default values are assigned are assigned to the returns.
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate word "are assigned" in the documentation. Should be "extended sessions are not enabled and default values are assigned to the returns."

Suggested change
/// extended sessions are not enabled and default values are assigned are assigned to the returns.
/// extended sessions are not enabled and default values are assigned to the returns.

Copilot uses AI. Check for mistakes.
/// </remarks>
/// <param name="properties">
/// A dictionary containing request properties used to configure extended session behavior.
/// </param>
/// <param name="extendedSessionsCache">The extended sessions cache manager.</param>
/// <param name="extendedSessionIdleTimeoutInSeconds">
/// When the method returns, contains the idle timeout value for extended sessions, in seconds. Cache entries that
/// have not been accessed in this timeframe are evicted from <paramref name="extendedSessionsCache"/>.
/// Set to zero if extended sessions are not enabled.
/// </param>
/// <param name="isExtendedSession">When the method returns, indicates whether this request is from within an extended session.</param>
/// <param name="stateIncluded">When the method returns, indicates whether instance state is included in the request.</param>
/// <param name="extendedSessions">When the method returns, contains the extended sessions cache initialized from
/// <paramref name="extendedSessionsCache"/> if <paramref name="isExtendedSession"/> and <paramref name="extendedSessionIdleTimeoutInSeconds"/>
/// are correctly specified in the <paramref name="properties"/>; otherwise, null.
/// </param>
internal static void ParseRequestPropertiesAndInitializeCache(
Dictionary<string, object?> properties,
ExtendedSessionsCache? extendedSessionsCache,
out double extendedSessionIdleTimeoutInSeconds,
out bool isExtendedSession,
out bool stateIncluded,
out MemoryCache? extendedSessions)
{
// If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the instance state is attached
extendedSessions = null;
stateIncluded = true;
isExtendedSession = false;
extendedSessionIdleTimeoutInSeconds = 0;

// Only attempt to initialize the extended sessions cache if all the parameters are correctly specified
if (properties.TryGetValue("ExtendedSessionIdleTimeoutInSeconds", out object? extendedSessionIdleTimeoutObj)
&& extendedSessionIdleTimeoutObj is double extendedSessionIdleTimeout
&& extendedSessionIdleTimeout > 0
&& properties.TryGetValue("IsExtendedSession", out object? extendedSessionObj)
&& extendedSessionObj is bool extendedSession)
{
extendedSessionIdleTimeoutInSeconds = extendedSessionIdleTimeout;
isExtendedSession = extendedSession;
extendedSessions = extendedSessionsCache?.GetOrInitializeCache(extendedSessionIdleTimeoutInSeconds);
}

if (properties.TryGetValue("IncludeState", out object? includeStateObj)
&& includeStateObj is bool includeState)
{
stateIncluded = includeState;
}
}
}
30 changes: 7 additions & 23 deletions src/Worker/Grpc/GrpcOrchestrationRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,32 +131,16 @@ public static string LoadAndRun(
pair => ProtoUtils.ConvertValueToObject(pair.Value));

OrchestratorExecutionResult? result = null;
MemoryCache? extendedSessions = null;

// If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the orchestration history is attached
bool addToExtendedSessions = false;
bool requiresHistory = false;
bool pastEventsIncluded = true;
bool isExtendedSession = false;
double extendedSessionIdleTimeoutInSeconds = 0;

// Only attempt to initialize the extended sessions cache if all the parameters are correctly specified
if (properties.TryGetValue("ExtendedSessionIdleTimeoutInSeconds", out object? extendedSessionIdleTimeoutObj)
&& extendedSessionIdleTimeoutObj is double extendedSessionIdleTimeout
&& extendedSessionIdleTimeout > 0
&& properties.TryGetValue("IsExtendedSession", out object? extendedSessionObj)
&& extendedSessionObj is bool extendedSession)
{
extendedSessionIdleTimeoutInSeconds = extendedSessionIdleTimeout;
isExtendedSession = extendedSession;
extendedSessions = extendedSessionsCache?.GetOrInitializeCache(extendedSessionIdleTimeoutInSeconds);
}

if (properties.TryGetValue("IncludePastEvents", out object? includePastEventsObj)
&& includePastEventsObj is bool includePastEvents)
{
pastEventsIncluded = includePastEvents;
}
GrpcInstanceRunnerUtils.ParseRequestPropertiesAndInitializeCache(
properties,
extendedSessionsCache,
out double extendedSessionIdleTimeoutInSeconds,
out bool isExtendedSession,
out bool pastEventsIncluded,
out MemoryCache? extendedSessions);

if (isExtendedSession && extendedSessions != null)
{
Expand Down
Loading
Loading