Skip to content

Commit 0f0e98a

Browse files
Fix bug in entity batch processing that caused entity state size explosion (#2390)
* add more detailed tracing to entity processing logic * fix bug that caused resume message to be delayed * add mitigation to ensure entities can never get stuck in a suspended state * update release notes * address PR feedback
1 parent 7bdae9d commit 0f0e98a

File tree

10 files changed

+339
-22
lines changed

10 files changed

+339
-22
lines changed

release_notes.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@
22

33
### New Features
44

5+
- Added more detail to entity batch processing instrumentation, to help diagnose issues
6+
57
### Bug Fixes
68

9+
- Fix bug in entity batch processing that caused entity state size explosion
10+
711
### Breaking Changes
812

913
### Dependency Updates

src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,20 @@ bool IDurableEntityContext.HasState
119119
public FunctionBindingContext FunctionBindingContext { get; set; }
120120
#endif
121121

122-
public void CaptureInternalError(Exception e)
122+
public void CaptureInternalError(Exception e, TaskEntityShim shim)
123123
{
124+
// first, try to get a quick ETW message out to help us diagnose what happened
125+
string details = Utils.IsFatal(e) ? e.GetType().Name : e.ToString();
126+
this.Config.TraceHelper.EntityBatchFailed(
127+
this.HubName,
128+
this.Name,
129+
this.InstanceId,
130+
shim.TraceFlags,
131+
details);
132+
133+
// then, record the error for additional reporting and tracking in other places
124134
this.InternalError = ExceptionDispatchInfo.Capture(e);
135+
shim.AddTraceFlag(EntityTraceFlags.InternalError);
125136
}
126137

127138
public void CaptureApplicationError(Exception e)
@@ -134,11 +145,11 @@ public void CaptureApplicationError(Exception e)
134145
this.ApplicationErrors.Add(ExceptionDispatchInfo.Capture(e));
135146
}
136147

137-
public void AbortOnInternalError()
148+
public void AbortOnInternalError(string traceFlags)
138149
{
139150
if (this.InternalError != null)
140151
{
141-
throw new SessionAbortedException($"Session aborted because of {this.InternalError.SourceException.GetType().Name}", this.InternalError.SourceException);
152+
throw new SessionAbortedException($"Session aborted because of {this.InternalError.SourceException.GetType().Name}, traceFlags={traceFlags}", this.InternalError.SourceException);
142153
}
143154
}
144155

src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -876,6 +876,8 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
876876

