Skip to content

Commit a625f30

Browse files
authored
Intelligent worker concurrency can start more than 10 workers (#8854).
1 parent 2fd0897 commit a625f30

File tree

2 files changed

+86
-18
lines changed

2 files changed

+86
-18
lines changed

src/WebJobs.Script/Workers/WorkerConcurrencyManager.cs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public Task StopAsync(CancellationToken cancellationToken)
118118

119119
internal async void OnTimer(object sender, System.Timers.ElapsedEventArgs e)
120120
{
121-
if (_disposed)
121+
if (_environment.IsPlaceholderModeEnabled() || _disposed)
122122
{
123123
return;
124124
}
@@ -131,10 +131,9 @@ internal async void OnTimer(object sender, System.Timers.ElapsedEventArgs e)
131131
{
132132
if (_functionInvocationDispatcher is RpcFunctionInvocationDispatcher rpcDispatcher)
133133
{
134-
// Check memory
135-
var currentChannels = (await rpcDispatcher.GetAllWorkerChannelsAsync()).Select(x => x.WorkerProcess.Process.PrivateMemorySize64);
136-
if (IsEnoughMemoryToScale(Process.GetCurrentProcess().PrivateMemorySize64,
137-
currentChannels,
134+
var allWorkerChannels = await rpcDispatcher.GetAllWorkerChannelsAsync();
135+
if (CanScale(allWorkerChannels) && IsEnoughMemoryToScale(Process.GetCurrentProcess().PrivateMemorySize64,
136+
allWorkerChannels.Select(x => x.WorkerProcess.Process.PrivateMemorySize64),
138137
_memoryLimit))
139138
{
140139
await _functionInvocationDispatcher.StartWorkerChannel();
@@ -176,10 +175,11 @@ private void OnActivationTimer(object sender, System.Timers.ElapsedEventArgs e)
176175
return;
177176
}
178177
}
179-
catch (Exception)
178+
catch (Exception ex)
180179
{
181180
// Best effort
182181
// return and do not start the activation timmer again
182+
_logger.LogError(ex, "Error activating worker concurrency");
183183
return;
184184
}
185185

@@ -318,6 +318,33 @@ internal bool IsEnoughMemoryToScale(long hostProcessSize, IEnumerable<long> work
318318
return true;
319319
}
320320

321+
/// <summary>
322+
/// Checks if a new worker can be added.
323+
/// </summary>
324+
/// <param name="workerChannels">All current worker channels.</param>
325+
/// <returns>True if a new worker can be started.</returns>
326+
internal bool CanScale(IEnumerable<IRpcWorkerChannel> workerChannels)
327+
{
328+
// Cancel if there is any "non-ready" channel.
329+
// A "ready" channel means it's ready for invocations.
330+
var nonReadyWorkerChannels = workerChannels.Where(x => x.IsChannelReadyForInvocations() == false);
331+
if (nonReadyWorkerChannels.Any())
332+
{
333+
_logger.LogDebug($"Starting new language worker canceled as there is atleast one non ready channel: TotalChannels={workerChannels.Count()}, NonReadyChannels={nonReadyWorkerChannels.Count()}");
334+
return false;
335+
}
336+
337+
// Cancel if MaxWorkerCount is reached.
338+
int workersCount = workerChannels.Count();
339+
if (workersCount >= _workerConcurrencyOptions.Value.MaxWorkerCount)
340+
{
341+
_logger.LogDebug($"Starting new language worker canceled as the count of total channels reaches the maximum limit: TotalChannels={workersCount}, MaxWorkerCount={_workerConcurrencyOptions.Value.MaxWorkerCount}");
342+
return false;
343+
}
344+
345+
return true;
346+
}
347+
321348
internal class WorkerStatusDetails
322349
{
323350
public string WorkerId { get; set; }

test/WebJobs.Script.Tests/Workers/Rpc/WorkerConcurrencyManagerTest.cs

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -308,10 +308,10 @@ public async Task AddWorkerIfNeeded_Returns_Expected(WorkerConcurrencyOptions op
308308
LatencyHistory = latencies2.Select(x => TimeSpan.FromMilliseconds(x))
309309
});
310310

311-
WorkerConcurrencyManager concurrancyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, _testEnvironment, Options.Create(options),
311+
WorkerConcurrencyManager concurrencyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, _testEnvironment, Options.Create(options),
312312
_functionsHostingConfigurations, _applicationLifetime, _loggerFactory);
313-
await concurrancyManager.StartAsync(CancellationToken.None);
314-
bool value = concurrancyManager.NewWorkerIsRequired(workerStatuses, elapsedFromLastAdding);
313+
await concurrencyManager.StartAsync(CancellationToken.None);
314+
bool value = concurrencyManager.NewWorkerIsRequired(workerStatuses, elapsedFromLastAdding);
315315

316316
Assert.Equal(value, expected);
317317
}
@@ -323,9 +323,9 @@ public async Task StartAsync_DoesNotGetDispatcher(string workerRuntime)
323323
{
324324
_testEnvironment.SetEnvironmentVariable(EnvironmentSettingNames.FunctionWorkerRuntime, workerRuntime);
325325
Mock<IFunctionInvocationDispatcherFactory> functionInvocationDispatcherFactory = new Mock<IFunctionInvocationDispatcherFactory>(MockBehavior.Strict);
326-
WorkerConcurrencyManager concurrancyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, _testEnvironment, Options.Create(new WorkerConcurrencyOptions()),
326+
WorkerConcurrencyManager concurrencyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, _testEnvironment, Options.Create(new WorkerConcurrencyOptions()),
327327
_functionsHostingConfigurations, _applicationLifetime, _loggerFactory);
328-
await concurrancyManager.StartAsync(CancellationToken.None);
328+
await concurrencyManager.StartAsync(CancellationToken.None);
329329
}
330330

331331
[Theory]
@@ -339,9 +339,9 @@ public async Task StartAsync_GetsDispatcher(string workerRuntime)
339339
Mock<IFunctionInvocationDispatcherFactory> functionInvocationDispatcherFactory = new Mock<IFunctionInvocationDispatcherFactory>(MockBehavior.Strict);
340340
functionInvocationDispatcherFactory.Setup(x => x.GetFunctionDispatcher()).Returns(functionInvocationDispatcher.Object);
341341

342-
WorkerConcurrencyManager concurrancyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, _testEnvironment, Options.Create(new WorkerConcurrencyOptions()),
342+
WorkerConcurrencyManager concurrencyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, _testEnvironment, Options.Create(new WorkerConcurrencyOptions()),
343343
_functionsHostingConfigurations, _applicationLifetime, _loggerFactory);
344-
await concurrancyManager.StartAsync(CancellationToken.None);
344+
await concurrencyManager.StartAsync(CancellationToken.None);
345345
}
346346

