Skip to content

Commit 2360468

Browse files
Additional attributes for worker AI agent (#8387)
Additional attributes for worker AI agent
1 parent 7beffee commit 2360468

File tree

7 files changed

+155
-3
lines changed

7 files changed

+155
-3
lines changed

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@
55
using System.Collections;
66
using System.Collections.Concurrent;
77
using System.Collections.Generic;
8+
using System.Diagnostics;
89
using System.IO;
910
using System.Linq;
1011
using System.Reactive.Linq;
1112
using System.Text;
1213
using System.Threading;
1314
using System.Threading.Tasks;
1415
using System.Threading.Tasks.Dataflow;
16+
using Google.Protobuf.Collections;
1517
using Microsoft.Azure.WebJobs.Logging;
1618
using Microsoft.Azure.WebJobs.Script.Description;
1719
using Microsoft.Azure.WebJobs.Script.Diagnostics;
@@ -487,6 +489,7 @@ internal async Task SendInvocationRequest(ScriptInvocationContext context)
487489
return;
488490
}
489491
var invocationRequest = await context.ToRpcInvocationRequest(_workerChannelLogger, _workerCapabilities, _isSharedMemoryDataTransferEnabled, _sharedMemoryManager);
492+
AddAdditionalTraceContext(invocationRequest.TraceContext.Attributes, context);
490493
_executingInvocations.TryAdd(invocationRequest.InvocationId, context);
491494

492495
SendStreamingMessage(new StreamingMessage
@@ -1002,5 +1005,27 @@ private void AddSample<T>(List<T> samples, T sample)
10021005
samples.Add(sample);
10031006
}
10041007
}
1008+
1009+
private void AddAdditionalTraceContext(MapField<string, string> attributes, ScriptInvocationContext context)
1010+
{
1011+
// This is only applicable for AI agents running along side worker
1012+
if (_environment.IsApplicationInsightsAgentEnabled())
1013+
{
1014+
attributes[ScriptConstants.LogPropertyProcessIdKey] = Convert.ToString(_rpcWorkerProcess.Id);
1015+
if (context.FunctionMetadata.Properties.ContainsKey(ScriptConstants.LogPropertyHostInstanceIdKey))
1016+
{
1017+
attributes[ScriptConstants.LogPropertyHostInstanceIdKey] = Convert.ToString(context.FunctionMetadata.Properties[ScriptConstants.LogPropertyHostInstanceIdKey]);
1018+
}
1019+
if (context.FunctionMetadata.Properties.ContainsKey(LogConstants.CategoryNameKey))
1020+
{
1021+
attributes[LogConstants.CategoryNameKey] = Convert.ToString(context.FunctionMetadata.Properties[LogConstants.CategoryNameKey]);
1022+
}
1023+
string sessionid = Activity.Current?.GetBaggageItem(ScriptConstants.LiveLogsSessionAIKey);
1024+
if (!string.IsNullOrEmpty(sessionid))
1025+
{
1026+
attributes[ScriptConstants.LiveLogsSessionAIKey] = sessionid;
1027+
}
1028+
}
1029+
}
10051030
}
10061031
}

src/WebJobs.Script/Environment/EnvironmentExtensions.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ namespace Microsoft.Azure.WebJobs.Script
1616
{
1717
internal static class EnvironmentExtensions
1818
{
19-
// For testing
19+
private static bool? isApplicationInsightsAgentEnabled;
20+
2021
internal static string BaseDirectory { get; set; }
2122

2223
public static string GetEnvironmentVariableOrDefault(this IEnvironment environment, string name, string defaultValue)
@@ -508,5 +509,21 @@ public static HashSet<string> GetLanguageWorkerListToStartInPlaceholder(this IEn
508509
}
509510
return placeholderRuntimeSet;
510511
}
512+
513+
public static bool IsApplicationInsightsAgentEnabled(this IEnvironment environment)
514+
{
515+
// cache the value of the environment variable
516+
if (isApplicationInsightsAgentEnabled.HasValue)
517+
{
518+
return isApplicationInsightsAgentEnabled.Value;
519+
}
520+
else if (!environment.IsPlaceholderModeEnabled())
521+
{
522+
bool.TryParse(environment.GetEnvironmentVariable(AppInsightsAgent), out bool isEnabled);
523+
isApplicationInsightsAgentEnabled = isEnabled;
524+
return isApplicationInsightsAgentEnabled.Value;
525+
}
526+
return false;
527+
}
511528
}
512529
}