877877
try
878878
{
879+
entityShim.AddTraceFlag('1'); // add a bread crumb for the entity batch tracing
880+
879881
// 1. First time through the history
880882
// we count events, add any under-lock op to the batch, and process lock releases
881883
foreach (HistoryEvent e in runtimeState.Events)
@@ -929,7 +931,8 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
929931

930932
foreach (var message in deliverNow)
931933
{
932-
if (entityContext.State.LockedBy == message.ParentInstanceId)
934+
if (entityContext.State.LockedBy != null
935+
&& entityContext.State.LockedBy == message.ParentInstanceId)
933936
{
934937
if (lockHolderMessages == null)
935938
{
@@ -967,6 +970,7 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
967970
// this is a continue message.
968971
// Resumes processing of previously queued operations, if any.
969972
entityContext.State.Suspended = false;
973+
entityShim.AddTraceFlag(EntityTraceFlags.Resumed);
970974
}
971975

972976
break;
@@ -979,8 +983,19 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
979983
entityContext.State.PutBack(lockHolderMessages);
980984
}
981985

986+
// mitigation for ICM358210295 : if an entity has been in suspended state for at least 10 seconds, resume
987+
// (suspended state is never meant to last long, it is needed just so the history gets persisted to storage)
988+
if (entityContext.State.Suspended
989+
&& runtimeState.ExecutionStartedEvent?.Timestamp < DateTime.UtcNow - TimeSpan.FromSeconds(10))
990+
{
991+
entityContext.State.Suspended = false;
992+
entityShim.AddTraceFlag(EntityTraceFlags.MitigationResumed);
993+
}
994+
982995
if (!entityContext.State.Suspended)
983996
{
997+
entityShim.AddTraceFlag('2');
998+
984999
// 2. We add as many requests from the queue to the batch as possible,
9851000
// stopping at lock requests or when the maximum batch size is reached
9861001
while (entityContext.State.MayDequeue())
@@ -989,6 +1004,7 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
9891004
{
9901005
// we have reached the maximum batch size already
9911006
// insert a delay after this batch to ensure write back
1007+
entityShim.AddTraceFlag(EntityTraceFlags.BatchSizeLimit);
9921008
entityShim.ToBeContinuedWithDelay();
9931009
break;
9941010
}
@@ -1009,14 +1025,14 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
10091025
}
10101026
catch (Exception e)
10111027
{
1012-
entityContext.CaptureInternalError(e);
1028+
entityContext.CaptureInternalError(e, entityShim);
10131029
}
10141030

10151031
WrappedFunctionResult result;
10161032

10171033
if (entityShim.OperationBatch.Count > 0 && !this.HostLifetimeService.OnStopping.IsCancellationRequested)
10181034
{
1019-
// 3a. Start the functions invocation pipeline (billing, logging, bindings, and timeout tracking).
1035+
// 3a. (function execution) Start the functions invocation pipeline (billing, logging, bindings, and timeout tracking).
10201036
result = await FunctionExecutionHelper.ExecuteFunctionInOrchestrationMiddleware(
10211037
entityShim.GetFunctionInfo().Executor,
10221038
new TriggeredFunctionData
@@ -1037,6 +1053,8 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
10371053
FunctionType.Entity,
10381054
isReplay: false);
10391055

1056+
entityShim.AddTraceFlag('3');
1057+
10401058
// 3. Run all the operations in the batch
10411059
if (entityContext.InternalError == null)
10421060
{
@@ -1046,10 +1064,12 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
10461064
}
10471065
catch (Exception e)
10481066
{
1049-
entityContext.CaptureInternalError(e);
1067+
entityContext.CaptureInternalError(e, entityShim);
10501068
}
10511069
}
10521070

1071+
entityShim.AddTraceFlag('4');
1072+
10531073
// 4. Run the DTFx orchestration to persist the effects,
10541074
// send the outbox, and continue as new
10551075
await next();
@@ -1110,7 +1130,9 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
11101130
}
11111131
else
11121132
{
1113-
// 3b. We do not need to call into user code because we are not going to run any operations.
1133+
entityShim.AddTraceFlag(EntityTraceFlags.DirectExecution);
1134+
1135+
// 3b. (direct execution) We do not need to call into user code because we are not going to run any operations.
11141136
// In this case we can execute without involving the functions runtime.
11151137
if (entityContext.InternalError == null)
11161138
{
@@ -1121,14 +1143,14 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
11211143
}
11221144
catch (Exception e)
11231145
{
1124-
entityContext.CaptureInternalError(e);
1146+
entityContext.CaptureInternalError(e, entityShim);
11251147
}
11261148
}
11271149
}
11281150

11291151
// If there were internal errors, throw a SessionAbortedException
11301152
// here so DTFx can abort the batch and back off the work item
1131-
entityContext.AbortOnInternalError();
1153+
entityContext.AbortOnInternalError(entityShim.TraceFlags);
11321154

11331155
await entityContext.RunDeferredTasks();
11341156
}

src/WebJobs.Extensions.DurableTask/EndToEndTraceHelper.cs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,79 @@ public void EntityLockReleased(
755755
}
756756
}
757757

