diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.Log.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.Log.cs deleted file mode 100644 index 6a284f1c06..0000000000 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.Log.cs +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using Microsoft.Extensions.Logging; -using static Microsoft.Azure.WebJobs.Script.Grpc.Messages.StreamingMessage; - -namespace Microsoft.Azure.WebJobs.Script.Grpc -{ - internal partial class GrpcWorkerChannel - { - // EventId range is 800-899 - private static class Logger - { - private static readonly Action _channelReceivedMessage = LoggerMessage.Define( - LogLevel.Debug, - new EventId(820, nameof(ChannelReceivedMessage)), - "[channel] received {workerId}: {msgType}"); - - private static readonly Action _invocationResponseReceived = LoggerMessage.Define( - LogLevel.Debug, - new EventId(821, nameof(InvocationResponseReceived)), - "InvocationResponse received for invocation: '{invocationId}'"); - - internal static void ChannelReceivedMessage(ILogger logger, string workerId, ContentOneofCase msgType) => _channelReceivedMessage(logger, workerId, msgType, null); - - internal static void InvocationResponseReceived(ILogger logger, string invocationId) => _invocationResponseReceived(logger, invocationId, null); - } - } -} diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index 83d0d97da7..5914d34ae9 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -2,98 +2,27 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using System.Collections; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Diagnostics; using System.IO; -using System.Linq; using System.Reactive.Linq; -using System.Text; using System.Threading; -using System.Threading.Channels; using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; -using Google.Protobuf.Collections; using Google.Protobuf.WellKnownTypes; -using Microsoft.Azure.WebJobs.Host; -using Microsoft.Azure.WebJobs.Logging; using Microsoft.Azure.WebJobs.Script.Config; -using Microsoft.Azure.WebJobs.Script.Description; using Microsoft.Azure.WebJobs.Script.Diagnostics; -using Microsoft.Azure.WebJobs.Script.Diagnostics.OpenTelemetry; using Microsoft.Azure.WebJobs.Script.Eventing; -using Microsoft.Azure.WebJobs.Script.Extensions; -using Microsoft.Azure.WebJobs.Script.Grpc.Eventing; -using Microsoft.Azure.WebJobs.Script.Grpc.Extensions; using Microsoft.Azure.WebJobs.Script.Grpc.Messages; using Microsoft.Azure.WebJobs.Script.Http; -using Microsoft.Azure.WebJobs.Script.ManagedDependencies; using Microsoft.Azure.WebJobs.Script.Workers; using Microsoft.Azure.WebJobs.Script.Workers.Rpc; using Microsoft.Azure.WebJobs.Script.Workers.SharedMemoryDataTransfer; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using OpenTelemetry; -using static Microsoft.Azure.WebJobs.Script.Grpc.Messages.RpcLog.Types; -using FunctionMetadata = Microsoft.Azure.WebJobs.Script.Description.FunctionMetadata; -using MsgType = Microsoft.Azure.WebJobs.Script.Grpc.Messages.StreamingMessage.ContentOneofCase; -using ParameterBindingType = Microsoft.Azure.WebJobs.Script.Grpc.Messages.ParameterBinding.RpcDataOneofCase; namespace Microsoft.Azure.WebJobs.Script.Grpc { - internal partial class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable + internal class GrpcWorkerChannel : WorkerChannel { - private readonly IScriptEventManager _eventManager; - private readonly IScriptHostManager _scriptHostManager; - private readonly RpcWorkerConfig _workerConfig; - private readonly string _runtime; - private readonly IEnvironment _environment; - private readonly IOptionsMonitor _applicationHostOptions; - private readonly ISharedMemoryManager _sharedMemoryManager; - private readonly List _workerStatusLatencyHistory = new List(); - private readonly IOptions _workerConcurrencyOptions; - private readonly WaitCallback _processInbound; - private readonly IInvocationMessageDispatcherFactory _messageDispatcherFactory; - private readonly object _syncLock = new object(); - private readonly object _metadataLock = new object(); - private readonly Dictionary> _pendingActions = new(); - private readonly ChannelWriter _outbound; - private readonly ChannelReader _inbound; - private readonly IOptions _hostingConfigOptions; - private readonly string _workerInvocationSucccededMetric; - private readonly string _workerInvocationFailedMetric; - private readonly string _workerId; - private IDisposable _functionLoadRequestResponseEvent; - private bool _disposed; - private bool _disposing; - private WorkerInitResponse _initMessage; - private RpcWorkerChannelState _state; - private IDictionary _functionLoadErrors = new ConcurrentDictionary(); - private IDictionary _metadataRequestErrors = new ConcurrentDictionary(); - private ConcurrentDictionary _executingInvocations = new(); - private IDictionary> _functionInputBuffers = new ConcurrentDictionary>(); - private ConcurrentDictionary> _workerStatusRequests = new ConcurrentDictionary>(); - private ConcurrentBag _inputLinks = new ConcurrentBag(); - private List _eventSubscriptions = new List(); - private IDisposable _startLatencyMetric; - private IEnumerable _functions; - private GrpcCapabilities _workerCapabilities; - private ILogger _workerChannelLogger; - private IMetricsLogger _metricsLogger; private IWorkerProcess _rpcWorkerProcess; - private TaskCompletionSource _reloadTask = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - private TaskCompletionSource _workerInitTask = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - private TaskCompletionSource> _functionsIndexingTask = new TaskCompletionSource>(TaskCreationOptions.RunContinuationsAsynchronously); - private TimeSpan _functionLoadTimeout = TimeSpan.FromMinutes(1); - private bool _isSharedMemoryDataTransferEnabled; - private bool _isHandlesInvocationCancelMessageCapabilityEnabled; - private bool _isWorkerApplicationInsightsLoggingEnabled; - private IHttpProxyService _httpProxyService; - private Uri _httpProxyEndpoint; - private System.Timers.Timer _timer; - private bool _functionMetadataRequestSent = false; - private IOptions _scriptHostOptions; internal GrpcWorkerChannel( string workerId, @@ -110,1435 +39,71 @@ internal GrpcWorkerChannel( IOptions workerConcurrencyOptions, IOptions hostingConfigOptions, IHttpProxyService httpProxyService) + : base( + workerId, + eventManager, + hostManager, + workerConfig, + logger, + metricsLogger, + attemptCount, + environment, + applicationHostOptions, + sharedMemoryManager, + workerConcurrencyOptions, + hostingConfigOptions, + httpProxyService) { - _workerId = workerId; - _eventManager = eventManager; - _scriptHostManager = hostManager; - _workerConfig = workerConfig; - _runtime = workerConfig.Description.Language; _rpcWorkerProcess = rpcWorkerProcess; - _workerChannelLogger = logger; - _metricsLogger = metricsLogger; - _environment = environment; - _applicationHostOptions = applicationHostOptions; - _sharedMemoryManager = sharedMemoryManager; - _workerConcurrencyOptions = workerConcurrencyOptions; - _processInbound = state => ProcessItem((InboundGrpcEvent)state); - _hostingConfigOptions = hostingConfigOptions; - _httpProxyService = httpProxyService; - _workerCapabilities = new GrpcCapabilities(_workerChannelLogger); - - if (!_eventManager.TryGetGrpcChannels(workerId, out var inbound, out var outbound)) - { - throw new InvalidOperationException("Could not get gRPC channels for worker ID: " + workerId); - } - - _outbound = outbound.Writer; - _inbound = inbound.Reader; - // note: we don't start the read loop until StartWorkerProcessAsync is called - - _eventSubscriptions.Add(_eventManager.OfType() - .Where(msg => _workerConfig.Description.Extensions.Contains(Path.GetExtension(msg.FileChangeArguments.FullPath))) - .Throttle(TimeSpan.FromMilliseconds(300)) // debounce - .Subscribe(msg => _eventManager.Publish(new HostRestartEvent($"Worker monitored file changed: '{msg.FileChangeArguments.Name}'.")))); - - _startLatencyMetric = metricsLogger?.LatencyEvent(string.Format(MetricEventNames.WorkerInitializeLatency, workerConfig.Description.Language, attemptCount)); - - _state = RpcWorkerChannelState.Default; - - // Temporary switch to allow fully testing new algorithm in production - _messageDispatcherFactory = GetProcessorFactory(); - - _scriptHostManager.ActiveHostChanged += HandleActiveHostChange; - - // Reused metrics strings - _workerInvocationSucccededMetric = string.Format(MetricEventNames.WorkerInvokeSucceeded, Id); - _workerInvocationFailedMetric = string.Format(MetricEventNames.WorkerInvokeFailed, Id); - - LoadScriptJobHostOptions(_scriptHostManager as IServiceProvider); - } - - private bool IsHttpProxyingWorker => _httpProxyEndpoint is not null; - - public string Id => _workerId; - - public IDictionary> FunctionInputBuffers => _functionInputBuffers; - - public IWorkerProcess WorkerProcess => _rpcWorkerProcess; - - public RpcWorkerConfig WorkerConfig => _workerConfig; - - public IOptions JobHostOptions => _scriptHostOptions; - - internal bool IsSharedMemoryDataTransferEnabled => _isSharedMemoryDataTransferEnabled; - - private void HandleActiveHostChange(object sender, ActiveHostChangedEventArgs e) - { - if (e.NewHost is null) - { - return; - } - - LoadScriptJobHostOptions(e.NewHost.Services); - } - - private void LoadScriptJobHostOptions(IServiceProvider provider) - { - if (provider?.GetService(typeof(IOptions)) is IOptions scriptHostOptions) - { - _scriptHostOptions = scriptHostOptions; - } - else - { - _workerChannelLogger.LogDebug("Unable to resolve ScriptJobHostOptions"); - } - } - - // Temporary switch that allows us to move between the "old" ThreadPool-only processor - // and a "new" Channel processor (for proper ordering of messages). - private IInvocationMessageDispatcherFactory GetProcessorFactory() - { - // Ordered invocations is the default, but allow explicit disabling - if (!_hostingConfigOptions.Value.EnableOrderedInvocationMessages || - FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagDisableOrderedInvocationMessages, _environment)) - { - _workerChannelLogger.LogDebug($"Using {nameof(ThreadPoolInvocationProcessorFactory)}."); - return new ThreadPoolInvocationProcessorFactory(_processInbound); - } - else - { - _workerChannelLogger.LogDebug($"Using {nameof(OrderedInvocationMessageDispatcherFactory)}."); - return new OrderedInvocationMessageDispatcherFactory(ProcessItem, _workerChannelLogger); - } - } - - private void ProcessItem(InboundGrpcEvent msg) - { - // note this method is a thread-pool (QueueUserWorkItem) entry-point - try - { - switch (msg.MessageType) - { - case MsgType.RpcLog when msg.Message.RpcLog.LogCategory == RpcLogCategory.System: - SystemLog(msg); - break; - case MsgType.RpcLog: - Log(msg); - break; - case MsgType.WorkerStatusResponse: - ReceiveWorkerStatusResponse(msg.Message.RequestId, msg.Message.WorkerStatusResponse); - break; - case MsgType.InvocationResponse: - _ = InvokeResponse(msg.Message.InvocationResponse); - break; - default: - ProcessRegisteredGrpcCallbacks(msg); - break; - } - } - catch (Exception ex) - { - _workerChannelLogger.LogError(ex, "Error processing InboundGrpcEvent: " + ex.Message); - } - } - - private void ProcessRegisteredGrpcCallbacks(InboundGrpcEvent message) - { - Queue queue; - lock (_pendingActions) - { - if (!_pendingActions.TryGetValue(message.MessageType, out queue)) - { - return; // nothing to do - } - } - PendingItem next; - lock (queue) - { - do - { - if (!queue.TryDequeue(out next)) - { - return; // nothing to do - } - } - while (next.IsComplete); - } - next.SetResult(message); - } - - private void RegisterCallbackForNextGrpcMessage(MsgType messageType, TimeSpan timeout, int count, Action callback, Action faultHandler) - { - Queue queue; - lock (_pendingActions) - { - if (!_pendingActions.TryGetValue(messageType, out queue)) - { - queue = new Queue(); - _pendingActions.Add(messageType, queue); - } - } - - lock (queue) - { - // while we have the lock, discard any dead items (to prevent unbounded growth on stall) - while (queue.TryPeek(out var next) && next.IsComplete) - { - queue.Dequeue(); - } - for (int i = 0; i < count; i++) - { - var newItem = (i == count - 1) && (timeout != TimeSpan.Zero) - ? new PendingItem(callback, faultHandler, timeout) - : new PendingItem(callback, faultHandler); - queue.Enqueue(newItem); - } - } - } - - private async Task ProcessInbound() - { - try - { - await Task.Yield(); // free up the caller - bool debug = _workerChannelLogger.IsEnabled(LogLevel.Debug); - if (debug) - { - _workerChannelLogger.LogDebug("[channel] processing reader loop for worker {0}:", _workerId); - } - while (await _inbound.WaitToReadAsync()) - { - while (_inbound.TryRead(out var msg)) - { - if (debug && msg.MessageType != MsgType.RpcLog) - { - Logger.ChannelReceivedMessage(_workerChannelLogger, msg.WorkerId, msg.MessageType); - } - - DispatchMessage(msg); - } - } - } - catch (Exception ex) - { - _workerChannelLogger.LogError(ex, "Error processing inbound messages"); - } - finally - { - // we're not listening any more! shut down the channels - _eventManager.RemoveGrpcChannels(_workerId); - } - } - - private void DispatchMessage(InboundGrpcEvent msg) - { - // RpcLog and InvocationResponse messages are special. They need to be handled by the InvocationMessageDispatcher - switch (msg.MessageType) - { - case MsgType.RpcLog when msg.Message.RpcLog.LogCategory == RpcLogCategory.User || msg.Message.RpcLog.LogCategory == RpcLogCategory.CustomMetric: - if (_executingInvocations.TryGetValue(msg.Message.RpcLog.InvocationId, out var invocation)) - { - invocation.Dispatcher.DispatchRpcLog(msg); - } - else - { - // We received a log outside of a invocation - ThreadPool.QueueUserWorkItem(_processInbound, msg); - } - break; - case MsgType.InvocationResponse: - if (_executingInvocations.TryGetValue(msg.Message.InvocationResponse.InvocationId, out invocation)) - { - invocation.Dispatcher.DispatchInvocationResponse(msg); - } - else - { - // This should never happen, but if it does, just send it to the ThreadPool. - ThreadPool.QueueUserWorkItem(_processInbound, msg); - } - break; - default: - // All other messages can go to the thread pool. - ThreadPool.QueueUserWorkItem(_processInbound, msg); - break; - } - } - - public bool IsChannelReadyForInvocations() - { - return !_disposing && !_disposed - && _state.HasFlag( - RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized); - } - - public async Task StartWorkerProcessAsync(CancellationToken cancellationToken) - { - RegisterCallbackForNextGrpcMessage( - MsgType.StartStream, - _workerConfig.CountOptions.ProcessStartupTimeout, - count: 1, - SendWorkerInitRequest, - HandleWorkerStartStreamError); - - // note: it is important that the ^^^ StartStream is in place *before* we start process the loop, - // otherwise we get a race condition - _ = ProcessInbound(); - - _workerChannelLogger.LogDebug("Initiating Worker Process start up"); - await _rpcWorkerProcess.StartProcessAsync(cancellationToken); - _state |= RpcWorkerChannelState.Initializing; - Task exited = _rpcWorkerProcess.WaitForExitAsync(cancellationToken); - Task winner = await Task.WhenAny(_workerInitTask.Task, exited).WaitAsync(cancellationToken); - await winner; - - if (winner == exited) - { - // Process exited without throwing. We need to throw to indicate process is not running. - throw new WorkerProcessExitException("Worker process exited before initializing.") - { - ExitCode = await exited, - }; - } - } - - public async Task GetWorkerStatusAsync() - { - var workerStatus = new WorkerStatus(); - - if (!string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.WorkerStatus))) - { - // get the worker's current status - // this will include the OOP worker's channel latency in the request, which can be used upstream - // to make scale decisions - var message = new StreamingMessage - { - RequestId = Guid.NewGuid().ToString(), - WorkerStatusRequest = new WorkerStatusRequest() - }; - - var sw = ValueStopwatch.StartNew(); - var tcs = new TaskCompletionSource(); - if (_workerStatusRequests.TryAdd(message.RequestId, tcs)) - { - await SendStreamingMessageAsync(message); - await tcs.Task; - var elapsed = sw.GetElapsedTime(); - workerStatus.Latency = elapsed; - _workerChannelLogger.LogDebug("[HostMonitor] Worker status request took {totalMs}ms", elapsed.TotalMilliseconds); - } - } - - workerStatus.IsReady = IsChannelReadyForInvocations(); - if (_environment.IsWorkerDynamicConcurrencyEnabled()) - { - workerStatus.LatencyHistory = GetLatencies(); - } - - return workerStatus; - } - - // send capabilities to worker, wait for WorkerInitResponse - internal void SendWorkerInitRequest(GrpcEvent startEvent) - { - _workerChannelLogger.LogDebug("Worker Process started. Received StartStream message"); - RegisterCallbackForNextGrpcMessage(MsgType.WorkerInitResponse, _workerConfig.CountOptions.InitializationTimeout, 1, WorkerInitResponse, HandleWorkerInitError); - - WorkerInitRequest initRequest = GetWorkerInitRequest(); - - // Run as Functions Host V2 compatible - if (_environment.IsV2CompatibilityMode()) - { - _workerChannelLogger.LogDebug("Worker and host running in V2 compatibility mode"); - initRequest.Capabilities.Add(RpcWorkerConstants.V2Compatable, "true"); - } - - if (ScriptHost.IsFunctionDataCacheEnabled) - { - // FunctionDataCache is available from the host side - we send this to the worker. - // As long as the worker replies back with the SharedMemoryDataTransfer capability, the cache - // can be used. - initRequest.Capabilities.Add(RpcWorkerConstants.FunctionDataCache, "true"); - } - - // advertise that we support multiple streams, and hint at a number; with this flag, we allow - // clients to connect multiple back-hauls *with the same workerid*, and rely on the internal - // plumbing to make sure we don't process everything N times - initRequest.Capabilities.Add(RpcWorkerConstants.MultiStream, "10"); // TODO: make this configurable - - SendStreamingMessage(new StreamingMessage - { - WorkerInitRequest = initRequest - }); - } - - internal WorkerInitRequest GetWorkerInitRequest() - { - return new WorkerInitRequest() - { - HostVersion = ScriptHost.Version, - WorkerDirectory = _workerConfig.Description.WorkerDirectory, - FunctionAppDirectory = _applicationHostOptions.CurrentValue.ScriptPath - }; - } - - internal void FunctionEnvironmentReloadResponse(FunctionEnvironmentReloadResponse res, IDisposable latencyEvent) - { - _workerChannelLogger.LogDebug("Received FunctionEnvironmentReloadResponse from WorkerProcess with Pid: '{0}'", _rpcWorkerProcess.Id); - - res.WorkerMetadata?.UpdateWorkerMetadata(_workerConfig); - - Utility.ExecuteAfterColdStartDelay(_environment, () => LogWorkerMetadata(res.WorkerMetadata)); - - _workerConfig.Description.DefaultRuntimeVersion = _workerConfig.Description.DefaultRuntimeVersion ?? res?.WorkerMetadata?.RuntimeVersion; - _workerConfig.Description.DefaultRuntimeName = _workerConfig.Description.DefaultRuntimeName ?? res?.WorkerMetadata?.RuntimeName; - - ApplyCapabilities(res.Capabilities, res.CapabilitiesUpdateStrategy.ToGrpcCapabilitiesUpdateStrategy()); - - if (res.Result.IsFailure(IsUserCodeExceptionCapabilityEnabled(), out var reloadEnvironmentVariablesException)) - { - if (res.Result.Exception is not null && reloadEnvironmentVariablesException is not null) - { - _workerChannelLogger.LogWarning(reloadEnvironmentVariablesException, reloadEnvironmentVariablesException.Message); - } - _reloadTask.SetResult(false); - } - else - { - _reloadTask.SetResult(true); - } - latencyEvent.Dispose(); - } - - internal void WorkerInitResponse(GrpcEvent initEvent) - { - _startLatencyMetric?.Dispose(); - _startLatencyMetric = null; - - _workerChannelLogger.LogDebug("Received WorkerInitResponse. Worker process initialized"); - _initMessage = initEvent.Message.WorkerInitResponse; - _workerChannelLogger.LogDebug("Worker capabilities: {capabilities}", _initMessage.Capabilities); - - _initMessage.WorkerMetadata?.UpdateWorkerMetadata(_workerConfig); - - Utility.ExecuteAfterColdStartDelay(_environment, () => LogWorkerMetadata(_initMessage.WorkerMetadata)); - - _workerConfig.Description.DefaultRuntimeVersion = _workerConfig.Description.DefaultRuntimeVersion ?? _initMessage?.WorkerMetadata?.RuntimeVersion; - _workerConfig.Description.DefaultRuntimeName = _workerConfig.Description.DefaultRuntimeName ?? _initMessage?.WorkerMetadata?.RuntimeName; - - if (_initMessage.Result.IsFailure(out Exception exc)) - { - HandleWorkerInitError(exc); - _workerInitTask.TrySetResult(false); - return; - } - - _state = _state | RpcWorkerChannelState.Initialized; - - ApplyCapabilities(_initMessage.Capabilities); - - _workerInitTask.TrySetResult(true); - } - - private bool IsUserCodeExceptionCapabilityEnabled() - { - var enableUserCodeExceptionCapability = string.Equals( - _workerCapabilities.GetCapabilityState(RpcWorkerConstants.EnableUserCodeException), bool.TrueString, - StringComparison.OrdinalIgnoreCase); - - return enableUserCodeExceptionCapability; - } - - private void LogWorkerMetadata(WorkerMetadata workerMetadata) - { - if (workerMetadata == null) - { - return; - } - - var workerMetadataString = workerMetadata.ToString(); - _metricsLogger.LogEvent(MetricEventNames.WorkerMetadata, functionName: null, Sanitizer.Sanitize(workerMetadataString)); - _workerChannelLogger.LogDebug("Worker metadata: {workerMetadata}", workerMetadataString); - } - - // Allow tests to add capabilities, even if not directly supported by the worker. - internal virtual void UpdateCapabilities(IDictionary fields, GrpcCapabilitiesUpdateStrategy strategy) - { - _workerCapabilities.UpdateCapabilities(fields, strategy); - } - - // Helper method that updates and applies capabilities - // Used at worker initialization and environment reload (placeholder scenarios) - // The default strategy for updating capabilities is merge - internal void ApplyCapabilities(IDictionary capabilities, GrpcCapabilitiesUpdateStrategy strategy = GrpcCapabilitiesUpdateStrategy.Merge) - { - UpdateCapabilities(capabilities, strategy); - - _isSharedMemoryDataTransferEnabled = ResolveSharedTransferEnablementState(_workerCapabilities, _environment, _workerChannelLogger); - _isHandlesInvocationCancelMessageCapabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesInvocationCancelMessage)); - - if (!_isSharedMemoryDataTransferEnabled) - { - // If the worker does not support using shared memory data transfer, caching must also be disabled - ScriptHost.IsFunctionDataCacheEnabled = false; - } - - if (_environment.IsApplicationInsightsAgentEnabled() || - (bool.TryParse(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.WorkerApplicationInsightsLoggingEnabled), out bool appInsightsWorkerEnabled) && - appInsightsWorkerEnabled)) - { - _isWorkerApplicationInsightsLoggingEnabled = true; - } - - if (bool.TryParse(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.WorkerOpenTelemetryEnabled), out bool otelEnabled) && - otelEnabled) - { - ScriptHost.WorkerOpenTelemetryEnabled = true; - } - - // If http proxying is enabled, we need to get the proxying endpoint of this worker - var httpUri = _workerCapabilities.GetCapabilityState(RpcWorkerConstants.HttpUri); - if (!string.IsNullOrEmpty(httpUri)) - { - try - { - _httpProxyEndpoint = new Uri(httpUri); - } - catch (Exception ex) - { - HandleWorkerInitError(ex); - } - } - } - - public void SetupFunctionInvocationBuffers(IEnumerable functions) - { - _functions = functions; - foreach (FunctionMetadata metadata in functions) - { - string functionId = metadata.GetFunctionId(); - _workerChannelLogger.LogDebug("Setting up FunctionInvocationBuffer for function: '{functionName}' with functionId: '{functionId}'", metadata.Name, functionId); - _functionInputBuffers[functionId] = new BufferBlock(); - } - _state |= RpcWorkerChannelState.InvocationBuffersInitialized; - } - - public void SendFunctionLoadRequests(ManagedDependencyOptions managedDependencyOptions, TimeSpan? functionTimeout) - { - if (_functions != null) - { - // Load Request is also sent for disabled function as it is invocable using the portal and admin endpoints - // Loading disabled functions at the end avoids unnecessary performance issues. Refer PR #5072 and commit #38b57883be28524fa6ee67a457fa47e96663094c - _functions = _functions.OrderBy(metadata => metadata.IsDisabled()); - - // Check if the worker supports this feature - bool capabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.SupportsLoadResponseCollection)); - TimeSpan timeout = TimeSpan.Zero; - if (functionTimeout.HasValue) - { - _functionLoadTimeout = functionTimeout.Value > _functionLoadTimeout ? functionTimeout.Value : _functionLoadTimeout; - timeout = _functionLoadTimeout; - } - - var count = _functions.Count(); - if (capabilityEnabled) - { - RegisterCallbackForNextGrpcMessage(MsgType.FunctionLoadResponseCollection, timeout, count, msg => LoadResponse(msg.Message.FunctionLoadResponseCollection), HandleWorkerFunctionLoadError); - - SendFunctionLoadRequestCollection(_functions, managedDependencyOptions); - } - else - { - RegisterCallbackForNextGrpcMessage(MsgType.FunctionLoadResponse, timeout, count, msg => LoadResponse(msg.Message.FunctionLoadResponse), HandleWorkerFunctionLoadError); - - foreach (FunctionMetadata metadata in _functions) - { - SendFunctionLoadRequest(metadata, managedDependencyOptions); - } - } - } - } - - internal void SendFunctionLoadRequestCollection(IEnumerable functions, ManagedDependencyOptions managedDependencyOptions) - { - _functionLoadRequestResponseEvent = _metricsLogger.LatencyEvent(MetricEventNames.FunctionLoadRequestResponse); - - FunctionLoadRequestCollection functionLoadRequestCollection = GetFunctionLoadRequestCollection(functions, managedDependencyOptions); - - _workerChannelLogger.LogDebug("Sending FunctionLoadRequestCollection with number of functions: '{count}'", functionLoadRequestCollection.FunctionLoadRequests.Count); - - // send load requests for the registered functions - SendStreamingMessage(new StreamingMessage - { - FunctionLoadRequestCollection = functionLoadRequestCollection - }); - } - - internal FunctionLoadRequestCollection GetFunctionLoadRequestCollection(IEnumerable functions, ManagedDependencyOptions managedDependencyOptions) - { - var functionLoadRequestCollection = new FunctionLoadRequestCollection(); - - foreach (FunctionMetadata metadata in functions) - { - var functionLoadRequest = GetFunctionLoadRequest(metadata, managedDependencyOptions); - functionLoadRequestCollection.FunctionLoadRequests.Add(functionLoadRequest); - } - - return functionLoadRequestCollection; - } - - public Task SendFunctionEnvironmentReloadRequest() - { - _functionsIndexingTask = new TaskCompletionSource>(TaskCreationOptions.RunContinuationsAsynchronously); - _functionMetadataRequestSent = false; - - _workerChannelLogger.LogDebug("Sending FunctionEnvironmentReloadRequest to WorkerProcess with Pid: '{0}'", _rpcWorkerProcess.Id); - IDisposable latencyEvent = _metricsLogger.LatencyEvent(MetricEventNames.SpecializationEnvironmentReloadRequestResponse); - - RegisterCallbackForNextGrpcMessage(MsgType.FunctionEnvironmentReloadResponse, _workerConfig.CountOptions.EnvironmentReloadTimeout, 1, - msg => FunctionEnvironmentReloadResponse(msg.Message.FunctionEnvironmentReloadResponse, latencyEvent), HandleWorkerEnvReloadError); - - IDictionary processEnv = Environment.GetEnvironmentVariables(); - - FunctionEnvironmentReloadRequest request = GetFunctionEnvironmentReloadRequest(processEnv); - - SendStreamingMessage(new StreamingMessage - { - FunctionEnvironmentReloadRequest = request - }); - - return _reloadTask.Task; - } - - public void SendWorkerWarmupRequest() - { - bool capabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesWorkerWarmupMessage)); - if (!capabilityEnabled) - { - _workerChannelLogger.LogDebug("Worker warmup capability not enabled"); - } - else - { - _workerChannelLogger.LogDebug("Sending WorkerWarmupRequest to WorkerProcess with Pid: '{0}'", _rpcWorkerProcess.Id); - - RegisterCallbackForNextGrpcMessage(MsgType.WorkerWarmupResponse, TimeSpan.FromMinutes(1.5), 1, - msg => ProcessWorkerWarmupResponse(msg.Message.WorkerWarmupResponse), HandleWorkerWarmupError); - - var request = new WorkerWarmupRequest() - { - WorkerDirectory = _workerConfig.Description.WorkerDirectory, - }; - - SendStreamingMessage(new StreamingMessage - { - WorkerWarmupRequest = request - }); - } - } - - internal void ProcessWorkerWarmupResponse(WorkerWarmupResponse response) - { - _workerChannelLogger.LogDebug("Received WorkerWarmupResponse from WorkerProcess with Pid: '{0}'", _rpcWorkerProcess.Id); - if (response.Result.IsFailure(out Exception workerWarmupException)) - { - _workerChannelLogger.LogError(workerWarmupException, "Worker warmup failed"); - } - } - - internal FunctionEnvironmentReloadRequest GetFunctionEnvironmentReloadRequest(IDictionary processEnv) - { - foreach (var pair in _hostingConfigOptions.Value.Features) - { - processEnv[pair.Key] = pair.Value; - } - - FunctionEnvironmentReloadRequest request = new FunctionEnvironmentReloadRequest(); - foreach (DictionaryEntry entry in processEnv) - { - // Do not add environment variables with empty or null values (see issue #4488 for context) - if (!string.IsNullOrEmpty(entry.Value?.ToString())) - { - request.EnvironmentVariables.Add(entry.Key.ToString(), entry.Value.ToString()); - } - } - - string scriptRoot = _applicationHostOptions.CurrentValue.ScriptPath; - request.EnvironmentVariables.TryAdd(WorkerConstants.FunctionsWorkerDirectorySettingName, _workerConfig.Description.WorkerDirectory); - request.EnvironmentVariables.TryAdd(WorkerConstants.FunctionsApplicationDirectorySettingName, scriptRoot); - request.FunctionAppDirectory = scriptRoot; - - return request; - } - - internal void SendFunctionLoadRequest(FunctionMetadata metadata, ManagedDependencyOptions managedDependencyOptions) - { - _functionLoadRequestResponseEvent = _metricsLogger.LatencyEvent(MetricEventNames.FunctionLoadRequestResponse); - - if (_workerChannelLogger.IsEnabled(LogLevel.Debug)) - { - _workerChannelLogger.LogDebug("Sending FunctionLoadRequest for function: '{functionName}' with functionId: '{functionId}'", metadata.Name, metadata.GetFunctionId()); - } - - // send a load request for the registered function - SendStreamingMessage(new StreamingMessage - { - FunctionLoadRequest = GetFunctionLoadRequest(metadata, managedDependencyOptions) - }); - } - - internal FunctionLoadRequest GetFunctionLoadRequest(FunctionMetadata metadata, ManagedDependencyOptions managedDependencyOptions) - { - FunctionLoadRequest request = new FunctionLoadRequest() - { - FunctionId = metadata.GetFunctionId(), - Metadata = new RpcFunctionMetadata() - { - Name = metadata.Name, - Directory = metadata.FunctionDirectory ?? string.Empty, - EntryPoint = metadata.EntryPoint ?? string.Empty, - ScriptFile = metadata.ScriptFile ?? string.Empty, - IsProxy = metadata.IsProxy() - } - }; - - if (managedDependencyOptions != null && managedDependencyOptions.Enabled) - { - _workerChannelLogger?.LogDebug("Adding dependency download request to {language} language worker", _workerConfig.Description.Language); - request.ManagedDependencyEnabled = managedDependencyOptions.Enabled; - } - - foreach (var binding in metadata.Bindings) - { - BindingInfo bindingInfo = binding.ToBindingInfo(); - - request.Metadata.Bindings.Add(binding.Name, bindingInfo); - - if (binding.SupportsDeferredBinding() && !binding.SkipDeferredBinding()) - { - _metricsLogger.LogEvent(MetricEventNames.FunctionBindingDeferred, functionName: Sanitizer.Sanitize(metadata.Name)); - } - } - - foreach (var property in metadata.Properties) - { - // worker properties are expected to be string values - request.Metadata.Properties.Add(property.Key, property.Value?.ToString()); - } - - return request; - } - - internal void LoadResponse(FunctionLoadResponse loadResponse) - { - _functionLoadRequestResponseEvent?.Dispose(); - string functionName = _functions.SingleOrDefault(m => m.GetFunctionId().Equals(loadResponse.FunctionId, StringComparison.OrdinalIgnoreCase))?.Name; - _workerChannelLogger.LogDebug("Received FunctionLoadResponse for function: '{functionName}' with functionId: '{functionId}'.", functionName, loadResponse.FunctionId); - if (loadResponse.Result.IsFailure(out Exception functionLoadEx)) - { - if (functionLoadEx == null) - { - _workerChannelLogger?.LogError("Worker failed to to load function: '{functionName}' with functionId: '{functionId}'. Function load exception is not set by the worker.", functionName, loadResponse.FunctionId); - } - else - { - _workerChannelLogger?.LogError(functionLoadEx, "Worker failed to load function: '{functionName}' with functionId: '{functionId}'.", functionName, loadResponse.FunctionId); - } - //Cache function load errors to replay error messages on invoking failed functions - _functionLoadErrors[loadResponse.FunctionId] = functionLoadEx; - } - - if (loadResponse.IsDependencyDownloaded) - { - _workerChannelLogger?.LogDebug("Managed dependency successfully downloaded by the {workerLanguage} language worker", _workerConfig.Description.Language); - } - - // link the invocation inputs to the invoke call - var invokeBlock = new ActionBlock(async ctx => await SendInvocationRequest(ctx)); - // associate the invocation input buffer with the function - var disposableLink = _functionInputBuffers[loadResponse.FunctionId].LinkTo(invokeBlock); - _inputLinks.Add(disposableLink); - } - - internal void LoadResponse(FunctionLoadResponseCollection loadResponseCollection) - { - _workerChannelLogger.LogDebug("Received FunctionLoadResponseCollection with number of functions: '{count}'.", loadResponseCollection.FunctionLoadResponses.Count); - - foreach (FunctionLoadResponse loadResponse in loadResponseCollection.FunctionLoadResponses) - { - LoadResponse(loadResponse); - } - } - - internal async Task SendInvocationRequest(ScriptInvocationContext context) - { - try - { - string invocationId = context.ExecutionContext.InvocationId.ToString(); - string functionId = context.FunctionMetadata.GetFunctionId(); - - // do not send an invocation request for functions that failed to load or could not be indexed by the worker - if (_functionLoadErrors.TryGetValue(functionId, out Exception exception)) - { - _workerChannelLogger.LogDebug("Function {functionName} failed to load", context.FunctionMetadata.Name); - context.SetException(exception); - RemoveExecutingInvocation(invocationId); - return; - } - else if (_metadataRequestErrors.TryGetValue(functionId, out exception)) - { - _workerChannelLogger.LogDebug("Worker failed to load metadata for {functionName}", context.FunctionMetadata.Name); - context.SetException(exception); - RemoveExecutingInvocation(invocationId); - return; - } - - if (context.CancellationToken.IsCancellationRequested) - { - _workerChannelLogger.LogDebug("Cancellation was requested prior to the invocation request ('{invocationId}') being sent to the worker.", invocationId); - - // If the worker does not support handling InvocationCancel grpc messages, or if cancellation is supported and the customer opts-out - // of sending cancelled invocations to the worker, we will cancel the result source and not send the invocation to the worker. - if (!_isHandlesInvocationCancelMessageCapabilityEnabled || !JobHostOptions.Value.SendCanceledInvocationsToWorker) - { - _workerChannelLogger.LogInformation("Cancelling invocation '{invocationId}' due to cancellation token being signaled. " - + "This invocation was not sent to the worker. Read more about this here: https://aka.ms/azure-functions-cancellations", invocationId); - - // This will result in an invocation failure with a "FunctionInvocationCanceled" exception. - context.ResultSource.TrySetCanceled(); - return; - } - } - - var invocationRequest = await context.ToRpcInvocationRequest(_workerChannelLogger, _workerCapabilities, _isSharedMemoryDataTransferEnabled, _sharedMemoryManager); - AddAdditionalTraceContext(invocationRequest, context); - _executingInvocations.TryAdd(invocationRequest.InvocationId, new(context, _messageDispatcherFactory.Create(invocationRequest.InvocationId))); - _metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvoked, Id), functionName: Sanitizer.Sanitize(context.FunctionMetadata.Name)); - - // If the worker supports HTTP proxying, ensure this request is forwarded prior - // to sending the invocation request to the worker, as this will ensure the context - // is properly set up. - if (IsHttpProxyingWorker && context.FunctionMetadata.IsHttpTriggerFunction()) - { - _httpProxyService.StartForwarding(context, _httpProxyEndpoint); - } - - await SendStreamingMessageAsync(new StreamingMessage - { - InvocationRequest = invocationRequest - }); - - if (_isHandlesInvocationCancelMessageCapabilityEnabled) - { - var cancellationCtr = context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId)); - context.Properties.Add(ScriptConstants.CancellationTokenRegistration, cancellationCtr); - } - } - catch (Exception invokeEx) - { - context.SetException(invokeEx); - } - } - - internal void SendInvocationCancel(string invocationId) - { - _workerChannelLogger.LogDebug("Sending InvocationCancel request for invocation: '{invocationId}'", invocationId); - - var invocationCancel = new InvocationCancel - { - InvocationId = invocationId - }; - - SendStreamingMessage(new StreamingMessage - { - InvocationCancel = invocationCancel - }); - } - - // gets metadata from worker - public Task> GetFunctionMetadata() - { - return SendFunctionMetadataRequest(); - } - - internal Task> SendFunctionMetadataRequest() - { - _workerChannelLogger.LogDebug("Fetching worker metadata, FunctionMetadataReceived set to: {functionMetadataReceived}", _functionMetadataRequestSent); - if (!_functionMetadataRequestSent) - { - lock (_metadataLock) - { - if (!_functionMetadataRequestSent) - { - RegisterCallbackForNextGrpcMessage(MsgType.FunctionMetadataResponse, _functionLoadTimeout, 1, - msg => ProcessFunctionMetadataResponses(msg.Message.FunctionMetadataResponse), HandleWorkerMetadataRequestError); - - _workerChannelLogger.LogDebug("Sending WorkerMetadataRequest to {language} worker with worker ID {workerID}", _runtime, _workerId); - - // sends the function app directory path to worker for indexing - SendStreamingMessage(new StreamingMessage - { - FunctionsMetadataRequest = new FunctionsMetadataRequest() - { - FunctionAppDirectory = _applicationHostOptions.CurrentValue.ScriptPath - } - }); - - _functionMetadataRequestSent = true; - } - } - } - - return _functionsIndexingTask.Task; - } - - // parse metadata response into RawFunctionMetadata objects for AggregateFunctionMetadataProvider to further parse and validate - internal void ProcessFunctionMetadataResponses(FunctionMetadataResponse functionMetadataResponse) - { - _workerChannelLogger.LogDebug("Received the worker function metadata response from worker {worker_id}", _workerId); - - if (functionMetadataResponse.Result.IsFailure(out Exception metadataResponseEx)) - { - _workerChannelLogger?.LogError(metadataResponseEx, "Worker failed to index functions"); - } - - var functions = new List(); - - if (functionMetadataResponse.UseDefaultMetadataIndexing == false) - { - foreach (var metadata in functionMetadataResponse.FunctionMetadataResults) - { - if (metadata == null) - { - continue; - } - if (metadata.Status != null && metadata.Status.IsFailure(out Exception metadataRequestEx)) - { - _workerChannelLogger.LogError("Worker failed to index function {functionId}", metadata.FunctionId); - _metadataRequestErrors[metadata.FunctionId] = metadataRequestEx; - } - - var functionMetadata = new FunctionMetadata() - { - FunctionDirectory = metadata.Directory, - ScriptFile = metadata.ScriptFile, - EntryPoint = metadata.EntryPoint, - Name = metadata.Name, - Language = metadata.Language - }; - - if (metadata.RetryOptions is not null) - { - functionMetadata.Retry = new RetryOptions - { - MaxRetryCount = metadata.RetryOptions.MaxRetryCount, - Strategy = metadata.RetryOptions.RetryStrategy.ToRetryStrategy() - }; - - if (functionMetadata.Retry.Strategy is RetryStrategy.FixedDelay) - { - functionMetadata.Retry.DelayInterval = metadata.RetryOptions.DelayInterval?.ToTimeSpan(); - } - else - { - functionMetadata.Retry.MinimumInterval = metadata.RetryOptions.MinimumInterval?.ToTimeSpan(); - functionMetadata.Retry.MaximumInterval = metadata.RetryOptions.MaximumInterval?.ToTimeSpan(); - } - } - - functionMetadata.SetFunctionId(metadata.FunctionId); - - foreach (var property in metadata.Properties) - { - if (!functionMetadata.Properties.TryAdd(property.Key, property.Value?.ToString())) - { - _workerChannelLogger?.LogDebug("{metadataPropertyKey} is already a part of metadata properties for {functionId}", property.Key, metadata.FunctionId); - } - } - - var bindings = new List(); - foreach (string binding in metadata.RawBindings) - { - bindings.Add(binding); - } - - functions.Add(new RawFunctionMetadata() - { - Metadata = functionMetadata, - Bindings = bindings, - UseDefaultMetadataIndexing = functionMetadataResponse.UseDefaultMetadataIndexing - }); - } - } - else - { - functions.Add(new RawFunctionMetadata() - { - UseDefaultMetadataIndexing = functionMetadataResponse.UseDefaultMetadataIndexing - }); - } - - // set it as task result because we cannot directly return from SendWorkerMetadataRequest - _functionsIndexingTask.SetResult(functions); - } - - private async Task GetBindingDataAsync(ParameterBinding binding, string invocationId) - { - switch (binding.RpcDataCase) - { - case ParameterBindingType.RpcSharedMemory: - // Data was transferred by the worker using shared memory - return await binding.RpcSharedMemory.ToObjectAsync(_workerChannelLogger, invocationId, _sharedMemoryManager, ScriptHost.IsFunctionDataCacheEnabled); - case ParameterBindingType.Data: - // Data was transferred by the worker using RPC - return binding.Data.ToObject(); - case ParameterBindingType.None: - return null; - default: - throw new InvalidOperationException($"Unknown ParameterBindingType of type {binding.RpcDataCase}"); - } - } - - /// - /// From the output data produced by the worker, get a list of the shared memory maps that were created for this invocation. - /// - /// List of produced by the worker as output. - /// List of names of shared memory maps produced by the worker. - private IList GetOutputMaps(IList bindings) - { - IList outputMaps = new List(); - foreach (ParameterBinding binding in bindings) - { - if (binding.RpcSharedMemory != null) - { - outputMaps.Add(binding.RpcSharedMemory.Name); - } - } - - return outputMaps; - } - - internal async Task InvokeResponse(InvocationResponse invokeResponse) - { - Logger.InvocationResponseReceived(_workerChannelLogger, invokeResponse.InvocationId); - - // Check if the worker supports logging user-code-thrown exceptions to app insights - bool userCodeExceptionHandlingEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.EnableUserCodeException)); - - if (_executingInvocations.TryRemove(invokeResponse.InvocationId, out var invocation)) - { - var context = invocation.Context; - - if (context.Properties.TryGetValue(ScriptConstants.CancellationTokenRegistration, out CancellationTokenRegistration ctr)) - { - await ctr.DisposeAsync(); - context.Properties.Remove(ScriptConstants.CancellationTokenRegistration); - } - - try - { - if (IsHttpProxyingWorker && context.FunctionMetadata.IsHttpTriggerFunction()) - { - await _httpProxyService.EnsureSuccessfulForwardingAsync(context); - } - - LogSharedMemoryUsage(invokeResponse); - AddWorkerTraceAttributes(invokeResponse, context); - - if (invokeResponse.Result.IsInvocationSuccess()) - { - ScriptInvocationResult result = await CreateScriptInvocationResult(invokeResponse); - context.ResultSource.SetResult(result); - - _metricsLogger.LogEvent(_workerInvocationSucccededMetric); - } - else - { - var rpcException = invokeResponse.Result.GetRpcException(userCodeExceptionHandlingEnabled); - context.SetException(rpcException); - - _metricsLogger.LogEvent(_workerInvocationFailedMetric); - } - } - catch (Exception exc) - { - context.SetException(exc); - } - finally - { - TryCloseSharedMemoryResources(invokeResponse); - - invocation.Dispose(); - } - } - } - - private async Task CreateScriptInvocationResult(InvocationResponse invokeResponse) - { - IDictionary bindingsDictionary = await invokeResponse.OutputData - .ToDictionaryAsync(binding => binding.Name, binding => GetBindingDataAsync(binding, invokeResponse.InvocationId)); - - var result = new ScriptInvocationResult() - { - Outputs = bindingsDictionary, - Return = invokeResponse?.ReturnValue?.ToObject() - }; - - return result; - } - - private void TryCloseSharedMemoryResources(InvocationResponse invokeResponse) - { - try - { - if (!_isSharedMemoryDataTransferEnabled) - { - return; - } - - // Free memory allocated by the host (for input bindings) which was needed only for the duration of this invocation - if (!_sharedMemoryManager.TryFreeSharedMemoryMapsForInvocation(invokeResponse.InvocationId)) - { - _workerChannelLogger.LogWarning("Cannot free all shared memory resources for invocation: {invocationId}", invokeResponse.InvocationId); - } - - // List of shared memory maps that were produced by the worker (for output bindings) - IList outputMaps = GetOutputMaps(invokeResponse.OutputData); - if (outputMaps.Count > 0) - { - // If this invocation was using any shared memory maps produced by the worker, close them to free memory - SendCloseSharedMemoryResourcesForInvocationRequest(outputMaps); - } - } - catch (Exception exc) - { - _workerChannelLogger.LogWarning(exc, "Failed to cleanup shared memory resources"); - } - } - - private void LogSharedMemoryUsage(InvocationResponse invokeResponse) - { - // Return early if debug logging is not enabled. - if (!_isSharedMemoryDataTransferEnabled - || !_workerChannelLogger.IsEnabled(LogLevel.Debug)) - { - return; - } - - StringBuilder sharedMemoryLogBuilder = null; - - foreach (ParameterBinding binding in invokeResponse.OutputData) - { - switch (binding.RpcDataCase) - { - case ParameterBindingType.RpcSharedMemory: - sharedMemoryLogBuilder ??= new StringBuilder(); - sharedMemoryLogBuilder.AppendFormat("{0}:{1},", binding.Name, binding.RpcSharedMemory.Count); - break; - default: - break; - } - } - - if (sharedMemoryLogBuilder != null) - { - _workerChannelLogger.LogDebug("Shared memory usage for response of invocation '{invocationId}' is {SharedMemoryUsage}", invokeResponse.InvocationId, sharedMemoryLogBuilder); - } - } - - /// - /// Request to free memory allocated by the worker (for output bindings). - /// - /// List of names of shared memory maps to close from the worker. - internal void SendCloseSharedMemoryResourcesForInvocationRequest(IList outputMaps) - { - // Request the worker to drop its references to any shared memory maps that it had produced. - // This is because the host has read them (or holds a reference to them if caching is enabled.) - // The worker will not delete the resources allocated for the memory maps; it will only drop its reference - // so that the worker process does not prevent the OS from freeing the memory maps when the host attempts - // to free them (either right away after reading them or if caching is enabled, then when the cache decides - // to evict that object based on its eviction policy). - CloseSharedMemoryResourcesRequest closeSharedMemoryResourcesRequest = new CloseSharedMemoryResourcesRequest(); - closeSharedMemoryResourcesRequest.MapNames.AddRange(outputMaps); - - SendStreamingMessage(new StreamingMessage() - { - CloseSharedMemoryResourcesRequest = closeSharedMemoryResourcesRequest - }); - } - - internal void Log(GrpcEvent msg) - { - var rpcLog = msg.Message.RpcLog; - if (_executingInvocations.TryGetValue(rpcLog.InvocationId, out var invocation)) - { - var context = invocation.Context; - - // Restore the execution context from the original invocation. This allows AsyncLocal state to flow to loggers. - System.Threading.ExecutionContext.Run(context.AsyncExecutionContext, static (state) => - { - var stateTuple = ((ScriptInvocationContext Context, RpcLog RpcLog, bool AppInsightsEnabledOnWorker))state; - - var rpcLog = stateTuple.RpcLog; - LogLevel logLevel = (LogLevel)rpcLog.Level; - - var context = stateTuple.Context; - - if (rpcLog.LogCategory == RpcLogCategory.CustomMetric) - { - if (rpcLog.PropertiesMap.TryGetValue(LogConstants.NameKey, out var metricName) - && rpcLog.PropertiesMap.TryGetValue(LogConstants.MetricValueKey, out var metricValue)) - { - // Strip off the name/value entries in the dictionary passed to Log Message and include the rest as the property bag passed to the backing ILogger - var rpcLogProperties = rpcLog.PropertiesMap - .Where(i => i.Key != LogConstants.NameKey && i.Key != LogConstants.MetricValueKey) - .ToDictionary(i => i.Key, i => i.Value.ToObject()); - context.Logger.LogMetric(metricName.String, metricValue.Double, rpcLogProperties); - } - } - else - { - try - { - WorkerTraceFilterTelemetryProcessor.FilterApplicationInsightsFromWorker.Value = stateTuple.AppInsightsEnabledOnWorker; - - if (rpcLog.Exception != null) - { - // TODO fix RpcException catch all https://github.com/Azure/azure-functions-dotnet-worker/issues/370 - var exception = new Workers.Rpc.RpcException(rpcLog.Message, rpcLog.Exception.Message, rpcLog.Exception.StackTrace); - context.Logger.Log(logLevel, new EventId(0, rpcLog.EventId), rpcLog.Message, exception, (state, exc) => state); - } - else - { - context.Logger.Log(logLevel, new EventId(0, rpcLog.EventId), rpcLog.Message, null, (state, exc) => state); - } - } - finally - { - WorkerTraceFilterTelemetryProcessor.FilterApplicationInsightsFromWorker.Value = false; - } - } - }, (context, rpcLog, _isWorkerApplicationInsightsLoggingEnabled)); - } - } - - internal void SystemLog(GrpcEvent msg) - { - RpcLog systemLog = msg.Message.RpcLog; - LogLevel logLevel = (LogLevel)systemLog.Level; - switch (logLevel) - { - case LogLevel.Warning: - _workerChannelLogger.LogWarning(systemLog.Message); - break; - - case LogLevel.Information: - _workerChannelLogger.LogInformation(systemLog.Message); - break; - - case LogLevel.Error: - { - if (systemLog.Exception != null) - { - Workers.Rpc.RpcException exception = new Workers.Rpc.RpcException(systemLog.Message, systemLog.Exception.Message, systemLog.Exception.StackTrace); - _workerChannelLogger.LogError(exception, systemLog.Message); - } - else - { - _workerChannelLogger.LogError(systemLog.Message); - } - } - break; - - default: - _workerChannelLogger.LogInformation(systemLog.Message); - break; - } - } - - internal void HandleWorkerStartStreamError(Exception exc) - { - _workerChannelLogger.LogError(exc, "Starting worker process failed"); - PublishWorkerErrorEvent(exc); - } - - internal void HandleWorkerEnvReloadError(Exception exc) - { - _workerChannelLogger.LogError(exc, "Reloading environment variables failed"); - _reloadTask.SetException(exc); - } - - internal void HandleWorkerInitError(Exception exc) - { - _workerChannelLogger.LogError(exc, "Initializing worker process failed"); - PublishWorkerErrorEvent(exc); - } - - internal void HandleWorkerFunctionLoadError(Exception exc) - { - _workerChannelLogger.LogError(exc, "Loading function failed."); - if (_disposing || _disposed) - { - return; - } - _eventManager.Publish(new WorkerErrorEvent(_runtime, Id, exc)); + EventSubscriptions.Add(EventManager.OfType() + .Where(msg => WorkerConfig.Description.Extensions.Contains(Path.GetExtension(msg.FileChangeArguments.FullPath))) + .Throttle(TimeSpan.FromMilliseconds(300)) // debounce + .Subscribe(msg => EventManager.Publish(new HostRestartEvent($"Worker monitored file changed: '{msg.FileChangeArguments.Name}'.")))); } - private void PublishWorkerErrorEvent(Exception exc) - { - _workerInitTask.TrySetException(exc); - if (_disposing || _disposed) - { - return; - } - _eventManager.Publish(new WorkerErrorEvent(_runtime, Id, exc)); - } + public override IWorkerProcess WorkerProcess => _rpcWorkerProcess; - internal void HandleWorkerMetadataRequestError(Exception exc) - { - _workerChannelLogger.LogError(exc, "Requesting metadata from worker failed."); - if (_disposing || _disposed) - { - return; - } - _eventManager.Publish(new WorkerErrorEvent(_runtime, Id, exc)); - } + protected override int WorkerProcessId => _rpcWorkerProcess.Id; - private void HandleWorkerWarmupError(Exception exc) + public override async Task StartWorkerProcessAsync(CancellationToken cancellationToken) { - _workerChannelLogger.LogError(exc, "Worker warmup failed"); - } + BeginInboundProcessing(WorkerConfig.CountOptions.ProcessStartupTimeout); - private ValueTask SendStreamingMessageAsync(StreamingMessage msg) - { - var evt = new OutboundGrpcEvent(_workerId, msg); - return _outbound.TryWrite(evt) ? default : _outbound.WriteAsync(evt); - } + WorkerChannelLogger.LogDebug("Initiating Worker Process start up"); + await _rpcWorkerProcess.StartProcessAsync(cancellationToken); + State |= RpcWorkerChannelState.Initializing; + Task exited = _rpcWorkerProcess.WaitForExitAsync(cancellationToken); + Task winner = await Task.WhenAny(WorkerInitTask.Task, exited).WaitAsync(cancellationToken); + await winner; - private void SendStreamingMessage(StreamingMessage msg) - { - var evt = new OutboundGrpcEvent(_workerId, msg); - if (!_outbound.TryWrite(evt)) - { - var pending = _outbound.WriteAsync(evt); - if (pending.IsCompleted) - { - try - { - pending.GetAwaiter().GetResult(); // ensure observed to ensure the IValueTaskSource completed/result is consumed - } - catch - { - // suppress failure - } - } - else - { - _ = ObserveEventually(pending); - } - } - static async Task ObserveEventually(ValueTask valueTask) + if (winner == exited) { - try - { - await valueTask; - } - catch + throw new WorkerProcessExitException("Worker process exited before initializing.") { - // no where to log - } - } - } - - internal void ReceiveWorkerStatusResponse(string requestId, WorkerStatusResponse response) - { - if (_workerStatusRequests.TryRemove(requestId, out var workerStatusTask)) - { - workerStatusTask.SetResult(true); + ExitCode = await exited, + }; } } - private void RemoveExecutingInvocation(string invocationId) + protected override void Dispose(bool disposing) { - if (_executingInvocations.TryRemove(invocationId, out var invocation)) + if (disposing) { - invocation.Dispose(); + StopWorkerProcess(); } - } - - protected virtual void Dispose(bool disposing) - { - if (!_disposed) - { - if (disposing) - { - foreach (var id in _executingInvocations.Keys) - { - RemoveExecutingInvocation(id); - } - - _startLatencyMetric?.Dispose(); - _workerInitTask?.TrySetCanceled(); - _timer?.Dispose(); - _scriptHostManager.ActiveHostChanged -= HandleActiveHostChange; - - // unlink function inputs - if (_inputLinks is not null) - { - foreach (var link in _inputLinks) - { - if (link is null) - { - // This log is temporarily added for diagnostic purposes. - _workerChannelLogger.LogDebug("An input link is null. Skipping disposal."); - } - - link?.Dispose(); - } - } - else - { - // This log is temporarily added for diagnostic purposes. - _workerChannelLogger.LogDebug("The input links collection is null. Skipping disposal of any individual input links."); - } - - (_rpcWorkerProcess as IDisposable)?.Dispose(); - - if (_eventSubscriptions is not null) - { - foreach (var sub in _eventSubscriptions) - { - if (sub is null) - { - // This log is temporarily added for diagnostic purposes. - _workerChannelLogger.LogDebug("An event subscription is null. Skipping disposal."); - } - - sub?.Dispose(); - } - } - else - { - // This log is temporarily added for diagnostic purposes. - _workerChannelLogger.LogDebug("The event subscriptions collection is null. Skipping disposal of any individual subscriptions."); - } - // shut down the channels - _eventManager.RemoveGrpcChannels(_workerId); - } - _disposed = true; - } + base.Dispose(disposing); } - public void Dispose() + protected override void DisposeWorkerResources() { - StopWorkerProcess(); - _disposing = true; - Dispose(true); + (_rpcWorkerProcess as IDisposable)?.Dispose(); } private void StopWorkerProcess() { - bool capabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesWorkerTerminateMessage)); + bool capabilityEnabled = !string.IsNullOrEmpty(WorkerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesWorkerTerminateMessage)); if (!capabilityEnabled) { return; @@ -1551,7 +116,7 @@ private void StopWorkerProcess() GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(gracePeriod)) }; - _workerChannelLogger.LogDebug("Sending WorkerTerminate message with grace period of {gracePeriod} seconds.", gracePeriod); + WorkerChannelLogger.LogDebug("Sending WorkerTerminate message with grace period of {gracePeriod} seconds.", gracePeriod); SendStreamingMessage(new StreamingMessage { @@ -1560,301 +125,5 @@ private void StopWorkerProcess() WorkerProcess.WaitForProcessExitInMilliSeconds(gracePeriod * 1000); } - - public async Task DrainInvocationsAsync() - { - _workerChannelLogger.LogDebug("Count of in-buffer invocations waiting to be drained out: {invocationCount}", _executingInvocations.Count); - foreach (var invocation in _executingInvocations.Values) - { - await invocation.Context.ResultSource.Task; - } - } - - public bool IsExecutingInvocation(string invocationId) - { - return _executingInvocations.ContainsKey(invocationId); - } - - public void Shutdown(Exception workerException) - { - var shutdownException = workerException; - - if (workerException is null || workerException is FunctionTimeoutException) - { - string exceptionMessageSuffix = workerException?.Message is not null ? " Reason: " + workerException.Message : string.Empty; - shutdownException = new FunctionTimeoutAbortException("Worker channel is shutting down. Aborting function." + exceptionMessageSuffix, workerException); - } - - foreach (var invocation in _executingInvocations?.Values) - { - string invocationId = invocation.Context?.ExecutionContext?.InvocationId.ToString(); - _workerChannelLogger.LogDebug("Worker '{workerId}' encountered a fatal error. Failing invocation: '{invocationId}'", _workerId, invocationId); - invocation.Context?.SetException(shutdownException); - RemoveExecutingInvocation(invocationId); - } - } - - /// - /// Determine if shared memory transfer is enabled. - /// The following conditions must be met: - /// 1) must be set in environment variable (AppSetting). - /// 2) Worker must have the capability . - /// - /// if shared memory data transfer is enabled, otherwise. - internal static bool ResolveSharedTransferEnablementState(GrpcCapabilities workerCapabilities, IEnvironment environment, ILogger logger) - { - // Check if the environment variable (AppSetting) has this feature enabled - string envVal = environment.GetEnvironmentVariable(RpcWorkerConstants.FunctionsWorkerSharedMemoryDataTransferEnabledSettingName); - if (string.IsNullOrEmpty(envVal)) - { - return false; - } - - bool envValEnabled = false; - if (bool.TryParse(envVal, out bool boolResult)) - { - // Check if value was specified as a bool (true/false) - envValEnabled = boolResult; - } - else if (int.TryParse(envVal, out int intResult) && intResult == 1) - { - // Check if value was specified as an int (1/0) - envValEnabled = true; - } - - if (!envValEnabled) - { - return false; - } - - // Check if the worker supports this feature - bool capabilityEnabled = !string.IsNullOrEmpty(workerCapabilities.GetCapabilityState(RpcWorkerConstants.SharedMemoryDataTransfer)); - logger.LogDebug("IsSharedMemoryDataTransferEnabled: {SharedMemoryDataTransferEnabled}", capabilityEnabled); - - return capabilityEnabled; - } - - internal void EnsureTimerStarted() - { - if (_environment.IsWorkerDynamicConcurrencyEnabled()) - { - lock (_syncLock) - { - if (_timer == null) - { - _timer = new System.Timers.Timer() - { - AutoReset = false, - Interval = _workerConcurrencyOptions.Value.CheckInterval.TotalMilliseconds, - }; - - _timer.Elapsed += OnTimer; - _timer.Start(); - } - } - } - } - - internal IEnumerable GetLatencies() - { - EnsureTimerStarted(); - lock (_syncLock) - { - return _workerStatusLatencyHistory.ToArray(); - } - } - - internal async void OnTimer(object sender, System.Timers.ElapsedEventArgs e) - { - if (_disposed) - { - return; - } - - try - { - WorkerStatus workerStatus = await GetWorkerStatusAsync(); - AddSample(_workerStatusLatencyHistory, workerStatus.Latency); - } - catch - { - // Don't allow background exceptions to escape - // E.g. when a rpc channel is shutting down we can process exceptions - } - try - { - _timer.Start(); - } - catch (ObjectDisposedException) - { - // Specifically ignore this race - we're exiting and that's okay - } - } - - private void AddSample(List samples, T sample) - { - lock (_syncLock) - { - if (samples.Count == _workerConcurrencyOptions.Value.HistorySize) - { - samples.RemoveAt(0); - } - samples.Add(sample); - } - } - - private void AddAdditionalTraceContext(InvocationRequest invocationRequest, ScriptInvocationContext context) - { - MapField attributes = invocationRequest.TraceContext.Attributes; - bool isOtelEnabled = _scriptHostOptions?.Value.TelemetryMode == TelemetryMode.OpenTelemetry; - - if (context.FunctionMetadata.Properties.TryGetValue(ScriptConstants.LogPropertyHostInstanceIdKey, out var hostInstanceIdValue)) - { - string id = Convert.ToString(hostInstanceIdValue); - - if (isOtelEnabled) - { - Activity.Current?.AddTag(ResourceSemanticConventions.FaaSInstance, id); - } - - attributes[ScriptConstants.LogPropertyHostInstanceIdKey] = id; - } - - attributes[ScriptConstants.LogPropertyProcessIdKey] = Convert.ToString(_rpcWorkerProcess.Id); - attributes[ScriptConstants.OperationNameKey] = context.FunctionMetadata.Name; - string sessionId = Activity.Current?.GetBaggageItem(ScriptConstants.LiveLogsSessionAIKey); - if (!string.IsNullOrEmpty(sessionId)) - { - attributes[ScriptConstants.LiveLogsSessionAIKey] = sessionId; - } - - if (context.FunctionMetadata.Properties.TryGetValue(LogConstants.CategoryNameKey, out var categoryNameValue)) - { - attributes[LogConstants.CategoryNameKey] = Convert.ToString(categoryNameValue); - } - - if (isOtelEnabled) - { - Activity.Current?.AddTag(ResourceSemanticConventions.FaaSName, context.FunctionMetadata.Name); - Activity.Current?.AddTag(ResourceSemanticConventions.FaaSInvocationId, invocationRequest.InvocationId); - - foreach (var b in Baggage.Current) - { - invocationRequest.TraceContext.Baggage[b.Key] = b.Value; - } - } - } - - private void AddWorkerTraceAttributes(InvocationResponse invocationResponse, ScriptInvocationContext context) - { - var attributes = invocationResponse.TraceContextAttributes; - if (attributes is null || attributes.Count == 0) - { - return; - } - - if (context.AsyncExecutionContext is null) - { - return; - } - - System.Threading.ExecutionContext.Run(context.AsyncExecutionContext, static state => - { - var attrs = (IDictionary)state; - foreach (var kvp in attrs) - { - // this will override any existing tags with the same key - Activity.Current?.AddTag(kvp.Key, kvp.Value); - } - }, attributes); - } - - private sealed class ExecutingInvocation : IDisposable - { - public ExecutingInvocation(ScriptInvocationContext context, IInvocationMessageDispatcher dispatcher) - { - Context = context; - Dispatcher = dispatcher; - } - - public ScriptInvocationContext Context { get; } - - public IInvocationMessageDispatcher Dispatcher { get; } - - public void Dispose() - { - (Dispatcher as IDisposable)?.Dispose(); - } - } - - private sealed class PendingItem - { - private readonly Action _callback; - private readonly Action _faultHandler; - private CancellationTokenRegistration _ctr; - private int _state; - - public PendingItem(Action callback, Action faultHandler) - { - _callback = callback; - _faultHandler = faultHandler; - } - - public PendingItem(Action callback, Action faultHandler, TimeSpan timeout) - : this(callback, faultHandler) - { - var cts = new CancellationTokenSource(); - cts.CancelAfter(timeout); - _ctr = cts.Token.Register(static state => ((PendingItem)state).OnTimeout(), this); - } - - public bool IsComplete => Volatile.Read(ref _state) != 0; - - private bool MakeComplete() => Interlocked.CompareExchange(ref _state, 1, 0) == 0; - - public void SetResult(InboundGrpcEvent message) - { - _ctr.Dispose(); - _ctr = default; - if (MakeComplete() && _callback != null) - { - try - { - _callback.Invoke(message); - } - catch (Exception fault) - { - try - { - _faultHandler?.Invoke(fault); - } - catch - { - } - } - } - } - - private void OnTimeout() - { - if (MakeComplete() && _faultHandler != null) - { - try - { - throw new TimeoutException(); - } - catch (Exception timeout) - { - try - { - _faultHandler(timeout); - } - catch - { - } - } - } - } - } } } \ No newline at end of file diff --git a/src/WebJobs.Script.Grpc/Channel/WorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/WorkerChannel.cs new file mode 100644 index 0000000000..66eb99875b --- /dev/null +++ b/src/WebJobs.Script.Grpc/Channel/WorkerChannel.cs @@ -0,0 +1,1842 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using Google.Protobuf.Collections; +using Microsoft.Azure.WebJobs.Host; +using Microsoft.Azure.WebJobs.Logging; +using Microsoft.Azure.WebJobs.Script.Config; +using Microsoft.Azure.WebJobs.Script.Description; +using Microsoft.Azure.WebJobs.Script.Diagnostics; +using Microsoft.Azure.WebJobs.Script.Diagnostics.OpenTelemetry; +using Microsoft.Azure.WebJobs.Script.Eventing; +using Microsoft.Azure.WebJobs.Script.Extensions; +using Microsoft.Azure.WebJobs.Script.Grpc.Eventing; +using Microsoft.Azure.WebJobs.Script.Grpc.Extensions; +using Microsoft.Azure.WebJobs.Script.Grpc.Messages; +using Microsoft.Azure.WebJobs.Script.Http; +using Microsoft.Azure.WebJobs.Script.ManagedDependencies; +using Microsoft.Azure.WebJobs.Script.Workers; +using Microsoft.Azure.WebJobs.Script.Workers.Rpc; +using Microsoft.Azure.WebJobs.Script.Workers.SharedMemoryDataTransfer; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using OpenTelemetry; +using static Microsoft.Azure.WebJobs.Script.Grpc.Messages.RpcLog.Types; +using FunctionMetadata = Microsoft.Azure.WebJobs.Script.Description.FunctionMetadata; +using MsgType = Microsoft.Azure.WebJobs.Script.Grpc.Messages.StreamingMessage.ContentOneofCase; +using ParameterBindingType = Microsoft.Azure.WebJobs.Script.Grpc.Messages.ParameterBinding.RpcDataOneofCase; + +namespace Microsoft.Azure.WebJobs.Script.Grpc +{ + internal abstract partial class WorkerChannel : IRpcWorkerChannel, IDisposable + { + private readonly IScriptEventManager _eventManager; + private readonly RpcWorkerConfig _workerConfig; + private readonly string _workerId; + private readonly IOptions _hostingConfigOptions; + private readonly IScriptHostManager _scriptHostManager; + private readonly string _runtime; + private readonly IEnvironment _environment; + private readonly IOptionsMonitor _applicationHostOptions; + private readonly ISharedMemoryManager _sharedMemoryManager; + private readonly List _workerStatusLatencyHistory = new List(); + private readonly IOptions _workerConcurrencyOptions; + private readonly WaitCallback _processInbound; + private readonly IInvocationMessageDispatcherFactory _messageDispatcherFactory; + private readonly object _syncLock = new object(); + private readonly object _metadataLock = new object(); + private readonly Dictionary> _pendingActions = new(); + private readonly ChannelWriter _outbound; + private readonly ChannelReader _inbound; + private readonly string _workerInvocationSucccededMetric; + private readonly string _workerInvocationFailedMetric; + private RpcWorkerChannelState _state; + private TaskCompletionSource _workerInitTask = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + private GrpcCapabilities _workerCapabilities; + private ILogger _workerChannelLogger; + private IMetricsLogger _metricsLogger; + private List _eventSubscriptions = new List(); + private ConcurrentBag _inputLinks = new ConcurrentBag(); + private bool _disposing; + private bool _disposed; + private IDisposable _functionLoadRequestResponseEvent; + private WorkerInitResponse _initMessage; + private IDictionary _functionLoadErrors = new ConcurrentDictionary(); + private IDictionary _metadataRequestErrors = new ConcurrentDictionary(); + private ConcurrentDictionary _executingInvocations = new(); + private IDictionary> _functionInputBuffers = new ConcurrentDictionary>(); + private ConcurrentDictionary> _workerStatusRequests = new ConcurrentDictionary>(); + private IDisposable _startLatencyMetric; + private IEnumerable _functions; + private TaskCompletionSource _reloadTask = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + private TaskCompletionSource> _functionsIndexingTask = new TaskCompletionSource>(TaskCreationOptions.RunContinuationsAsynchronously); + private TimeSpan _functionLoadTimeout = TimeSpan.FromMinutes(1); + private bool _isSharedMemoryDataTransferEnabled; + private bool _isHandlesInvocationCancelMessageCapabilityEnabled; + private bool _isWorkerApplicationInsightsLoggingEnabled; + private IHttpProxyService _httpProxyService; + private Uri _httpProxyEndpoint; + private System.Timers.Timer _timer; + private bool _functionMetadataRequestSent = false; + private IOptions _scriptHostOptions; + + internal WorkerChannel( + string workerId, + IScriptEventManager eventManager, + IScriptHostManager hostManager, + RpcWorkerConfig workerConfig, + ILogger logger, + IMetricsLogger metricsLogger, + int attemptCount, + IEnvironment environment, + IOptionsMonitor applicationHostOptions, + ISharedMemoryManager sharedMemoryManager, + IOptions workerConcurrencyOptions, + IOptions hostingConfigOptions, + IHttpProxyService httpProxyService) + { + _workerId = workerId; + _eventManager = eventManager; + _scriptHostManager = hostManager; + _workerConfig = workerConfig; + _runtime = workerConfig.Description.Language; + _workerChannelLogger = logger; + _metricsLogger = metricsLogger; + _environment = environment; + _applicationHostOptions = applicationHostOptions; + _sharedMemoryManager = sharedMemoryManager; + _workerConcurrencyOptions = workerConcurrencyOptions; + _processInbound = state => ProcessItem((InboundGrpcEvent)state); + _hostingConfigOptions = hostingConfigOptions; + + _httpProxyService = httpProxyService; + _workerCapabilities = new GrpcCapabilities(_workerChannelLogger); + + if (!_eventManager.TryGetGrpcChannels(workerId, out var inbound, out var outbound)) + { + throw new InvalidOperationException("Could not get gRPC channels for worker ID: " + workerId); + } + + _outbound = outbound.Writer; + _inbound = inbound.Reader; + // note: we don't start the read loop until StartWorkerProcessAsync is called + + _startLatencyMetric = metricsLogger?.LatencyEvent(string.Format(MetricEventNames.WorkerInitializeLatency, workerConfig.Description.Language, attemptCount)); + + _state = RpcWorkerChannelState.Default; + + // Temporary switch to allow fully testing new algorithm in production + _messageDispatcherFactory = GetProcessorFactory(); + + _scriptHostManager.ActiveHostChanged += HandleActiveHostChange; + + // Reused metrics strings + _workerInvocationSucccededMetric = string.Format(MetricEventNames.WorkerInvokeSucceeded, Id); + _workerInvocationFailedMetric = string.Format(MetricEventNames.WorkerInvokeFailed, Id); + + LoadScriptJobHostOptions(_scriptHostManager as IServiceProvider); + } + + protected virtual int WorkerProcessId => -1; + + public virtual IWorkerProcess WorkerProcess => null; + + private bool IsHttpProxyingWorker => _httpProxyEndpoint is not null; + + public string Id => _workerId; + + public IDictionary> FunctionInputBuffers => _functionInputBuffers; + + public RpcWorkerConfig WorkerConfig => _workerConfig; + + public IOptions JobHostOptions => _scriptHostOptions; + + internal bool IsSharedMemoryDataTransferEnabled => _isSharedMemoryDataTransferEnabled; + + protected IScriptEventManager EventManager => _eventManager; + + protected ILogger WorkerChannelLogger => _workerChannelLogger; + + protected GrpcCapabilities WorkerCapabilities => _workerCapabilities; + + protected TaskCompletionSource WorkerInitTask => _workerInitTask; + + protected List EventSubscriptions => _eventSubscriptions; + + protected RpcWorkerChannelState State + { + get => _state; + set => _state = value; + } + + protected virtual void DisposeWorkerResources() { } + + public abstract Task StartWorkerProcessAsync(CancellationToken cancellationToken); + + protected void BeginInboundProcessing(TimeSpan startStreamTimeout) + { + RegisterCallbackForNextGrpcMessage( + MsgType.StartStream, startStreamTimeout, count: 1, + SendWorkerInitRequest, HandleWorkerStartStreamError); + _ = ProcessInbound(); + } + + private void HandleActiveHostChange(object sender, ActiveHostChangedEventArgs e) + { + if (e.NewHost is null) + { + return; + } + + LoadScriptJobHostOptions(e.NewHost.Services); + } + + private void LoadScriptJobHostOptions(IServiceProvider provider) + { + if (provider?.GetService(typeof(IOptions)) is IOptions scriptHostOptions) + { + _scriptHostOptions = scriptHostOptions; + } + else + { + _workerChannelLogger.LogDebug("Unable to resolve ScriptJobHostOptions"); + } + } + + // Temporary switch that allows us to move between the "old" ThreadPool-only processor + // and a "new" Channel processor (for proper ordering of messages). + private IInvocationMessageDispatcherFactory GetProcessorFactory() + { + // Ordered invocations is the default, but allow explicit disabling + if (!_hostingConfigOptions.Value.EnableOrderedInvocationMessages || + FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagDisableOrderedInvocationMessages, _environment)) + { + _workerChannelLogger.LogDebug($"Using {nameof(ThreadPoolInvocationProcessorFactory)}."); + return new ThreadPoolInvocationProcessorFactory(_processInbound); + } + else + { + _workerChannelLogger.LogDebug($"Using {nameof(OrderedInvocationMessageDispatcherFactory)}."); + return new OrderedInvocationMessageDispatcherFactory(ProcessItem, _workerChannelLogger); + } + } + + private void ProcessItem(InboundGrpcEvent msg) + { + // note this method is a thread-pool (QueueUserWorkItem) entry-point + try + { + switch (msg.MessageType) + { + case MsgType.RpcLog when msg.Message.RpcLog.LogCategory == RpcLogCategory.System: + SystemLog(msg); + break; + case MsgType.RpcLog: + Log(msg); + break; + case MsgType.WorkerStatusResponse: + ReceiveWorkerStatusResponse(msg.Message.RequestId, msg.Message.WorkerStatusResponse); + break; + case MsgType.InvocationResponse: + _ = InvokeResponse(msg.Message.InvocationResponse); + break; + default: + ProcessRegisteredGrpcCallbacks(msg); + break; + } + } + catch (Exception ex) + { + _workerChannelLogger.LogError(ex, "Error processing InboundGrpcEvent: " + ex.Message); + } + } + + private void ProcessRegisteredGrpcCallbacks(InboundGrpcEvent message) + { + Queue queue; + lock (_pendingActions) + { + if (!_pendingActions.TryGetValue(message.MessageType, out queue)) + { + return; // nothing to do + } + } + PendingItem next; + lock (queue) + { + do + { + if (!queue.TryDequeue(out next)) + { + return; // nothing to do + } + } + while (next.IsComplete); + } + next.SetResult(message); + } + + private void RegisterCallbackForNextGrpcMessage(MsgType messageType, TimeSpan timeout, int count, Action callback, Action faultHandler) + { + Queue queue; + lock (_pendingActions) + { + if (!_pendingActions.TryGetValue(messageType, out queue)) + { + queue = new Queue(); + _pendingActions.Add(messageType, queue); + } + } + + lock (queue) + { + // while we have the lock, discard any dead items (to prevent unbounded growth on stall) + while (queue.TryPeek(out var next) && next.IsComplete) + { + queue.Dequeue(); + } + for (int i = 0; i < count; i++) + { + var newItem = (i == count - 1) && (timeout != TimeSpan.Zero) + ? new PendingItem(callback, faultHandler, timeout) + : new PendingItem(callback, faultHandler); + queue.Enqueue(newItem); + } + } + } + + private async Task ProcessInbound() + { + try + { + await Task.Yield(); // free up the caller + bool debug = _workerChannelLogger.IsEnabled(LogLevel.Debug); + if (debug) + { + _workerChannelLogger.LogDebug("[channel] processing reader loop for worker {0}:", _workerId); + } + while (await _inbound.WaitToReadAsync()) + { + while (_inbound.TryRead(out var msg)) + { + if (debug && msg.MessageType != MsgType.RpcLog) + { + Logger.ChannelReceivedMessage(_workerChannelLogger, msg.WorkerId, msg.MessageType); + } + + DispatchMessage(msg); + } + } + } + catch (Exception ex) + { + _workerChannelLogger.LogError(ex, "Error processing inbound messages"); + } + finally + { + // we're not listening any more! shut down the channels + _eventManager.RemoveGrpcChannels(_workerId); + } + } + + private void DispatchMessage(InboundGrpcEvent msg) + { + // RpcLog and InvocationResponse messages are special. They need to be handled by the InvocationMessageDispatcher + switch (msg.MessageType) + { + case MsgType.RpcLog when msg.Message.RpcLog.LogCategory == RpcLogCategory.User || msg.Message.RpcLog.LogCategory == RpcLogCategory.CustomMetric: + if (_executingInvocations.TryGetValue(msg.Message.RpcLog.InvocationId, out var invocation)) + { + invocation.Dispatcher.DispatchRpcLog(msg); + } + else + { + // We received a log outside of a invocation + ThreadPool.QueueUserWorkItem(_processInbound, msg); + } + break; + case MsgType.InvocationResponse: + if (_executingInvocations.TryGetValue(msg.Message.InvocationResponse.InvocationId, out invocation)) + { + invocation.Dispatcher.DispatchInvocationResponse(msg); + } + else + { + // This should never happen, but if it does, just send it to the ThreadPool. + ThreadPool.QueueUserWorkItem(_processInbound, msg); + } + break; + default: + // All other messages can go to the thread pool. + ThreadPool.QueueUserWorkItem(_processInbound, msg); + break; + } + } + + public bool IsChannelReadyForInvocations() + { + return !_disposing && !_disposed + && _state.HasFlag( + RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized); + } + + public async Task GetWorkerStatusAsync() + { + var workerStatus = new WorkerStatus(); + + if (!string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.WorkerStatus))) + { + // get the worker's current status + // this will include the OOP worker's channel latency in the request, which can be used upstream + // to make scale decisions + var message = new StreamingMessage + { + RequestId = Guid.NewGuid().ToString(), + WorkerStatusRequest = new WorkerStatusRequest() + }; + + var sw = ValueStopwatch.StartNew(); + var tcs = new TaskCompletionSource(); + if (_workerStatusRequests.TryAdd(message.RequestId, tcs)) + { + await SendStreamingMessageAsync(message); + await tcs.Task; + var elapsed = sw.GetElapsedTime(); + workerStatus.Latency = elapsed; + _workerChannelLogger.LogDebug("[HostMonitor] Worker status request took {totalMs}ms", elapsed.TotalMilliseconds); + } + } + + workerStatus.IsReady = IsChannelReadyForInvocations(); + if (_environment.IsWorkerDynamicConcurrencyEnabled()) + { + workerStatus.LatencyHistory = GetLatencies(); + } + + return workerStatus; + } + + // send capabilities to worker, wait for WorkerInitResponse + internal void SendWorkerInitRequest(GrpcEvent startEvent) + { + _workerChannelLogger.LogDebug("Worker Process started. Received StartStream message"); + RegisterCallbackForNextGrpcMessage(MsgType.WorkerInitResponse, _workerConfig.CountOptions.InitializationTimeout, 1, WorkerInitResponse, HandleWorkerInitError); + + WorkerInitRequest initRequest = GetWorkerInitRequest(); + + // Run as Functions Host V2 compatible + if (_environment.IsV2CompatibilityMode()) + { + _workerChannelLogger.LogDebug("Worker and host running in V2 compatibility mode"); + initRequest.Capabilities.Add(RpcWorkerConstants.V2Compatable, "true"); + } + + if (ScriptHost.IsFunctionDataCacheEnabled) + { + // FunctionDataCache is available from the host side - we send this to the worker. + // As long as the worker replies back with the SharedMemoryDataTransfer capability, the cache + // can be used. + initRequest.Capabilities.Add(RpcWorkerConstants.FunctionDataCache, "true"); + } + + // advertise that we support multiple streams, and hint at a number; with this flag, we allow + // clients to connect multiple back-hauls *with the same workerid*, and rely on the internal + // plumbing to make sure we don't process everything N times + initRequest.Capabilities.Add(RpcWorkerConstants.MultiStream, "10"); // TODO: make this configurable + + SendStreamingMessage(new StreamingMessage + { + WorkerInitRequest = initRequest + }); + } + + internal WorkerInitRequest GetWorkerInitRequest() + { + return new WorkerInitRequest() + { + HostVersion = ScriptHost.Version, + WorkerDirectory = _workerConfig.Description.WorkerDirectory, + FunctionAppDirectory = _applicationHostOptions.CurrentValue.ScriptPath + }; + } + + internal void FunctionEnvironmentReloadResponse(FunctionEnvironmentReloadResponse res, IDisposable latencyEvent) + { + _workerChannelLogger.LogDebug("Received FunctionEnvironmentReloadResponse from WorkerProcess with Pid: '{0}'", WorkerProcessId); + + res.WorkerMetadata?.UpdateWorkerMetadata(_workerConfig); + + Utility.ExecuteAfterColdStartDelay(_environment, () => LogWorkerMetadata(res.WorkerMetadata)); + + _workerConfig.Description.DefaultRuntimeVersion = _workerConfig.Description.DefaultRuntimeVersion ?? res?.WorkerMetadata?.RuntimeVersion; + _workerConfig.Description.DefaultRuntimeName = _workerConfig.Description.DefaultRuntimeName ?? res?.WorkerMetadata?.RuntimeName; + + ApplyCapabilities(res.Capabilities, res.CapabilitiesUpdateStrategy.ToGrpcCapabilitiesUpdateStrategy()); + + if (res.Result.IsFailure(IsUserCodeExceptionCapabilityEnabled(), out var reloadEnvironmentVariablesException)) + { + if (res.Result.Exception is not null && reloadEnvironmentVariablesException is not null) + { + _workerChannelLogger.LogWarning(reloadEnvironmentVariablesException, reloadEnvironmentVariablesException.Message); + } + _reloadTask.SetResult(false); + } + else + { + _reloadTask.SetResult(true); + } + latencyEvent.Dispose(); + } + + internal void WorkerInitResponse(GrpcEvent initEvent) + { + _startLatencyMetric?.Dispose(); + _startLatencyMetric = null; + + _workerChannelLogger.LogDebug("Received WorkerInitResponse. Worker process initialized"); + _initMessage = initEvent.Message.WorkerInitResponse; + _workerChannelLogger.LogDebug("Worker capabilities: {capabilities}", _initMessage.Capabilities); + + _initMessage.WorkerMetadata?.UpdateWorkerMetadata(_workerConfig); + + Utility.ExecuteAfterColdStartDelay(_environment, () => LogWorkerMetadata(_initMessage.WorkerMetadata)); + + _workerConfig.Description.DefaultRuntimeVersion = _workerConfig.Description.DefaultRuntimeVersion ?? _initMessage?.WorkerMetadata?.RuntimeVersion; + _workerConfig.Description.DefaultRuntimeName = _workerConfig.Description.DefaultRuntimeName ?? _initMessage?.WorkerMetadata?.RuntimeName; + + if (_initMessage.Result.IsFailure(out Exception exc)) + { + HandleWorkerInitError(exc); + _workerInitTask.TrySetResult(false); + return; + } + + _state = _state | RpcWorkerChannelState.Initialized; + + ApplyCapabilities(_initMessage.Capabilities); + + _workerInitTask.TrySetResult(true); + } + + private bool IsUserCodeExceptionCapabilityEnabled() + { + var enableUserCodeExceptionCapability = string.Equals( + _workerCapabilities.GetCapabilityState(RpcWorkerConstants.EnableUserCodeException), bool.TrueString, + StringComparison.OrdinalIgnoreCase); + + return enableUserCodeExceptionCapability; + } + + private void LogWorkerMetadata(WorkerMetadata workerMetadata) + { + if (workerMetadata == null) + { + return; + } + + var workerMetadataString = workerMetadata.ToString(); + _metricsLogger.LogEvent(MetricEventNames.WorkerMetadata, functionName: null, Sanitizer.Sanitize(workerMetadataString)); + _workerChannelLogger.LogDebug("Worker metadata: {workerMetadata}", workerMetadataString); + } + + // Allow tests to add capabilities, even if not directly supported by the worker. + internal virtual void UpdateCapabilities(IDictionary fields, GrpcCapabilitiesUpdateStrategy strategy) + { + _workerCapabilities.UpdateCapabilities(fields, strategy); + } + + // Helper method that updates and applies capabilities + // Used at worker initialization and environment reload (placeholder scenarios) + // The default strategy for updating capabilities is merge + internal void ApplyCapabilities(IDictionary capabilities, GrpcCapabilitiesUpdateStrategy strategy = GrpcCapabilitiesUpdateStrategy.Merge) + { + UpdateCapabilities(capabilities, strategy); + + _isSharedMemoryDataTransferEnabled = ResolveSharedTransferEnablementState(_workerCapabilities, _environment, _workerChannelLogger); + _isHandlesInvocationCancelMessageCapabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesInvocationCancelMessage)); + + if (!_isSharedMemoryDataTransferEnabled) + { + // If the worker does not support using shared memory data transfer, caching must also be disabled + ScriptHost.IsFunctionDataCacheEnabled = false; + } + + if (_environment.IsApplicationInsightsAgentEnabled() || + (bool.TryParse(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.WorkerApplicationInsightsLoggingEnabled), out bool appInsightsWorkerEnabled) && + appInsightsWorkerEnabled)) + { + _isWorkerApplicationInsightsLoggingEnabled = true; + } + + if (bool.TryParse(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.WorkerOpenTelemetryEnabled), out bool otelEnabled) && + otelEnabled) + { + ScriptHost.WorkerOpenTelemetryEnabled = true; + } + + // If http proxying is enabled, we need to get the proxying endpoint of this worker + var httpUri = _workerCapabilities.GetCapabilityState(RpcWorkerConstants.HttpUri); + if (!string.IsNullOrEmpty(httpUri)) + { + try + { + _httpProxyEndpoint = new Uri(httpUri); + } + catch (Exception ex) + { + HandleWorkerInitError(ex); + } + } + } + + public void SetupFunctionInvocationBuffers(IEnumerable functions) + { + _functions = functions; + foreach (FunctionMetadata metadata in functions) + { + string functionId = metadata.GetFunctionId(); + _workerChannelLogger.LogDebug("Setting up FunctionInvocationBuffer for function: '{functionName}' with functionId: '{functionId}'", metadata.Name, functionId); + _functionInputBuffers[functionId] = new BufferBlock(); + } + _state |= RpcWorkerChannelState.InvocationBuffersInitialized; + } + + public void SendFunctionLoadRequests(ManagedDependencyOptions managedDependencyOptions, TimeSpan? functionTimeout) + { + if (_functions != null) + { + // Load Request is also sent for disabled function as it is invocable using the portal and admin endpoints + // Loading disabled functions at the end avoids unnecessary performance issues. Refer PR #5072 and commit #38b57883be28524fa6ee67a457fa47e96663094c + _functions = _functions.OrderBy(metadata => metadata.IsDisabled()); + + // Check if the worker supports this feature + bool capabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.SupportsLoadResponseCollection)); + TimeSpan timeout = TimeSpan.Zero; + if (functionTimeout.HasValue) + { + _functionLoadTimeout = functionTimeout.Value > _functionLoadTimeout ? functionTimeout.Value : _functionLoadTimeout; + timeout = _functionLoadTimeout; + } + + var count = _functions.Count(); + if (capabilityEnabled) + { + RegisterCallbackForNextGrpcMessage(MsgType.FunctionLoadResponseCollection, timeout, count, msg => LoadResponse(msg.Message.FunctionLoadResponseCollection), HandleWorkerFunctionLoadError); + + SendFunctionLoadRequestCollection(_functions, managedDependencyOptions); + } + else + { + RegisterCallbackForNextGrpcMessage(MsgType.FunctionLoadResponse, timeout, count, msg => LoadResponse(msg.Message.FunctionLoadResponse), HandleWorkerFunctionLoadError); + + foreach (FunctionMetadata metadata in _functions) + { + SendFunctionLoadRequest(metadata, managedDependencyOptions); + } + } + } + } + + internal void SendFunctionLoadRequestCollection(IEnumerable functions, ManagedDependencyOptions managedDependencyOptions) + { + _functionLoadRequestResponseEvent = _metricsLogger.LatencyEvent(MetricEventNames.FunctionLoadRequestResponse); + + FunctionLoadRequestCollection functionLoadRequestCollection = GetFunctionLoadRequestCollection(functions, managedDependencyOptions); + + _workerChannelLogger.LogDebug("Sending FunctionLoadRequestCollection with number of functions: '{count}'", functionLoadRequestCollection.FunctionLoadRequests.Count); + + // send load requests for the registered functions + SendStreamingMessage(new StreamingMessage + { + FunctionLoadRequestCollection = functionLoadRequestCollection + }); + } + + internal FunctionLoadRequestCollection GetFunctionLoadRequestCollection(IEnumerable functions, ManagedDependencyOptions managedDependencyOptions) + { + var functionLoadRequestCollection = new FunctionLoadRequestCollection(); + + foreach (FunctionMetadata metadata in functions) + { + var functionLoadRequest = GetFunctionLoadRequest(metadata, managedDependencyOptions); + functionLoadRequestCollection.FunctionLoadRequests.Add(functionLoadRequest); + } + + return functionLoadRequestCollection; + } + + public Task SendFunctionEnvironmentReloadRequest() + { + _functionsIndexingTask = new TaskCompletionSource>(TaskCreationOptions.RunContinuationsAsynchronously); + _functionMetadataRequestSent = false; + + _workerChannelLogger.LogDebug("Sending FunctionEnvironmentReloadRequest to WorkerProcess with Pid: '{0}'", WorkerProcessId); + IDisposable latencyEvent = _metricsLogger.LatencyEvent(MetricEventNames.SpecializationEnvironmentReloadRequestResponse); + + RegisterCallbackForNextGrpcMessage(MsgType.FunctionEnvironmentReloadResponse, _workerConfig.CountOptions.EnvironmentReloadTimeout, 1, + msg => FunctionEnvironmentReloadResponse(msg.Message.FunctionEnvironmentReloadResponse, latencyEvent), HandleWorkerEnvReloadError); + + IDictionary processEnv = Environment.GetEnvironmentVariables(); + + FunctionEnvironmentReloadRequest request = GetFunctionEnvironmentReloadRequest(processEnv); + + SendStreamingMessage(new StreamingMessage + { + FunctionEnvironmentReloadRequest = request + }); + + return _reloadTask.Task; + } + + public void SendWorkerWarmupRequest() + { + bool capabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesWorkerWarmupMessage)); + if (!capabilityEnabled) + { + _workerChannelLogger.LogDebug("Worker warmup capability not enabled"); + } + else + { + _workerChannelLogger.LogDebug("Sending WorkerWarmupRequest to WorkerProcess with Pid: '{0}'", WorkerProcessId); + + RegisterCallbackForNextGrpcMessage(MsgType.WorkerWarmupResponse, TimeSpan.FromMinutes(1.5), 1, + msg => ProcessWorkerWarmupResponse(msg.Message.WorkerWarmupResponse), HandleWorkerWarmupError); + + var request = new WorkerWarmupRequest() + { + WorkerDirectory = _workerConfig.Description.WorkerDirectory, + }; + + SendStreamingMessage(new StreamingMessage + { + WorkerWarmupRequest = request + }); + } + } + + internal void ProcessWorkerWarmupResponse(WorkerWarmupResponse response) + { + _workerChannelLogger.LogDebug("Received WorkerWarmupResponse from WorkerProcess with Pid: '{0}'", WorkerProcessId); + if (response.Result.IsFailure(out Exception workerWarmupException)) + { + _workerChannelLogger.LogError(workerWarmupException, "Worker warmup failed"); + } + } + + internal FunctionEnvironmentReloadRequest GetFunctionEnvironmentReloadRequest(IDictionary processEnv) + { + foreach (var pair in _hostingConfigOptions.Value.Features) + { + processEnv[pair.Key] = pair.Value; + } + + FunctionEnvironmentReloadRequest request = new FunctionEnvironmentReloadRequest(); + foreach (DictionaryEntry entry in processEnv) + { + // Do not add environment variables with empty or null values (see issue #4488 for context) + if (!string.IsNullOrEmpty(entry.Value?.ToString())) + { + request.EnvironmentVariables.Add(entry.Key.ToString(), entry.Value.ToString()); + } + } + + string scriptRoot = _applicationHostOptions.CurrentValue.ScriptPath; + request.EnvironmentVariables.TryAdd(WorkerConstants.FunctionsWorkerDirectorySettingName, _workerConfig.Description.WorkerDirectory); + request.EnvironmentVariables.TryAdd(WorkerConstants.FunctionsApplicationDirectorySettingName, scriptRoot); + request.FunctionAppDirectory = scriptRoot; + + return request; + } + + internal void SendFunctionLoadRequest(FunctionMetadata metadata, ManagedDependencyOptions managedDependencyOptions) + { + _functionLoadRequestResponseEvent = _metricsLogger.LatencyEvent(MetricEventNames.FunctionLoadRequestResponse); + + if (_workerChannelLogger.IsEnabled(LogLevel.Debug)) + { + _workerChannelLogger.LogDebug("Sending FunctionLoadRequest for function: '{functionName}' with functionId: '{functionId}'", metadata.Name, metadata.GetFunctionId()); + } + + // send a load request for the registered function + SendStreamingMessage(new StreamingMessage + { + FunctionLoadRequest = GetFunctionLoadRequest(metadata, managedDependencyOptions) + }); + } + + internal FunctionLoadRequest GetFunctionLoadRequest(FunctionMetadata metadata, ManagedDependencyOptions managedDependencyOptions) + { + FunctionLoadRequest request = new FunctionLoadRequest() + { + FunctionId = metadata.GetFunctionId(), + Metadata = new RpcFunctionMetadata() + { + Name = metadata.Name, + Directory = metadata.FunctionDirectory ?? string.Empty, + EntryPoint = metadata.EntryPoint ?? string.Empty, + ScriptFile = metadata.ScriptFile ?? string.Empty, + IsProxy = metadata.IsProxy() + } + }; + + if (managedDependencyOptions != null && managedDependencyOptions.Enabled) + { + _workerChannelLogger?.LogDebug("Adding dependency download request to {language} language worker", _workerConfig.Description.Language); + request.ManagedDependencyEnabled = managedDependencyOptions.Enabled; + } + + foreach (var binding in metadata.Bindings) + { + BindingInfo bindingInfo = binding.ToBindingInfo(); + + request.Metadata.Bindings.Add(binding.Name, bindingInfo); + + if (binding.SupportsDeferredBinding() && !binding.SkipDeferredBinding()) + { + _metricsLogger.LogEvent(MetricEventNames.FunctionBindingDeferred, functionName: Sanitizer.Sanitize(metadata.Name)); + } + } + + foreach (var property in metadata.Properties) + { + // worker properties are expected to be string values + request.Metadata.Properties.Add(property.Key, property.Value?.ToString()); + } + + return request; + } + + internal void LoadResponse(FunctionLoadResponse loadResponse) + { + _functionLoadRequestResponseEvent?.Dispose(); + string functionName = _functions.SingleOrDefault(m => m.GetFunctionId().Equals(loadResponse.FunctionId, StringComparison.OrdinalIgnoreCase))?.Name; + _workerChannelLogger.LogDebug("Received FunctionLoadResponse for function: '{functionName}' with functionId: '{functionId}'.", functionName, loadResponse.FunctionId); + if (loadResponse.Result.IsFailure(out Exception functionLoadEx)) + { + if (functionLoadEx == null) + { + _workerChannelLogger?.LogError("Worker failed to to load function: '{functionName}' with functionId: '{functionId}'. Function load exception is not set by the worker.", functionName, loadResponse.FunctionId); + } + else + { + _workerChannelLogger?.LogError(functionLoadEx, "Worker failed to load function: '{functionName}' with functionId: '{functionId}'.", functionName, loadResponse.FunctionId); + } + //Cache function load errors to replay error messages on invoking failed functions + _functionLoadErrors[loadResponse.FunctionId] = functionLoadEx; + } + + if (loadResponse.IsDependencyDownloaded) + { + _workerChannelLogger?.LogDebug("Managed dependency successfully downloaded by the {workerLanguage} language worker", _workerConfig.Description.Language); + } + + // link the invocation inputs to the invoke call + var invokeBlock = new ActionBlock(async ctx => await SendInvocationRequest(ctx)); + // associate the invocation input buffer with the function + var disposableLink = _functionInputBuffers[loadResponse.FunctionId].LinkTo(invokeBlock); + _inputLinks.Add(disposableLink); + } + + internal void LoadResponse(FunctionLoadResponseCollection loadResponseCollection) + { + _workerChannelLogger.LogDebug("Received FunctionLoadResponseCollection with number of functions: '{count}'.", loadResponseCollection.FunctionLoadResponses.Count); + + foreach (FunctionLoadResponse loadResponse in loadResponseCollection.FunctionLoadResponses) + { + LoadResponse(loadResponse); + } + } + + internal async Task SendInvocationRequest(ScriptInvocationContext context) + { + try + { + string invocationId = context.ExecutionContext.InvocationId.ToString(); + string functionId = context.FunctionMetadata.GetFunctionId(); + + // do not send an invocation request for functions that failed to load or could not be indexed by the worker + if (_functionLoadErrors.TryGetValue(functionId, out Exception exception)) + { + _workerChannelLogger.LogDebug("Function {functionName} failed to load", context.FunctionMetadata.Name); + context.SetException(exception); + RemoveExecutingInvocation(invocationId); + return; + } + else if (_metadataRequestErrors.TryGetValue(functionId, out exception)) + { + _workerChannelLogger.LogDebug("Worker failed to load metadata for {functionName}", context.FunctionMetadata.Name); + context.SetException(exception); + RemoveExecutingInvocation(invocationId); + return; + } + + if (context.CancellationToken.IsCancellationRequested) + { + _workerChannelLogger.LogDebug("Cancellation was requested prior to the invocation request ('{invocationId}') being sent to the worker.", invocationId); + + // If the worker does not support handling InvocationCancel grpc messages, or if cancellation is supported and the customer opts-out + // of sending cancelled invocations to the worker, we will cancel the result source and not send the invocation to the worker. + if (!_isHandlesInvocationCancelMessageCapabilityEnabled || !JobHostOptions.Value.SendCanceledInvocationsToWorker) + { + _workerChannelLogger.LogInformation("Cancelling invocation '{invocationId}' due to cancellation token being signaled. " + + "This invocation was not sent to the worker. Read more about this here: https://aka.ms/azure-functions-cancellations", invocationId); + + // This will result in an invocation failure with a "FunctionInvocationCanceled" exception. + context.ResultSource.TrySetCanceled(); + return; + } + } + + var invocationRequest = await context.ToRpcInvocationRequest(_workerChannelLogger, _workerCapabilities, _isSharedMemoryDataTransferEnabled, _sharedMemoryManager); + AddAdditionalTraceContext(invocationRequest, context); + _executingInvocations.TryAdd(invocationRequest.InvocationId, new(context, _messageDispatcherFactory.Create(invocationRequest.InvocationId))); + _metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvoked, Id), functionName: Sanitizer.Sanitize(context.FunctionMetadata.Name)); + + // If the worker supports HTTP proxying, ensure this request is forwarded prior + // to sending the invocation request to the worker, as this will ensure the context + // is properly set up. + if (IsHttpProxyingWorker && context.FunctionMetadata.IsHttpTriggerFunction()) + { + _httpProxyService.StartForwarding(context, _httpProxyEndpoint); + } + + await SendStreamingMessageAsync(new StreamingMessage + { + InvocationRequest = invocationRequest + }); + + if (_isHandlesInvocationCancelMessageCapabilityEnabled) + { + var cancellationCtr = context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId)); + context.Properties.Add(ScriptConstants.CancellationTokenRegistration, cancellationCtr); + } + } + catch (Exception invokeEx) + { + context.SetException(invokeEx); + } + } + + internal void SendInvocationCancel(string invocationId) + { + _workerChannelLogger.LogDebug("Sending InvocationCancel request for invocation: '{invocationId}'", invocationId); + + var invocationCancel = new InvocationCancel + { + InvocationId = invocationId + }; + + SendStreamingMessage(new StreamingMessage + { + InvocationCancel = invocationCancel + }); + } + + // gets metadata from worker + public Task> GetFunctionMetadata() + { + return SendFunctionMetadataRequest(); + } + + internal Task> SendFunctionMetadataRequest() + { + _workerChannelLogger.LogDebug("Fetching worker metadata, FunctionMetadataReceived set to: {functionMetadataReceived}", _functionMetadataRequestSent); + if (!_functionMetadataRequestSent) + { + lock (_metadataLock) + { + if (!_functionMetadataRequestSent) + { + RegisterCallbackForNextGrpcMessage(MsgType.FunctionMetadataResponse, _functionLoadTimeout, 1, + msg => ProcessFunctionMetadataResponses(msg.Message.FunctionMetadataResponse), HandleWorkerMetadataRequestError); + + _workerChannelLogger.LogDebug("Sending WorkerMetadataRequest to {language} worker with worker ID {workerID}", _runtime, _workerId); + + // sends the function app directory path to worker for indexing + SendStreamingMessage(new StreamingMessage + { + FunctionsMetadataRequest = new FunctionsMetadataRequest() + { + FunctionAppDirectory = _applicationHostOptions.CurrentValue.ScriptPath + } + }); + + _functionMetadataRequestSent = true; + } + } + } + + return _functionsIndexingTask.Task; + } + + // parse metadata response into RawFunctionMetadata objects for AggregateFunctionMetadataProvider to further parse and validate + internal void ProcessFunctionMetadataResponses(FunctionMetadataResponse functionMetadataResponse) + { + _workerChannelLogger.LogDebug("Received the worker function metadata response from worker {worker_id}", _workerId); + + if (functionMetadataResponse.Result.IsFailure(out Exception metadataResponseEx)) + { + _workerChannelLogger?.LogError(metadataResponseEx, "Worker failed to index functions"); + } + + var functions = new List(); + + if (functionMetadataResponse.UseDefaultMetadataIndexing == false) + { + foreach (var metadata in functionMetadataResponse.FunctionMetadataResults) + { + if (metadata == null) + { + continue; + } + if (metadata.Status != null && metadata.Status.IsFailure(out Exception metadataRequestEx)) + { + _workerChannelLogger.LogError("Worker failed to index function {functionId}", metadata.FunctionId); + _metadataRequestErrors[metadata.FunctionId] = metadataRequestEx; + } + + var functionMetadata = new FunctionMetadata() + { + FunctionDirectory = metadata.Directory, + ScriptFile = metadata.ScriptFile, + EntryPoint = metadata.EntryPoint, + Name = metadata.Name, + Language = metadata.Language + }; + + if (metadata.RetryOptions is not null) + { + functionMetadata.Retry = new RetryOptions + { + MaxRetryCount = metadata.RetryOptions.MaxRetryCount, + Strategy = metadata.RetryOptions.RetryStrategy.ToRetryStrategy() + }; + + if (functionMetadata.Retry.Strategy is RetryStrategy.FixedDelay) + { + functionMetadata.Retry.DelayInterval = metadata.RetryOptions.DelayInterval?.ToTimeSpan(); + } + else + { + functionMetadata.Retry.MinimumInterval = metadata.RetryOptions.MinimumInterval?.ToTimeSpan(); + functionMetadata.Retry.MaximumInterval = metadata.RetryOptions.MaximumInterval?.ToTimeSpan(); + } + } + + functionMetadata.SetFunctionId(metadata.FunctionId); + + foreach (var property in metadata.Properties) + { + if (!functionMetadata.Properties.TryAdd(property.Key, property.Value?.ToString())) + { + _workerChannelLogger?.LogDebug("{metadataPropertyKey} is already a part of metadata properties for {functionId}", property.Key, metadata.FunctionId); + } + } + + var bindings = new List(); + foreach (string binding in metadata.RawBindings) + { + bindings.Add(binding); + } + + functions.Add(new RawFunctionMetadata() + { + Metadata = functionMetadata, + Bindings = bindings, + UseDefaultMetadataIndexing = functionMetadataResponse.UseDefaultMetadataIndexing + }); + } + } + else + { + functions.Add(new RawFunctionMetadata() + { + UseDefaultMetadataIndexing = functionMetadataResponse.UseDefaultMetadataIndexing + }); + } + + // set it as task result because we cannot directly return from SendWorkerMetadataRequest + _functionsIndexingTask.SetResult(functions); + } + + private async Task GetBindingDataAsync(ParameterBinding binding, string invocationId) + { + switch (binding.RpcDataCase) + { + case ParameterBindingType.RpcSharedMemory: + // Data was transferred by the worker using shared memory + return await binding.RpcSharedMemory.ToObjectAsync(_workerChannelLogger, invocationId, _sharedMemoryManager, ScriptHost.IsFunctionDataCacheEnabled); + case ParameterBindingType.Data: + // Data was transferred by the worker using RPC + return binding.Data.ToObject(); + case ParameterBindingType.None: + return null; + default: + throw new InvalidOperationException($"Unknown ParameterBindingType of type {binding.RpcDataCase}"); + } + } + + /// + /// From the output data produced by the worker, get a list of the shared memory maps that were created for this invocation. + /// + /// List of produced by the worker as output. + /// List of names of shared memory maps produced by the worker. + private IList GetOutputMaps(IList bindings) + { + IList outputMaps = new List(); + foreach (ParameterBinding binding in bindings) + { + if (binding.RpcSharedMemory != null) + { + outputMaps.Add(binding.RpcSharedMemory.Name); + } + } + + return outputMaps; + } + + internal async Task InvokeResponse(InvocationResponse invokeResponse) + { + Logger.InvocationResponseReceived(_workerChannelLogger, invokeResponse.InvocationId); + + // Check if the worker supports logging user-code-thrown exceptions to app insights + bool userCodeExceptionHandlingEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.EnableUserCodeException)); + + if (_executingInvocations.TryRemove(invokeResponse.InvocationId, out var invocation)) + { + var context = invocation.Context; + + if (context.Properties.TryGetValue(ScriptConstants.CancellationTokenRegistration, out CancellationTokenRegistration ctr)) + { + await ctr.DisposeAsync(); + context.Properties.Remove(ScriptConstants.CancellationTokenRegistration); + } + + try + { + if (IsHttpProxyingWorker && context.FunctionMetadata.IsHttpTriggerFunction()) + { + await _httpProxyService.EnsureSuccessfulForwardingAsync(context); + } + + LogSharedMemoryUsage(invokeResponse); + AddWorkerTraceAttributes(invokeResponse, context); + + if (invokeResponse.Result.IsInvocationSuccess()) + { + ScriptInvocationResult result = await CreateScriptInvocationResult(invokeResponse); + context.ResultSource.SetResult(result); + + _metricsLogger.LogEvent(_workerInvocationSucccededMetric); + } + else + { + var rpcException = invokeResponse.Result.GetRpcException(userCodeExceptionHandlingEnabled); + context.SetException(rpcException); + + _metricsLogger.LogEvent(_workerInvocationFailedMetric); + } + } + catch (Exception exc) + { + context.SetException(exc); + } + finally + { + TryCloseSharedMemoryResources(invokeResponse); + + invocation.Dispose(); + } + } + } + + private async Task CreateScriptInvocationResult(InvocationResponse invokeResponse) + { + IDictionary bindingsDictionary = await invokeResponse.OutputData + .ToDictionaryAsync(binding => binding.Name, binding => GetBindingDataAsync(binding, invokeResponse.InvocationId)); + + var result = new ScriptInvocationResult() + { + Outputs = bindingsDictionary, + Return = invokeResponse?.ReturnValue?.ToObject() + }; + + return result; + } + + private void TryCloseSharedMemoryResources(InvocationResponse invokeResponse) + { + try + { + if (!_isSharedMemoryDataTransferEnabled) + { + return; + } + + // Free memory allocated by the host (for input bindings) which was needed only for the duration of this invocation + if (!_sharedMemoryManager.TryFreeSharedMemoryMapsForInvocation(invokeResponse.InvocationId)) + { + _workerChannelLogger.LogWarning("Cannot free all shared memory resources for invocation: {invocationId}", invokeResponse.InvocationId); + } + + // List of shared memory maps that were produced by the worker (for output bindings) + IList outputMaps = GetOutputMaps(invokeResponse.OutputData); + if (outputMaps.Count > 0) + { + // If this invocation was using any shared memory maps produced by the worker, close them to free memory + SendCloseSharedMemoryResourcesForInvocationRequest(outputMaps); + } + } + catch (Exception exc) + { + _workerChannelLogger.LogWarning(exc, "Failed to cleanup shared memory resources"); + } + } + + private void LogSharedMemoryUsage(InvocationResponse invokeResponse) + { + // Return early if debug logging is not enabled. + if (!_isSharedMemoryDataTransferEnabled + || !_workerChannelLogger.IsEnabled(LogLevel.Debug)) + { + return; + } + + StringBuilder sharedMemoryLogBuilder = null; + + foreach (ParameterBinding binding in invokeResponse.OutputData) + { + switch (binding.RpcDataCase) + { + case ParameterBindingType.RpcSharedMemory: + sharedMemoryLogBuilder ??= new StringBuilder(); + sharedMemoryLogBuilder.AppendFormat("{0}:{1},", binding.Name, binding.RpcSharedMemory.Count); + break; + default: + break; + } + } + + if (sharedMemoryLogBuilder != null) + { + _workerChannelLogger.LogDebug("Shared memory usage for response of invocation '{invocationId}' is {SharedMemoryUsage}", invokeResponse.InvocationId, sharedMemoryLogBuilder); + } + } + + /// + /// Request to free memory allocated by the worker (for output bindings). + /// + /// List of names of shared memory maps to close from the worker. + internal void SendCloseSharedMemoryResourcesForInvocationRequest(IList outputMaps) + { + // Request the worker to drop its references to any shared memory maps that it had produced. + // This is because the host has read them (or holds a reference to them if caching is enabled.) + // The worker will not delete the resources allocated for the memory maps; it will only drop its reference + // so that the worker process does not prevent the OS from freeing the memory maps when the host attempts + // to free them (either right away after reading them or if caching is enabled, then when the cache decides + // to evict that object based on its eviction policy). + CloseSharedMemoryResourcesRequest closeSharedMemoryResourcesRequest = new CloseSharedMemoryResourcesRequest(); + closeSharedMemoryResourcesRequest.MapNames.AddRange(outputMaps); + + SendStreamingMessage(new StreamingMessage() + { + CloseSharedMemoryResourcesRequest = closeSharedMemoryResourcesRequest + }); + } + + internal void Log(GrpcEvent msg) + { + var rpcLog = msg.Message.RpcLog; + if (_executingInvocations.TryGetValue(rpcLog.InvocationId, out var invocation)) + { + var context = invocation.Context; + + // Restore the execution context from the original invocation. This allows AsyncLocal state to flow to loggers. + System.Threading.ExecutionContext.Run(context.AsyncExecutionContext, static (state) => + { + var stateTuple = ((ScriptInvocationContext Context, RpcLog RpcLog, bool AppInsightsEnabledOnWorker))state; + + var rpcLog = stateTuple.RpcLog; + LogLevel logLevel = (LogLevel)rpcLog.Level; + + var context = stateTuple.Context; + + if (rpcLog.LogCategory == RpcLogCategory.CustomMetric) + { + if (rpcLog.PropertiesMap.TryGetValue(LogConstants.NameKey, out var metricName) + && rpcLog.PropertiesMap.TryGetValue(LogConstants.MetricValueKey, out var metricValue)) + { + // Strip off the name/value entries in the dictionary passed to Log Message and include the rest as the property bag passed to the backing ILogger + var rpcLogProperties = rpcLog.PropertiesMap + .Where(i => i.Key != LogConstants.NameKey && i.Key != LogConstants.MetricValueKey) + .ToDictionary(i => i.Key, i => i.Value.ToObject()); + context.Logger.LogMetric(metricName.String, metricValue.Double, rpcLogProperties); + } + } + else + { + try + { + WorkerTraceFilterTelemetryProcessor.FilterApplicationInsightsFromWorker.Value = stateTuple.AppInsightsEnabledOnWorker; + + if (rpcLog.Exception != null) + { + // TODO fix RpcException catch all https://github.com/Azure/azure-functions-dotnet-worker/issues/370 + var exception = new Workers.Rpc.RpcException(rpcLog.Message, rpcLog.Exception.Message, rpcLog.Exception.StackTrace); + context.Logger.Log(logLevel, new EventId(0, rpcLog.EventId), rpcLog.Message, exception, (state, exc) => state); + } + else + { + context.Logger.Log(logLevel, new EventId(0, rpcLog.EventId), rpcLog.Message, null, (state, exc) => state); + } + } + finally + { + WorkerTraceFilterTelemetryProcessor.FilterApplicationInsightsFromWorker.Value = false; + } + } + }, (context, rpcLog, _isWorkerApplicationInsightsLoggingEnabled)); + } + } + + internal void SystemLog(GrpcEvent msg) + { + RpcLog systemLog = msg.Message.RpcLog; + LogLevel logLevel = (LogLevel)systemLog.Level; + switch (logLevel) + { + case LogLevel.Warning: + _workerChannelLogger.LogWarning(systemLog.Message); + break; + + case LogLevel.Information: + _workerChannelLogger.LogInformation(systemLog.Message); + break; + + case LogLevel.Error: + { + if (systemLog.Exception != null) + { + Workers.Rpc.RpcException exception = new Workers.Rpc.RpcException(systemLog.Message, systemLog.Exception.Message, systemLog.Exception.StackTrace); + _workerChannelLogger.LogError(exception, systemLog.Message); + } + else + { + _workerChannelLogger.LogError(systemLog.Message); + } + } + break; + + default: + _workerChannelLogger.LogInformation(systemLog.Message); + break; + } + } + + internal void HandleWorkerStartStreamError(Exception exc) + { + _workerChannelLogger.LogError(exc, "Starting worker process failed"); + PublishWorkerErrorEvent(exc); + } + + internal void HandleWorkerEnvReloadError(Exception exc) + { + _workerChannelLogger.LogError(exc, "Reloading environment variables failed"); + _reloadTask.SetException(exc); + } + + internal void HandleWorkerInitError(Exception exc) + { + _workerChannelLogger.LogError(exc, "Initializing worker process failed"); + PublishWorkerErrorEvent(exc); + } + + internal void HandleWorkerFunctionLoadError(Exception exc) + { + _workerChannelLogger.LogError(exc, "Loading function failed."); + if (_disposing || _disposed) + { + return; + } + _eventManager.Publish(new WorkerErrorEvent(_runtime, Id, exc)); + } + + private void PublishWorkerErrorEvent(Exception exc) + { + _workerInitTask.TrySetException(exc); + if (_disposing || _disposed) + { + return; + } + _eventManager.Publish(new WorkerErrorEvent(_runtime, Id, exc)); + } + + internal void HandleWorkerMetadataRequestError(Exception exc) + { + _workerChannelLogger.LogError(exc, "Requesting metadata from worker failed."); + if (_disposing || _disposed) + { + return; + } + _eventManager.Publish(new WorkerErrorEvent(_runtime, Id, exc)); + } + + private void HandleWorkerWarmupError(Exception exc) + { + _workerChannelLogger.LogError(exc, "Worker warmup failed"); + } + + protected ValueTask SendStreamingMessageAsync(StreamingMessage msg) + { + var evt = new OutboundGrpcEvent(_workerId, msg); + + return _outbound.TryWrite(evt) ? default : _outbound.WriteAsync(evt); + } + + protected void SendStreamingMessage(StreamingMessage msg) + { + var evt = new OutboundGrpcEvent(_workerId, msg); + if (!_outbound.TryWrite(evt)) + { + var pending = _outbound.WriteAsync(evt); + if (pending.IsCompleted) + { + try + { + pending.GetAwaiter().GetResult(); // ensure observed to ensure the IValueTaskSource completed/result is consumed + } + catch + { + // suppress failure + } + } + else + { + _ = ObserveEventually(pending); + } + } + static async Task ObserveEventually(ValueTask valueTask) + { + try + { + await valueTask; + } + catch + { + // no where to log + } + } + } + + internal void ReceiveWorkerStatusResponse(string requestId, WorkerStatusResponse response) + { + if (_workerStatusRequests.TryRemove(requestId, out var workerStatusTask)) + { + workerStatusTask.SetResult(true); + } + } + + private void RemoveExecutingInvocation(string invocationId) + { + if (_executingInvocations.TryRemove(invocationId, out var invocation)) + { + invocation.Dispose(); + } + } + + protected virtual void Dispose(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + foreach (var id in _executingInvocations.Keys) + { + RemoveExecutingInvocation(id); + } + + _startLatencyMetric?.Dispose(); + _workerInitTask?.TrySetCanceled(); + _timer?.Dispose(); + _scriptHostManager.ActiveHostChanged -= HandleActiveHostChange; + + // unlink function inputs + if (_inputLinks is not null) + { + foreach (var link in _inputLinks) + { + if (link is null) + { + // This log is temporarily added for diagnostic purposes. + _workerChannelLogger.LogDebug("An input link is null. Skipping disposal."); + } + + link?.Dispose(); + } + } + else + { + // This log is temporarily added for diagnostic purposes. + _workerChannelLogger.LogDebug("The input links collection is null. Skipping disposal of any individual input links."); + } + + DisposeWorkerResources(); + + if (_eventSubscriptions is not null) + { + foreach (var sub in _eventSubscriptions) + { + if (sub is null) + { + // This log is temporarily added for diagnostic purposes. + _workerChannelLogger.LogDebug("An event subscription is null. Skipping disposal."); + } + + sub?.Dispose(); + } + } + else + { + // This log is temporarily added for diagnostic purposes. + _workerChannelLogger.LogDebug("The event subscriptions collection is null. Skipping disposal of any individual subscriptions."); + } + + // shut down the channels + _eventManager.RemoveGrpcChannels(_workerId); + } + _disposed = true; + } + } + + public void Dispose() + { + _disposing = true; + Dispose(true); + } + + public async Task DrainInvocationsAsync() + { + _workerChannelLogger.LogDebug("Count of in-buffer invocations waiting to be drained out: {invocationCount}", _executingInvocations.Count); + foreach (var invocation in _executingInvocations.Values) + { + await invocation.Context.ResultSource.Task; + } + } + + public bool IsExecutingInvocation(string invocationId) + { + return _executingInvocations.ContainsKey(invocationId); + } + + public void Shutdown(Exception workerException) + { + var shutdownException = workerException; + + if (workerException is null || workerException is FunctionTimeoutException) + { + string exceptionMessageSuffix = workerException?.Message is not null ? " Reason: " + workerException.Message : string.Empty; + shutdownException = new FunctionTimeoutAbortException("Worker channel is shutting down. Aborting function." + exceptionMessageSuffix, workerException); + } + + foreach (var invocation in _executingInvocations?.Values) + { + string invocationId = invocation.Context?.ExecutionContext?.InvocationId.ToString(); + _workerChannelLogger.LogDebug("Worker '{workerId}' encountered a fatal error. Failing invocation: '{invocationId}'", _workerId, invocationId); + invocation.Context?.SetException(shutdownException); + RemoveExecutingInvocation(invocationId); + } + } + + /// + /// Determine if shared memory transfer is enabled. + /// The following conditions must be met: + /// 1) must be set in environment variable (AppSetting). + /// 2) Worker must have the capability . + /// + /// if shared memory data transfer is enabled, otherwise. + internal static bool ResolveSharedTransferEnablementState(GrpcCapabilities workerCapabilities, IEnvironment environment, ILogger logger) + { + // Check if the environment variable (AppSetting) has this feature enabled + string envVal = environment.GetEnvironmentVariable(RpcWorkerConstants.FunctionsWorkerSharedMemoryDataTransferEnabledSettingName); + if (string.IsNullOrEmpty(envVal)) + { + return false; + } + + bool envValEnabled = false; + if (bool.TryParse(envVal, out bool boolResult)) + { + // Check if value was specified as a bool (true/false) + envValEnabled = boolResult; + } + else if (int.TryParse(envVal, out int intResult) && intResult == 1) + { + // Check if value was specified as an int (1/0) + envValEnabled = true; + } + + if (!envValEnabled) + { + return false; + } + + // Check if the worker supports this feature + bool capabilityEnabled = !string.IsNullOrEmpty(workerCapabilities.GetCapabilityState(RpcWorkerConstants.SharedMemoryDataTransfer)); + logger.LogDebug("IsSharedMemoryDataTransferEnabled: {SharedMemoryDataTransferEnabled}", capabilityEnabled); + + return capabilityEnabled; + } + + internal void EnsureTimerStarted() + { + if (_environment.IsWorkerDynamicConcurrencyEnabled()) + { + lock (_syncLock) + { + if (_timer == null) + { + _timer = new System.Timers.Timer() + { + AutoReset = false, + Interval = _workerConcurrencyOptions.Value.CheckInterval.TotalMilliseconds, + }; + + _timer.Elapsed += OnTimer; + _timer.Start(); + } + } + } + } + + internal IEnumerable GetLatencies() + { + EnsureTimerStarted(); + lock (_syncLock) + { + return _workerStatusLatencyHistory.ToArray(); + } + } + + internal async void OnTimer(object sender, System.Timers.ElapsedEventArgs e) + { + if (_disposed) + { + return; + } + + try + { + WorkerStatus workerStatus = await GetWorkerStatusAsync(); + AddSample(_workerStatusLatencyHistory, workerStatus.Latency); + } + catch + { + // Don't allow background exceptions to escape + // E.g. when a rpc channel is shutting down we can process exceptions + } + try + { + _timer.Start(); + } + catch (ObjectDisposedException) + { + // Specifically ignore this race - we're exiting and that's okay + } + } + + private void AddSample(List samples, T sample) + { + lock (_syncLock) + { + if (samples.Count == _workerConcurrencyOptions.Value.HistorySize) + { + samples.RemoveAt(0); + } + samples.Add(sample); + } + } + + private void AddAdditionalTraceContext(InvocationRequest invocationRequest, ScriptInvocationContext context) + { + MapField attributes = invocationRequest.TraceContext.Attributes; + bool isOtelEnabled = _scriptHostOptions?.Value.TelemetryMode == TelemetryMode.OpenTelemetry; + + if (context.FunctionMetadata.Properties.TryGetValue(ScriptConstants.LogPropertyHostInstanceIdKey, out var hostInstanceIdValue)) + { + string id = Convert.ToString(hostInstanceIdValue); + + if (isOtelEnabled) + { + Activity.Current?.AddTag(ResourceSemanticConventions.FaaSInstance, id); + } + + attributes[ScriptConstants.LogPropertyHostInstanceIdKey] = id; + } + + attributes[ScriptConstants.LogPropertyProcessIdKey] = Convert.ToString(WorkerProcessId); + attributes[ScriptConstants.OperationNameKey] = context.FunctionMetadata.Name; + string sessionId = Activity.Current?.GetBaggageItem(ScriptConstants.LiveLogsSessionAIKey); + if (!string.IsNullOrEmpty(sessionId)) + { + attributes[ScriptConstants.LiveLogsSessionAIKey] = sessionId; + } + + if (context.FunctionMetadata.Properties.TryGetValue(LogConstants.CategoryNameKey, out var categoryNameValue)) + { + attributes[LogConstants.CategoryNameKey] = Convert.ToString(categoryNameValue); + } + + if (isOtelEnabled) + { + Activity.Current?.AddTag(ResourceSemanticConventions.FaaSName, context.FunctionMetadata.Name); + Activity.Current?.AddTag(ResourceSemanticConventions.FaaSInvocationId, invocationRequest.InvocationId); + + foreach (var b in Baggage.Current) + { + invocationRequest.TraceContext.Baggage[b.Key] = b.Value; + } + } + } + + private void AddWorkerTraceAttributes(InvocationResponse invocationResponse, ScriptInvocationContext context) + { + var attributes = invocationResponse.TraceContextAttributes; + if (attributes is null || attributes.Count == 0) + { + return; + } + + if (context.AsyncExecutionContext is null) + { + return; + } + + System.Threading.ExecutionContext.Run(context.AsyncExecutionContext, static state => + { + var attrs = (IDictionary)state; + foreach (var kvp in attrs) + { + // this will override any existing tags with the same key + Activity.Current?.AddTag(kvp.Key, kvp.Value); + } + }, attributes); + } + + private sealed class ExecutingInvocation : IDisposable + { + public ExecutingInvocation(ScriptInvocationContext context, IInvocationMessageDispatcher dispatcher) + { + Context = context; + Dispatcher = dispatcher; + } + + public ScriptInvocationContext Context { get; } + + public IInvocationMessageDispatcher Dispatcher { get; } + + public void Dispose() + { + (Dispatcher as IDisposable)?.Dispose(); + } + } + + private sealed class PendingItem + { + private readonly Action _callback; + private readonly Action _faultHandler; + private CancellationTokenRegistration _ctr; + private int _state; + + public PendingItem(Action callback, Action faultHandler) + { + _callback = callback; + _faultHandler = faultHandler; + } + + public PendingItem(Action callback, Action faultHandler, TimeSpan timeout) + : this(callback, faultHandler) + { + var cts = new CancellationTokenSource(); + cts.CancelAfter(timeout); + _ctr = cts.Token.Register(static state => ((PendingItem)state).OnTimeout(), this); + } + + public bool IsComplete => Volatile.Read(ref _state) != 0; + + private bool MakeComplete() => Interlocked.CompareExchange(ref _state, 1, 0) == 0; + + public void SetResult(InboundGrpcEvent message) + { + _ctr.Dispose(); + _ctr = default; + if (MakeComplete() && _callback != null) + { + try + { + _callback.Invoke(message); + } + catch (Exception fault) + { + try + { + _faultHandler?.Invoke(fault); + } + catch + { + } + } + } + } + + private void OnTimeout() + { + if (MakeComplete() && _faultHandler != null) + { + try + { + throw new TimeoutException(); + } + catch (Exception timeout) + { + try + { + _faultHandler(timeout); + } + catch + { + } + } + } + } + } + + // EventId range is 800-899 + private static class Logger + { + private static readonly Action _channelReceivedMessage = LoggerMessage.Define( + LogLevel.Debug, + new EventId(820, nameof(ChannelReceivedMessage)), + "[channel] received {workerId}: {msgType}"); + + private static readonly Action _invocationResponseReceived = LoggerMessage.Define( + LogLevel.Debug, + new EventId(821, nameof(InvocationResponseReceived)), + "InvocationResponse received for invocation: '{invocationId}'"); + + internal static void ChannelReceivedMessage(ILogger logger, string workerId, MsgType msgType) => _channelReceivedMessage(logger, workerId, msgType, null); + + internal static void InvocationResponseReceived(ILogger logger, string invocationId) => _invocationResponseReceived(logger, invocationId, null); + } + } +}