Skip to content

Commit f5b03de

Browse files
jviaukshyju
andauthored
[in-proc backport] Enable log system scope during RPC invocation (#10344) (#10358)
* Enable log system scope during RPC invocation (#10344) * Enable log system scope during RPC invocation * Add unit test to verify scope * Verify scope reverts to 'User' * Remove usage of IHostMetrics type (in test) which does not exist in in-proc branch --------- Co-authored-by: Shyju Krishnankutty <[email protected]>
1 parent ce328da commit f5b03de

File tree

3 files changed

+100
-6
lines changed

3 files changed

+100
-6
lines changed

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using System.Threading.Tasks;
1111
using System.Threading.Tasks.Dataflow;
1212
using Microsoft.AspNetCore.Hosting;
13+
using Microsoft.Azure.WebJobs.Host.Executors.Internal;
1314
using Microsoft.Azure.WebJobs.Logging;
1415
using Microsoft.Azure.WebJobs.Script.Config;
1516
using Microsoft.Azure.WebJobs.Script.Description;
@@ -399,6 +400,9 @@ public async Task ShutdownAsync()
399400

400401
public async Task InvokeAsync(ScriptInvocationContext invocationContext)
401402
{
403+
// We have entered back into a system scope, ensure our logs are captured as such.
404+
using FunctionInvoker.Scope scope = FunctionInvoker.BeginSystemScope();
405+
402406
// This could throw if no initialized workers are found. Shut down instance and retry.
403407
IEnumerable<IRpcWorkerChannel> workerChannels = await GetInitializedWorkerChannelsAsync(invocationContext.FunctionMetadata.Language ?? _workerRuntime);
404408
var rpcWorkerChannel = _functionDispatcherLoadBalancer.GetLanguageWorkerChannel(workerChannels);

test/WebJobs.Script.Tests/Workers/Rpc/RpcFunctionInvocationDispatcherTests.cs

Lines changed: 95 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
using System.Linq;
88
using System.Threading;
99
using System.Threading.Tasks;
10+
using System.Threading.Tasks.Dataflow;
1011
using Microsoft.AspNetCore.Hosting;
12+
using Microsoft.Azure.WebJobs.Host.Executors.Internal;
1113
using Microsoft.Azure.WebJobs.Script.Config;
1214
using Microsoft.Azure.WebJobs.Script.Description;
1315
using Microsoft.Azure.WebJobs.Script.Diagnostics;
@@ -630,8 +632,92 @@ public async Task FunctionDispatcher_ErroredWebHostChannel()
630632
Assert.True(testLogs.Any(m => m.FormattedMessage.Contains("Removing errored webhost language worker channel for runtime")));
631633
}
632634

633-
private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int maxProcessCountValue = 1, bool addWebhostChannel = false,
634-
Mock<IWebHostRpcWorkerChannelManager> mockwebHostLanguageWorkerChannelManager = null, bool throwOnProcessStartUp = false, TimeSpan? startupIntervals = null, string runtime = null, bool workerIndexing = false, bool placeholder = false)
635+
[Fact]
636+
public async Task FunctionDispatcher_InvokeAsync_SystemScope()
637+
{
638+
FunctionMetadata func1 = new FunctionMetadata()
639+
{
640+
Name = "func1",
641+
Language = "node"
642+
};
643+
var functions = new List<FunctionMetadata>()
644+
{
645+
func1
646+
};
647+
648+
ScriptInvocationContext context = new()
649+
{
650+
FunctionMetadata = func1,
651+
ExecutionContext = new()
652+
{
653+
InvocationId = Guid.NewGuid(),
654+
FunctionName = "func1",
655+
},
656+
ResultSource = new(),
657+
CancellationToken = default,
658+
Logger = _testLogger,
659+
AsyncExecutionContext = System.Threading.ExecutionContext.Capture(),
660+
};
661+
662+
BufferBlock<ScriptInvocationContext> inputBuffer = new();
663+
ActionBlock<ScriptInvocationContext> actionBlock = new(context =>
664+
{
665+
try
666+
{
667+
Assert.Equal(FunctionInvocationScope.System, FunctionInvoker.CurrentScope);
668+
context.ResultSource.TrySetResult(null);
669+
}
670+
catch (Exception ex)
671+
{
672+
context.ResultSource.TrySetException(ex);
673+
}
674+
});
675+
676+
inputBuffer.LinkTo(actionBlock);
677+
678+
Mock<IDictionary<string, BufferBlock<ScriptInvocationContext>>> mockBufferBlocks = new();
679+
mockBufferBlocks.Setup(m => m.TryGetValue(It.IsAny<string>(), out inputBuffer)).Returns(true);
680+
681+
Mock<IRpcWorkerChannel> mockChannel = new();
682+
mockChannel.Setup(m => m.FunctionInputBuffers).Returns(mockBufferBlocks.Object);
683+
684+
IRpcWorkerChannel Create(
685+
string workerRuntime, string language, IMetricsLogger metricsLogger, int attemptCount, IEnumerable<RpcWorkerConfig> workerConfigs)
686+
{
687+
var workerConfig = workerConfigs.SingleOrDefault(p => language.Equals(p.Description.Language, StringComparison.OrdinalIgnoreCase));
688+
return new TestRpcWorkerChannel(
689+
Guid.NewGuid().ToString(), language, null, _testLogger, false, workerConfig: workerConfig)
690+
{
691+
FunctionInputBuffers = mockBufferBlocks.Object,
692+
};
693+
}
694+
695+
Mock<IRpcWorkerChannelFactory> mockChannelFactory = new();
696+
mockChannelFactory.Setup(
697+
m => m.Create(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<IMetricsLogger>(), It.IsAny<int>(), It.IsAny<IEnumerable<RpcWorkerConfig>>()))
698+
.Returns(Create);
699+
700+
RpcFunctionInvocationDispatcher functionDispatcher = GetTestFunctionDispatcher(
701+
runtime: RpcWorkerConstants.NodeLanguageWorkerName, channelFactory: mockChannelFactory.Object);
702+
703+
await functionDispatcher.InitializeAsync(functions);
704+
await WaitForFunctionDispactherStateInitialized(functionDispatcher);
705+
using FunctionInvoker.Scope scope = FunctionInvoker.BeginUserScope();
706+
await functionDispatcher.InvokeAsync(context);
707+
await context.ResultSource.Task;
708+
Assert.Equal(FunctionInvocationScope.User, FunctionInvoker.CurrentScope);
709+
}
710+
711+
private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(
712+
int maxProcessCountValue = 1,
713+
bool addWebhostChannel = false,
714+
Mock<IWebHostRpcWorkerChannelManager> mockwebHostLanguageWorkerChannelManager = null,
715+
bool throwOnProcessStartUp = false,
716+
TimeSpan? startupIntervals = null,
717+
string runtime = null,
718+
bool workerIndexing = false,
719+
bool placeholder = false,
720+
IRpcWorkerChannelFactory channelFactory = null)
635721
{
636722
var eventManager = new ScriptEventManager();
637723
var metricsLogger = new Mock<IMetricsLogger>();
@@ -666,8 +752,10 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int max
666752
WorkerConfigs = TestHelpers.GetTestWorkerConfigs(processCountValue: maxProcessCountValue, processStartupInterval: intervals,
667753
processRestartInterval: intervals, processShutdownTimeout: TimeSpan.FromSeconds(1), workerIndexing: workerIndexing)
668754
};
669-
IRpcWorkerChannelFactory testLanguageWorkerChannelFactory = new TestRpcWorkerChannelFactory(eventManager, _testLogger, scriptOptions.Value.RootScriptPath, throwOnProcessStartUp);
670-
IWebHostRpcWorkerChannelManager testWebHostLanguageWorkerChannelManager = new TestRpcWorkerChannelManager(eventManager, _testLogger, scriptOptions.Value.RootScriptPath, testLanguageWorkerChannelFactory);
755+
756+
channelFactory ??= new TestRpcWorkerChannelFactory(eventManager, _testLogger, scriptOptions.Value.RootScriptPath, throwOnProcessStartUp);
757+
IWebHostRpcWorkerChannelManager testWebHostLanguageWorkerChannelManager = new TestRpcWorkerChannelManager(
758+
eventManager, _testLogger, scriptOptions.Value.RootScriptPath, channelFactory);
671759
IJobHostRpcWorkerChannelManager jobHostLanguageWorkerChannelManager = new JobHostRpcWorkerChannelManager(_testLoggerFactory);
672760

673761
if (addWebhostChannel)
@@ -680,6 +768,8 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int max
680768
}
681769

