Skip to content

Commit b47e9c8

Browse files
handle internal errors like OOM in entity processing by aborting the … (#2234)
* handle internal errors like OOM in entity processing by aborting the session * update release_notes * fix whitespace that breaks build
1 parent 7c7e640 commit b47e9c8

File tree

4 files changed

+76
-56
lines changed

4 files changed

+76
-56
lines changed

release_notes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
## Bug Fixes
22
- Correctly Serialize HostStoppingEvent in ActivityShim (https://github.com/Azure/azure-functions-durable-extension/pull/2178)
33
- Fix NotImplementedException for management API calls from Java client (https://github.com/Azure/azure-functions-durable-extension/pull/2193)
4+
- Handle OOM and other exceptions in entity shim by aborting the session (https://github.com/Azure/azure-functions-durable-extension/pull/2234)
45

56
## Enhancements
67
- add optional 'instanceIdPrefix' query parameter to the HTTP API for instance queries

src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
using System.Runtime.ExceptionServices;
99
using System.Threading.Tasks;
1010
using DurableTask.Core;
11+
using DurableTask.Core.Common;
12+
using DurableTask.Core.Exceptions;
1113
using Microsoft.Azure.WebJobs.Host.Bindings;
1214
using Newtonsoft.Json;
1315

@@ -132,6 +134,14 @@ public void CaptureApplicationError(Exception e)
132134
this.ApplicationErrors.Add(ExceptionDispatchInfo.Capture(e));
133135
}
134136

137+
public void AbortOnInternalError()
138+
{
139+
if (this.InternalError != null)
140+
{
141+
throw new SessionAbortedException($"Session aborted because of {this.InternalError.SourceException.GetType().Name}", this.InternalError.SourceException);
142+
}
143+
}
144+
135145
public void ThrowInternalExceptionIfAny()
136146
{
137147
if (this.InternalError != null)

src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,11 +1128,11 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
11281128
}
11291129
}
11301130

1131-
await entityContext.RunDeferredTasks();
1132-
1133-
// If there were internal errors, do not commit the batch, but instead rethrow
1131+
// If there were internal errors, throw a SessionAbortedException
11341132
// here so DTFx can abort the batch and back off the work item
1135-
entityContext.ThrowInternalExceptionIfAny();
1133+
entityContext.AbortOnInternalError();
1134+
1135+
await entityContext.RunDeferredTasks();
11361136
}
11371137

11381138
internal string GetDefaultConnectionName()

src/WebJobs.Extensions.DurableTask/Listener/TaskEntityShim.cs

Lines changed: 61 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System.Threading;
99
using System.Threading.Tasks;
1010
using DurableTask.Core;
11+
using DurableTask.Core.Exceptions;
1112
using Newtonsoft.Json;
1213
using Newtonsoft.Json.Linq;
1314

@@ -185,70 +186,78 @@ public override async Task<string> Execute(OrchestrationContext innerContext, st
185186

186187
DurableTaskExtension.TagActivityWithOrchestrationStatus(status, this.context.InstanceId, true);
187188
#endif
188-
189-
if (this.operationBatch.Count == 0
190-
&& this.lockRequest == null
191-
&& (this.toBeRescheduled == null || this.toBeRescheduled.Count == 0)
192-
&& !this.suspendAndContinueWithDelay)
193-
{
194-
// we are idle after a ContinueAsNew - the batch is empty.
195-
// Wait for more messages to get here (via extended sessions)
196-
await this.doneProcessingMessages.Task;
197-
}
198-
199-
if (!this.messageDataConverter.IsDefault)
200-
{
201-
innerContext.MessageDataConverter = this.messageDataConverter;
202-
}
203-
204-
if (!this.errorDataConverter.IsDefault)
205-
{
206-
innerContext.ErrorDataConverter = this.errorDataConverter;
207-
}
208-
209-
if (this.NumberEventsToReceive > 0)
189+
try
210190
{
211-
await this.doneProcessingMessages.Task;
212-
}
191+
if (this.operationBatch.Count == 0
192+
&& this.lockRequest == null
193+
&& (this.toBeRescheduled == null || this.toBeRescheduled.Count == 0)
194+
&& !this.suspendAndContinueWithDelay)
195+
{
196+
// we are idle after a ContinueAsNew - the batch is empty.
197+
// Wait for more messages to get here (via extended sessions)
198+
await this.doneProcessingMessages.Task;
199+
}
213200

214-
// Commit the effects of this batch, if
215-
// we have not already run into an internal error
216-
// (in which case we will abort the batch instead of committing it)
217-
if (this.context.InternalError == null)
218-
{
219-
bool writeBackSuccessful = true;
220-
ResponseMessage serializationErrorMessage = null;
201+
if (!this.messageDataConverter.IsDefault)
202+
{
203+
innerContext.MessageDataConverter = this.messageDataConverter;
204+
}
221205

222-
if (this.RollbackFailedOperations)
206+
if (!this.errorDataConverter.IsDefault)
223207
{
224-
// the state has already been written back, since it is
225-
// done right after each operation.
208+
innerContext.ErrorDataConverter = this.errorDataConverter;
226209
}
227-
else
210+
211+
if (this.NumberEventsToReceive > 0)
228212
{
229-
// we are writing back the state here, after executing
230-
// the entire batch of operations.
231-
writeBackSuccessful = this.context.TryWriteback(out serializationErrorMessage, out var _);
213+
await this.doneProcessingMessages.Task;
232214
}
233215

234-
// Reschedule all signals that were received before their time
235-
this.context.RescheduleMessages(innerContext, this.toBeRescheduled);
216+
// Commit the effects of this batch, if
217+
// we have not already run into an internal error
218+
// (in which case we will abort the batch instead of committing it)
219+
if (this.context.InternalError == null)
220+
{
221+
bool writeBackSuccessful = true;
222+
ResponseMessage serializationErrorMessage = null;
236223

237-
// Send all buffered outgoing messages
238-
this.context.SendOutbox(innerContext, writeBackSuccessful, serializationErrorMessage);
224+
if (this.RollbackFailedOperations)
225+
{
226+
// the state has already been written back, since it is
227+
// done right after each operation.
228+
}
229+
else
230+
{
231+
// we are writing back the state here, after executing
232+
// the entire batch of operations.
233+
writeBackSuccessful = this.context.TryWriteback(out serializationErrorMessage, out var _);
234+
}
239235

240-
// send a continue signal
241-
if (this.suspendAndContinueWithDelay)
242-
{
243-
this.context.SendContinue(innerContext);
244-
this.suspendAndContinueWithDelay = false;
245-
this.context.State.Suspended = true;
246-
}
236+
// Reschedule all signals that were received before their time
237+
this.context.RescheduleMessages(innerContext, this.toBeRescheduled);
247238

248-
var jstate = JToken.FromObject(this.context.State);
239+
// Send all buffered outgoing messages
240+
this.context.SendOutbox(innerContext, writeBackSuccessful, serializationErrorMessage);
249241

250-
// continue as new
251-
innerContext.ContinueAsNew(jstate);
242+
// send a continue signal
243+
if (this.suspendAndContinueWithDelay)
244+
{
245+
this.context.SendContinue(innerContext);
246+
this.suspendAndContinueWithDelay = false;
247+
this.context.State.Suspended = true;
248+
}
249+
250+
var jstate = JToken.FromObject(this.context.State);
251+
252+
// continue as new
253+
innerContext.ContinueAsNew(jstate);
254+
}
255+
}
256+
catch (Exception e)
257+
{
258+
// we must catch unexpected exceptions here, otherwise entity goes into permanent failed state
259+
// for example, there can be an OOM thrown during serialization https://github.com/Azure/azure-functions-durable-extension/issues/2166
260+
this.context.CaptureInternalError(e);
252261
}
253262

254263
// The return value is not used.

0 commit comments

Comments
 (0)