Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Core" Version="2.2.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Abstractions" Version="1.3.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Storage.Blobs" Version="6.7.0" />
<PackageVersion Include="Microsoft.Azure.WebJobs" Version="3.0.39" />
<PackageVersion Include="Microsoft.Azure.WebJobs" Version="3.0.45" />
<PackageVersion Include="Microsoft.Azure.WebJobs.Extensions.DurableTask.Analyzers" Version="0.5.0" />
<PackageVersion Include="Microsoft.Azure.WebJobs.Extensions.Rpc" Version="3.0.39" />
<PackageVersion Include="Microsoft.Bcl.AsyncInterfaces" Version="10.0.1" />
Expand Down
45 changes: 30 additions & 15 deletions src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.
#nullable enable
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Entities;
using DurableTask.Core.Entities.OperationFormat;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Middleware;
using Google.Protobuf;
using Microsoft.Azure.WebJobs.Host.Executors;
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Entities;
using DurableTask.Core.Entities.OperationFormat;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Middleware;
using Google.Protobuf;
using Microsoft.Azure.WebJobs.Host.Executors;
using P = Microsoft.DurableTask.Protobuf;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
Expand Down Expand Up @@ -405,7 +405,14 @@ void SetErrorResult(FailureDetails failureDetails)
cancellationToken: this.HostLifetimeService.OnStopping);