758+
public void EntityBatchCompleted(
759+
string hubName,
760+
string functionName,
761+
string instanceId,
762+
int eventsReceived,
763+
int operationsInBatch,
764+
int operationsExecuted,
765+
int? outOfOrderMessages,
766+
int queuedMessages,
767+
int userStateSize,
768+
int? sources,
769+
int? destinations,
770+
string lockedBy,
771+
bool suspended,
772+
string traceFlags)
773+
{
774+
FunctionType functionType = FunctionType.Entity;
775+
776+
EtwEventSource.Instance.EntityBatchCompleted(
777+
hubName,
778+
LocalAppName,
779+
LocalSlotName,
780+
functionName,
781+
instanceId,
782+
eventsReceived,
783+
operationsInBatch,
784+
operationsExecuted,
785+
outOfOrderMessages?.ToString() ?? "",
786+
queuedMessages,
787+
userStateSize,
788+
sources?.ToString() ?? "",
789+
destinations?.ToString() ?? "",
790+
lockedBy ?? "",
791+
suspended,
792+
traceFlags,
793+
functionType.ToString(),
794+
ExtensionVersion,
795+
IsReplay: false);
796+
797+
this.logger.LogInformation(
798+
"{instanceId}: Function '{functionName} ({functionType})' received {eventsReceived} events and processed {operationsExecuted}/{operationsInBatch} entity operations. OutOfOrderMessages: {outOfOrderMessages}. QueuedMessages: {queuedMessages}. UserStateSize: {userStateSize}. Sources: {sources}. Destinations: {destinations}. LockedBy: {lockedBy}. Suspended: {suspended}. TraceFlags: {traceFlags}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.",
799+
instanceId, functionName, functionType,
800+
eventsReceived, operationsExecuted, operationsInBatch, outOfOrderMessages, queuedMessages, userStateSize, sources, destinations, lockedBy, suspended, traceFlags,
801+
FunctionState.EntityBatch, hubName, LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++);
802+
}
803+
804+
public void EntityBatchFailed(
805+
string hubName,
806+
string functionName,
807+
string instanceId,
808+
string traceFlags,
809+
string details)
810+
{
811+
FunctionType functionType = FunctionType.Entity;
812+
813+
EtwEventSource.Instance.EntityBatchFailed(
814+
hubName,
815+
LocalAppName,
816+
LocalSlotName,
817+
functionName,
818+
instanceId,
819+
traceFlags,
820+
details,
821+
functionType.ToString(),
822+
ExtensionVersion);
823+
824+
this.logger.LogError(
825+
"{instanceId}: Function '{functionName} ({functionType})' failed. TraceFlags: {traceFlags}. Details: {details}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.",
826+
instanceId, functionName, functionType,
827+
traceFlags, details,
828+
hubName, LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++);
829+
}
830+
758831
public void EventGridSuccess(
759832
string hubName,
760833
string functionName,
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See LICENSE in the project root for license information.
3+
4+
using System;
5+
using System.Text;
6+
7+
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
8+
{
9+
internal struct EntityTraceFlags
10+
{
11+
// state was rehydrated
12+
public const char Rehydrated = 'Y';
13+
14+
// the execution was suspended (to be continued in a fresh batch), or resumed
15+
public const char Suspended = 'S';
16+
public const char Resumed = 'R';
17+
public const char MitigationResumed = 'M';
18+
19+
// reasons for suspending execution
20+
public const char TimedOut = 'T';
21+
public const char HostShutdown = 'H';
22+
public const char SignificantTimeElapsed = 'E';
23+
public const char BatchSizeLimit = 'L';
24+
25+
// execution is waiting for new messages after a continue-as-new
26+
public const char WaitForEvents = 'W';
27+
28+
// the execution bypassed the functions middleware because no user code is called
29+
public const char DirectExecution = 'D';
30+
31+
// an internal error was captured
32+
public const char InternalError = '!';
33+
34+
// trace flags
35+
private StringBuilder traceFlags;
36+
37+
public string TraceFlags => this.traceFlags.ToString();
38+
39+
public void AddFlag(char flag)
40+
{
41+
// we concatenate the trace flag characters, they serve as a 'trail of bread crumbs' to reconstruct code path
42+
(this.traceFlags ??= new StringBuilder()).Append(flag);
43+
}
44+
}
45+
}

0 commit comments

Comments
 (0)