src/WebJobs.Script/Environment/EnvironmentSettingNames.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public static class EnvironmentSettingNames
2727
public const string AppInsightsInstrumentationKey = "APPINSIGHTS_INSTRUMENTATIONKEY";
2828
public const string AppInsightsConnectionString = "APPLICATIONINSIGHTS_CONNECTION_STRING";
2929
public const string AppInsightsQuickPulseAuthApiKey = "APPINSIGHTS_QUICKPULSEAUTHAPIKEY";
30+
public const string AppInsightsAgent = "APPLICATIONINSIGHTS_ENABLE_AGENT";
3031
public const string FunctionsExtensionVersion = "FUNCTIONS_EXTENSION_VERSION";
3132
public const string FunctionWorkerRuntime = "FUNCTIONS_WORKER_RUNTIME";
3233
public const string ContainerName = "CONTAINER_NAME";

src/WebJobs.Script/ScriptConstants.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public static class ScriptConstants
3333
public const string LogPropertyIsSystemLogKey = "MS_IsSystemLog";
3434
public const string LogPropertyFunctionInvocationIdKey = "MS_FunctionInvocationId";
3535
public const string LogPropertyHostInstanceIdKey = "HostInstanceId";
36+
public const string LogPropertyProcessIdKey = "ProcessId";
3637
public const string LogPropertyActivityIdKey = "MS_ActivityId";
3738

3839
public const string TraceSourceSecretManagement = "SecretManagement";

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using System.Threading.Tasks;
1111
using System.Threading.Tasks.Dataflow;
1212
using Microsoft.AspNetCore.Hosting;
13+
using Microsoft.Azure.WebJobs.Logging;
1314
using Microsoft.Azure.WebJobs.Script.Description;
1415
using Microsoft.Azure.WebJobs.Script.Diagnostics;
1516
using Microsoft.Azure.WebJobs.Script.Eventing;
@@ -314,6 +315,7 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, Cance
314315
}
315316
}
316317
}
318+
AddLogUserCategory(functions);
317319
}
318320

319321
// Gets metadata from worker
@@ -353,6 +355,7 @@ public async Task FinishInitialization(IEnumerable<FunctionMetadata> functions,
353355
}
354356
SetFunctionDispatcherStateToInitializedAndLog();
355357
}
358+
AddLogUserCategory(functions);
356359
}
357360

358361
public async Task<IDictionary<string, WorkerStatus>> GetWorkerStatusesAsync()
@@ -655,5 +658,18 @@ public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
655658
}
656659
return false;
657660
}
661+
662+
private void AddLogUserCategory(IEnumerable<FunctionMetadata> functions)
663+
{
664+
// Add category, this is only needed for workers running AI agent
665+
if (_environment.IsApplicationInsightsAgentEnabled())
666+
{
667+
foreach (FunctionMetadata metadata in functions)
668+
{
669+
metadata.Properties[LogConstants.CategoryNameKey] = LogCategories.CreateFunctionUserCategory(metadata.Name);
670+
metadata.Properties[ScriptConstants.LogPropertyHostInstanceIdKey] = _scriptOptions.InstanceId;
671+
}
672+
}
673+
}
658674
}
659675
}

test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Diagnostics;
67
using System.IO;
78
using System.Linq;
89
using System.Runtime.InteropServices;
910
using System.Threading;
1011
using System.Threading.Tasks;
12+
using Microsoft.Azure.WebJobs.Logging;
1113
using Microsoft.Azure.WebJobs.Script.Description;
1214
using Microsoft.Azure.WebJobs.Script.Diagnostics;
1315
using Microsoft.Azure.WebJobs.Script.Eventing;
@@ -36,6 +38,7 @@ public class GrpcWorkerChannelTests : IDisposable
3638
private readonly string _workerId = "testWorkerId";
3739
private readonly string _scriptRootPath = "c:\testdir";
3840
private readonly IScriptEventManager _eventManager = new ScriptEventManager();
41+
private readonly Mock<IScriptEventManager> _eventManagerMock = new Mock<IScriptEventManager>();
3942
private readonly TestMetricsLogger _metricsLogger = new TestMetricsLogger();
4043
private readonly Mock<IWorkerConsoleLogSource> _mockConsoleLogger = new Mock<IWorkerConsoleLogSource>();
4144
private readonly Mock<FunctionRpc.FunctionRpcBase> _mockFunctionRpcService = new Mock<FunctionRpc.FunctionRpcBase>();
@@ -52,6 +55,7 @@ public class GrpcWorkerChannelTests : IDisposable
5255
private readonly IFunctionDataCache _functionDataCache;
5356
private readonly IOptions<WorkerConcurrencyOptions> _workerConcurrencyOptions;
5457
private GrpcWorkerChannel _workerChannel;
58+
private GrpcWorkerChannel _workerChannelwithMockEventManager;
5559