682770
var mockFunctionDispatcherLoadBalancer = new Mock<IRpcFunctionInvocationDispatcherLoadBalancer>();
771+
mockFunctionDispatcherLoadBalancer.Setup(m => m.GetLanguageWorkerChannel(It.IsAny<IEnumerable<IRpcWorkerChannel>>()))
772+
.Returns((IEnumerable<IRpcWorkerChannel> channels) => channels.FirstOrDefault());
683773

684774
_javaTestChannel = new TestRpcWorkerChannel(Guid.NewGuid().ToString(), "java", eventManager, _testLogger, false);
685775
var optionsMonitor = TestHelpers.CreateOptionsMonitor(workerConfigOptions);
@@ -691,7 +781,7 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int max
691781
mockApplicationLifetime.Object,
692782
eventManager,
693783
_testLoggerFactory,
694-
testLanguageWorkerChannelFactory,
784+
channelFactory,
695785
optionsMonitor,
696786
testWebHostLanguageWorkerChannelManager,
697787
jobHostLanguageWorkerChannelManager,

test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public TestRpcWorkerChannel(string workerId, string runtime = null, IScriptEvent
5454

5555
public RpcWorkerConfig WorkerConfig => _workerConfig;
5656

57-
public IDictionary<string, BufferBlock<ScriptInvocationContext>> FunctionInputBuffers => throw new NotImplementedException();
57+
public IDictionary<string, BufferBlock<ScriptInvocationContext>> FunctionInputBuffers { get; set; }
5858

5959
public List<Task> ExecutionContexts => _executionContexts;
6060

0 commit comments

Comments
 (0)