Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<PackageReference Include="Microsoft.ApplicationInsights.WindowsServer.TelemetryChannel" Version="2.21.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.2" />
<PackageReference Include="System.Collections.Immutable" Version="1.5.0" />
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.435" PrivateAssets="all" />
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.556" PrivateAssets="all" />
</ItemGroup>

</Project>
24 changes: 18 additions & 6 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -364,19 +364,31 @@ private void DispatchMessage(InboundGrpcEvent msg)

public bool IsChannelReadyForInvocations()
{
return !_disposing && !_disposed && _state.HasFlag(RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
return !_disposing && !_disposed
&& _state.HasFlag(
RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
}

public async Task StartWorkerProcessAsync(CancellationToken cancellationToken)
{
RegisterCallbackForNextGrpcMessage(MsgType.StartStream, _workerConfig.CountOptions.ProcessStartupTimeout, 1, SendWorkerInitRequest, HandleWorkerStartStreamError);
// note: it is important that the ^^^ StartStream is in place *before* we start process the loop, otherwise we get a race condition
RegisterCallbackForNextGrpcMessage(
MsgType.StartStream,
_workerConfig.CountOptions.ProcessStartupTimeout,
count: 1,
SendWorkerInitRequest,
HandleWorkerStartStreamError);

// note: it is important that the ^^^ StartStream is in place *before* we start process the loop,
// otherwise we get a race condition
_ = ProcessInbound();

_workerChannelLogger.LogDebug("Initiating Worker Process start up");
await _rpcWorkerProcess.StartProcessAsync();
_state = _state | RpcWorkerChannelState.Initializing;
await _workerInitTask.Task;
await _rpcWorkerProcess.StartProcessAsync(cancellationToken);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the important change - we will now wait on either worker fully initialized (gRPC connection established) or worker exits (in which case, we will re-throw any failures the worker experience).

_state |= RpcWorkerChannelState.Initializing;
Task winner = await Task.WhenAny(
_workerInitTask.Task, _rpcWorkerProcess.WaitForExitAsync(cancellationToken))
.WaitAsync(cancellationToken);
await winner;
}

public async Task<WorkerStatus> GetWorkerStatusAsync()
Expand Down
2 changes: 1 addition & 1 deletion src/WebJobs.Script.Grpc/WebJobs.Script.Grpc.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<PackageReference Include="Microsoft.ApplicationInsights.WindowsServer.TelemetryChannel" Version="2.22.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Rpc.Core" Version="3.0.37" />
<PackageReference Include="System.IO.FileSystem.Primitives" Version="4.3.0" />
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.435" PrivateAssets="all" />
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.556" PrivateAssets="all" />
<PackageReference Include="System.Threading.Channels" Version="8.0.0" />
<PackageReference Include="Yarp.ReverseProxy" Version="2.0.1" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
<PackageReference Include="Microsoft.IdentityModel.Protocols.OpenIdConnect" Version="$(IdentityDependencyVersion)" />
<PackageReference Include="Microsoft.Security.Utilities" Version="1.3.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.435" PrivateAssets="all" />
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.556" PrivateAssets="all" />
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="$(IdentityDependencyVersion)" />

<!--
Expand Down
21 changes: 16 additions & 5 deletions src/WebJobs.Script/Extensions/ExceptionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Reflection;
using System.Collections.Generic;
using System.Runtime.ExceptionServices;
using System.Runtime.InteropServices;
using System.Threading;
using Microsoft.Azure.WebJobs.Host.Diagnostics;
Expand Down Expand Up @@ -32,12 +33,22 @@ or SEHException

public static string ToFormattedString(this Exception exception)
{
if (exception == null)
ArgumentNullException.ThrowIfNull(exception);
return ExceptionFormatter.GetFormattedException(exception);
}

public static void ThrowIfErrorsPresent(IList<Exception> exceptions, string message = null)
{
switch (exceptions)
{
throw new ArgumentNullException(nameof(exception));
case null or []:
return;
case [Exception e]:
ExceptionDispatchInfo.Capture(e).Throw();
return;
default:
throw new AggregateException(message, exceptions);
}

return ExceptionFormatter.GetFormattedException(exception);
}
}
}
1 change: 0 additions & 1 deletion src/WebJobs.Script/Host/IFunctionMetadataManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using Microsoft.Azure.WebJobs.Script.Description;
using Microsoft.Azure.WebJobs.Script.Workers;
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;