5660
public GrpcWorkerChannelTests()
5761
{
@@ -104,6 +108,23 @@ public GrpcWorkerChannelTests()
104108
_sharedMemoryManager,
105109
_functionDataCache,
106110
_workerConcurrencyOptions);
111+
112+
_eventManagerMock.Setup(proxy => proxy.Publish(It.IsAny<OutboundGrpcEvent>())).Verifiable();
113+
_workerChannelwithMockEventManager = new GrpcWorkerChannel(
114+
_workerId,
115+
_eventManagerMock.Object,
116+
_testWorkerConfig,
117+
_mockrpcWorkerProcess.Object,
118+
_logger,
119+
_metricsLogger,
120+
0,
121+
_testEnvironment,
122+
_hostOptionsMonitor,
123+
_sharedMemoryManager,
124+
_functionDataCache,
125+
_workerConcurrencyOptions);
126+
127+
_testEnvironment.SetEnvironmentVariable("APPLICATIONINSIGHTS_ENABLE_AGENT", "true");
107128
}
108129

109130
public void Dispose()
@@ -730,6 +751,57 @@ public async Task GetLatencies_DoesNot_StartTimer_WhenDynamicConcurrencyDisabled
730751
Assert.True(latencyHistory.Count() == 0);
731752
}
732753

754+
[Fact]
755+
public async Task SendInvocationRequest_ValidateTraceContext()
756+
{
757+
ScriptInvocationContext scriptInvocationContext = GetTestScriptInvocationContext(Guid.NewGuid(), null);
758+
await _workerChannelwithMockEventManager.SendInvocationRequest(scriptInvocationContext);
759+
if (_testEnvironment.IsApplicationInsightsAgentEnabled())
760+
{
761+
_eventManagerMock.Verify(proxy => proxy.Publish(It.Is<OutboundGrpcEvent>(
762+
grpcEvent => grpcEvent.Message.InvocationRequest.TraceContext.Attributes.ContainsKey(ScriptConstants.LogPropertyProcessIdKey)
763+
&& grpcEvent.Message.InvocationRequest.TraceContext.Attributes.ContainsKey(ScriptConstants.LogPropertyHostInstanceIdKey)
764+
&& grpcEvent.Message.InvocationRequest.TraceContext.Attributes.ContainsKey(LogConstants.CategoryNameKey)
765+
&& grpcEvent.Message.InvocationRequest.TraceContext.Attributes[LogConstants.CategoryNameKey].Equals("testcat1")
766+
&& grpcEvent.Message.InvocationRequest.TraceContext.Attributes.Count == 3)));
767+
}
768+
else
769+
{
770+
_eventManagerMock.Verify(proxy => proxy.Publish(It.Is<OutboundGrpcEvent>(
771+
grpcEvent => !grpcEvent.Message.InvocationRequest.TraceContext.Attributes.ContainsKey(ScriptConstants.LogPropertyProcessIdKey)
772+
&& !grpcEvent.Message.InvocationRequest.TraceContext.Attributes.ContainsKey(ScriptConstants.LogPropertyHostInstanceIdKey)
773+
&& !grpcEvent.Message.InvocationRequest.TraceContext.Attributes.ContainsKey(LogConstants.CategoryNameKey))));
774+
}
775+
}
776+
777+
[Fact]
778+
public async Task SendInvocationRequest_ValidateTraceContext_SessionId()
779+
{
780+
string sessionId = "sessionId1234";
781+
Activity activity = new Activity("testActivity");
782+
activity.AddBaggage(ScriptConstants.LiveLogsSessionAIKey, sessionId);
783+
activity.Start();
784+
ScriptInvocationContext scriptInvocationContext = GetTestScriptInvocationContext(Guid.NewGuid(), null);
785+
await _workerChannelwithMockEventManager.SendInvocationRequest(scriptInvocationContext);
786+
activity.Stop();
787+
_eventManagerMock.Verify(p => p.Publish(It.Is<OutboundGrpcEvent>(grpcEvent => ValidateInvocationRequest(grpcEvent, sessionId))));
788+
}
789+
790+
private bool ValidateInvocationRequest(OutboundGrpcEvent grpcEvent, string sessionId)
791+
{
792+
if (_testEnvironment.IsApplicationInsightsAgentEnabled())
793+
{
794+
return grpcEvent.Message.InvocationRequest.TraceContext.Attributes[ScriptConstants.LiveLogsSessionAIKey].Equals(sessionId)
795+
&& grpcEvent.Message.InvocationRequest.TraceContext.Attributes.ContainsKey(LogConstants.CategoryNameKey)
796+
&& grpcEvent.Message.InvocationRequest.TraceContext.Attributes[LogConstants.CategoryNameKey].Equals("testcat1")
797+
&& grpcEvent.Message.InvocationRequest.TraceContext.Attributes.Count == 4;
798+
}
799+
else
800+
{
801+
return !grpcEvent.Message.InvocationRequest.TraceContext.Attributes.ContainsKey(LogConstants.CategoryNameKey);
802+
}
803+
}
804+
733805
private IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime)
734806
{
735807
var metadata1 = new FunctionMetadata()
@@ -739,15 +811,17 @@ private IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime)
739811
};
740812