347347
[Fact]
@@ -356,10 +356,10 @@ public async Task ActivateWorkerConcurency_FunctionsHostingConfiguration_WorkAsE
356356
conf.Setup(x => x.FunctionsWorkerDynamicConcurrencyEnabled).Returns(true);
357357
WorkerConcurrencyOptions options = new WorkerConcurrencyOptions();
358358

359-
WorkerConcurrencyManager concurrancyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, testEnvironment, Options.Create(options), conf.Object,
359+
WorkerConcurrencyManager concurrencyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, testEnvironment, Options.Create(options), conf.Object,
360360
_applicationLifetime, _loggerFactory);
361-
concurrancyManager.ActivationTimerInterval = TimeSpan.FromMilliseconds(100);
362-
await concurrancyManager.StartAsync(CancellationToken.None);
361+
concurrencyManager.ActivationTimerInterval = TimeSpan.FromMilliseconds(100);
362+
await concurrencyManager.StartAsync(CancellationToken.None);
363363
await TestHelpers.Await(() => _loggerProvider.GetAllLogMessages().SingleOrDefault(x => x.FormattedMessage.StartsWith("Dynamic worker concurrency monitoring was started by activation timer.")) != null, timeout: 1000, pollingInterval: 100);
364364
conf.Setup(x => x.FunctionsWorkerDynamicConcurrencyEnabled).Returns(false);
365365
await TestHelpers.Await(() => _loggerProvider.GetAllLogMessages().SingleOrDefault(x => x.FormattedMessage.StartsWith("Dynamic worker concurrency monitoring is disabled after activation. Shutting down Functions Host.")) != null, timeout: 1000, pollingInterval: 100);
@@ -377,14 +377,55 @@ public void IsEnoughMemory_WorkAsExpected(long availableMemory, long hostProcess
377377
Mock<IFunctionsHostingConfiguration> conf = new Mock<IFunctionsHostingConfiguration>();
378378
WorkerConcurrencyOptions options = new WorkerConcurrencyOptions();
379379

380-
WorkerConcurrencyManager concurrancyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, testEnvironment,
380+
WorkerConcurrencyManager concurrencyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, testEnvironment,
381381
Options.Create(options), conf.Object, _applicationLifetime, _loggerFactory);
382382

