Skip to content

Commit 5d4e651

Browse files
authored
Worker concurrency memory check (#8585)
1 parent b3c42f0 commit 5d4e651

File tree

8 files changed

+75
-4
lines changed

8 files changed

+75
-4
lines changed

release_notes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
- Updated Java Worker Version to [2.4.0](https://github.com/Azure/azure-functions-java-worker/releases/tag/2.4.0)
1010
- Update PowerShell Worker 7.2 to 4.0.2258 [Release Note](https://github.com/Azure/azure-functions-powershell-worker/releases/tag/v4.0.2258)
1111
- Adding support to conditionally include empty entries from trigger payload when sending to OOP workers. ([#8499](https://github.com/Azure/azure-functions-host/issues/8499))
12+
- Worker concurrency memory check (https://github.com/Azure/azure-functions-host/issues/7499)
1213

1314
**Release sprint:** Sprint 128
1415
[ [bugs](https://github.com/Azure/azure-functions-host/issues?q=is%3Aissue+milestone%3A%22Functions+Sprint+128%22+label%3Abug+is%3Aclosed) | [features](https://github.com/Azure/azure-functions-host/issues?q=is%3Aissue+milestone%3A%22Functions+Sprint+128%22+label%3Afeature+is%3Aclosed) ]

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ internal GrpcWorkerChannel(
137137

138138
public IDictionary<string, BufferBlock<ScriptInvocationContext>> FunctionInputBuffers => _functionInputBuffers;
139139

140-
internal IWorkerProcess WorkerProcess => _rpcWorkerProcess;
140+
public IWorkerProcess WorkerProcess => _rpcWorkerProcess;
141141

142142
internal RpcWorkerConfig Config => _workerConfig;
143143

src/WebJobs.Script/Workers/Http/HttpWorkerChannel.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ internal HttpWorkerChannel(
4141

4242
public string Id { get; }
4343

44+
public IWorkerProcess WorkerProcess => _workerProcess;
45+
4446
public Task InvokeAsync(ScriptInvocationContext context)
4547
{
4648
return _httpWorkerService.InvokeAsync(context);

src/WebJobs.Script/Workers/IWorkerChannel.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4+
using System.Diagnostics;
45
using System.Threading;
56
using System.Threading.Tasks;
67

@@ -10,6 +11,8 @@ public interface IWorkerChannel
1011
{
1112
string Id { get; }
1213

14+
IWorkerProcess WorkerProcess { get; }
15+
1316
Task<WorkerStatus> GetWorkerStatusAsync();
1417

1518
Task StartWorkerProcessAsync(CancellationToken cancellationToken = default);

src/WebJobs.Script/Workers/ProcessManagement/IWorkerProcess.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4+
using System.Diagnostics;
45
using System.Threading.Tasks;
56
using Microsoft.Azure.WebJobs.Script.Scale;
67

@@ -10,6 +11,8 @@ public interface IWorkerProcess
1011
{
1112
int Id { get; }
1213

14+
Process Process { get; }
15+
1316
Task StartProcessAsync();
1417

1518
void WaitForProcessExitInMilliSeconds(int waitTime);

src/WebJobs.Script/Workers/WorkerConcurrencyManager.cs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Diagnostics;
67
using System.Linq;
78
using System.Text;
89
using System.Threading;
910
using System.Threading.Tasks;
11+
using Microsoft.Azure.WebJobs.Host;
12+
using Microsoft.Azure.WebJobs.Host.Scale;
1013
using Microsoft.Azure.WebJobs.Logging;
1114
using Microsoft.Azure.WebJobs.Script.Config;
1215
using Microsoft.Azure.WebJobs.Script.Diagnostics;
@@ -26,6 +29,7 @@ internal class WorkerConcurrencyManager : IHostedService, IDisposable
2629
private readonly IEnvironment _environment;
2730
private readonly IFunctionsHostingConfiguration _functionsHostingConfigurations;
2831
private readonly IApplicationLifetime _applicationLifetime;
32+
private readonly long _memoryLimit = AppServicesHostingUtility.GetMemoryLimitBytes();
2933

3034
private IOptions<WorkerConcurrencyOptions> _workerConcurrencyOptions;
3135
private IFunctionInvocationDispatcher _functionInvocationDispatcher;
@@ -125,8 +129,18 @@ internal async void OnTimer(object sender, System.Timers.ElapsedEventArgs e)
125129

126130
if (NewWorkerIsRequired(workerStatuses, _addWorkerStopwatch.GetElapsedTime()))
127131
{
128-
await _functionInvocationDispatcher.StartWorkerChannel();
129-
_addWorkerStopwatch = ValueStopwatch.StartNew();
132+
if (_functionInvocationDispatcher is RpcFunctionInvocationDispatcher rpcDispatcher)
133+
{
134+
// Check memory
135+
var currentChannels = (await rpcDispatcher.GetAllWorkerChannelsAsync()).Select(x => x.WorkerProcess.Process.PrivateMemorySize64);
136+
if (IsEnoughMemoryToScale(Process.GetCurrentProcess().PrivateMemorySize64,
137+
currentChannels,
138+
_memoryLimit))
139+
{
140+
await _functionInvocationDispatcher.StartWorkerChannel();
141+
_addWorkerStopwatch = ValueStopwatch.StartNew();
142+
}
143+
}
130144
}
131145
}
132146
catch (Exception ex)
@@ -284,6 +298,26 @@ public void Dispose()
284298
Dispose(true);
285299
}
286300

301+
internal bool IsEnoughMemoryToScale(long hostProcessSize, IEnumerable<long> workerChannelSizes, long memoryLimit)
302+
{
303+
if (memoryLimit <= 0)
304+
{
305+
return true;
306+
}
307+
308+
// Checking memory before adding a new worker
309+
// By adding `maxWorkerSize` to current memeory consumption we are predicting what will be overall memory consumption after adding a new worker.
310+
// We do not want this value to be more then 80%.
311+
long maxWorkerSize = workerChannelSizes.Max();
312+
long currentMemoryConsumption = workerChannelSizes.Sum() + hostProcessSize;
313+
if (currentMemoryConsumption + maxWorkerSize > memoryLimit * 0.8)
314+
{
315+
_logger.LogDebug($"Starting new language worker canceled: TotalMemory={memoryLimit}, MaxWorkerSize={maxWorkerSize}, CurrentMemoryConsumption={currentMemoryConsumption}");
316+
return false;
317+
}
318+
return true;
319+
}
320+
287321
internal class WorkerStatusDetails
288322
{
289323
public string WorkerId { get; set; }

test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannel.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Diagnostics;
67
using System.Threading;
78
using System.Threading.Tasks;
89
using System.Threading.Tasks.Dataflow;
@@ -47,6 +48,8 @@ public TestRpcWorkerChannel(string workerId, string runtime = null, IScriptEvent
4748

4849
public bool IsDisposed => _isDisposed;
4950

51+
public IWorkerProcess WorkerProcess => null;
52+
5053
public IDictionary<string, BufferBlock<ScriptInvocationContext>> FunctionInputBuffers => throw new NotImplementedException();
5154

5255
public List<Task> ExecutionContexts => _executionContexts;

test/WebJobs.Script.Tests/Workers/Rpc/WorkerConcurrencyManagerTest.cs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Diagnostics;
67
using System.Linq;
78
using System.Threading;
89
using System.Threading.Tasks;
10+
using Microsoft.Azure.WebJobs.Host.Scale;
911
using Microsoft.Azure.WebJobs.Script.Config;
1012
using Microsoft.Azure.WebJobs.Script.Workers;
1113
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
@@ -354,12 +356,35 @@ public async Task ActivateWorkerConcurency_FunctionsHostingConfiguration_WorkAsE
354356
conf.Setup(x => x.FunctionsWorkerDynamicConcurrencyEnabled).Returns(true);
355357
WorkerConcurrencyOptions options = new WorkerConcurrencyOptions();
356358

357-
WorkerConcurrencyManager concurrancyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, testEnvironment, Options.Create(options), conf.Object, _applicationLifetime, _loggerFactory);
359+
WorkerConcurrencyManager concurrancyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, testEnvironment, Options.Create(options), conf.Object,
360+
_applicationLifetime, _loggerFactory);
358361
concurrancyManager.ActivationTimerInterval = TimeSpan.FromMilliseconds(100);
359362
await concurrancyManager.StartAsync(CancellationToken.None);
360363
await TestHelpers.Await(() => _loggerProvider.GetAllLogMessages().SingleOrDefault(x => x.FormattedMessage.StartsWith("Dynamic worker concurrency monitoring was started by activation timer.")) != null, timeout: 1000, pollingInterval: 100);
361364
conf.Setup(x => x.FunctionsWorkerDynamicConcurrencyEnabled).Returns(false);
362365
await TestHelpers.Await(() => _loggerProvider.GetAllLogMessages().SingleOrDefault(x => x.FormattedMessage.StartsWith("Dynamic worker concurrency monitoring is disabled after activation. Shutting down Functions Host.")) != null, timeout: 1000, pollingInterval: 100);
363366
}
367+
368+
[Theory]
369+
[InlineData(100, 20, new long[] { 20, 20 }, true)]
370+
[InlineData(100, 20, new long[] { 20, 10, 10 }, true)]
371+
[InlineData(100, 21, new long[] { 20, 10, 10 }, false)]
372+
public void IsEnoughMemory_WorkAsExpected(long availableMemory, long hostProcessSize, IEnumerable<long> languageWorkerSizes, bool result)
373+
{
374+
TestEnvironment testEnvironment = new TestEnvironment();
375+
Mock<IFunctionInvocationDispatcher> functionInvocationDispatcher = new Mock<IFunctionInvocationDispatcher>(MockBehavior.Strict);
376+
Mock<IFunctionInvocationDispatcherFactory> functionInvocationDispatcherFactory = new Mock<IFunctionInvocationDispatcherFactory>(MockBehavior.Strict);
377+
Mock<IFunctionsHostingConfiguration> conf = new Mock<IFunctionsHostingConfiguration>();
378+
WorkerConcurrencyOptions options = new WorkerConcurrencyOptions();
379+
380+
WorkerConcurrencyManager concurrancyManager = new WorkerConcurrencyManager(functionInvocationDispatcherFactory.Object, testEnvironment,
381+
Options.Create(options), conf.Object, _applicationLifetime, _loggerFactory);
382+
383+
Assert.True(concurrancyManager.IsEnoughMemoryToScale(hostProcessSize, languageWorkerSizes, availableMemory) == result);
384+
if (!result)
385+
{
386+
Assert.Contains(_loggerProvider.GetAllLogMessages().Select(x => x.FormattedMessage), x => x.StartsWith("Starting new language worker canceled:"));
387+
}
388+
}
364389
}
365390
}

0 commit comments

Comments
 (0)