Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Core" Version="2.2.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Abstractions" Version="1.3.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Storage.Blobs" Version="6.7.0" />
<PackageVersion Include="Microsoft.Azure.WebJobs" Version="3.0.39" />
<PackageVersion Include="Microsoft.Azure.WebJobs" Version="3.0.45" />
<PackageVersion Include="Microsoft.Azure.WebJobs.Extensions.DurableTask.Analyzers" Version="0.5.0" />
<PackageVersion Include="Microsoft.Azure.WebJobs.Extensions.Rpc" Version="3.0.39" />
<PackageVersion Include="Microsoft.Bcl.AsyncInterfaces" Version="10.0.1" />
Expand Down
45 changes: 30 additions & 15 deletions src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.
#nullable enable
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Entities;
using DurableTask.Core.Entities.OperationFormat;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Middleware;
using Google.Protobuf;
using Microsoft.Azure.WebJobs.Host.Executors;
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Entities;
using DurableTask.Core.Entities.OperationFormat;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Middleware;
using Google.Protobuf;
using Microsoft.Azure.WebJobs.Host.Executors;
using P = Microsoft.DurableTask.Protobuf;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
Expand Down Expand Up @@ -405,7 +405,14 @@ void SetErrorResult(FailureDetails failureDetails)
cancellationToken: this.HostLifetimeService.OnStopping);

