Skip to content

Fail in-flight invocations when worker channel shuts down #11159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: dev
Choose a base branch
from
Open
14 changes: 9 additions & 5 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
using Microsoft.Azure.WebJobs.Script.Diagnostics;
using Microsoft.Azure.WebJobs.Script.Diagnostics.OpenTelemetry;
using Microsoft.Azure.WebJobs.Script.Eventing;
using Microsoft.Azure.WebJobs.Script.Exceptions;
using Microsoft.Azure.WebJobs.Script.Extensions;
using Microsoft.Azure.WebJobs.Script.Grpc.Eventing;
using Microsoft.Azure.WebJobs.Script.Grpc.Extensions;
Expand Down Expand Up @@ -1555,21 +1556,24 @@ public bool IsExecutingInvocation(string invocationId)
return _executingInvocations.ContainsKey(invocationId);
}

public bool TryFailExecutions(Exception workerException)
public void Shutdown(Exception workerException)
{
if (workerException == null)
WorkerShutdownException shutdownException = new WorkerShutdownException("Worker encountered a fatal error and is shutting down.");

if (workerException is not null)
{
return false;
shutdownException.Reason = workerException.Message;
}

foreach (var invocation in _executingInvocations?.Values)
{
string invocationId = invocation.Context?.ExecutionContext?.InvocationId.ToString();
_workerChannelLogger.LogDebug("Worker '{workerId}' encountered a fatal error. Failing invocation: '{invocationId}'", _workerId, invocationId);
invocation.Context?.ResultSource?.TrySetException(workerException);

invocation.Context?.ResultSource?.TrySetException(shutdownException);

RemoveExecutingInvocation(invocationId);
}
return true;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public async Task OnTimeoutExceptionAsync(ExceptionDispatchInfo exceptionInfo, T
// explicitly in order to pass the timeout.
// Task ignoreTask = _hostManager.StopAsync();
// Give the manager and all running tasks some time to shut down gracefully.
//await Task.Delay(timeoutGracePeriod);
// await Task.Delay(timeoutGracePeriod);
IFunctionInvocationDispatcher functionInvocationDispatcher = _functionInvocationDispatcherFactory.GetFunctionDispatcher();
if (!functionInvocationDispatcher.State.Equals(FunctionInvocationDispatcherState.Default))
{
Expand Down
21 changes: 21 additions & 0 deletions src/WebJobs.Script/Exceptions/WorkerShutdownException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;

namespace Microsoft.Azure.WebJobs.Script.Exceptions
{
internal class WorkerShutdownException : Exception
{
public WorkerShutdownException() { }

public WorkerShutdownException(string message) : base(message) { }

public WorkerShutdownException(string message, string reason) : base(message)
{
Reason = reason;
}

public string Reason { get; set; }
}
}
9 changes: 8 additions & 1 deletion src/WebJobs.Script/Http/DefaultHttpProxyService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal class DefaultHttpProxyService : IHttpProxyService, IDisposable
private readonly HttpMessageInvoker _messageInvoker;
private readonly ForwarderRequestConfig _forwarderRequestConfig;
private readonly ILogger<DefaultHttpProxyService> _logger;
private readonly HttpTransformer _httpTransformer;

public DefaultHttpProxyService(IHttpForwarder httpForwarder, ILogger<DefaultHttpProxyService> logger)
{
Expand All @@ -39,6 +40,8 @@ public DefaultHttpProxyService(IHttpForwarder httpForwarder, ILogger<DefaultHttp
{
ActivityTimeout = TimeSpan.FromSeconds(240)
};

_httpTransformer = new ScriptInvocationRequestTransformer();
}

public void Dispose()
Expand Down Expand Up @@ -98,7 +101,11 @@ public void StartForwarding(ScriptInvocationContext context, Uri httpUri)
// add invocation id as correlation id, override existing header if present
httpRequest.Headers[ScriptConstants.HttpProxyCorrelationHeader] = context.ExecutionContext.InvocationId.ToString();

var forwardingTask = _httpForwarder.SendAsync(httpContext, httpUri.ToString(), _messageInvoker, _forwarderRequestConfig).AsTask();
// Add the script invocation context for later observation of the ScriptInvocationResult task.
// This helps track failures/cancellations that should halt retrying the http request.
httpContext.Items[ScriptConstants.HttpProxyScriptInvocationContext] = context;

var forwardingTask = _httpForwarder.SendAsync(httpContext, httpUri.ToString(), _messageInvoker, _forwarderRequestConfig, _httpTransformer).AsTask();
context.Properties[ScriptConstants.HttpProxyTask] = forwardingTask;
}
}
Expand Down
18 changes: 18 additions & 0 deletions src/WebJobs.Script/Http/RetryProxyHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Script.Description;
using Microsoft.Azure.WebJobs.Script.Exceptions;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Script.Http
Expand All @@ -30,11 +32,22 @@ public RetryProxyHandler(HttpMessageHandler innerHandler, ILogger logger)

protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
TaskCompletionSource<ScriptInvocationResult> resultSource = null;
if (request.Options.TryGetValue(ScriptConstants.HttpProxyScriptInvocationContext, out ScriptInvocationContext scriptInvocationContext))
{
resultSource = scriptInvocationContext.ResultSource;
}

var currentDelay = InitialDelay;
for (int attemptCount = 1; attemptCount <= MaxRetries; attemptCount++)
{
try
{
if (resultSource is not null && resultSource.Task.IsFaulted)
{
throw resultSource.Task.Exception.InnerException;
}

return await base.SendAsync(request, cancellationToken);
}
catch (TaskCanceledException) when (cancellationToken.IsCancellationRequested)
Expand All @@ -51,6 +64,11 @@ protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage

currentDelay = Math.Min(currentDelay * 2, MaximumDelay);
}
catch (WorkerShutdownException)
{
_logger.LogInformation("Language worker channel is shutting down. Request will not be retried.");
throw;
}
catch (Exception ex)
{
var message = attemptCount == MaxRetries
Expand Down
28 changes: 28 additions & 0 deletions src/WebJobs.Script/Http/ScriptInvocationRequestTransformer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs.Script;
using Microsoft.Azure.WebJobs.Script.Description;
using Yarp.ReverseProxy.Forwarder;

public class ScriptInvocationRequestTransformer : HttpTransformer
{
public override async ValueTask TransformRequestAsync(HttpContext httpContext, HttpRequestMessage proxyRequest, string destinationPrefix, CancellationToken cancellationToken)
{
var defaultTransformer = HttpTransformer.Default;

// this preserves previous behavior (which called the default transformer) - base method is also called inside of here
await defaultTransformer.TransformRequestAsync(httpContext, proxyRequest, destinationPrefix, cancellationToken);

if (httpContext.Items.TryGetValue(ScriptConstants.HttpProxyScriptInvocationContext, out object result)
&& result is ScriptInvocationContext scriptContext)
{
proxyRequest.Options.TryAdd(ScriptConstants.HttpProxyScriptInvocationContext, scriptContext);
}
}
}
1 change: 1 addition & 0 deletions src/WebJobs.Script/ScriptConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ public static class ScriptConstants
public static readonly string HttpProxyingEnabled = "HttpProxyingEnabled";
public static readonly string HttpProxyCorrelationHeader = "x-ms-invocation-id";
public static readonly string HttpProxyTask = "HttpProxyTask";
public static readonly string HttpProxyScriptInvocationContext = "HttpProxyScriptInvocationContext";

public static readonly string OperationNameKey = "OperationName";

Expand Down
2 changes: 1 addition & 1 deletion src/WebJobs.Script/Workers/Rpc/IRpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ public interface IRpcWorkerChannel : IWorkerChannel

bool IsExecutingInvocation(string invocationId);

bool TryFailExecutions(Exception workerException);
void Shutdown(Exception workerException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Task<bool> ShutdownChannelIfExistsAsync(string channelId, Exception worke
{
string id = rpcChannel.Id;
_logger.LogDebug("Disposing language worker channel with id:{workerId}", id);
rpcChannel.TryFailExecutions(workerException);
rpcChannel.Shutdown(workerException);

(rpcChannel as IDisposable)?.Dispose();
_logger.LogDebug("Disposed language worker channel with id:{workerId}", id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId,
if (workerChannel != null)
{
_logger.LogDebug("Disposing WebHost channel for workerId: {channelId}, for runtime:{language}", workerId, language);
workerChannel.TryFailExecutions(workerException);
workerChannel.Shutdown(workerException);
(channelTask.Result as IDisposable)?.Dispose();
}
}
Expand All @@ -295,7 +295,7 @@ public Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId,
if (workerChannel != null)
{
_logger.LogDebug("Disposing WebHost channel for workerId: {channelId}, for runtime:{language}", workerId, language);
workerChannel.TryFailExecutions(workerException);
workerChannel.Shutdown(workerException);
(channelTask.Result as IDisposable)?.Dispose();
}
}
Expand Down
Loading