Skip to content

Commit 83081e1

Browse files
authored
modifying approach to ignoring app insights coming from workers (#8965)
1 parent fe291a2 commit 83081e1

File tree

11 files changed

+241
-75
lines changed

11 files changed

+241
-75
lines changed

src/WebJobs.Script.Grpc/Channel/GrpcCapabilities.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
using System;
55
using System.Collections.Generic;
6-
using Google.Protobuf.Collections;
76
using Microsoft.Extensions.Logging;
87

98
namespace Microsoft.Azure.WebJobs.Script.Grpc
@@ -27,7 +26,7 @@ public string GetCapabilityState(string capability)
2726
return null;
2827
}
2928

30-
public void UpdateCapabilities(MapField<string, string> capabilities)
29+
public void UpdateCapabilities(IDictionary<string, string> capabilities)
3130
{
3231
if (capabilities == null)
3332
{
@@ -47,4 +46,4 @@ private void UpdateCapability(KeyValuePair<string, string> capability)
4746
_capabilities[capability.Key] = capability.Value;
4847
}
4948
}
50-
}
49+
}

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
5454
private readonly ChannelWriter<OutboundGrpcEvent> _outbound;
5555
private readonly ChannelReader<InboundGrpcEvent> _inbound;
5656
private readonly IOptions<FunctionsHostingConfigOptions> _hostingConfigOptions;
57-
5857
private IDisposable _functionLoadRequestResponseEvent;
5958
private bool _disposed;
6059
private bool _disposing;
@@ -80,6 +79,7 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
8079
private TimeSpan _functionLoadTimeout = TimeSpan.FromMinutes(1);
8180
private bool _isSharedMemoryDataTransferEnabled;
8281
private bool _cancelCapabilityEnabled;
82+
private bool _isWorkerApplicationInsightsLoggingEnabled;
8383

8484
private System.Timers.Timer _timer;
8585

@@ -94,7 +94,6 @@ internal GrpcWorkerChannel(
9494
IEnvironment environment,
9595
IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions,
9696
ISharedMemoryManager sharedMemoryManager,
97-
IFunctionDataCache functionDataCache,
9897
IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions,
9998
IOptions<FunctionsHostingConfigOptions> hostingConfigOptions)
10099
{
@@ -392,7 +391,9 @@ internal void WorkerInitResponse(GrpcEvent initEvent)
392391
}
393392

394393
_state = _state | RpcWorkerChannelState.Initialized;
395-
_workerCapabilities.UpdateCapabilities(_initMessage.Capabilities);
394+
395+
UpdateCapabilities(_initMessage.Capabilities);
396+
396397
_isSharedMemoryDataTransferEnabled = IsSharedMemoryDataTransferEnabled();
397398
_cancelCapabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesInvocationCancelMessage));
398399

@@ -402,9 +403,22 @@ internal void WorkerInitResponse(GrpcEvent initEvent)
402403
ScriptHost.IsFunctionDataCacheEnabled = false;
403404
}
404405

406+
if (_environment.IsApplicationInsightsAgentEnabled() ||
407+
(bool.TryParse(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.WorkerApplicationInsightsLoggingEnabled), out bool appInsightsWorkerEnabled) &&
408+
appInsightsWorkerEnabled))
409+
{
410+
_isWorkerApplicationInsightsLoggingEnabled = true;
411+
}
412+
405413
_workerInitTask.TrySetResult(true);
406414
}
407415

416+
// Allow tests to add capabilities, even if not directly supported by the worker.
417+
internal virtual void UpdateCapabilities(IDictionary<string, string> fields)
418+
{
419+
_workerCapabilities.UpdateCapabilities(fields);
420+
}
421+
408422
public void SetupFunctionInvocationBuffers(IEnumerable<FunctionMetadata> functions)
409423
{
410424
_functions = functions;
@@ -882,7 +896,7 @@ internal async Task InvokeResponse(InvocationResponse invokeResponse)
882896
}
883897