if (!functionResult.Succeeded)
{
{
// This exception is thrown when another Function on the worker exceeded the Function timeout.
// In this case we want to make sure to retry this entity's execution rather than marking it as failed.
if (functionResult.Exception is Host.FunctionTimeoutAbortException)
{
throw functionResult.Exception;
}

// Shutdown can surface as a completed invocation in a failed state.
// Re-throw so we can abort this invocation.
this.HostLifetimeService.OnStopping.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -554,7 +561,14 @@ public async Task CallActivityAsync(DispatchMiddlewareContext dispatchContext, F
triggerInput,
cancellationToken: this.HostLifetimeService.OnStopping);
if (!result.Succeeded)
{
{
// This exception is thrown when another Function on the worker exceeded the Function timeout.
// In this case we want to make sure to retry this Activity's execution rather than marking it as failed.
if (result.Exception is Host.FunctionTimeoutAbortException)
{
throw result.Exception;
}

// Shutdown can surface as a completed invocation in a failed state.
// Re-throw so we can abort this invocation.
this.HostLifetimeService.OnStopping.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -646,6 +660,7 @@ private static bool IsPlatformLevelException(Exception? exception)
// TODO: the `WorkerProcessExitException` type is not exposed in our dependencies, it's part of WebJobs.Host.Script.
// Should we add that dependency or should it be exposed in WebJobs.Host?
return exception is Host.FunctionTimeoutException
|| exception is Host.FunctionTimeoutAbortException
|| exception?.InnerException is SessionAbortedException // see RemoteOrchestrationContext.TrySetResultInternal for details on OOM-handling
|| (exception?.InnerException?.GetType().ToString().Contains("WorkerProcessExitException", StringComparison.Ordinal) ?? false)
|| (exception?.InnerException is InvalidOperationException ioe
Expand Down
92 changes: 70 additions & 22 deletions test/FunctionsV2/OutOfProcMiddlewareTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Entities;
using DurableTask.Core.Entities.OperationFormat;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Middleware;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Moq;
Expand All @@ -31,13 +30,13 @@ public async Task CallOrchestratorAsync_DifferentInvalidOperationException_DoesN
var innerException = new InvalidOperationException("The internal function invoker returned a task that does not support return values!");
var outerException = new Exception("Function invocation failed.", innerException);

var (middleware, dispatchContext) = this.SetupOrchestratorTest(outerException);
(OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext) = this.SetupOrchestratorTest(outerException);

// Act: should NOT throw SessionAbortedException — instead the orchestration should be marked as failed
await middleware.CallOrchestratorAsync(dispatchContext, () => Task.CompletedTask);

// Assert: the middleware should have set a failure result on the dispatch context
var result = dispatchContext.GetProperty<OrchestratorExecutionResult>();
OrchestratorExecutionResult result = dispatchContext.GetProperty<OrchestratorExecutionResult>();
Assert.NotNull(result);
}

Expand All @@ -46,16 +45,43 @@ public async Task CallOrchestratorAsync_DifferentInvalidOperationException_DoesN
[MemberData(nameof(PlatformLevelExceptions))]
public async Task CallOrchestratorAsync_PlatformLevelException_ThrowsSessionAbortedException(Exception exception)
{
var (middleware, dispatchContext) = this.SetupOrchestratorTest(exception);
(OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext) = this.SetupOrchestratorTest(exception);

await Assert.ThrowsAsync<SessionAbortedException>(
() => middleware.CallOrchestratorAsync(dispatchContext, () => Task.CompletedTask));
}

[Fact]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
public async Task CallEntityAsync_FunctionTimeoutAbortException_ThrowsSessionAbortedException()
{
var exception = new FunctionTimeoutAbortException("Activity A timed out! Worker channel closing");

(OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext) = this.SetupEntityTest(exception);

await Assert.ThrowsAsync<SessionAbortedException>(
() => middleware.CallEntityAsync(dispatchContext, () => Task.CompletedTask));
}

[Fact]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
public async Task CallActivityAsync_FunctionTimeoutAbortException_ThrowsSessionAbortedException()
{
var exception = new FunctionTimeoutAbortException("Activity A timed out! Worker channel closing");

(OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext) = this.SetupActivityTest(exception);

await Assert.ThrowsAsync<SessionAbortedException>(
() => middleware.CallActivityAsync(dispatchContext, () => Task.CompletedTask));
}

public static IEnumerable<object[]> PlatformLevelExceptions()
{
// FunctionTimeoutException (top-level)
yield return new object[] { new Host.FunctionTimeoutException("Function timed out.") };
yield return new object[] { new Host.FunctionTimeoutException("Function timed out.") };

// FunctionTimeoutAbortException (top-level)
yield return new object[] { new Host.FunctionTimeoutAbortException("Function timed out.") };

// SessionAbortedException as InnerException (e.g. out-of-memory handling)
yield return new object[] { new Exception("Function invocation failed.", new SessionAbortedException("Out of memory")) };
Expand All @@ -69,24 +95,50 @@ public static IEnumerable<object[]> PlatformLevelExceptions()

private (OutOfProcMiddleware middleware, DispatchMiddlewareContext context) SetupOrchestratorTest(Exception executorException)
{
var (middleware, dispatchContext) = this.CreateMiddleware(executorException, "TestOrchestrator", FunctionType.Orchestrator);
(OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext)
= this.CreateMiddleware(executorException, "TestOrchestrator", FunctionType.Orchestrator);

var orchestrationState = new OrchestrationRuntimeState(
new List<HistoryEvent>
{
[
new ExecutionStartedEvent(-1, null) { Name = "TestOrchestrator" },
});
]);

dispatchContext.SetProperty(orchestrationState);
dispatchContext.SetProperty(new OrchestrationInstance { InstanceId = "test-instance-id" });

return (middleware, dispatchContext);
}

private (OutOfProcMiddleware middleware, DispatchMiddlewareContext context) SetupEntityTest(Exception executorException)
{
(OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext)
= this.CreateMiddleware(executorException, "TestEntity", FunctionType.Entity);

dispatchContext.SetProperty(new EntityBatchRequest
{
InstanceId = "@TestEntity@test-key",
EntityState = null,
Operations = new List<OperationRequest>(),
});

return (middleware, dispatchContext);
}

private (OutOfProcMiddleware middleware, DispatchMiddlewareContext context) SetupActivityTest(Exception executorException)
{
(OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext)
= this.CreateMiddleware(executorException, "TestActivity", FunctionType.Activity);

dispatchContext.SetProperty(new TaskScheduledEvent(-1) { Name = "TestActivity" });
dispatchContext.SetProperty(new OrchestrationInstance { InstanceId = "test-instance-id" });

return (middleware, dispatchContext);
}

private (OutOfProcMiddleware middleware, DispatchMiddlewareContext context) CreateMiddleware(
Exception executorException, string functionName, FunctionType functionType)
{
var extension = CreateDurableTaskExtension();
DurableTaskExtension extension = CreateDurableTaskExtension();

var mockExecutor = new Mock<ITriggeredFunctionExecutor>();
mockExecutor
Expand Down Expand Up @@ -131,15 +183,14 @@ private static DurableTaskExtension CreateDurableTaskExtension()
new OptionsWrapper<DurableTaskOptions>(options),
NullLoggerFactory.Instance,
TestHelpers.GetTestNameResolver(),
new[]
{
[
new AzureStorageDurabilityProviderFactory(
new OptionsWrapper<DurableTaskOptions>(options),
new TestStorageServiceClientProviderFactory(),
TestHelpers.GetTestNameResolver(),
NullLoggerFactory.Instance,
TestHelpers.GetMockPlatformInformationService()),
},
],
new TestHostShutdownNotificationService(),
new DurableHttpMessageHandlerFactory(),
platformInformationService: TestHelpers.GetMockPlatformInformationService());
Expand All @@ -148,13 +199,13 @@ private static DurableTaskExtension CreateDurableTaskExtension()
private static WorkItemMetadata CreateWorkItemMetadata(bool isExtendedSession, bool includeState)
{
// WorkItemMetadata has an internal constructor, so we use reflection to create it.
var ctor = typeof(WorkItemMetadata).GetConstructor(
ConstructorInfo ctor = typeof(WorkItemMetadata).GetConstructor(
BindingFlags.Instance | BindingFlags.NonPublic,
binder: null,
new[] { typeof(bool), typeof(bool) },
[typeof(bool), typeof(bool)],
modifiers: null);
Assert.NotNull(ctor);
return (WorkItemMetadata)ctor.Invoke(new object[] { isExtendedSession, includeState });
return (WorkItemMetadata)ctor.Invoke([isExtendedSession, includeState]);
}

/// <summary>
Expand All @@ -163,12 +214,9 @@ private static WorkItemMetadata CreateWorkItemMetadata(bool isExtendedSession, b
/// <c>WorkerProcessExitException</c> lives in <c>Microsoft.Azure.WebJobs.Script</c>
/// (the Functions host runtime), which is too heavy to reference as a test dependency.
/// </summary>
private class WorkerProcessExitExceptionStub : Exception
private class WorkerProcessExitExceptionStub(string message)
: Exception(message)
{
public WorkerProcessExitExceptionStub(string message)
: base(message)
{
}
}
}
}
Loading