Skip to content

Fail in-flight invocations when worker channel shuts down #11159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: dev
Choose a base branch
from
Open
15 changes: 10 additions & 5 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
using System.Threading.Tasks.Dataflow;
using Google.Protobuf.Collections;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Azure.WebJobs.Script.Config;
using Microsoft.Azure.WebJobs.Script.Description;
using Microsoft.Azure.WebJobs.Script.Diagnostics;
using Microsoft.Azure.WebJobs.Script.Diagnostics.OpenTelemetry;
using Microsoft.Azure.WebJobs.Script.Eventing;
using Microsoft.Azure.WebJobs.Script.Exceptions;
using Microsoft.Azure.WebJobs.Script.Extensions;
using Microsoft.Azure.WebJobs.Script.Grpc.Eventing;
using Microsoft.Azure.WebJobs.Script.Grpc.Extensions;
Expand Down Expand Up @@ -1555,21 +1557,24 @@ public bool IsExecutingInvocation(string invocationId)
return _executingInvocations.ContainsKey(invocationId);
}

public bool TryFailExecutions(Exception workerException)
public void Shutdown(Exception workerException)
{
if (workerException == null)
var shutdownException = workerException;

if (workerException is null || workerException is FunctionTimeoutException)
{
return false;
shutdownException = new FunctionAbortedException(workerException?.Message ?? "Worker channel is shutting down. Aborting function.", workerException);
}

foreach (var invocation in _executingInvocations?.Values)
{
string invocationId = invocation.Context?.ExecutionContext?.InvocationId.ToString();
_workerChannelLogger.LogDebug("Worker '{workerId}' encountered a fatal error. Failing invocation: '{invocationId}'", _workerId, invocationId);
invocation.Context?.ResultSource?.TrySetException(workerException);

invocation.Context?.ResultSource?.TrySetException(shutdownException);

RemoveExecutingInvocation(invocationId);
}
return true;
}

