diff --git a/Directory.Packages.props b/Directory.Packages.props index ac26f4282..d0d554ca6 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -22,7 +22,7 @@ - + diff --git a/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs b/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs index af78b758e..c6b9f10d0 100644 --- a/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs +++ b/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs @@ -1,19 +1,19 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See LICENSE in the project root for license information. #nullable enable -using System; -using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; -using System.Linq; -using System.Threading.Tasks; -using DurableTask.Core; -using DurableTask.Core.Entities; -using DurableTask.Core.Entities.OperationFormat; -using DurableTask.Core.Exceptions; -using DurableTask.Core.History; -using DurableTask.Core.Middleware; -using Google.Protobuf; -using Microsoft.Azure.WebJobs.Host.Executors; +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Threading.Tasks; +using DurableTask.Core; +using DurableTask.Core.Entities; +using DurableTask.Core.Entities.OperationFormat; +using DurableTask.Core.Exceptions; +using DurableTask.Core.History; +using DurableTask.Core.Middleware; +using Google.Protobuf; +using Microsoft.Azure.WebJobs.Host.Executors; using P = Microsoft.DurableTask.Protobuf; namespace Microsoft.Azure.WebJobs.Extensions.DurableTask @@ -405,7 +405,14 @@ void SetErrorResult(FailureDetails failureDetails) cancellationToken: this.HostLifetimeService.OnStopping); if (!functionResult.Succeeded) - { + { + // This exception is thrown when another Function on the worker exceeded the Function timeout. + // In this case we want to make sure to retry this entity's execution rather than marking it as failed. + if (functionResult.Exception is Host.FunctionTimeoutAbortException) + { + throw functionResult.Exception; + } + // Shutdown can surface as a completed invocation in a failed state. // Re-throw so we can abort this invocation. this.HostLifetimeService.OnStopping.ThrowIfCancellationRequested(); @@ -554,7 +561,14 @@ public async Task CallActivityAsync(DispatchMiddlewareContext dispatchContext, F triggerInput, cancellationToken: this.HostLifetimeService.OnStopping); if (!result.Succeeded) - { + { + // This exception is thrown when another Function on the worker exceeded the Function timeout. + // In this case we want to make sure to retry this Activity's execution rather than marking it as failed. + if (result.Exception is Host.FunctionTimeoutAbortException) + { + throw result.Exception; + } + // Shutdown can surface as a completed invocation in a failed state. // Re-throw so we can abort this invocation. this.HostLifetimeService.OnStopping.ThrowIfCancellationRequested(); @@ -646,6 +660,7 @@ private static bool IsPlatformLevelException(Exception? exception) // TODO: the `WorkerProcessExitException` type is not exposed in our dependencies, it's part of WebJobs.Host.Script. // Should we add that dependency or should it be exposed in WebJobs.Host? return exception is Host.FunctionTimeoutException + || exception is Host.FunctionTimeoutAbortException || exception?.InnerException is SessionAbortedException // see RemoteOrchestrationContext.TrySetResultInternal for details on OOM-handling || (exception?.InnerException?.GetType().ToString().Contains("WorkerProcessExitException", StringComparison.Ordinal) ?? false) || (exception?.InnerException is InvalidOperationException ioe diff --git a/test/FunctionsV2/OutOfProcMiddlewareTests.cs b/test/FunctionsV2/OutOfProcMiddlewareTests.cs index b9edd7581..ed7cd3c23 100644 --- a/test/FunctionsV2/OutOfProcMiddlewareTests.cs +++ b/test/FunctionsV2/OutOfProcMiddlewareTests.cs @@ -7,13 +7,12 @@ using System.Threading; using System.Threading.Tasks; using DurableTask.Core; -using DurableTask.Core.Entities; using DurableTask.Core.Entities.OperationFormat; using DurableTask.Core.Exceptions; using DurableTask.Core.History; using DurableTask.Core.Middleware; +using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Executors; -using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using Moq; @@ -31,13 +30,13 @@ public async Task CallOrchestratorAsync_DifferentInvalidOperationException_DoesN var innerException = new InvalidOperationException("The internal function invoker returned a task that does not support return values!"); var outerException = new Exception("Function invocation failed.", innerException); - var (middleware, dispatchContext) = this.SetupOrchestratorTest(outerException); + (OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext) = this.SetupOrchestratorTest(outerException); // Act: should NOT throw SessionAbortedException — instead the orchestration should be marked as failed await middleware.CallOrchestratorAsync(dispatchContext, () => Task.CompletedTask); // Assert: the middleware should have set a failure result on the dispatch context - var result = dispatchContext.GetProperty(); + OrchestratorExecutionResult result = dispatchContext.GetProperty(); Assert.NotNull(result); } @@ -46,16 +45,43 @@ public async Task CallOrchestratorAsync_DifferentInvalidOperationException_DoesN [MemberData(nameof(PlatformLevelExceptions))] public async Task CallOrchestratorAsync_PlatformLevelException_ThrowsSessionAbortedException(Exception exception) { - var (middleware, dispatchContext) = this.SetupOrchestratorTest(exception); + (OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext) = this.SetupOrchestratorTest(exception); await Assert.ThrowsAsync( () => middleware.CallOrchestratorAsync(dispatchContext, () => Task.CompletedTask)); + } + + [Fact] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + public async Task CallEntityAsync_FunctionTimeoutAbortException_ThrowsSessionAbortedException() + { + var exception = new FunctionTimeoutAbortException("Activity A timed out! Worker channel closing"); + + (OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext) = this.SetupEntityTest(exception); + + await Assert.ThrowsAsync( + () => middleware.CallEntityAsync(dispatchContext, () => Task.CompletedTask)); + } + + [Fact] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + public async Task CallActivityAsync_FunctionTimeoutAbortException_ThrowsSessionAbortedException() + { + var exception = new FunctionTimeoutAbortException("Activity A timed out! Worker channel closing"); + + (OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext) = this.SetupActivityTest(exception); + + await Assert.ThrowsAsync( + () => middleware.CallActivityAsync(dispatchContext, () => Task.CompletedTask)); } public static IEnumerable PlatformLevelExceptions() { // FunctionTimeoutException (top-level) - yield return new object[] { new Host.FunctionTimeoutException("Function timed out.") }; + yield return new object[] { new Host.FunctionTimeoutException("Function timed out.") }; + + // FunctionTimeoutAbortException (top-level) + yield return new object[] { new Host.FunctionTimeoutAbortException("Function timed out.") }; // SessionAbortedException as InnerException (e.g. out-of-memory handling) yield return new object[] { new Exception("Function invocation failed.", new SessionAbortedException("Out of memory")) }; @@ -69,24 +95,50 @@ public static IEnumerable PlatformLevelExceptions() private (OutOfProcMiddleware middleware, DispatchMiddlewareContext context) SetupOrchestratorTest(Exception executorException) { - var (middleware, dispatchContext) = this.CreateMiddleware(executorException, "TestOrchestrator", FunctionType.Orchestrator); + (OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext) + = this.CreateMiddleware(executorException, "TestOrchestrator", FunctionType.Orchestrator); var orchestrationState = new OrchestrationRuntimeState( - new List - { + [ new ExecutionStartedEvent(-1, null) { Name = "TestOrchestrator" }, - }); + ]); dispatchContext.SetProperty(orchestrationState); dispatchContext.SetProperty(new OrchestrationInstance { InstanceId = "test-instance-id" }); + return (middleware, dispatchContext); + } + + private (OutOfProcMiddleware middleware, DispatchMiddlewareContext context) SetupEntityTest(Exception executorException) + { + (OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext) + = this.CreateMiddleware(executorException, "TestEntity", FunctionType.Entity); + + dispatchContext.SetProperty(new EntityBatchRequest + { + InstanceId = "@TestEntity@test-key", + EntityState = null, + Operations = new List(), + }); + + return (middleware, dispatchContext); + } + + private (OutOfProcMiddleware middleware, DispatchMiddlewareContext context) SetupActivityTest(Exception executorException) + { + (OutOfProcMiddleware middleware, DispatchMiddlewareContext dispatchContext) + = this.CreateMiddleware(executorException, "TestActivity", FunctionType.Activity); + + dispatchContext.SetProperty(new TaskScheduledEvent(-1) { Name = "TestActivity" }); + dispatchContext.SetProperty(new OrchestrationInstance { InstanceId = "test-instance-id" }); + return (middleware, dispatchContext); } private (OutOfProcMiddleware middleware, DispatchMiddlewareContext context) CreateMiddleware( Exception executorException, string functionName, FunctionType functionType) { - var extension = CreateDurableTaskExtension(); + DurableTaskExtension extension = CreateDurableTaskExtension(); var mockExecutor = new Mock(); mockExecutor @@ -131,15 +183,14 @@ private static DurableTaskExtension CreateDurableTaskExtension() new OptionsWrapper(options), NullLoggerFactory.Instance, TestHelpers.GetTestNameResolver(), - new[] - { + [ new AzureStorageDurabilityProviderFactory( new OptionsWrapper(options), new TestStorageServiceClientProviderFactory(), TestHelpers.GetTestNameResolver(), NullLoggerFactory.Instance, TestHelpers.GetMockPlatformInformationService()), - }, + ], new TestHostShutdownNotificationService(), new DurableHttpMessageHandlerFactory(), platformInformationService: TestHelpers.GetMockPlatformInformationService()); @@ -148,13 +199,13 @@ private static DurableTaskExtension CreateDurableTaskExtension() private static WorkItemMetadata CreateWorkItemMetadata(bool isExtendedSession, bool includeState) { // WorkItemMetadata has an internal constructor, so we use reflection to create it. - var ctor = typeof(WorkItemMetadata).GetConstructor( + ConstructorInfo ctor = typeof(WorkItemMetadata).GetConstructor( BindingFlags.Instance | BindingFlags.NonPublic, binder: null, - new[] { typeof(bool), typeof(bool) }, + [typeof(bool), typeof(bool)], modifiers: null); Assert.NotNull(ctor); - return (WorkItemMetadata)ctor.Invoke(new object[] { isExtendedSession, includeState }); + return (WorkItemMetadata)ctor.Invoke([isExtendedSession, includeState]); } /// @@ -163,12 +214,9 @@ private static WorkItemMetadata CreateWorkItemMetadata(bool isExtendedSession, b /// WorkerProcessExitException lives in Microsoft.Azure.WebJobs.Script /// (the Functions host runtime), which is too heavy to reference as a test dependency. /// - private class WorkerProcessExitExceptionStub : Exception + private class WorkerProcessExitExceptionStub(string message) + : Exception(message) { - public WorkerProcessExitExceptionStub(string message) - : base(message) - { - } } } }