Skip to content

Commit c0af286

Browse files
author
Sophia Tevosyan
committed
pushing bug fix for not honoring a host restarting an extended session
1 parent 083a304 commit c0af286

File tree

2 files changed

+54
-11
lines changed

2 files changed

+54
-11
lines changed

src/Worker/Grpc/GrpcOrchestrationRunner.cs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -170,11 +170,14 @@ public static string LoadAndRun(
170170
pair => pair.Key,
171171
pair => ProtoUtils.ConvertValueToObject(pair.Value));
172172

173-
OrchestratorExecutionResult? result = null;
173+
OrchestratorExecutionResult? result = null;
174+
MemoryCache? extendedSessions = null;
175+
176+
// If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the orchestration history is attached
174177
bool addToExtendedSessions = false;
175178
bool requiresHistory = false;
179+
bool pastEventsIncluded = true;
176180
double extendedSessionIdleTimeoutInSeconds = 0;
177-
MemoryCache? extendedSessions = null;
178181

179182
if (properties.TryGetValue("ExtendedSessionIdleTimeoutInSeconds", out object? extendedSessionIdleTimeoutObj)
180183
&& extendedSessionIdleTimeoutObj is double extendedSessionIdleTimeout
@@ -183,13 +186,21 @@ public static string LoadAndRun(
183186
extendedSessionIdleTimeoutInSeconds = extendedSessionIdleTimeout;
184187
extendedSessions = extendedSessionsCache.GetOrInitializeCache(extendedSessionIdleTimeoutInSeconds);
185188
}
189+
190+
if (properties.TryGetValue("IncludePastEvents", out object? includePastEventsObj)
191+
&& includePastEventsObj is bool includePastEvents)
192+
{
193+
pastEventsIncluded = includePastEvents;
194+
}
186195

187196
if (properties.TryGetValue("ExtendedSession", out object? isExtendedSessionObj)
188197
&& isExtendedSessionObj is bool isExtendedSession
189198
&& isExtendedSession
190199
&& extendedSessions != null)
191-
{
192-
if (extendedSessions.TryGetValue(request.InstanceId, out ExtendedSessionState? extendedSessionState) && extendedSessionState is not null)
200+
{
201+
// If a history was provided, even if we already have an extended session stored, we always want to evict whatever state is in the cache and replace it with a new extended
202+
// session based on the provided history
203+
if (!pastEventsIncluded && extendedSessions.TryGetValue(request.InstanceId, out ExtendedSessionState? extendedSessionState) && extendedSessionState is not null)
193204
{
194205
OrchestrationRuntimeState runtimeState = extendedSessionState!.RuntimeState;
195206
runtimeState.NewEvents.Clear();
@@ -205,20 +216,17 @@ public static string LoadAndRun(
205216
}
206217
}
207218
else
208-
{
219+
{
220+
extendedSessions.Remove(request.InstanceId);
209221
addToExtendedSessions = true;
210222
}
211223
}
212224

213225
if (result == null)
214226
{
215-
// If this is the first orchestration execution, then the past events count will be 0 but includePastEvents will be true (there are just none to include).
216-
// Otherwise, there is an orchestration history but DurableTask.Core did not attach it since the extended session is still active on its end, but we have since evicted the
227+
// DurableTask.Core did not attach the orchestration history since the extended session is still active on its end, but we have since evicted the
217228
// session and lost the orchestration history so we cannot replay the orchestration.
218-
if (pastEvents.Count == 0
219-
&& (properties.TryGetValue("IncludePastEvents", out object? pastEventsIncludedObj)
220-
&& pastEventsIncludedObj is bool pastEventsIncluded
221-
&& !pastEventsIncluded))
229+
if (!pastEventsIncluded)
222230
{
223231
requiresHistory = true;
224232
}

test/Worker/Core.Tests/GrpcOrchestrationRunnerTests.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,41 @@ public async void Stale_ExtendedSessions_Evicted_Async()
346346
Assert.True(response.RequiresHistory);
347347
}
348348

349+
[Fact]
350+
public void PastEventIncludes_Means_ExtendedSession_Evicted()
351+
{
352+
using var extendedSessions = new ExtendedSessionsCache();
353+
int extendedSessionIdleTimeout = 5;
354+
var historyEvent = new Protobuf.HistoryEvent
355+
{
356+
EventId = -1,
357+
Timestamp = Timestamp.FromDateTime(DateTime.UtcNow),
358+
ExecutionStarted = new Protobuf.ExecutionStartedEvent()
359+
{
360+
OrchestrationInstance = new Protobuf.OrchestrationInstance
361+
{
362+
InstanceId = TestInstanceId,
363+
ExecutionId = TestExecutionId,
364+
},
365+
}
366+
};
367+
Protobuf.OrchestratorRequest orchestratorRequest = CreateOrchestratorRequest([historyEvent]);
368+
orchestratorRequest.Properties.Add(new MapField<string, Value>() {
369+
{ "IncludePastEvents", Value.ForBool(true) },
370+
{ "ExtendedSession", Value.ForBool(true) },
371+
{ "ExtendedSessionIdleTimeoutInSeconds", Value.ForNumber(extendedSessionIdleTimeout) } });
372+
byte[] requestBytes = orchestratorRequest.ToByteArray();
373+
string requestString = Convert.ToBase64String(requestBytes);
374+
GrpcOrchestrationRunner.LoadAndRun(requestString, new CallSubOrchestrationOrchestrator(), extendedSessions);
375+
Assert.True(extendedSessions.IsInitialized());
376+
Assert.True(extendedSessions.GetOrInitializeCache(extendedSessionIdleTimeout).TryGetValue(TestInstanceId, out object? extendedSession));
377+
378+
// Now we will retry the same exact request. If the extended session is not evicted, then the request will fail due to duplicate ExecutionStarted events being detected
379+
// If the extended session is evicted because IncludePastEvents is true, then the request will succeed and a new extended session will be stored
380+
GrpcOrchestrationRunner.LoadAndRun(requestString, new CallSubOrchestrationOrchestrator(), extendedSessions);
381+
Assert.True(extendedSessions.GetOrInitializeCache(extendedSessionIdleTimeout).TryGetValue(TestInstanceId, out extendedSession));
382+
}
383+
349384
static Protobuf.OrchestratorRequest CreateOrchestratorRequest(IEnumerable<Protobuf.HistoryEvent> newEvents)
350385
{
351386
var orchestratorRequest = new Protobuf.OrchestratorRequest()

0 commit comments

Comments
 (0)