if (!functionResult.Succeeded)
{
{
// This exception is thrown when another Function on the worker exceeded the Function timeout.
// In this case we want to make sure to retry this entity's execution rather than marking it as failed.
if (functionResult.Exception is Host.FunctionTimeoutAbortException)
{
throw functionResult.Exception;
}

// Shutdown can surface as a completed invocation in a failed state.
// Re-throw so we can abort this invocation.
this.HostLifetimeService.OnStopping.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -554,7 +561,14 @@ public async Task CallActivityAsync(DispatchMiddlewareContext dispatchContext, F
triggerInput,
cancellationToken: this.HostLifetimeService.OnStopping);
if (!result.Succeeded)
{
{
// This exception is thrown when another Function on the worker exceeded the Function timeout.
// In this case we want to make sure to retry this Activity's execution rather than marking it as failed.
if (result.Exception is Host.FunctionTimeoutAbortException)
{
throw result.Exception;
}

// Shutdown can surface as a completed invocation in a failed state.
// Re-throw so we can abort this invocation.
this.HostLifetimeService.OnStopping.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -646,6 +660,7 @@ private static bool IsPlatformLevelException(Exception? exception)
// TODO: the `WorkerProcessExitException` type is not exposed in our dependencies, it's part of WebJobs.Host.Script.
// Should we add that dependency or should it be exposed in WebJobs.Host?
return exception is Host.FunctionTimeoutException
|| exception is Host.FunctionTimeoutAbortException
|| exception?.InnerException is SessionAbortedException // see RemoteOrchestrationContext.TrySetResultInternal for details on OOM-handling
|| (exception?.InnerException?.GetType().ToString().Contains("WorkerProcessExitException", StringComparison.Ordinal) ?? false)
|| (exception?.InnerException is InvalidOperationException ioe
Expand Down
91 changes: 69 additions & 22 deletions test/FunctionsV2/OutOfProcMiddlewareTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Entities;
using DurableTask.Core.Entities.OperationFormat;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Middleware;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Moq;
Expand All @@ -31,13 +30,13 @@
var innerException = new InvalidOperationException("The internal function invoker returned a task that does not support return values!");
var outerException = new Exception("Function invocation failed.", innerException);

var (middleware, dispatchContext) = this.SetupOrchestratorTest(outerException);
(OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext) = this.SetupOrchestratorTest(outerException);

// Act: should NOT throw SessionAbortedException — instead the orchestration should be marked as failed
await middleware.CallOrchestratorAsync(dispatchContext, () => Task.CompletedTask);

// Assert: the middleware should have set a failure result on the dispatch context
var result = dispatchContext.GetProperty<OrchestratorExecutionResult>();
OrchestratorExecutionResult result = dispatchContext.GetProperty<OrchestratorExecutionResult>();
Assert.NotNull(result);
}

Expand All @@ -46,16 +45,43 @@
[MemberData(nameof(PlatformLevelExceptions))]
public async Task CallOrchestratorAsync_PlatformLevelException_ThrowsSessionAbortedException(Exception exception)
{
var (middleware, dispatchContext) = this.SetupOrchestratorTest(exception);
(OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext) = this.SetupOrchestratorTest(exception);

await Assert.ThrowsAsync<SessionAbortedException>(
() => middleware.CallOrchestratorAsync(dispatchContext, () => Task.CompletedTask));
}

[Fact]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
public async Task CallEntityAsync_FunctionTimeoutAbortException_ThrowsSessionAbortedException()
{
var exception = new FunctionTimeoutAbortException("Activity A timed out! Worker channel closing");

(OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext) = this.SetupEntityTest(exception);

await Assert.ThrowsAsync<SessionAbortedException>(
() => middleware.CallEntityAsync(dispatchContext, () => Task.CompletedTask));
}

[Fact]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
public async Task CallActivityAsync_FunctionTimeoutAbortException_ThrowsSessionAbortedException()
{
var exception = new FunctionTimeoutAbortException("Activity A timed out! Worker channel closing");

(OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext) = this.SetupActivityTest(exception);

await Assert.ThrowsAsync<SessionAbortedException>(
() => middleware.CallActivityAsync(dispatchContext, () => Task.CompletedTask));
}

public static IEnumerable<object[]> PlatformLevelExceptions()
{
// FunctionTimeoutException (top-level)
yield return new object[] { new Host.FunctionTimeoutException("Function timed out.") };
yield return new object[] { new Host.FunctionTimeoutException("Function timed out.") };

// FunctionTimeoutAbortException (top-level)
yield return new object[] { new Host.FunctionTimeoutAbortException("Function timed out.") };

// SessionAbortedException as InnerException (e.g. out-of-memory handling)
yield return new object[] { new Exception("Function invocation failed.", new SessionAbortedException("Out of memory")) };
Expand All @@ -69,24 +95,50 @@

private (OutOfProcMiddleware middleware, DispatchMiddlewareContext context) SetupOrchestratorTest(Exception executorException)
{
var (middleware, dispatchContext) = this.CreateMiddleware(executorException, "TestOrchestrator", FunctionType.Orchestrator);
(OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext)
= this.CreateMiddleware(executorException, "TestOrchestrator", FunctionType.Orchestrator);

var orchestrationState = new OrchestrationRuntimeState(
new List<HistoryEvent>
{
[
new ExecutionStartedEvent(-1, null) { Name = "TestOrchestrator" },
});
]);

dispatchContext.SetProperty(orchestrationState);
dispatchContext.SetProperty(new OrchestrationInstance { InstanceId = "test-instance-id" });

return (middleware, dispatchContext);
}

private (OutOfProcMiddleware middleware, DispatchMiddlewareContext context) SetupEntityTest(Exception executorException)
{
(OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext)
= this.CreateMiddleware(executorException, "TestEntity", FunctionType.Entity);

dispatchContext.SetProperty(new EntityBatchRequest
{
InstanceId = "@TestEntity@test-key",
EntityState = null,
Operations = new List<OperationRequest>(),
});

return (middleware, dispatchContext);
}

private (OutOfProcMiddleware middleware, DispatchMiddlewareContext context) SetupActivityTest(Exception executorException)
{
(OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext)
= this.CreateMiddleware(executorException, "TestActivity", FunctionType.Activity);

dispatchContext.SetProperty(new TaskScheduledEvent(-1) { Name = "TestActivity" });
dispatchContext.SetProperty(new OrchestrationInstance { InstanceId = "test-instance-id" });

return (middleware, dispatchContext);
}

private (OutOfProcMiddleware middleware, DispatchMiddlewareContext context) CreateMiddleware(
Exception executorException, string functionName, FunctionType functionType)
{
var extension = CreateDurableTaskExtension();
DurableTaskExtension extension = CreateDurableTaskExtension();

var mockExecutor = new Mock<ITriggeredFunctionExecutor>();
mockExecutor
Expand Down Expand Up @@ -131,15 +183,14 @@
new OptionsWrapper<DurableTaskOptions>(options),
NullLoggerFactory.Instance,
TestHelpers.GetTestNameResolver(),
new[]
{
[
new AzureStorageDurabilityProviderFactory(
new OptionsWrapper<DurableTaskOptions>(options),
new TestStorageServiceClientProviderFactory(),
TestHelpers.GetTestNameResolver(),
NullLoggerFactory.Instance,
TestHelpers.GetMockPlatformInformationService()),
},
],
new TestHostShutdownNotificationService(),
new DurableHttpMessageHandlerFactory(),
platformInformationService: TestHelpers.GetMockPlatformInformationService());
Expand All @@ -148,13 +199,13 @@
private static WorkItemMetadata CreateWorkItemMetadata(bool isExtendedSession, bool includeState)
{
// WorkItemMetadata has an internal constructor, so we use reflection to create it.
var ctor = typeof(WorkItemMetadata).GetConstructor(
ConstructorInfo ctor = typeof(WorkItemMetadata).GetConstructor(
BindingFlags.Instance | BindingFlags.NonPublic,
binder: null,
new[] { typeof(bool), typeof(bool) },
[typeof(bool), typeof(bool)],
modifiers: null);
Assert.NotNull(ctor);
return (WorkItemMetadata)ctor.Invoke(new object[] { isExtendedSession, includeState });
return (WorkItemMetadata)ctor.Invoke([isExtendedSession, includeState]);
}

/// <summary>
Expand All @@ -163,12 +214,8 @@
/// <c>WorkerProcessExitException</c> lives in <c>Microsoft.Azure.WebJobs.Script</c>
/// (the Functions host runtime), which is too heavy to reference as a test dependency.
/// </summary>
private class WorkerProcessExitExceptionStub : Exception
private class WorkerProcessExitExceptionStub(string message) : Exception(message)

Check warning on line 217 in test/FunctionsV2/OutOfProcMiddlewareTests.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 217 in test/FunctionsV2/OutOfProcMiddlewareTests.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 217 in test/FunctionsV2/OutOfProcMiddlewareTests.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 217 in test/FunctionsV2/OutOfProcMiddlewareTests.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 217 in test/FunctionsV2/OutOfProcMiddlewareTests.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 217 in test/FunctionsV2/OutOfProcMiddlewareTests.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 217 in test/FunctionsV2/OutOfProcMiddlewareTests.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 217 in test/FunctionsV2/OutOfProcMiddlewareTests.cs

View workflow job for this annotation

GitHub Actions / build

{
public WorkerProcessExitExceptionStub(string message)
: base(message)
{
}
}
}
}
Loading