Skip to content

Commit ce328da

Browse files
kshyjufabiocav
andauthored
Perf optimizations with function ID resolution (#9959) (#10447)
* Perf optimizations with function ID resolution * Minor perf and code improvements to RpcFunctionInvocationDispatcher Co-authored-by: Fabio Cavalcante <[email protected]>
1 parent b7a8d0c commit ce328da

File tree

3 files changed

+37
-21
lines changed

3 files changed

+37
-21
lines changed

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -586,10 +586,11 @@ public void SetupFunctionInvocationBuffers(IEnumerable<FunctionMetadata> functio
586586
_functions = functions;
587587
foreach (FunctionMetadata metadata in functions)
588588
{
589-
_workerChannelLogger.LogDebug("Setting up FunctionInvocationBuffer for function: '{functionName}' with functionId: '{functionId}'", metadata.Name, metadata.GetFunctionId());
590-
_functionInputBuffers[metadata.GetFunctionId()] = new BufferBlock<ScriptInvocationContext>();
589+
string functionId = metadata.GetFunctionId();
590+
_workerChannelLogger.LogDebug("Setting up FunctionInvocationBuffer for function: '{functionName}' with functionId: '{functionId}'", metadata.Name, functionId);
591+
_functionInputBuffers[functionId] = new BufferBlock<ScriptInvocationContext>();
591592
}
592-
_state = _state | RpcWorkerChannelState.InvocationBuffersInitialized;
593+
_state |= RpcWorkerChannelState.InvocationBuffersInitialized;
593594
}
594595

595596
public void SendFunctionLoadRequests(ManagedDependencyOptions managedDependencyOptions, TimeSpan? functionTimeout)
@@ -742,7 +743,11 @@ internal FunctionEnvironmentReloadRequest GetFunctionEnvironmentReloadRequest(ID
742743
internal void SendFunctionLoadRequest(FunctionMetadata metadata, ManagedDependencyOptions managedDependencyOptions)
743744
{
744745
_functionLoadRequestResponseEvent = _metricsLogger.LatencyEvent(MetricEventNames.FunctionLoadRequestResponse);
745-
_workerChannelLogger.LogDebug("Sending FunctionLoadRequest for function: '{functionName}' with functionId: '{functionId}'", metadata.Name, metadata.GetFunctionId());
746+
747+
if (_workerChannelLogger.IsEnabled(LogLevel.Debug))
748+
{
749+
_workerChannelLogger.LogDebug("Sending FunctionLoadRequest for function: '{functionName}' with functionId: '{functionId}'", metadata.Name, metadata.GetFunctionId());
750+
}
746751

747752
// send a load request for the registered function
748753
SendStreamingMessage(new StreamingMessage
@@ -838,20 +843,21 @@ internal async Task SendInvocationRequest(ScriptInvocationContext context)
838843
{
839844
try
840845
{
841-
var invocationId = context.ExecutionContext.InvocationId.ToString();
846+
string invocationId = context.ExecutionContext.InvocationId.ToString();
847+
string functionId = context.FunctionMetadata.GetFunctionId();
842848

843849
// do not send an invocation request for functions that failed to load or could not be indexed by the worker
844-
if (_functionLoadErrors.ContainsKey(context.FunctionMetadata.GetFunctionId()))
850+
if (_functionLoadErrors.TryGetValue(functionId, out Exception exception))
845851
{
846852
_workerChannelLogger.LogDebug("Function {functionName} failed to load", context.FunctionMetadata.Name);
847-
context.ResultSource.TrySetException(_functionLoadErrors[context.FunctionMetadata.GetFunctionId()]);
853+
context.ResultSource.TrySetException(exception);
848854
RemoveExecutingInvocation(invocationId);
849855
return;
850856
}
851-
else if (_metadataRequestErrors.ContainsKey(context.FunctionMetadata.GetFunctionId()))
857+
else if (_metadataRequestErrors.TryGetValue(functionId, out exception))
852858
{
853859
_workerChannelLogger.LogDebug("Worker failed to load metadata for {functionName}", context.FunctionMetadata.Name);
854-
context.ResultSource.TrySetException(_metadataRequestErrors[context.FunctionMetadata.GetFunctionId()]);
860+
context.ResultSource.TrySetException(exception);
855861
RemoveExecutingInvocation(invocationId);
856862
return;
857863
}

src/WebJobs.Script/Extensions/FunctionMetadataExtensions.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using System;
5+
using System.Collections.Generic;
56
using System.Linq;
67
using Microsoft.Azure.WebJobs.Script.Description;
78
using Microsoft.Azure.WebJobs.Script.Extensions;
@@ -92,13 +93,16 @@ public static string GetFunctionGroup(this FunctionMetadata metadata)
9293

9394
public static string GetFunctionId(this FunctionMetadata metadata)
9495
{
95-
if (!metadata.Properties.TryGetValue(FunctionIdKey, out object idObj)
96-
|| !(idObj is string))
96+
if (metadata.Properties.TryGetValue(FunctionIdKey, out object idObj)
97+
&& idObj is string id)
9798
{
98-
metadata.Properties[FunctionIdKey] = Guid.NewGuid().ToString();
99+
return id;
99100
}
100101

101-
return metadata.Properties[FunctionIdKey] as string;
102+
id = Guid.NewGuid().ToString();
103+
return metadata.Properties.TryAdd(FunctionIdKey, id)
104+
? id
105+
: metadata.Properties[FunctionIdKey] as string;
102106
}
103107

104108
internal static void SetFunctionId(this FunctionMetadata metadata, string functionId)

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
using Microsoft.Azure.WebJobs.Script.ManagedDependencies;
1919
using Microsoft.Extensions.Logging;
2020
using Microsoft.Extensions.Options;
21-
2221
using FunctionMetadata = Microsoft.Azure.WebJobs.Script.Description.FunctionMetadata;
2322

2423
namespace Microsoft.Azure.WebJobs.Script.Workers.Rpc
@@ -196,7 +195,7 @@ private void SetDispatcherStateToInitialized(IDictionary<string, TaskCompletionS
196195
// RanToCompletion indicates successful process startup
197196
if (State != FunctionInvocationDispatcherState.Initialized
198197
&& webhostLanguageWorkerChannel != null
199-
&& webhostLanguageWorkerChannel.Where(a => a.Value.Task.Status == TaskStatus.RanToCompletion).Any())
198+
&& webhostLanguageWorkerChannel.Any(a => a.Value.Task.Status == TaskStatus.RanToCompletion))
200199
{
201200
SetFunctionDispatcherStateToInitializedAndLog();
202201
}
@@ -253,7 +252,7 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, Cance
253252
return;
254253
}
255254

256-
_workerRuntime = _workerRuntime ?? Utility.GetWorkerRuntime(functions, _environment);
255+
_workerRuntime ??= Utility.GetWorkerRuntime(functions, _environment);
257256

258257
// In case of multi language runtime, _workerRuntime has no significance, thus skipping this check for multi language runtime environment
259258
if ((string.IsNullOrEmpty(_workerRuntime) || _workerRuntime.Equals(RpcWorkerConstants.DotNetLanguageWorkerName, StringComparison.InvariantCultureIgnoreCase)) && !_environment.IsMultiLanguageRuntimeEnvironment())
@@ -294,7 +293,7 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, Cance
294293
return;
295294
}
296295

297-
_functions = functions ?? new List<FunctionMetadata>();
296+
_functions = functions;
298297

299298
if (_environment.IsMultiLanguageRuntimeEnvironment())
300299
{
@@ -351,6 +350,7 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, Cance
351350
}
352351
}
353352
}
353+
354354
AddLogUserCategory(functions);
355355
}
356356

@@ -402,10 +402,15 @@ public async Task InvokeAsync(ScriptInvocationContext invocationContext)
402402
// This could throw if no initialized workers are found. Shut down instance and retry.
403403
IEnumerable<IRpcWorkerChannel> workerChannels = await GetInitializedWorkerChannelsAsync(invocationContext.FunctionMetadata.Language ?? _workerRuntime);
404404
var rpcWorkerChannel = _functionDispatcherLoadBalancer.GetLanguageWorkerChannel(workerChannels);
405-
if (rpcWorkerChannel.FunctionInputBuffers.TryGetValue(invocationContext.FunctionMetadata.GetFunctionId(), out BufferBlock<ScriptInvocationContext> bufferBlock))
405+
string functionId = invocationContext.FunctionMetadata.GetFunctionId();
406+
if (rpcWorkerChannel.FunctionInputBuffers.TryGetValue(functionId, out BufferBlock<ScriptInvocationContext> bufferBlock))
406407
{
407-
_logger.LogTrace("Posting invocation id:{InvocationId} on workerId:{workerChannelId}", invocationContext.ExecutionContext.InvocationId, rpcWorkerChannel.Id);
408-
rpcWorkerChannel.FunctionInputBuffers[invocationContext.FunctionMetadata.GetFunctionId()].Post(invocationContext);
408+
if (_logger.IsEnabled(LogLevel.Trace))
409+
{
410+
_logger.LogTrace("Posting invocation id:{InvocationId} on workerId:{workerChannelId}", invocationContext.ExecutionContext.InvocationId, rpcWorkerChannel.Id);
411+
}
412+
413+
bufferBlock.Post(invocationContext);
409414
}
410415
else
411416
{
@@ -539,11 +544,12 @@ private async Task DisposeAndRestartWorkerChannel(string runtime, string workerI
539544
if (ShouldRestartWorkerChannel(runtime, isWebHostChannelDisposed, isJobHostChannelDisposed))
540545
{
541546
// Set state to "WorkerProcessRestarting" if there are no other workers to handle work
542-
if ((await GetInitializedWorkerChannelsAsync()).Count() == 0)
547+
if (!(await GetInitializedWorkerChannelsAsync()).Any())
543548
{
544549
State = FunctionInvocationDispatcherState.WorkerProcessRestarting;
545550
_logger.LogDebug("No initialized worker channels for runtime '{runtime}'. Delaying future invocations", runtime);
546551
}
552+
547553
// Restart worker channel
548554
_logger.LogDebug("Restarting worker channel for runtime: '{runtime}'", runtime);
549555
await StartWorkerChannel(runtime);

0 commit comments

Comments
 (0)