namespace Microsoft.Azure.WebJobs.Script
Expand Down
4 changes: 2 additions & 2 deletions src/WebJobs.Script/Host/IWorkerFunctionMetadataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
namespace Microsoft.Azure.WebJobs.Script
{
/// <summary>
/// Defines an interface for fetching function metadata from Out-of-Proc language workers
/// Defines an interface for fetching function metadata from Out-of-Proc language workers.
/// </summary>
internal interface IWorkerFunctionMetadataProvider
{
ImmutableDictionary<string, ImmutableArray<string>> FunctionErrors { get; }

/// <summary>
/// Attempts to get function metadata from Out-of-Proc language workers
/// Attempts to get function metadata from Out-of-Proc language workers.
/// </summary>
/// <returns>FunctionMetadataResult that either contains the function metadata or indicates that a fall back option for fetching metadata should be used</returns>
Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<RpcWorkerConfig> workerConfigs, bool forceRefresh = false);
Expand Down
6 changes: 3 additions & 3 deletions src/WebJobs.Script/Host/ScriptHostState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Microsoft.Azure.WebJobs.Script
public enum ScriptHostState
{
/// <summary>
/// The host has not yet been created
/// The host has not yet been created.
/// </summary>
Default,

Expand All @@ -28,7 +28,7 @@ public enum ScriptHostState
Running,

/// <summary>
/// The host is in an error state
/// The host is in an error state.
/// </summary>
Error,

Expand All @@ -43,7 +43,7 @@ public enum ScriptHostState
Stopped,

/// <summary>
/// The host is offline
/// The host is offline.
/// </summary>
Offline
}
Expand Down
9 changes: 7 additions & 2 deletions src/WebJobs.Script/Host/WorkerFunctionMetadataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public async Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<R
// forceRefresh will be false when bundle is not used (dotnet and dotnet-isolated).
if (!_environment.IsPlaceholderModeEnabled() && forceRefresh && !_scriptOptions.CurrentValue.IsFileSystemReadOnly)
{
_channelManager.ShutdownChannelsAsync().GetAwaiter().GetResult();
await _channelManager.ShutdownChannelsAsync();
}

var channels = _channelManager.GetChannels(_workerRuntime);
Expand Down Expand Up @@ -108,6 +108,7 @@ public async Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<R
throw new InvalidOperationException($"No initialized language worker channel found for runtime: {_workerRuntime}.");
}

List<Exception> errors = null;
foreach (string workerId in channels.Keys.ToList())
{
if (channels.TryGetValue(workerId, out TaskCompletionSource<IRpcWorkerChannel> languageWorkerChannelTask))
Expand All @@ -130,7 +131,7 @@ public async Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<R
}

_functions = functions.ToImmutableArray();
_logger.FunctionsReturnedByProvider(_functions.IsDefault ? 0 : _functions.Count(), _metadataProviderName);
_logger.FunctionsReturnedByProvider(_functions.Length, _metadataProviderName);

// Validate if the app has functions in legacy format and add in logs to inform about the mixed app
_ = Task.Delay(TimeSpan.FromMinutes(1)).ContinueWith(t => ValidateFunctionAppFormat(_scriptOptions.CurrentValue.ScriptPath, _logger, _environment));
Expand All @@ -141,9 +142,13 @@ public async Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<R
{
_logger.LogWarning(ex, "Removing errored webhost language worker channel for runtime: {workerRuntime} workerId:{workerId}", _workerRuntime, workerId);
await _channelManager.ShutdownChannelIfExistsAsync(_workerRuntime, workerId, ex);
errors ??= [];
errors.Add(ex);
}
}
}

