Skip to content

Commit f0e72a6

Browse files
authored
[v3 port] Worker concurrency memory check
1 parent c6516b0 commit f0e72a6

File tree

9 files changed

+75
-6
lines changed

9 files changed

+75
-6
lines changed

release_notes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
<!-- Please add your release notes in the following format:
33
- My change description (#PR)
44
-->
5+
- Worker concurrency memory check (https://github.com/Azure/azure-functions-host/issues/7499)
56

67
**Release sprint:** Sprint 128
78
[ [bugs](https://github.com/Azure/azure-functions-host/issues?q=is%3Aissue+milestone%3A%22Functions+Sprint+128%22+label%3Abug+is%3Aclosed) | [features](https://github.com/Azure/azure-functions-host/issues?q=is%3Aissue+milestone%3A%22Functions+Sprint+128%22+label%3Afeature+is%3Aclosed) ]

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ internal GrpcWorkerChannel(
134134

135135
public IDictionary<string, BufferBlock<ScriptInvocationContext>> FunctionInputBuffers => _functionInputBuffers;
136136

137-
internal IWorkerProcess WorkerProcess => _rpcWorkerProcess;
137+
public IWorkerProcess WorkerProcess => _rpcWorkerProcess;
138138

139139
internal RpcWorkerConfig Config => _workerConfig;
140140

src/WebJobs.Script/Workers/Http/HttpWorkerChannel.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ internal HttpWorkerChannel(
4242

4343
public string Id { get; }
4444

45+
public IWorkerProcess WorkerProcess => _workerProcess;
46+
4547
public Task InvokeAsync(ScriptInvocationContext context)
4648
{
4749
return _httpWorkerService.InvokeAsync(context);

src/WebJobs.Script/Workers/IWorkerChannel.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4+
using System.Diagnostics;
45
using System.Threading;
56
using System.Threading.Tasks;
67

@@ -10,6 +11,8 @@ public interface IWorkerChannel
1011
{
1112
string Id { get; }
1213

14+
IWorkerProcess WorkerProcess { get; }
15+
1316
Task<WorkerStatus> GetWorkerStatusAsync();
1417

1518
Task StartWorkerProcessAsync(CancellationToken cancellationToken = default);

src/WebJobs.Script/Workers/ProcessManagement/IWorkerProcess.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4+
using System.Diagnostics;
45
using System.Threading.Tasks;
56
using Microsoft.Azure.WebJobs.Script.Scale;
67

@@ -10,6 +11,8 @@ public interface IWorkerProcess
1011
{
1112
int Id { get; }
1213

14+
Process Process { get; }
15+
1316
Task StartProcessAsync();
1417
}
1518
}

src/WebJobs.Script/Workers/ProcessManagement/WorkerProcess.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,7 @@ internal WorkerProcess(IScriptEventManager eventManager, IProcessRegistry proces
5353

5454
internal Queue<string> ProcessStdErrDataQueue => _processStdErrDataQueue;
5555

56-
// for testing
57-
internal Process Process { get; set; }
56+
public Process Process { get; set; }
5857

5958
internal abstract Process CreateWorkerProcess();
6059

src/WebJobs.Script/Workers/WorkerConcurrencyManager.cs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
using System.Text;
99
using System.Threading;
1010
using System.Threading.Tasks;
11+
using Microsoft.Azure.WebJobs.Host;
12+
using Microsoft.Azure.WebJobs.Host.Scale;
1113
using Microsoft.Azure.WebJobs.Logging;
1214
using Microsoft.Azure.WebJobs.Script.Config;
1315
using Microsoft.Azure.WebJobs.Script.Diagnostics;
@@ -27,6 +29,7 @@ internal class WorkerConcurrencyManager : IHostedService, IDisposable
2729
private readonly IEnvironment _environment;
2830
private readonly IFunctionsHostingConfiguration _functionsHostingConfigurations;
2931
private readonly IApplicationLifetime _applicationLifetime;
32+
private readonly long _memoryLimit = AppServicesHostingUtility.GetMemoryLimitBytes();
3033

3134
private IOptions<WorkerConcurrencyOptions> _workerConcurrencyOptions;
3235
private IFunctionInvocationDispatcher _functionInvocationDispatcher;
@@ -126,8 +129,18 @@ internal async void OnTimer(object sender, System.Timers.ElapsedEventArgs e)
126129

127130
if (NewWorkerIsRequired(workerStatuses, _addWorkerStopwatch.Elapsed))
128131
{
129-
await _functionInvocationDispatcher.StartWorkerChannel();
130-
_addWorkerStopwatch.Restart();
132+
if (_functionInvocationDispatcher is RpcFunctionInvocationDispatcher rpcDispatcher)
133+
{
134+
// Check memory
135+
var currentChannels = (await rpcDispatcher.GetAllWorkerChannelsAsync()).Select(x => x.WorkerProcess.Process.PrivateMemorySize64);
136+
if (IsEnoughMemoryToScale(Process.GetCurrentProcess().PrivateMemorySize64,
137+
currentChannels,
138+
_memoryLimit))
139+
{
140+
await _functionInvocationDispatcher.StartWorkerChannel();
141+
_addWorkerStopwatch.Restart();
142+
}
143+
}
131144
}
132145
}
133146
catch (Exception ex)
@@ -285,6 +298,26 @@ public void Dispose()
285298
Dispose(true);
286299
}
287300

301+
internal bool IsEnoughMemoryToScale(long hostProcessSize, IEnumerable<long> workerChannelSizes, long memoryLimit)
302+
{
303+
if (memoryLimit <= 0)
304+
{
305+
return true;
306+
}
307+
308+
// Checking memory before adding a new worker
309+
// By adding `maxWorkerSize` to current memeory consumption we are predicting what will be overall memory consumption after adding a new worker.
310+
// We do not want this value to be more then 80%.
311+
long maxWorkerSize = workerChannelSizes.Max();
312+
long currentMemoryConsumption = workerChannelSizes.Sum() + hostProcessSize;
313+
if (currentMemoryConsumption + maxWorkerSize > memoryLimit * 0.8)
314+
{
315+
_logger.LogDebug($"Starting new language worker canceled: TotalMemory={memoryLimit}, MaxWorkerSize={maxWorkerSize}, CurrentMemoryConsumption={currentMemoryConsumption}");
316+
return false;
317+
}
318+
return true;
319+
}
320+
288321
internal class WorkerStatusDetails
289322
{
290323
public string WorkerId { get; set; }

test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannel.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Diagnostics;
67
using System.Threading;
78
using System.Threading.Tasks;
89
using System.Threading.Tasks.Dataflow;
@@ -47,6 +48,8 @@ public TestRpcWorkerChannel(string workerId, string runtime = null, IScriptEvent
4748

4849
public bool IsDisposed => _isDisposed;
4950

51+
public IWorkerProcess WorkerProcess => null;
52+
5053
public IDictionary<string, BufferBlock<ScriptInvocationContext>> FunctionInputBuffers => throw new NotImplementedException();
5154

5255
public List<Task> ExecutionContexts => _executionContexts;

test/WebJobs.Script.Tests/Workers/Rpc/WorkerConcurrencyManagerTest.cs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Diagnostics;
67
using System.Linq;
78
using System.Threading;
89
using System.Threading.Tasks;
10+
using Microsoft.Azure.WebJobs.Host.Scale;
911
using Microsoft.Azure.WebJobs.Script.Config;
1012
using Microsoft.Azure.WebJobs.Script.Workers;
1113
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
@@ -354,12 +356,35 @@ public async Task ActivateWorkerConcurency_FunctionsHostingConfiguration_WorkAsE
354356
conf.Setup(x => x.FunctionsWorkerDynamicConcurrencyEnabled).Returns(true);
355357
WorkerConcurrencyOptions options = new WorkerConcurrencyOptions();
356358

357-
WorkerConcurrencyManager concurrancyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, testEnvironment, Options.Create(options), conf.Object, _applicationLifetime, _loggerFactory);
359+
WorkerConcurrencyManager concurrancyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, testEnvironment, Options.Create(options), conf.Object,
360+
_applicationLifetime, _loggerFactory);
358361
concurrancyManager.ActivationTimerInterval = TimeSpan.FromMilliseconds(100);
359362
await concurrancyManager.StartAsync(CancellationToken.None);
360363
await TestHelpers.Await(() => _loggerProvider.GetAllLogMessages().SingleOrDefault(x => x.FormattedMessage.StartsWith("Dynamic worker concurrency monitoring was started by activation timer.")) != null, timeout: 1000, pollingInterval: 100);
361364
conf.Setup(x => x.FunctionsWorkerDynamicConcurrencyEnabled).Returns(false);
362365
await TestHelpers.Await(() => _loggerProvider.GetAllLogMessages().SingleOrDefault(x => x.FormattedMessage.StartsWith("Dynamic worker concurrency monitoring is disabled after activation. Shutting down Functions Host.")) != null, timeout: 1000, pollingInterval: 100);
363366
}
367+
368+
[Theory]
369+
[InlineData(100, 20, new long[] { 20, 20 }, true)]
370+
[InlineData(100, 20, new long[] { 20, 10, 10 }, true)]
371+
[InlineData(100, 21, new long[] { 20, 10, 10 }, false)]
372+
public void IsEnoughMemory_WorkAsExpected(long availableMemory, long hostProcessSize, IEnumerable<long> languageWorkerSizes, bool result)
373+
{
374+
TestEnvironment testEnvironment = new TestEnvironment();
375+
Mock<IFunctionInvocationDispatcher> functionInvocationDispatcher = new Mock<IFunctionInvocationDispatcher>(MockBehavior.Strict);
376+
Mock<IFunctionInvocationDispatcherFactory> functionInvocationDispatcherFactory = new Mock<IFunctionInvocationDispatcherFactory>(MockBehavior.Strict);
377+
Mock<IFunctionsHostingConfiguration> conf = new Mock<IFunctionsHostingConfiguration>();
378+
WorkerConcurrencyOptions options = new WorkerConcurrencyOptions();
379+
380+
WorkerConcurrencyManager concurrancyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, testEnvironment,
381+
Options.Create(options), conf.Object, _applicationLifetime, _loggerFactory);
382+
383+
Assert.True(concurrancyManager.IsEnoughMemoryToScale(hostProcessSize, languageWorkerSizes, availableMemory) == result);
384+
if (!result)
385+
{
386+
Assert.Contains(_loggerProvider.GetAllLogMessages().Select(x => x.FormattedMessage), x => x.StartsWith("Starting new language worker canceled:"));
387+
}
388+
}
364389
}
365390
}

0 commit comments

Comments
 (0)