884898
/// <summary>
885-
/// Request to free memory allocated by the worker (for output bindings)
899+
/// Request to free memory allocated by the worker (for output bindings).
886900
/// </summary>
887901
/// <param name="outputMaps">List of names of shared memory maps to close from the worker.</param>
888902
internal void SendCloseSharedMemoryResourcesForInvocationRequest(IList<string> outputMaps)
@@ -905,12 +919,18 @@ internal void SendCloseSharedMemoryResourcesForInvocationRequest(IList<string> o
905919
internal void Log(GrpcEvent msg)
906920
{
907921
var rpcLog = msg.Message.RpcLog;
908-
LogLevel logLevel = (LogLevel)rpcLog.Level;
909922
if (_executingInvocations.TryGetValue(rpcLog.InvocationId, out ScriptInvocationContext context))
910923
{
911924
// Restore the execution context from the original invocation. This allows AsyncLocal state to flow to loggers.
912-
System.Threading.ExecutionContext.Run(context.AsyncExecutionContext, (s) =>
925+
System.Threading.ExecutionContext.Run(context.AsyncExecutionContext, static (state) =>
913926
{
927+
var stateTuple = ((ScriptInvocationContext Context, RpcLog RpcLog, bool AppInsightsEnabledOnWorker))state;
928+
929+
var rpcLog = stateTuple.RpcLog;
930+
LogLevel logLevel = (LogLevel)rpcLog.Level;
931+
932+
var context = stateTuple.Context;
933+
914934
if (rpcLog.LogCategory == RpcLogCategory.CustomMetric)
915935
{
916936
if (rpcLog.PropertiesMap.TryGetValue(LogConstants.NameKey, out var metricName)
@@ -925,18 +945,27 @@ internal void Log(GrpcEvent msg)
925945
}
926946
else
927947
{
928-
if (rpcLog.Exception != null)
948+
try
929949
{
930-
// TODO fix RpcException catch all https://github.com/Azure/azure-functions-dotnet-worker/issues/370
931-
var exception = new Workers.Rpc.RpcException(rpcLog.Message, rpcLog.Exception.Message, rpcLog.Exception.StackTrace);
932-
context.Logger.Log(logLevel, new EventId(0, rpcLog.EventId), rpcLog.Message, exception, (state, exc) => state);
950+
WorkerTraceFilterTelemetryProcessor.FilterApplicationInsightsFromWorker.Value = stateTuple.AppInsightsEnabledOnWorker;
951+
952+
if (rpcLog.Exception != null)
953+
{
954+
// TODO fix RpcException catch all https://github.com/Azure/azure-functions-dotnet-worker/issues/370
955+
var exception = new Workers.Rpc.RpcException(rpcLog.Message, rpcLog.Exception.Message, rpcLog.Exception.StackTrace);
956+
context.Logger.Log(logLevel, new EventId(0, rpcLog.EventId), rpcLog.Message, exception, (state, exc) => state);
957+
}
958+
else
959+
{
960+
context.Logger.Log(logLevel, new EventId(0, rpcLog.EventId), rpcLog.Message, null, (state, exc) => state);
961+
}
933962
}
934-
else
963+
finally
935964
{
936-
context.Logger.Log(logLevel, new EventId(0, rpcLog.EventId), rpcLog.Message, null, (state, exc) => state);
965+
WorkerTraceFilterTelemetryProcessor.FilterApplicationInsightsFromWorker.Value = false;
937966
}
938967
}
939-
}, null);
968+
}, (context, rpcLog, _isWorkerApplicationInsightsLoggingEnabled));
940969
}
941970
}
942971

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,11 @@ public class GrpcWorkerChannelFactory : IRpcWorkerChannelFactory
2525
private readonly IEnvironment _environment = null;
2626
private readonly IOptionsMonitor<ScriptApplicationHostOptions> _applicationHostOptions = null;
2727
private readonly ISharedMemoryManager _sharedMemoryManager = null;
28-
private readonly IFunctionDataCache _functionDataCache = null;
2928
private readonly IOptions<WorkerConcurrencyOptions> _workerConcurrencyOptions;
3029
private readonly IOptions<FunctionsHostingConfigOptions> _hostingConfigOptions;
3130

32-
public GrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment environment, IRpcServer rpcServer, ILoggerFactory loggerFactory, IOptionsMonitor<LanguageWorkerOptions> languageWorkerOptions,
33-
IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions, IRpcWorkerProcessFactory rpcWorkerProcessManager, ISharedMemoryManager sharedMemoryManager, IFunctionDataCache functionDataCache,
31+
public GrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment environment, ILoggerFactory loggerFactory,
32+
IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions, IRpcWorkerProcessFactory rpcWorkerProcessManager, ISharedMemoryManager sharedMemoryManager,
3433
IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions)
3534
{
3635
_eventManager = eventManager;
@@ -39,7 +38,6 @@ public GrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment e
3938
_environment = environment;
4039
_applicationHostOptions = applicationHostOptions;
4140
_sharedMemoryManager = sharedMemoryManager;
42-
_functionDataCache = functionDataCache;
4341
_workerConcurrencyOptions = workerConcurrencyOptions;
4442
_hostingConfigOptions = hostingConfigOptions;
4543
}
@@ -55,20 +53,28 @@ public IRpcWorkerChannel Create(string scriptRootPath, string runtime, IMetricsL
5553
_eventManager.AddGrpcChannels(workerId); // prepare the inbound/outbound dedicated channels
5654
ILogger workerLogger = _loggerFactory.CreateLogger($"Worker.LanguageWorkerChannel.{runtime}.{workerId}");
5755
IWorkerProcess rpcWorkerProcess = _rpcWorkerProcessFactory.Create(workerId, runtime, scriptRootPath, languageWorkerConfig);
56+
57+
return CreateInternal(workerId, _eventManager, languageWorkerConfig, rpcWorkerProcess, workerLogger, metricsLogger, attemptCount,
58+
_environment, _applicationHostOptions, _sharedMemoryManager, _workerConcurrencyOptions, _hostingConfigOptions);
59+
}
60+
61+
internal virtual IRpcWorkerChannel CreateInternal(string workerId, IScriptEventManager eventManager, RpcWorkerConfig languageWorkerConfig, IWorkerProcess rpcWorkerProcess,
62+
ILogger workerLogger, IMetricsLogger metricsLogger, int attemptCount, IEnvironment environment, IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions,
63+
ISharedMemoryManager sharedMemoryManager, IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions)
64+
{
5865
return new GrpcWorkerChannel(
5966
workerId,
60-
_eventManager,
67+
eventManager,
6168
languageWorkerConfig,
6269
rpcWorkerProcess,
6370
workerLogger,
6471
metricsLogger,
6572
attemptCount,
66-
_environment,
67-
_applicationHostOptions,
68-
_sharedMemoryManager,
69-
_functionDataCache,
70-
_workerConcurrencyOptions,
71-
_hostingConfigOptions);
73+
environment,
74+
applicationHostOptions,
75+
sharedMemoryManager,
76+
workerConcurrencyOptions,
77+
hostingConfigOptions);
7278
}
7379
}
7480
}

src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4-
using Microsoft.Azure.WebJobs.Script.Config;
54
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;
65
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
76
using Microsoft.Extensions.DependencyInjection;
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System.Threading;
5+
using Microsoft.ApplicationInsights.Channel;
6+
using Microsoft.ApplicationInsights.Extensibility;
7+
8+
namespace Microsoft.Azure.WebJobs.Script.Config
9+
{
10+
internal class WorkerTraceFilterTelemetryProcessor : ITelemetryProcessor
11+
{
12+
private readonly ITelemetryProcessor _next;
13+
14+
internal static readonly AsyncLocal<bool> FilterApplicationInsightsFromWorker = new();
15+
16+
public WorkerTraceFilterTelemetryProcessor(ITelemetryProcessor next)
17+
{
18+
_next = next;
19+
}
20+
21+
public void Process(ITelemetry item)
22+
{
23+
if (FilterApplicationInsightsFromWorker.Value)
24+
{
25+
return;
26+
}
27+
28+
_next.Process(item);
29+
}
30+
}
31+
}

0 commit comments

Comments
 (0)