-
Notifications
You must be signed in to change notification settings - Fork 464
Fail in-flight invocations when worker channel shuts down #11159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from 16 commits
99d2e23
78f3a99
d249c4a
eff32c2
ad650ed
16875db
31e0473
4b64ece
5ab7a94
dba3f31
b084a14
8c5cd2f
c904db4
04409f3
8ad875d
dd5017a
22660a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,13 @@ public async Task OnTimeoutExceptionAsync(ExceptionDispatchInfo exceptionInfo, T | |
{ | ||
FunctionTimeoutException timeoutException = exceptionInfo.SourceException as FunctionTimeoutException; | ||
|
||
// this seems to happen when the worker channel is already shutting down. Ex. One timeout is being handled and another comes in during shutdown. | ||
satvu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (timeoutException is null) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it's not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah this seems odd, the method is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've only reproduced this twice, but timeoutException has ended up |
||
{ | ||
_logger.LogDebug("A timeout exception has occurred, but worker channel is already shutting down", exceptionInfo.SourceException); | ||
satvu marked this conversation as resolved.
Show resolved
Hide resolved
satvu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return; | ||
} | ||
|
||
if (timeoutException?.Task != null) | ||
{ | ||
// We may double the timeoutGracePeriod here by first waiting to see if the initial | ||
|
@@ -48,13 +55,13 @@ public async Task OnTimeoutExceptionAsync(ExceptionDispatchInfo exceptionInfo, T | |
// explicitly in order to pass the timeout. | ||
// Task ignoreTask = _hostManager.StopAsync(); | ||
// Give the manager and all running tasks some time to shut down gracefully. | ||
//await Task.Delay(timeoutGracePeriod); | ||
// await Task.Delay(timeoutGracePeriod); | ||
IFunctionInvocationDispatcher functionInvocationDispatcher = _functionInvocationDispatcherFactory.GetFunctionDispatcher(); | ||
if (!functionInvocationDispatcher.State.Equals(FunctionInvocationDispatcherState.Default)) | ||
{ | ||
_logger.LogWarning($"A function timeout has occurred. Restarting worker process executing invocationId '{timeoutException.InstanceId}'.", exceptionInfo.SourceException); | ||
// If invocation id is not found in any of the workers => worker is already disposed. No action needed. | ||
await functionInvocationDispatcher.RestartWorkerWithInvocationIdAsync(timeoutException.InstanceId.ToString()); | ||
await functionInvocationDispatcher.RestartWorkerWithInvocationIdAsync(timeoutException.InstanceId.ToString(), timeoutException); | ||
_logger.LogWarning("Restart of language worker process(es) completed.", exceptionInfo.SourceException); | ||
} | ||
else | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
// Copyright (c) .NET Foundation. All rights reserved. | ||
// Licensed under the MIT License. See License.txt in the project root for license information. | ||
|
||
using System; | ||
|
||
namespace Microsoft.Azure.WebJobs.Script.Exceptions | ||
{ | ||
internal sealed class WorkerShutdownException : Exception | ||
{ | ||
public WorkerShutdownException() { } | ||
|
||
public WorkerShutdownException(string message) : base(message) { } | ||
|
||
satvu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
public WorkerShutdownException(string message, Exception innerException) : base(message, innerException) | ||
{ | ||
Reason = innerException?.Message ?? string.Empty; | ||
} | ||
|
||
public string Reason { get; set; } | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
// Copyright (c) .NET Foundation. All rights reserved. | ||
// Licensed under the MIT License. See License.txt in the project root for license information. | ||
|
||
using System.Collections.Generic; | ||
using System.Net.Http; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Microsoft.AspNetCore.Http; | ||
using Microsoft.Azure.WebJobs.Script.Description; | ||
using Yarp.ReverseProxy.Forwarder; | ||
|
||
namespace Microsoft.Azure.WebJobs.Script.Http | ||
{ | ||
internal class ScriptInvocationRequestTransformer : HttpTransformer | ||
{ | ||
public override async ValueTask TransformRequestAsync(HttpContext httpContext, HttpRequestMessage proxyRequest, string destinationPrefix, CancellationToken cancellationToken) | ||
{ | ||
// this preserves previous behavior (which called the default transformer) - base method is also called inside of here | ||
await HttpTransformer.Default.TransformRequestAsync(httpContext, proxyRequest, destinationPrefix, cancellationToken); | ||
|
||
if (httpContext.Items.TryGetValue(ScriptConstants.HttpProxyScriptInvocationContext, out object result) | ||
&& result is ScriptInvocationContext scriptContext) | ||
{ | ||
proxyRequest.Options.TryAdd(ScriptConstants.HttpProxyScriptInvocationContext, scriptContext); | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -206,7 +206,7 @@ public Task ShutdownAsync() | |||||
return Task.CompletedTask; | ||||||
} | ||||||
|
||||||
public Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId) | ||||||
public Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId, Exception exception = null) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The unused parameter is here because this class implements |
||||||
{ | ||||||
// Since there's only one channel for httpworker | ||||||
DisposeAndRestartWorkerChannel(_httpWorkerChannel.Id); | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
// Copyright (c) .NET Foundation. All rights reserved. | ||
// Licensed under the MIT License. See License.txt in the project root for license information. | ||
|
||
using System; | ||
satvu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
using System.Runtime.ExceptionServices; | ||
using System.Threading.Tasks; | ||
using Microsoft.AspNetCore.Hosting; | ||
using Microsoft.Azure.WebJobs.Host; | ||
using Microsoft.Azure.WebJobs.Script.WebHost; | ||
using Microsoft.Azure.WebJobs.Script.Workers; | ||
using Microsoft.Extensions.Logging; | ||
using Moq; | ||
using Xunit; | ||
|
||
namespace Microsoft.Azure.WebJobs.Script.Tests.Handlers | ||
{ | ||
public class WebScriptHostExceptionHandlerTests | ||
{ | ||
private readonly Mock<IApplicationLifetime> _mockApplicationLifetime; | ||
private readonly Mock<ILogger<WebScriptHostExceptionHandler>> _mockLogger; | ||
private readonly Mock<IFunctionInvocationDispatcherFactory> _mockDispatcherFactory; | ||
private readonly Mock<IFunctionInvocationDispatcher> _mockDispatcher; | ||
private readonly WebScriptHostExceptionHandler _exceptionHandler; | ||
|
||
public WebScriptHostExceptionHandlerTests() | ||
{ | ||
_mockApplicationLifetime = new Mock<IApplicationLifetime>(); | ||
_mockLogger = new Mock<ILogger<WebScriptHostExceptionHandler>>(); | ||
_mockDispatcherFactory = new Mock<IFunctionInvocationDispatcherFactory>(); | ||
_mockDispatcher = new Mock<IFunctionInvocationDispatcher>(); | ||
|
||
_mockDispatcherFactory.Setup(f => f.GetFunctionDispatcher()) | ||
.Returns(_mockDispatcher.Object); | ||
|
||
_exceptionHandler = new WebScriptHostExceptionHandler( | ||
_mockApplicationLifetime.Object, | ||
_mockLogger.Object, | ||
_mockDispatcherFactory.Object); | ||
} | ||
|
||
[Fact] | ||
public async Task OnTimeoutExceptionAsync_CallsRestartWorkerWithInvocationIdAsync_WithTimeoutException() | ||
{ | ||
var task = Task.CompletedTask; | ||
var timeoutException = new FunctionTimeoutException("Test timeout"); | ||
var exceptionInfo = ExceptionDispatchInfo.Capture(timeoutException); | ||
var timeoutGracePeriod = TimeSpan.FromSeconds(5); | ||
|
||
_mockDispatcher.Setup(d => d.State) | ||
.Returns(FunctionInvocationDispatcherState.Initialized); | ||
_mockDispatcher.Setup(d => d.RestartWorkerWithInvocationIdAsync(It.IsAny<string>(), It.IsAny<Exception>())) | ||
.Returns(Task.FromResult(true)); | ||
|
||
await _exceptionHandler.OnTimeoutExceptionAsync(exceptionInfo, timeoutGracePeriod); | ||
|
||
_mockDispatcher.Verify(d => d.RestartWorkerWithInvocationIdAsync( | ||
It.IsAny<string>(), | ||
timeoutException), Times.Once); | ||
} | ||
|
||
[Fact] | ||
public async Task OnTimeoutExceptionAsync_WhenTaskDoesNotCompleteWithinGracePeriod_RestartsWorker() | ||
{ | ||
// Arrange | ||
var invocationId = Guid.NewGuid(); | ||
var taskCompletionSource = new TaskCompletionSource<bool>(); | ||
var timeoutException = new FunctionTimeoutException("Test timeout"); | ||
var exceptionInfo = ExceptionDispatchInfo.Capture(timeoutException); | ||
var timeoutGracePeriod = TimeSpan.FromMilliseconds(100); // Short grace period | ||
|
||
_mockDispatcher.Setup(d => d.State) | ||
.Returns(FunctionInvocationDispatcherState.Initialized); | ||
_mockDispatcher.Setup(d => d.RestartWorkerWithInvocationIdAsync(It.IsAny<string>(), It.IsAny<Exception>())) | ||
.Returns(Task.FromResult(true)); | ||
|
||
// Don't complete the task to simulate it not finishing within the grace period | ||
|
||
// Act | ||
await _exceptionHandler.OnTimeoutExceptionAsync(exceptionInfo, timeoutGracePeriod); | ||
|
||
// Assert | ||
_mockDispatcher.Verify(d => d.RestartWorkerWithInvocationIdAsync( | ||
It.IsAny<string>(), | ||
timeoutException), Times.Once); | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.