Skip to content

Commit 405a25d

Browse files
authored
Language workers concurrency
1 parent ecb23de commit 405a25d

33 files changed

+1036
-79
lines changed

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using System.Linq;
1111
using System.Reactive.Linq;
1212
using System.Text;
13+
using System.Threading;
1314
using System.Threading.Tasks;
1415
using System.Threading.Tasks.Dataflow;
1516
using Microsoft.Azure.WebJobs.Script.Description;
@@ -40,6 +41,8 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
4041
private readonly IEnvironment _environment;
4142
private readonly IOptionsMonitor<ScriptApplicationHostOptions> _applicationHostOptions;
4243
private readonly ISharedMemoryManager _sharedMemoryManager;
44+
private readonly List<TimeSpan> _workerStatusLatencyHistory = new List<TimeSpan>();
45+
private readonly IOptions<WorkerConcurrencyOptions> _workerConcurrencyOptions;
4346

4447
private IDisposable _functionLoadRequestResponseEvent;
4548
private bool _disposed;
@@ -68,6 +71,9 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
6871
private TimeSpan _functionLoadTimeout = TimeSpan.FromMinutes(10);
6972
private bool _isSharedMemoryDataTransferEnabled;
7073

74+
private object _syncLock = new object();
75+
private System.Timers.Timer _timer;
76+
7177
internal GrpcWorkerChannel(
7278
string workerId,
7379
IScriptEventManager eventManager,
@@ -79,7 +85,8 @@ internal GrpcWorkerChannel(
7985
IEnvironment environment,
8086
IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions,
8187
ISharedMemoryManager sharedMemoryManager,
82-
IFunctionDataCache functionDataCache)
88+
IFunctionDataCache functionDataCache,
89+
IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions)
8390
{
8491
_workerId = workerId;
8592
_eventManager = eventManager;
@@ -91,6 +98,7 @@ internal GrpcWorkerChannel(
9198
_environment = environment;
9299
_applicationHostOptions = applicationHostOptions;
93100
_sharedMemoryManager = sharedMemoryManager;
101+
_workerConcurrencyOptions = workerConcurrencyOptions;
94102

95103
_workerCapabilities = new GrpcCapabilities(_workerChannelLogger);
96104

@@ -134,7 +142,7 @@ public bool IsChannelReadyForInvocations()
134142
return !_disposing && !_disposed && _state.HasFlag(RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
135143
}
136144

137-
public async Task StartWorkerProcessAsync()
145+
public async Task StartWorkerProcessAsync(CancellationToken cancellationToken)
138146
{
139147
_startSubscription = _inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.StartStream)
140148
.Timeout(_workerConfig.CountOptions.ProcessStartupTimeout)
@@ -174,6 +182,12 @@ public async Task<WorkerStatus> GetWorkerStatusAsync()
174182
}
175183
}
176184

185+
workerStatus.IsReady = IsChannelReadyForInvocations();
186+
if (_environment.IsWorkerDynamicConcurrencyEnabled())
187+
{
188+
workerStatus.LatencyHistory = GetLatencies();
189+
}
190+
177191
return workerStatus;
178192
}
179193