383-
Assert.True(concurrancyManager.IsEnoughMemoryToScale(hostProcessSize, languageWorkerSizes, availableMemory) == result);
383+
Assert.True(concurrencyManager.IsEnoughMemoryToScale(hostProcessSize, languageWorkerSizes, availableMemory) == result);
384384
if (!result)
385385
{
386386
Assert.Contains(_loggerProvider.GetAllLogMessages().Select(x => x.FormattedMessage), x => x.StartsWith("Starting new language worker canceled:"));
387387
}
388388
}
389+
390+
[Theory]
391+
[InlineData(4, new bool[] { true, true, true }, true)]
392+
[InlineData(3, new bool[] { true, true, true }, false)]
393+
[InlineData(4, new bool[] { true, false, true }, false)]
394+
public void CanScale_ReturnsExpected(int maxWorkerCount, bool[] isReadyArray, bool result)
395+
{
396+
TestEnvironment testEnvironment = new TestEnvironment();
397+
Mock<IFunctionInvocationDispatcherFactory> functionInvocationDispatcherFactory = new Mock<IFunctionInvocationDispatcherFactory>(MockBehavior.Strict);
398+
Mock<IFunctionsHostingConfiguration> conf = new Mock<IFunctionsHostingConfiguration>();
399+
WorkerConcurrencyOptions options = new WorkerConcurrencyOptions();
400+
options.MaxWorkerCount = maxWorkerCount;
401+
402+
WorkerConcurrencyManager concurrencyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, testEnvironment,
403+
Options.Create(options), conf.Object, _applicationLifetime, _loggerFactory);
404+
405+
List<IRpcWorkerChannel> workerChannels = new List<IRpcWorkerChannel>();
406+
foreach (bool isReady in isReadyArray)
407+
{
408+
Mock<IRpcWorkerChannel> mock = new Mock<IRpcWorkerChannel>();
409+
mock.Setup(x => x.IsChannelReadyForInvocations()).Returns(isReady);
410+
workerChannels.Add(mock.Object);
411+
}
412+
Assert.Equal(concurrencyManager.CanScale(workerChannels), result);
413+
}
414+
415+
[Fact]
416+
public void OnTimer_WorksAsExpected_IfPlaceholderMode_Enabled()
417+
{
418+
TestEnvironment testEnvironment = new TestEnvironment();
419+
Mock<IFunctionInvocationDispatcherFactory> functionInvocationDispatcherFactory = new Mock<IFunctionInvocationDispatcherFactory>(MockBehavior.Strict);
420+
Mock<IFunctionsHostingConfiguration> conf = new Mock<IFunctionsHostingConfiguration>();
421+
WorkerConcurrencyOptions options = new WorkerConcurrencyOptions();
422+
423+
WorkerConcurrencyManager concurrencyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, testEnvironment,
424+
Options.Create(options), conf.Object, _applicationLifetime, _loggerFactory);
425+
426+
testEnvironment.SetEnvironmentVariable(EnvironmentSettingNames.AzureWebsitePlaceholderMode, "1");
427+
concurrencyManager.OnTimer(null, null);
428+
Assert.Empty(_loggerProvider.GetAllLogMessages().Where(x => x.FormattedMessage == "Error monitoring worker concurrency"));
429+
}
389430
}
390431
}

0 commit comments

Comments
 (0)