/// <summary>
Expand Down
60 changes: 33 additions & 27 deletions src/WebJobs.Script.WebHost/WebScriptHostExceptionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,46 @@ public WebScriptHostExceptionHandler(IApplicationLifetime applicationLifetime, I

public async Task OnTimeoutExceptionAsync(ExceptionDispatchInfo exceptionInfo, TimeSpan timeoutGracePeriod)
{
FunctionTimeoutException timeoutException = exceptionInfo.SourceException as FunctionTimeoutException;

if (timeoutException?.Task != null)
if (exceptionInfo.SourceException is FunctionTimeoutException timeoutException)
{
// We may double the timeoutGracePeriod here by first waiting to see if the initial
// function task that started the exception has completed.
Task completedTask = await Task.WhenAny(timeoutException.Task, Task.Delay(timeoutGracePeriod));

// If the function task has completed, simply return. The host has already logged the timeout.
if (completedTask == timeoutException.Task)
if (timeoutException?.Task != null)
{
return;
// We may double the timeoutGracePeriod here by first waiting to see if the initial
// function task that started the exception has completed.
Task completedTask = await Task.WhenAny(timeoutException.Task, Task.Delay(timeoutGracePeriod));

// If the function task has completed, simply return. The host has already logged the timeout.
if (completedTask == timeoutException.Task)
{
return;
}
}
}

// We can't wait on this as it may cause a deadlock if the timeout was fired
// by a Listener that cannot stop until it has completed.
// TODO: DI (FACAVAL) The shutdown call will invoke the host stop... but we may need to do this
// explicitly in order to pass the timeout.
// Task ignoreTask = _hostManager.StopAsync();
// Give the manager and all running tasks some time to shut down gracefully.
//await Task.Delay(timeoutGracePeriod);
IFunctionInvocationDispatcher functionInvocationDispatcher = _functionInvocationDispatcherFactory.GetFunctionDispatcher();
if (!functionInvocationDispatcher.State.Equals(FunctionInvocationDispatcherState.Default))
{
_logger.LogWarning($"A function timeout has occurred. Restarting worker process executing invocationId '{timeoutException.InstanceId}'.", exceptionInfo.SourceException);
// If invocation id is not found in any of the workers => worker is already disposed. No action needed.
await functionInvocationDispatcher.RestartWorkerWithInvocationIdAsync(timeoutException.InstanceId.ToString());
_logger.LogWarning("Restart of language worker process(es) completed.", exceptionInfo.SourceException);
// We can't wait on this as it may cause a deadlock if the timeout was fired
// by a Listener that cannot stop until it has completed.
// TODO: DI (FACAVAL) The shutdown call will invoke the host stop... but we may need to do this
// explicitly in order to pass the timeout.
// Task ignoreTask = _hostManager.StopAsync();
// Give the manager and all running tasks some time to shut down gracefully.
// await Task.Delay(timeoutGracePeriod);
IFunctionInvocationDispatcher functionInvocationDispatcher = _functionInvocationDispatcherFactory.GetFunctionDispatcher();
if (!functionInvocationDispatcher.State.Equals(FunctionInvocationDispatcherState.Default))
{
_logger.LogWarning($"A function timeout has occurred. Restarting worker process executing invocationId '{timeoutException.InstanceId}'.", exceptionInfo.SourceException);
// If invocation id is not found in any of the workers => worker is already disposed. No action needed.
await functionInvocationDispatcher.RestartWorkerWithInvocationIdAsync(timeoutException.InstanceId.ToString(), timeoutException);
_logger.LogWarning("Restart of language worker process(es) completed.", exceptionInfo.SourceException);
}
else
{
LogErrorAndFlush("A function timeout has occurred. Host is shutting down.", exceptionInfo.SourceException);
_applicationLifetime.StopApplication();
}
}
else
{
LogErrorAndFlush("A function timeout has occurred. Host is shutting down.", exceptionInfo.SourceException);
_applicationLifetime.StopApplication();
// still testing why this can occur, leaving a placeholder log message for now.
LogErrorAndFlush("An unexpected timeout exception has occurred. Host is shutting down.", exceptionInfo.SourceException);
}
}

Expand Down
19 changes: 19 additions & 0 deletions src/WebJobs.Script/Exceptions/FunctionAbortedException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using Microsoft.Azure.WebJobs.Host;

namespace Microsoft.Azure.WebJobs.Script.Exceptions
{
internal sealed class FunctionAbortedException : FunctionTimeoutException
{
public FunctionAbortedException() { }

public FunctionAbortedException(string message) : base(message) { }

public FunctionAbortedException(string message, Exception innerException) : base(message, innerException)
{
}
}
}
9 changes: 8 additions & 1 deletion src/WebJobs.Script/Http/DefaultHttpProxyService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal class DefaultHttpProxyService : IHttpProxyService, IDisposable
private readonly HttpMessageInvoker _messageInvoker;
private readonly ForwarderRequestConfig _forwarderRequestConfig;
private readonly ILogger<DefaultHttpProxyService> _logger;
private readonly HttpTransformer _httpTransformer;

public DefaultHttpProxyService(IHttpForwarder httpForwarder, ILogger<DefaultHttpProxyService> logger)
{
Expand All @@ -39,6 +40,8 @@ public DefaultHttpProxyService(IHttpForwarder httpForwarder, ILogger<DefaultHttp
{
ActivityTimeout = TimeSpan.FromSeconds(240)
};

_httpTransformer = new ScriptInvocationRequestTransformer();
}

public void Dispose()
Expand Down Expand Up @@ -98,7 +101,11 @@ public void StartForwarding(ScriptInvocationContext context, Uri httpUri)
// add invocation id as correlation id, override existing header if present
httpRequest.Headers[ScriptConstants.HttpProxyCorrelationHeader] = context.ExecutionContext.InvocationId.ToString();

var forwardingTask = _httpForwarder.SendAsync(httpContext, httpUri.ToString(), _messageInvoker, _forwarderRequestConfig).AsTask();
// Add the script invocation context for later observation of the ScriptInvocationResult task.
// This helps track failures/cancellations that should halt retrying the http request.
httpContext.Items[ScriptConstants.HttpProxyScriptInvocationContext] = context;

var forwardingTask = _httpForwarder.SendAsync(httpContext, httpUri.ToString(), _messageInvoker, _forwarderRequestConfig, _httpTransformer).AsTask();
context.Properties[ScriptConstants.HttpProxyTask] = forwardingTask;
}
}
Expand Down
18 changes: 18 additions & 0 deletions src/WebJobs.Script/Http/RetryProxyHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Script.Description;
using Microsoft.Azure.WebJobs.Script.Exceptions;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Script.Http
Expand All @@ -30,11 +32,22 @@ public RetryProxyHandler(HttpMessageHandler innerHandler, ILogger logger)

protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
TaskCompletionSource<ScriptInvocationResult> resultSource = null;
if (request.Options.TryGetValue(ScriptConstants.HttpProxyScriptInvocationContext, out ScriptInvocationContext scriptInvocationContext))
{
resultSource = scriptInvocationContext.ResultSource;
}