@@ -750,6 +764,7 @@ protected virtual void Dispose(bool disposing)
750764
{
751765
_startLatencyMetric?.Dispose();
752766
_startSubscription?.Dispose();
767+
_timer?.Dispose();
753768

754769
// unlink function inputs
755770
foreach (var link in _inputLinks)
@@ -843,5 +858,64 @@ internal bool IsSharedMemoryDataTransferEnabled()
843858
_workerChannelLogger.LogDebug("IsSharedMemoryDataTransferEnabled: {SharedMemoryDataTransferEnabled}", capabilityEnabled);
844859
return capabilityEnabled;
845860
}
861+
862+
internal void EnsureTimerStarted()
863+
{
864+
if (_environment.IsWorkerDynamicConcurrencyEnabled())
865+
{
866+
lock (_syncLock)
867+
{
868+
if (_timer == null)
869+
{
870+
_timer = new System.Timers.Timer()
871+
{
872+
AutoReset = false,
873+
Interval = _workerConcurrencyOptions.Value.CheckInterval.TotalMilliseconds,
874+
};
875+
876+
_timer.Elapsed += OnTimer;
877+
_timer.Start();
878+
}
879+
}
880+
}
881+
}
882+
883+
internal IEnumerable<TimeSpan> GetLatencies()
884+
{
885+
EnsureTimerStarted();
886+
return _workerStatusLatencyHistory;
887+
}
888+
889+
internal async void OnTimer(object sender, System.Timers.ElapsedEventArgs e)
890+
{
891+
if (_disposed)
892+
{
893+
return;
894+
}
895+
896+
try
897+
{
898+
WorkerStatus workerStatus = await GetWorkerStatusAsync();
899+
AddSample(_workerStatusLatencyHistory, workerStatus.Latency);
900+
}
901+
catch
902+
{
903+
// Don't allow background execptions to escape
904+
// E.g. when a rpc channel is shutting down we can process exceptions
905+
}
906+
_timer.Start();
907+
}
908+
909+
private void AddSample<T>(List<T> samples, T sample)
910+
{
911+
lock (_syncLock)
912+
{
913+
if (samples.Count == _workerConcurrencyOptions.Value.HistorySize)
914+
{
915+
samples.RemoveAt(0);
916+
}
917+
samples.Add(sample);
918+
}
919+
}
846920
}
847921
}

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ public class GrpcWorkerChannelFactory : IRpcWorkerChannelFactory
2424
private readonly IOptionsMonitor<ScriptApplicationHostOptions> _applicationHostOptions = null;
2525
private readonly ISharedMemoryManager _sharedMemoryManager = null;
2626
private readonly IFunctionDataCache _functionDataCache = null;
27+
private readonly IOptions<WorkerConcurrencyOptions> _workerConcurrencyOptions;
2728

2829
public GrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment environment, IRpcServer rpcServer, ILoggerFactory loggerFactory, IOptionsMonitor<LanguageWorkerOptions> languageWorkerOptions,
29-
IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions, IRpcWorkerProcessFactory rpcWorkerProcessManager, ISharedMemoryManager sharedMemoryManager, IFunctionDataCache functionDataCache)
30+
IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions, IRpcWorkerProcessFactory rpcWorkerProcessManager, ISharedMemoryManager sharedMemoryManager, IFunctionDataCache functionDataCache, IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions)
3031
{
3132
_eventManager = eventManager;
3233
_loggerFactory = loggerFactory;
@@ -35,6 +36,7 @@ public GrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment e
3536
_applicationHostOptions = applicationHostOptions;
3637
_sharedMemoryManager = sharedMemoryManager;
3738
_functionDataCache = functionDataCache;
39+
_workerConcurrencyOptions = workerConcurrencyOptions;
3840
}
3941

4042
public IRpcWorkerChannel Create(string scriptRootPath, string runtime, IMetricsLogger metricsLogger, int attemptCount, IEnumerable<RpcWorkerConfig> workerConfigs)
@@ -58,7 +60,8 @@ public IRpcWorkerChannel Create(string scriptRootPath, string runtime, IMetricsL
5860
_environment,
5961
_applicationHostOptions,
6062
_sharedMemoryManager,
61-
_functionDataCache);
63+
_functionDataCache,
64+
_workerConcurrencyOptions);
6265
}
6366
}
6467
}

src/WebJobs.Script.WebHost/DependencyInjection/DependencyValidator/DependencyValidator.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
using Microsoft.Azure.WebJobs.Script.Scale;
1313
using Microsoft.Azure.WebJobs.Script.WebHost.Diagnostics;
1414
using Microsoft.Azure.WebJobs.Script.Workers;
15+
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
1516
using Microsoft.Extensions.DependencyInjection;
1617
using Microsoft.Extensions.Hosting;
1718
using Microsoft.Extensions.Logging;
@@ -46,6 +47,7 @@ private static ExpectedDependencyBuilder CreateExpectedDependencies()
4647
.ExpectFactory<IFileMonitoringService>()
4748
.Expect<WorkerConsoleLogService>()
4849
.Expect<FunctionInvocationDispatcherShutdownManager>()
50+
.Expect<WorkerConcurrencyManager>()
4951
.Optional<FunctionsScaleMonitorService>()
5052
.Optional<FuncAppFileProvisioningService>() // Used by powershell.
5153
.Optional<JobHostService>() // Missing when host is offline.

