Skip to content

Commit c4b8e28

Browse files
authored
Enable log system scope during RPC invocation (#10344)
* Enable log system scope during RPC invocation * Add unit test to verify scope * Verify scope reverts to 'User'
1 parent 257c079 commit c4b8e28

File tree

3 files changed

+100
-6
lines changed

3 files changed

+100
-6
lines changed

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using System.Threading.Tasks;
1111
using System.Threading.Tasks.Dataflow;
1212
using Microsoft.AspNetCore.Hosting;
13+
using Microsoft.Azure.WebJobs.Host.Executors.Internal;
1314
using Microsoft.Azure.WebJobs.Logging;
1415
using Microsoft.Azure.WebJobs.Script.Config;
1516
using Microsoft.Azure.WebJobs.Script.Description;
@@ -403,6 +404,9 @@ public async Task ShutdownAsync()
403404

404405
public async Task InvokeAsync(ScriptInvocationContext invocationContext)
405406
{
407+
// We have entered back into a system scope, ensure our logs are captured as such.
408+
using FunctionInvoker.Scope scope = FunctionInvoker.BeginSystemScope();
409+
406410
// This could throw if no initialized workers are found. Shut down instance and retry.
407411
IEnumerable<IRpcWorkerChannel> workerChannels = await GetInitializedWorkerChannelsAsync(invocationContext.FunctionMetadata.Language ?? _workerRuntime);
408412
var rpcWorkerChannel = _functionDispatcherLoadBalancer.GetLanguageWorkerChannel(workerChannels);

test/WebJobs.Script.Tests/Workers/Rpc/RpcFunctionInvocationDispatcherTests.cs

Lines changed: 95 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
using System.Linq;
88
using System.Threading;
99
using System.Threading.Tasks;
10+
using System.Threading.Tasks.Dataflow;
1011
using Microsoft.AspNetCore.Hosting;
12+
using Microsoft.Azure.WebJobs.Host.Executors.Internal;
1113
using Microsoft.Azure.WebJobs.Script.Config;
1214
using Microsoft.Azure.WebJobs.Script.Description;
1315
using Microsoft.Azure.WebJobs.Script.Diagnostics;
@@ -631,8 +633,92 @@ public async Task FunctionDispatcher_ErroredWebHostChannel()
631633
Assert.True(testLogs.Any(m => m.FormattedMessage.Contains("Removing errored webhost language worker channel for runtime")));
632634
}
633635

634-
private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int maxProcessCountValue = 1, bool addWebhostChannel = false,
635-
Mock<IWebHostRpcWorkerChannelManager> mockwebHostLanguageWorkerChannelManager = null, bool throwOnProcessStartUp = false, TimeSpan? startupIntervals = null, string runtime = null, bool workerIndexing = false, bool placeholder = false)
636+
[Fact]
637+
public async Task FunctionDispatcher_InvokeAsync_SystemScope()
638+
{
639+
FunctionMetadata func1 = new FunctionMetadata()
640+
{
641+
Name = "func1",
642+
Language = "node"
643+
};
644+
var functions = new List<FunctionMetadata>()
645+
{
646+
func1
647+
};
648+
649+
ScriptInvocationContext context = new()
650+
{
651+
FunctionMetadata = func1,
652+
ExecutionContext = new()
653+
{
654+
InvocationId = Guid.NewGuid(),
655+
FunctionName = "func1",
656+
},
657+
ResultSource = new(),
658+
CancellationToken = default,
659+
Logger = _testLogger,
660+
AsyncExecutionContext = System.Threading.ExecutionContext.Capture(),
661+
};
662+
663+
BufferBlock<ScriptInvocationContext> inputBuffer = new();
664+
ActionBlock<ScriptInvocationContext> actionBlock = new(context =>
665+
{
666+
try
667+
{
668+
Assert.Equal(FunctionInvocationScope.System, FunctionInvoker.CurrentScope);
669+
context.ResultSource.TrySetResult(null);
670+
}
671+
catch (Exception ex)
672+
{
673+
context.ResultSource.TrySetException(ex);
674+
}
675+
});
676+
677+
inputBuffer.LinkTo(actionBlock);
678+
679+
Mock<IDictionary<string, BufferBlock<ScriptInvocationContext>>> mockBufferBlocks = new();
680+
mockBufferBlocks.Setup(m => m.TryGetValue(It.IsAny<string>(), out inputBuffer)).Returns(true);
681+
682+
Mock<IRpcWorkerChannel> mockChannel = new();
683+
mockChannel.Setup(m => m.FunctionInputBuffers).Returns(mockBufferBlocks.Object);
684+
685+
IRpcWorkerChannel Create(
686+
string workerRuntime, string language, IMetricsLogger metricsLogger, int attemptCount, IEnumerable<RpcWorkerConfig> workerConfigs)
687+
{
688+
var workerConfig = workerConfigs.SingleOrDefault(p => language.Equals(p.Description.Language, StringComparison.OrdinalIgnoreCase));
689+
return new TestRpcWorkerChannel(
690+
Guid.NewGuid().ToString(), language, null, _testLogger, false, workerConfig: workerConfig)
691+
{
692+
FunctionInputBuffers = mockBufferBlocks.Object,
693+
};
694+
}
695+
696+
Mock<IRpcWorkerChannelFactory> mockChannelFactory = new();
697+
mockChannelFactory.Setup(
698+
m => m.Create(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<IMetricsLogger>(), It.IsAny<int>(), It.IsAny<IEnumerable<RpcWorkerConfig>>()))
699+
.Returns(Create);
700+
701+
RpcFunctionInvocationDispatcher functionDispatcher = GetTestFunctionDispatcher(
702+
runtime: RpcWorkerConstants.NodeLanguageWorkerName, channelFactory: mockChannelFactory.Object);
703+
704+
await functionDispatcher.InitializeAsync(functions);
705+
await WaitForFunctionDispactherStateInitialized(functionDispatcher);
706+
using FunctionInvoker.Scope scope = FunctionInvoker.BeginUserScope();
707+
await functionDispatcher.InvokeAsync(context);
708+
await context.ResultSource.Task;
709+
Assert.Equal(FunctionInvocationScope.User, FunctionInvoker.CurrentScope);
710+
}
711+
712+
private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(
713+
int maxProcessCountValue = 1,
714+
bool addWebhostChannel = false,
715+
Mock<IWebHostRpcWorkerChannelManager> mockwebHostLanguageWorkerChannelManager = null,
716+
bool throwOnProcessStartUp = false,
717+
TimeSpan? startupIntervals = null,
718+
string runtime = null,
719+
bool workerIndexing = false,
720+
bool placeholder = false,
721+
IRpcWorkerChannelFactory channelFactory = null)
636722
{
637723
var eventManager = new ScriptEventManager();
638724
var metricsLogger = new Mock<IMetricsLogger>();
@@ -667,8 +753,10 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int max
667753
WorkerConfigs = TestHelpers.GetTestWorkerConfigs(processCountValue: maxProcessCountValue, processStartupInterval: intervals,
668754
processRestartInterval: intervals, processShutdownTimeout: TimeSpan.FromSeconds(1), workerIndexing: workerIndexing)
669755
};
670-
IRpcWorkerChannelFactory testLanguageWorkerChannelFactory = new TestRpcWorkerChannelFactory(eventManager, _testLogger, scriptOptions.Value.RootScriptPath, throwOnProcessStartUp);
671-
IWebHostRpcWorkerChannelManager testWebHostLanguageWorkerChannelManager = new TestRpcWorkerChannelManager(eventManager, _testLogger, scriptOptions.Value.RootScriptPath, testLanguageWorkerChannelFactory);
756+
757+
channelFactory ??= new TestRpcWorkerChannelFactory(eventManager, _testLogger, scriptOptions.Value.RootScriptPath, throwOnProcessStartUp);
758+
IWebHostRpcWorkerChannelManager testWebHostLanguageWorkerChannelManager = new TestRpcWorkerChannelManager(
759+
eventManager, _testLogger, scriptOptions.Value.RootScriptPath, channelFactory);
672760
IJobHostRpcWorkerChannelManager jobHostLanguageWorkerChannelManager = new JobHostRpcWorkerChannelManager(_testLoggerFactory);
673761