ExceptionExtensions.ThrowIfErrorsPresent(errors, "Errors getting function metadata from workers.");
}

return new FunctionMetadataResult(useDefaultMetadataIndexing: false, _functions);
Expand Down
2 changes: 1 addition & 1 deletion src/WebJobs.Script/WebJobs.Script.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.8.0" />
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.8.1" />
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.435" PrivateAssets="all" />
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.556" PrivateAssets="all" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="8.0.0" />
<PackageReference Include="System.Drawing.Common" Version="8.0.0" />
<PackageReference Include="System.Formats.Asn1" Version="6.0.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Script.Scale;

namespace Microsoft.Azure.WebJobs.Script.Workers
{
Expand All @@ -13,7 +13,9 @@ public interface IWorkerProcess

Process Process { get; }

Task StartProcessAsync();
Task StartProcessAsync(CancellationToken cancellationToken = default);

Task WaitForExitAsync(CancellationToken cancellationToken = default);

void WaitForProcessExitInMilliSeconds(int waitTime);
}
Expand Down
74 changes: 53 additions & 21 deletions src/WebJobs.Script/Workers/ProcessManagement/WorkerProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.IO;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Azure.WebJobs.Logging;
Expand All @@ -33,10 +34,11 @@ internal abstract class WorkerProcess : IWorkerProcess, IDisposable
private readonly IEnvironment _environment;
private readonly IOptionsMonitor<ScriptApplicationHostOptions> _scriptApplicationHostOptions;

private bool _useStdErrorStreamForErrorsOnly;
private Queue<string> _processStdErrDataQueue = new Queue<string>(3);
private readonly object _syncLock = new();
private readonly bool _useStdErrorStreamForErrorsOnly;
private Queue<string> _processStdErrDataQueue = new(3);
private IHostProcessMonitor _processMonitor;
private object _syncLock = new object();
private TaskCompletionSource _processExit; // used to hold custom exceptions on non-success exit.

internal WorkerProcess(IScriptEventManager eventManager, IProcessRegistry processRegistry, ILogger workerProcessLogger, IWorkerConsoleLogSource consoleLogSource, IMetricsLogger metricsLogger,
IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IEnvironment environment, IOptionsMonitor<ScriptApplicationHostOptions> scriptApplicationHostOptions, bool useStdErrStreamForErrorsOnly = false)
Expand Down Expand Up @@ -69,8 +71,9 @@ internal WorkerProcess(IScriptEventManager eventManager, IProcessRegistry proces

internal abstract Process CreateWorkerProcess();

public Task StartProcessAsync()
public Task StartProcessAsync(CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
using (_metricsLogger.LatencyEvent(MetricEventNames.ProcessStart))
{
Process = CreateWorkerProcess();
Expand All @@ -79,6 +82,7 @@ public Task StartProcessAsync()
AssignUserExecutePermissionsIfNotExists();
}

_processExit = new();
try
{
Process.ErrorDataReceived += (sender, e) => OnErrorDataReceived(sender, e);
Expand All @@ -103,12 +107,24 @@ public Task StartProcessAsync()
}
catch (Exception ex)
{
_processExit.TrySetException(ex);
_workerProcessLogger.LogError(ex, $"Failed to start Worker Channel. Process fileName: {Process.StartInfo.FileName}");
return Task.FromException(ex);
}
}
}

public Task WaitForExitAsync(CancellationToken cancellationToken = default)
{
if (_processExit is { } tcs)
{
// We use a TaskCompletionSource (and not Process.WaitForExitAsync) so we can propagate our custom exceptions.
return tcs.Task.WaitAsync(cancellationToken);
}

throw new InvalidOperationException("Process has not been started yet.");
}