src/WebJobs.Script/Config/HostJsonFileConfigurationSource.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class HostJsonFileConfigurationProvider : ConfigurationProvider
5151
{
5252
"version", "functionTimeout", "retry", "functions", "http", "watchDirectories", "watchFiles", "queues", "serviceBus",
5353
"eventHub", "singleton", "logging", "aggregator", "healthMonitor", "extensionBundle", "managedDependencies",
54-
"customHandler", "httpWorker", "extensions"
54+
"customHandler", "httpWorker", "extensions", "concurrency"
5555
};
5656

5757
private readonly HostJsonFileConfigurationSource _configurationSource;
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.ComponentModel.DataAnnotations;
6+
using Newtonsoft.Json;
7+
using Newtonsoft.Json.Linq;
8+
9+
namespace Microsoft.Azure.WebJobs.Script
10+
{
11+
public class WorkerConcurrencyOptions
12+
{
13+
public WorkerConcurrencyOptions()
14+
{
15+
// Setting default values
16+
LatencyThreshold = TimeSpan.FromMilliseconds(100);
17+
AdjustmentPeriod = TimeSpan.FromSeconds(10);
18+
CheckInterval = TimeSpan.FromSeconds(1);
19+
HistorySize = 10;
20+
NewWorkerThreshold = 0.2F;
21+
MaxWorkerCount = 10;
22+
}
23+
24+
/// <summary>
25+
/// Gets or sets the latency threshold dictating when worker channel is overloaded.
26+
/// </summary>
27+
public TimeSpan LatencyThreshold { get; set; }
28+
29+
/// <summary>
30+
/// Gets or sets the time when worker
31+
/// channels will start to monitor again after adding a new worker.
32+
/// </summary>
33+
public TimeSpan AdjustmentPeriod { get; set; }
34+
35+
/// <summary>
36+
/// Gets or sets interval to check worker channel state.
37+
/// </summary>
38+
public TimeSpan CheckInterval { get; set; }
39+
40+
/// <summary>
41+
/// Gets or sets the history size to store workers channel states.
42+
/// </summary>
43+
public int HistorySize { get; set; }
44+
45+
/// <summary>
46+
/// Gets or sets the threshold dictating when a new worker will be added.
47+
/// Value should be between 0 and 1 indicating the percentage of overloaded channel latency samples required to trigger a addition of a new worker
48+
/// </summary>
49+
[Range(typeof(float), "0F", "1F", ErrorMessage = "Value for {0} must be between {1} and {2}.")]
50+
public float NewWorkerThreshold { get; set; }
51+
52+
/// <summary>
53+
/// Gets or sets the max count of workers.
54+
/// It will be set depending on SKU if equal to 0.
55+
/// </summary>
56+
[Range(typeof(int), "1", "100", ErrorMessage = "Value for {0} must be between {1} and {2}.")]
57+
public int MaxWorkerCount { get; set; }
58+
}
59+
}

src/WebJobs.Script/Environment/EnvironmentExtensions.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,5 +472,14 @@ public static bool DrainOnApplicationStoppingEnabled(this IEnvironment environme
472472
return !string.IsNullOrEmpty(environment.GetEnvironmentVariable(KubernetesServiceHost)) ||
473473
(bool.TryParse(environment.GetEnvironmentVariable(DrainOnApplicationStopping), out bool v) && v);
474474
}
475+
476+
public static bool IsWorkerDynamicConcurrencyEnabled(this IEnvironment environment)
477+
{
478+
if (bool.TryParse(environment.GetEnvironmentVariable(RpcWorkerConstants.FunctionsWorkerDynamicConcurrencyEnabled), out bool concurrencyEnabled))
479+
{
480+
return concurrencyEnabled && string.IsNullOrEmpty(environment.GetEnvironmentVariable(RpcWorkerConstants.FunctionsWorkerProcessCountSettingName));
481+
}
482+
return false;
483+
}
475484
}
476485
}