var currentDelay = InitialDelay;
for (int attemptCount = 1; attemptCount <= MaxRetries; attemptCount++)
{
try
{
if (resultSource is not null && resultSource.Task.IsFaulted)
{
throw resultSource.Task.Exception?.InnerException ?? new HttpRequestException("The function invocation tied to this HTTP request failed.");
}

return await base.SendAsync(request, cancellationToken);
}
catch (TaskCanceledException) when (cancellationToken.IsCancellationRequested)
Expand All @@ -51,6 +64,11 @@ protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage

currentDelay = Math.Min(currentDelay * 2, MaximumDelay);
}
catch (FunctionAbortedException)
{
_logger.LogDebug("Function invocation aborted. Request will not be retried.");
throw;
}
catch (Exception ex)
{
var message = attemptCount == MaxRetries
Expand Down
28 changes: 28 additions & 0 deletions src/WebJobs.Script/Http/ScriptInvocationRequestTransformer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs.Script.Description;
using Yarp.ReverseProxy.Forwarder;

namespace Microsoft.Azure.WebJobs.Script.Http
{
internal class ScriptInvocationRequestTransformer : HttpTransformer
{
public override async ValueTask TransformRequestAsync(HttpContext httpContext, HttpRequestMessage proxyRequest, string destinationPrefix, CancellationToken cancellationToken)
{
// this preserves previous behavior (which called the default transformer) - base method is also called inside of here
await HttpTransformer.Default.TransformRequestAsync(httpContext, proxyRequest, destinationPrefix, cancellationToken);

if (httpContext.Items.TryGetValue(ScriptConstants.HttpProxyScriptInvocationContext, out object result)
&& result is ScriptInvocationContext scriptContext)
{
proxyRequest.Options.TryAdd(ScriptConstants.HttpProxyScriptInvocationContext, scriptContext);
}
}
}
}
1 change: 1 addition & 0 deletions src/WebJobs.Script/ScriptConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ public static class ScriptConstants
public static readonly string HttpProxyingEnabled = "HttpProxyingEnabled";
public static readonly string HttpProxyCorrelationHeader = "x-ms-invocation-id";
public static readonly string HttpProxyTask = "HttpProxyTask";
public static readonly string HttpProxyScriptInvocationContext = "HttpProxyScriptInvocationContext";

public static readonly string OperationNameKey = "OperationName";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public Task ShutdownAsync()
return Task.CompletedTask;
}

public Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
public Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId, Exception exception = null)
Copy link
Preview

Copilot AI Jul 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new exception parameter is not used in the method body, so shutdown context is never forwarded. Consider passing the exception to the restart logic or removing the unused parameter.

Suggested change
public Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId, Exception exception = null)
public Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)

Copilot uses AI. Check for mistakes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unused parameter is here because this class implements IFunctionInvocationDispatcher (which includes this method) - this parameter is used in other implementations of this interface.

{
// Since there's only one channel for httpworker
DisposeAndRestartWorkerChannel(_httpWorkerChannel.Id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public interface IFunctionInvocationDispatcher : IDisposable

Task ShutdownAsync();

Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId);
Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId, Exception exception);

Task StartWorkerChannel();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ public void Dispose()
Dispose(true);
}

public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId, Exception exception = null)
{
// Dispose and restart errored channel with the particular invocation id
var channels = await GetInitializedWorkerChannelsAsync();
Expand All @@ -685,7 +685,7 @@ public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
if (channel.IsExecutingInvocation(invocationId))
{
_logger.LogDebug($"Restarting channel with workerId: '{channel.Id}' that is executing invocation: '{invocationId}' and timed out.");
await DisposeAndRestartWorkerChannel(_workerRuntime, channel.Id);
await DisposeAndRestartWorkerChannel(_workerRuntime, channel.Id, exception);
return true;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/WebJobs.Script/Workers/Rpc/IRpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ public interface IRpcWorkerChannel : IWorkerChannel

bool IsExecutingInvocation(string invocationId);

bool TryFailExecutions(Exception workerException);
void Shutdown(Exception workerException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Task<bool> ShutdownChannelIfExistsAsync(string channelId, Exception worke
{
string id = rpcChannel.Id;
_logger.LogDebug("Disposing language worker channel with id:{workerId}", id);
rpcChannel.TryFailExecutions(workerException);
rpcChannel.Shutdown(workerException);

(rpcChannel as IDisposable)?.Dispose();
_logger.LogDebug("Disposed language worker channel with id:{workerId}", id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId,
if (workerChannel != null)
{
_logger.LogDebug("Disposing WebHost channel for workerId: {channelId}, for runtime:{language}", workerId, language);
workerChannel.TryFailExecutions(workerException);
workerChannel.Shutdown(workerException);
(channelTask.Result as IDisposable)?.Dispose();
}
}
Expand All @@ -295,7 +295,7 @@ public Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId,
if (workerChannel != null)
{
_logger.LogDebug("Disposing WebHost channel for workerId: {channelId}, for runtime:{language}", workerId, language);
workerChannel.TryFailExecutions(workerException);
workerChannel.Shutdown(workerException);
(channelTask.Result as IDisposable)?.Dispose();
}
}
Expand Down
Loading
Loading