674762
if (addWebhostChannel)
@@ -681,6 +769,8 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int max
681769
}
682770

683771
var mockFunctionDispatcherLoadBalancer = new Mock<IRpcFunctionInvocationDispatcherLoadBalancer>();
772+
mockFunctionDispatcherLoadBalancer.Setup(m => m.GetLanguageWorkerChannel(It.IsAny<IEnumerable<IRpcWorkerChannel>>()))
773+
.Returns((IEnumerable<IRpcWorkerChannel> channels) => channels.FirstOrDefault());
684774
var mockHostMetrics = new Mock<IHostMetrics>();
685775

686776
_javaTestChannel = new TestRpcWorkerChannel(Guid.NewGuid().ToString(), "java", eventManager, _testLogger, false);
@@ -693,7 +783,7 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int max
693783
mockApplicationLifetime.Object,
694784
eventManager,
695785
_testLoggerFactory,
696-
testLanguageWorkerChannelFactory,
786+
channelFactory,
697787
optionsMonitor,
698788
testWebHostLanguageWorkerChannelManager,
699789
jobHostLanguageWorkerChannelManager,

test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public TestRpcWorkerChannel(string workerId, string runtime = null, IScriptEvent
5454

5555
public RpcWorkerConfig WorkerConfig => _workerConfig;
5656

57-
public IDictionary<string, BufferBlock<ScriptInvocationContext>> FunctionInputBuffers => throw new NotImplementedException();
57+
public IDictionary<string, BufferBlock<ScriptInvocationContext>> FunctionInputBuffers { get; set; }
5858

5959
public List<Task> ExecutionContexts => _executionContexts;
6060

0 commit comments

Comments
 (0)