src/WebJobs.Script/ScriptHostBuilderExtensions.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ public static IHostBuilder AddScriptHostCore(this IHostBuilder builder, ScriptAp
251251
services.AddSingleton<IScriptJobHost>(p => p.GetRequiredService<ScriptHost>());
252252
services.AddSingleton<IJobHost>(p => p.GetRequiredService<ScriptHost>());
253253
services.AddSingleton<IFunctionProvider, ProxyFunctionProvider>();
254+
services.AddSingleton<IHostedService, WorkerConcurrencyManager>();
254255

255256
services.AddSingleton<ITypeLocator, ScriptTypeLocator>();
256257
services.AddSingleton<ScriptSettingsManager>();
@@ -271,6 +272,7 @@ public static IHostBuilder AddScriptHostCore(this IHostBuilder builder, ScriptAp
271272
services.ConfigureOptions<JobHostFunctionTimeoutOptionsSetup>();
272273
// LanguageWorkerOptionsSetup should be registered in WebHostServiceCollection as well to enable starting worker processing in placeholder mode.
273274
services.ConfigureOptions<LanguageWorkerOptionsSetup>();
275+
services.AddOptions<WorkerConcurrencyOptions>();
274276
services.ConfigureOptions<HttpWorkerOptionsSetup>();
275277
services.ConfigureOptions<ManagedDependencyOptionsSetup>();
276278
services.AddOptions<FunctionResultAggregatorOptions>()
@@ -343,6 +345,7 @@ public static void AddCommonServices(IServiceCollection services)
343345
services.TryAddSingleton<IEnvironment>(SystemEnvironment.Instance);
344346
services.TryAddSingleton<HostPerformanceManager>();
345347
services.ConfigureOptions<HostHealthMonitorOptionsSetup>();
348+
346349
AddProcessRegistry(services);
347350
}
348351

src/WebJobs.Script/Utility.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using System.Collections.Generic;
66
using System.Collections.ObjectModel;
7+
using System.Collections.Specialized;
78
using System.Dynamic;
89
using System.Globalization;
910
using System.IO;
@@ -261,6 +262,11 @@ public static IReadOnlyDictionary<string, string> ToStringValues(this IReadOnlyD
261262
return data.ToDictionary(p => p.Key, p => p.Value != null ? p.Value.ToString() : null, StringComparer.OrdinalIgnoreCase);
262263
}
263264

265+
public static string GetValueOrNull(this StringDictionary dictionary, string key)
266+
{
267+
return dictionary.ContainsKey(key) ? dictionary[key] : null;
268+
}
269+
264270
// "Namespace.Class.Method" --> "Method"
265271
public static string GetFunctionShortName(string functionName)
266272
{

src/WebJobs.Script/Workers/FunctionInvocationDispatcherFactory.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ public FunctionInvocationDispatcherFactory(IOptions<ScriptJobHostOptions> script
3030
IWebHostRpcWorkerChannelManager webHostLanguageWorkerChannelManager,
3131
IJobHostRpcWorkerChannelManager jobHostLanguageWorkerChannelManager,
3232
IOptions<ManagedDependencyOptions> managedDependencyOptions,
33-
IRpcFunctionInvocationDispatcherLoadBalancer functionDispatcherLoadBalancer)
33+
IRpcFunctionInvocationDispatcherLoadBalancer functionDispatcherLoadBalancer,
34+
IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions)
3435
{
3536
if (httpWorkerOptions.Value == null)
3637
{
@@ -53,7 +54,8 @@ public FunctionInvocationDispatcherFactory(IOptions<ScriptJobHostOptions> script
5354
webHostLanguageWorkerChannelManager,
5455
jobHostLanguageWorkerChannelManager,
5556
managedDependencyOptions,
56-
functionDispatcherLoadBalancer);
57+
functionDispatcherLoadBalancer,
58+
workerConcurrencyOptions);
5759
}
5860

5961
public IFunctionInvocationDispatcher GetFunctionDispatcher() => _functionDispatcher;

src/WebJobs.Script/Workers/Http/HttpFunctionInvocationDispatcher.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ public HttpFunctionInvocationDispatcher(IOptions<ScriptJobHostOptions> scriptHos
5959
.Subscribe(WorkerRestart);
6060
}
6161

62+
// For tests
63+
internal HttpFunctionInvocationDispatcher()
64+
{
65+
}
66+
6267
public FunctionInvocationDispatcherState State { get; private set; }
6368

6469
public int ErrorEventsThreshold { get; private set; }
@@ -119,6 +124,12 @@ public async void WorkerRestart(HttpWorkerRestartEvent workerRestart)
119124
}
120125
}
121126

127+
public Task StartWorkerChannel()
128+
{
129+
// currently only one worker
130+
return Task.CompletedTask;
131+
}
132+
122133
private async Task DisposeAndRestartWorkerChannel(string workerId)
123134
{
124135
// Since we only have one HTTP worker process, as soon as we dispose it, InvokeAsync will fail. Set state to

0 commit comments

Comments
 (0)