Skip to content

Commit 6d5f9a4

Browse files
authored
[Backport] Ensure _maxProcessCount is always initialized (#8359)
* backport maxprocesscount * remove old maxproccount prop
1 parent 45f5ceb commit 6d5f9a4

File tree

2 files changed

+72
-22
lines changed

2 files changed

+72
-22
lines changed

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ internal class RpcFunctionInvocationDispatcher : IFunctionInvocationDispatcher
3131
private readonly SemaphoreSlim _startWorkerProcessLock = new SemaphoreSlim(1, 1);
3232
private readonly TimeSpan _thresholdBetweenRestarts = TimeSpan.FromMinutes(WorkerConstants.WorkerRestartErrorIntervalThresholdInMinutes);
3333
private readonly IOptions<WorkerConcurrencyOptions> _workerConcurrencyOptions;
34+
private readonly IEnumerable<RpcWorkerConfig> _workerConfigs;
35+
private readonly Lazy<Task<int>> _maxProcessCount;
3436

3537
private IScriptEventManager _eventManager;
36-
private IEnumerable<RpcWorkerConfig> _workerConfigs;
3738
private IWebHostRpcWorkerChannelManager _webHostLanguageWorkerChannelManager;
3839
private IJobHostRpcWorkerChannelManager _jobHostLanguageWorkerChannelManager;
3940
private IDisposable _workerErrorSubscription;
4041
private IDisposable _workerRestartSubscription;
4142
private ScriptJobHostOptions _scriptOptions;
42-
private int _maxProcessCount;
4343
private IRpcFunctionInvocationDispatcherLoadBalancer _functionDispatcherLoadBalancer;
4444
private bool _disposed = false;
4545
private bool _disposing = false;
@@ -71,12 +71,12 @@ public RpcFunctionInvocationDispatcher(IOptions<ScriptJobHostOptions> scriptHost
7171
{
7272
_metricsLogger = metricsLogger;
7373
_scriptOptions = scriptHostOptions.Value;
74-
_environment = environment;
74+
_environment = environment ?? throw new ArgumentNullException(nameof(environment));
7575
_applicationLifetime = applicationLifetime;
7676
_webHostLanguageWorkerChannelManager = webHostLanguageWorkerChannelManager;
7777
_jobHostLanguageWorkerChannelManager = jobHostLanguageWorkerChannelManager;
7878
_eventManager = eventManager;
79-
_workerConfigs = languageWorkerOptions.CurrentValue.WorkerConfigs;
79+
_workerConfigs = languageWorkerOptions?.CurrentValue?.WorkerConfigs ?? throw new ArgumentNullException(nameof(languageWorkerOptions));
8080
_managedDependencyOptions = managedDependencyOptions ?? throw new ArgumentNullException(nameof(managedDependencyOptions));
8181
_logger = loggerFactory.CreateLogger<RpcFunctionInvocationDispatcher>();
8282
_rpcWorkerChannelFactory = rpcWorkerChannelFactory;
@@ -86,13 +86,13 @@ public RpcFunctionInvocationDispatcher(IOptions<ScriptJobHostOptions> scriptHost
8686
_workerIndexing = Utility.CanWorkerIndex(_workerConfigs, _environment);
8787
State = FunctionInvocationDispatcherState.Default;
8888

89-
_workerErrorSubscription = _eventManager.OfType<WorkerErrorEvent>()
90-
.Subscribe(WorkerError);
91-
_workerRestartSubscription = _eventManager.OfType<WorkerRestartEvent>()
92-
.Subscribe(WorkerRestart);
89+
_workerErrorSubscription = _eventManager.OfType<WorkerErrorEvent>().Subscribe(WorkerError);
90+
_workerRestartSubscription = _eventManager.OfType<WorkerRestartEvent>().Subscribe(WorkerRestart);
9391

9492
_shutdownStandbyWorkerChannels = ShutdownWebhostLanguageWorkerChannels;
9593
_shutdownStandbyWorkerChannels = _shutdownStandbyWorkerChannels.Debounce(milliseconds: 5000);
94+
95+
_maxProcessCount = new Lazy<Task<int>>(GetMaxProcessCount);
9696
}
9797

9898
public FunctionInvocationDispatcherState State { get; private set; }
@@ -101,12 +101,27 @@ public RpcFunctionInvocationDispatcher(IOptions<ScriptJobHostOptions> scriptHost
101101

102102
public IJobHostRpcWorkerChannelManager JobHostLanguageWorkerChannelManager => _jobHostLanguageWorkerChannelManager;
103103

104-
internal ConcurrentStack<WorkerErrorEvent> LanguageWorkerErrors => _languageWorkerErrors;
104+
internal Task<int> MaxProcessCount => _maxProcessCount.Value;
105105

106-
internal int MaxProcessCount => _maxProcessCount;
106+
internal ConcurrentStack<WorkerErrorEvent> LanguageWorkerErrors => _languageWorkerErrors;
107107

108108
internal IWebHostRpcWorkerChannelManager WebHostLanguageWorkerChannelManager => _webHostLanguageWorkerChannelManager;
109109

110+
private async Task<int> GetMaxProcessCount()
111+
{
112+
if (_workerConcurrencyOptions != null && !string.IsNullOrEmpty(_workerRuntime))
113+
{
114+
var workerConfig = _workerConfigs.Where(c => c.Description.Language.Equals(_workerRuntime, StringComparison.InvariantCultureIgnoreCase))
115+
.FirstOrDefault();
116+
if (workerConfig != null)
117+
{
118+
return _environment.IsWorkerDynamicConcurrencyEnabled() ? _workerConcurrencyOptions.Value.MaxWorkerCount : workerConfig.CountOptions.ProcessCount;
119+
}
120+
}
121+
122+
return (await GetAllWorkerChannelsAsync()).Count();
123+
}
124+
110125
internal async Task InitializeJobhostLanguageWorkerChannelAsync()
111126
{
112127
await InitializeJobhostLanguageWorkerChannelAsync(0);
@@ -131,7 +146,7 @@ internal async Task InitializeJobhostLanguageWorkerChannelAsync(int attemptCount
131146
private void SetFunctionDispatcherStateToInitializedAndLog()
132147
{
133148
State = FunctionInvocationDispatcherState.Initialized;
134-
// Do not change this log message. Vs Code relies on this to figure out when to attach debuger to the worker process.
149+
// Do not change this log message. Vs Code relies on this to figure out when to attach debugger to the worker process.
135150
_logger.LogInformation("Worker process started and initialized.");
136151
}
137152

@@ -170,7 +185,7 @@ private void StartWorkerProcesses(int startIndex, Func<Task> startAction, bool i
170185
{
171186
Task.Run(async () =>
172187
{
173-
for (var count = startIndex; count < _maxProcessCount
188+
for (var count = startIndex; count < (await _maxProcessCount.Value)
174189
&& !_processStartCancellationToken.IsCancellationRequested; count++)
175190
{
176191
if (_environment.IsWorkerDynamicConcurrencyEnabled() && count > 0)
@@ -229,6 +244,7 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, Cance
229244
}
230245

231246
var workerConfig = _workerConfigs.Where(c => c.Description.Language.Equals(_workerRuntime, StringComparison.InvariantCultureIgnoreCase)).FirstOrDefault();
247+
232248
if (workerConfig == null && (functions == null || functions.Count() == 0))
233249
{
234250
// Only throw if workerConfig is null AND some functions have been found.
@@ -239,16 +255,15 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, Cance
239255
if ((functions == null || functions.Count() == 0) && !_workerIndexing)
240256
{
241257
// do not initialize function dispatcher if there are no functions, unless the worker is indexing
242-
_logger.LogDebug("RpcFunctionInvocationDispatcher received no functions");
258+
_logger.LogDebug($"{nameof(RpcFunctionInvocationDispatcher)} received no functions");
243259
return;
244260
}
245261

246262
_functions = functions ?? new List<FunctionMetadata>();
247-
_maxProcessCount = workerConfig.CountOptions.ProcessCount;
248263
_processStartupInterval = workerConfig.CountOptions.ProcessStartupInterval;
249264
_restartWait = workerConfig.CountOptions.ProcessRestartInterval;
250265
_shutdownTimeout = workerConfig.CountOptions.ProcessShutdownTimeout;
251-
ErrorEventsThreshold = 3 * _maxProcessCount;
266+
ErrorEventsThreshold = 3 * (await _maxProcessCount.Value);
252267

253268
if (Utility.IsSupportedRuntime(_workerRuntime, _workerConfigs))
254269
{
@@ -419,10 +434,11 @@ internal async Task<IEnumerable<IRpcWorkerChannel>> GetInitializedWorkerChannels
419434
{
420435
IEnumerable<IRpcWorkerChannel> workerChannels = await GetAllWorkerChannelsAsync();
421436
IEnumerable<IRpcWorkerChannel> initializedWorkers = workerChannels.Where(ch => ch.IsChannelReadyForInvocations());
422-
int workerCount = _environment.IsWorkerDynamicConcurrencyEnabled() ? _workerConcurrencyOptions.Value.MaxWorkerCount : _maxProcessCount;
437+
int workerCount = await _maxProcessCount.Value;
438+
423439
if (initializedWorkers.Count() > workerCount)
424440
{
425-
throw new InvalidOperationException($"Number of initialized language workers exceeded:{initializedWorkers.Count()} exceeded maxProcessCount: {_maxProcessCount}");
441+
throw new InvalidOperationException($"Number of initialized language workers exceeded:{initializedWorkers.Count()} exceeded maxProcessCount: {workerCount}");
426442
}
427443

428444
return initializedWorkers;

test/WebJobs.Script.Tests/Workers/Rpc/RpcFunctionInvocationDispatcherTests.cs

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,32 @@ public async Task ShutdownTests_WithInfinitelyRunningTasks_Timesout()
279279
}
280280
}
281281

282+
[Fact]
283+
public async Task FunctionDispatcherState_PlaceholderMode_SingleWebHostChannel()
284+
{
285+
// Arrange
286+
int expectedWebHostProcessCount = 1;
287+
int expectedJobHostProcessCount = 0;
288+
289+
var functionDispatcher = GetTestFunctionDispatcher(runtime: RpcWorkerConstants.JavaLanguageWorkerName, placeholder: true, addWebhostChannel: true);
290+
await WaitForWebhostWorkerChannelsToStartup(functionDispatcher.WebHostLanguageWorkerChannelManager, expectedWebHostProcessCount, RpcWorkerConstants.JavaLanguageWorkerName);
291+
var channels = functionDispatcher.WebHostLanguageWorkerChannelManager.GetChannels(RpcWorkerConstants.JavaLanguageWorkerName).Values;
292+
channels.FirstOrDefault().Task.Result.SetupFunctionInvocationBuffers(null);
293+
294+
// Act
295+
await functionDispatcher.InitializeAsync(GetTestFunctionsList(RpcWorkerConstants.JavaLanguageWorkerName));
296+
await functionDispatcher.ShutdownAsync();
297+
298+
// Assert
299+
var webHostCount = functionDispatcher.WebHostLanguageWorkerChannelManager.GetChannels(RpcWorkerConstants.JavaLanguageWorkerName).Count();
300+
var jobHostCount = functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels().Count();
301+
var maxProcessCount = await functionDispatcher.MaxProcessCount;
302+
303+
Assert.Equal(expectedWebHostProcessCount, webHostCount);
304+
Assert.Equal(expectedJobHostProcessCount, jobHostCount);
305+
Assert.Equal(expectedWebHostProcessCount, maxProcessCount);
306+
}
307+
282308
[Fact]
283309
public async Task FunctionDispatcherState_Default_DotNetFunctions()
284310
{
@@ -640,7 +666,8 @@ public async Task FunctionDispatcher_ErroredWebHostChannel()
640666
}
641667

642668
private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int maxProcessCountValue = 1, bool addWebhostChannel = false,
643-
Mock<IWebHostRpcWorkerChannelManager> mockwebHostLanguageWorkerChannelManager = null, bool throwOnProcessStartUp = false, TimeSpan? startupIntervals = null, string runtime = null, bool workerIndexing = false)
669+
Mock<IWebHostRpcWorkerChannelManager> mockwebHostLanguageWorkerChannelManager = null, bool throwOnProcessStartUp = false,
670+
TimeSpan? startupIntervals = null, string runtime = null, bool workerIndexing = false, bool placeholder = false)
644671
{
645672
var eventManager = new ScriptEventManager();
646673
var metricsLogger = new Mock<IMetricsLogger>();
@@ -649,6 +676,7 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int max
649676
TimeSpan intervals = startupIntervals ?? TimeSpan.FromMilliseconds(100);
650677

651678
testEnv.SetEnvironmentVariable(RpcWorkerConstants.FunctionsWorkerProcessCountSettingName, maxProcessCountValue.ToString());
679+
652680
if (runtime != null)
653681
{
654682
testEnv.SetEnvironmentVariable(EnvironmentSettingNames.FunctionWorkerRuntime, runtime);
@@ -657,6 +685,10 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int max
657685
{
658686
testEnv.SetEnvironmentVariable(EnvironmentSettingNames.AzureWebJobsFeatureFlags, ScriptConstants.FeatureFlagEnableWorkerIndexing);
659687
}
688+
if (placeholder)
689+
{
690+
testEnv.SetEnvironmentVariable(EnvironmentSettingNames.AzureWebsitePlaceholderMode, "1");
691+
}
660692

661693
var options = new ScriptJobHostOptions
662694
{
@@ -673,6 +705,7 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int max
673705
IRpcWorkerChannelFactory testLanguageWorkerChannelFactory = new TestRpcWorkerChannelFactory(eventManager, _testLogger, scriptOptions.Value.RootScriptPath, throwOnProcessStartUp);
674706
IWebHostRpcWorkerChannelManager testWebHostLanguageWorkerChannelManager = new TestRpcWorkerChannelManager(eventManager, _testLogger, scriptOptions.Value.RootScriptPath, testLanguageWorkerChannelFactory);
675707
IJobHostRpcWorkerChannelManager jobHostLanguageWorkerChannelManager = new JobHostRpcWorkerChannelManager(_testLoggerFactory);
708+
676709
if (addWebhostChannel)
677710
{
678711
testWebHostLanguageWorkerChannelManager.InitializeChannelAsync("java");
@@ -681,6 +714,7 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int max
681714
{
682715
testWebHostLanguageWorkerChannelManager = mockwebHostLanguageWorkerChannelManager.Object;
683716
}
717+
684718
var mockFunctionDispatcherLoadBalancer = new Mock<IRpcFunctionInvocationDispatcherLoadBalancer>();
685719

686720
_javaTestChannel = new TestRpcWorkerChannel(Guid.NewGuid().ToString(), "java", eventManager, _testLogger, false);
@@ -747,14 +781,14 @@ private IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime)
747781
{
748782
new FunctionMetadata()
749783
{
750-
Language = runtime,
751-
Name = "js1"
784+
Language = runtime,
785+
Name = "js1"
752786
},
753787

754788
new FunctionMetadata()
755789
{
756-
Language = runtime,
757-
Name = "js2"
790+
Language = runtime,
791+
Name = "js2"
758792
}
759793
};
760794
}

0 commit comments

Comments
 (0)