741813
metadata1.SetFunctionId("TestFunctionId1");
742-
814+
metadata1.Properties.Add(LogConstants.CategoryNameKey, "testcat1");
815+
metadata1.Properties.Add(ScriptConstants.LogPropertyHostInstanceIdKey, "testhostId1");
743816
var metadata2 = new FunctionMetadata()
744817
{
745818
Language = runtime,
746819
Name = "js2",
747820
};
748821

749822
metadata2.SetFunctionId("TestFunctionId2");
750-
823+
metadata2.Properties.Add(LogConstants.CategoryNameKey, "testcat2");
824+
metadata2.Properties.Add(ScriptConstants.LogPropertyHostInstanceIdKey, "testhostId2");
751825
return new List<FunctionMetadata>()
752826
{
753827
metadata1,

test/WebJobs.Script.Tests/Workers/Rpc/RpcFunctionInvocationDispatcherTests.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System.Threading;
99
using System.Threading.Tasks;
1010
using Microsoft.AspNetCore.Hosting;
11+
using Microsoft.Azure.WebJobs.Logging;
1112
using Microsoft.Azure.WebJobs.Script.Description;
1213
using Microsoft.Azure.WebJobs.Script.Diagnostics;
1314
using Microsoft.Azure.WebJobs.Script.Eventing;
@@ -635,6 +636,22 @@ public async Task FunctionDispatcher_DoNot_Restart_ErroredChannels_If_WorkerRunt
635636
Assert.Equal(expectedProcessCount, functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels().Count());
636637
}
637638

639+
[Fact]
640+
public async Task FunctionDispatcher_AdditionalAttributes()
641+
{
642+
int expectedProcessCount = 3;
643+
RpcFunctionInvocationDispatcher functionDispatcher = GetTestFunctionDispatcher(expectedProcessCount, false, runtime: RpcWorkerConstants.JavaLanguageWorkerName, workerIndexing: true);
644+
645+
var functions = GetTestFunctionsList(RpcWorkerConstants.JavaLanguageWorkerName);
646+
await functionDispatcher.FinishInitialization(functions);
647+
foreach (var item in functions)
648+
{
649+
Assert.Equal(item.Properties.Count, 2);
650+
Assert.True(item.Properties.ContainsKey(LogConstants.CategoryNameKey) &&
651+
item.Properties.ContainsKey(ScriptConstants.LogPropertyHostInstanceIdKey));
652+
}
653+
}
654+
638655
[Theory]
639656
[InlineData("node", "node", false, true, true)]
640657
[InlineData("node", "node", true, false, true)]
@@ -719,6 +736,7 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int max
719736
_javaTestChannel = new TestRpcWorkerChannel(Guid.NewGuid().ToString(), "java", eventManager, _testLogger, false);
720737
var optionsMonitor = TestHelpers.CreateOptionsMonitor(workerConfigOptions);
721738

739+
testEnv.SetEnvironmentVariable("APPLICATIONINSIGHTS_ENABLE_AGENT", "true");
722740
return new RpcFunctionInvocationDispatcher(scriptOptions,
723741
metricsLogger.Object,
724742
testEnv,

0 commit comments

Comments
 (0)