private void OnErrorDataReceived(object sender, DataReceivedEventArgs e)
{
if (e.Data != null)
Expand Down Expand Up @@ -159,42 +175,58 @@ private void OnProcessExited(object sender, EventArgs e)

if (Disposing)
{
// No action needed
return;
}

try
{
if (Process.ExitCode == WorkerConstants.SuccessExitCode)
{
Process.WaitForExit();
Process.Close();
}
else if (Process.ExitCode == WorkerConstants.IntentionalRestartExitCode)
ThrowIfExitError();

Process.WaitForExit();
if (Process.ExitCode == WorkerConstants.IntentionalRestartExitCode)
{
HandleWorkerProcessRestart();
}
else
{
string exceptionMessage = string.Join(",", _processStdErrDataQueue.Where(s => !string.IsNullOrEmpty(s)));
string sanitizedExceptionMessage = Sanitizer.Sanitize(exceptionMessage);
var processExitEx = new WorkerProcessExitException($"{Process.StartInfo.FileName} exited with code {Process.ExitCode} (0x{Process.ExitCode.ToString("X")})", new Exception(sanitizedExceptionMessage));
processExitEx.ExitCode = Process.ExitCode;
processExitEx.Pid = Process.Id;
HandleWorkerProcessExitError(processExitEx);
}
}
catch (WorkerProcessExitException processExitEx)
{
_processExit.TrySetException(processExitEx);
HandleWorkerProcessExitError(processExitEx);
}
catch (Exception exc)
{
_workerProcessLogger?.LogDebug(exc, "Exception on worker process exit. Process id: {processId}", Process?.Id);
// ignore process is already disposed
_processExit.TrySetException(exc);
_workerProcessLogger?.LogDebug(exc, "Exception on worker process exit. Process id: {processId}", Process?.Id);
}
finally
{
_processExit.TrySetResult();
UnregisterFromProcessMonitor();
Process.Close();
}
}

private void ThrowIfExitError()
{
if (Process.ExitCode is WorkerConstants.SuccessExitCode or WorkerConstants.IntentionalRestartExitCode)
{
return;
}

string exceptionMessage = string.Join(",", _processStdErrDataQueue.Where(s => !string.IsNullOrEmpty(s)));
string sanitizedExceptionMessage = Sanitizer.Sanitize(exceptionMessage);
WorkerProcessExitException processExitEx = new(
$"{Process.StartInfo.FileName} exited with code {Process.ExitCode} (0x{Process.ExitCode:X})",
new Exception(sanitizedExceptionMessage))
{
ExitCode = Process.ExitCode,
Pid = Process.Id
};

throw processExitEx;
}

private void OnOutputDataReceived(object sender, DataReceivedEventArgs e)
{
if (e.Data != null)
Expand Down
6 changes: 2 additions & 4 deletions src/WebJobs.Script/Workers/Rpc/RpcWorkerProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,12 @@ internal override Process CreateWorkerProcess()

internal override void HandleWorkerProcessExitError(WorkerProcessExitException rpcWorkerProcessExitException)
{
ArgumentNullException.ThrowIfNull(rpcWorkerProcessExitException);
if (Disposing)
{
return;
}
if (rpcWorkerProcessExitException == null)
{
throw new ArgumentNullException(nameof(rpcWorkerProcessExitException));
}

// The subscriber of WorkerErrorEvent is expected to Dispose() the errored channel
_workerProcessLogger.LogError(rpcWorkerProcessExitException, $"Language Worker Process exited. Pid={rpcWorkerProcessExitException.Pid}.", _workerProcessArguments.ExecutablePath);
_eventManager.Publish(new WorkerErrorEvent(_runtime, _workerId, rpcWorkerProcessExitException));
Expand Down
2 changes: 1 addition & 1 deletion test/WebJobs.Script.Tests/WebJobs.Script.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<PackageReference Include="Microsoft.Azure.Functions.PythonWorker" Version="4.35.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.4" />
<PackageReference Include="Moq" Version="4.18.4" />
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.435">
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.556">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down
Loading
Loading