Skip to content

Commit 5367f3e

Browse files
authored
Fixes slow start of multiple worker processes (resolves #7136)
1 parent 1927f5f commit 5367f3e

File tree

4 files changed

+98
-34
lines changed

4 files changed

+98
-34
lines changed

release_notes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
- Re-added dependencies: Microsoft.Extensions.PlatformAbstractions and Microsoft.Extensions.DiagnosticAdapter
1717
- Updated PowerShell Worker (PS7) to [3.0.705](https://github.com/Azure/azure-functions-powershell-worker/releases/tag/v3.0.705)
1818
- Set bundle v2.x as the default version
19+
- Fixed issue where multiple worker processes are started slower than expected (#7136)
1920

2021
**Release sprint:** Sprint 89, 90, 91
2122
[ [bugs](https://github.com/Azure/azure-functions-host/issues?q=is%3Aissue+milestone%3A%22Functions+Sprint+89%22+label%3Abug+is%3Aclosed) | [features](https://github.com/Azure/azure-functions-host/issues?q=is%3Aissue+milestone%3A%22Functions+Sprint+89%22+label%3Afeature+is%3Aclosed) ]

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ internal class RpcFunctionInvocationDispatcher : IFunctionInvocationDispatcher
5151
private ConcurrentStack<WorkerErrorEvent> _languageWorkerErrors = new ConcurrentStack<WorkerErrorEvent>();
5252
private CancellationTokenSource _processStartCancellationToken = new CancellationTokenSource();
5353
private CancellationTokenSource _disposeToken = new CancellationTokenSource();
54-
private int _debounceMilliSeconds = (int)TimeSpan.FromSeconds(10).TotalMilliseconds;
54+
private int _processStartupInterval;
5555

5656
public RpcFunctionInvocationDispatcher(IOptions<ScriptJobHostOptions> scriptHostOptions,
5757
IMetricsLogger metricsLogger,
@@ -102,23 +102,20 @@ public RpcFunctionInvocationDispatcher(IOptions<ScriptJobHostOptions> scriptHost
102102

103103
internal IWebHostRpcWorkerChannelManager WebHostLanguageWorkerChannelManager => _webHostLanguageWorkerChannelManager;
104104

105-
internal async void InitializeJobhostLanguageWorkerChannelAsync()
105+
internal async Task InitializeJobhostLanguageWorkerChannelAsync()
106106
{
107107
await InitializeJobhostLanguageWorkerChannelAsync(0);
108108
}
109109

110-
internal Task InitializeJobhostLanguageWorkerChannelAsync(int attemptCount)
110+
internal async Task InitializeJobhostLanguageWorkerChannelAsync(int attemptCount)
111111
{
112112
var rpcWorkerChannel = _rpcWorkerChannelFactory.Create(_scriptOptions.RootScriptPath, _workerRuntime, _metricsLogger, attemptCount, _workerConfigs);
113113
rpcWorkerChannel.SetupFunctionInvocationBuffers(_functions);
114114
_jobHostLanguageWorkerChannelManager.AddChannel(rpcWorkerChannel);
115-
rpcWorkerChannel.StartWorkerProcessAsync().ContinueWith(workerInitTask =>
116-
{
117-
_logger.LogDebug("Adding jobhost language worker channel for runtime: {language}. workerId:{id}", _workerRuntime, rpcWorkerChannel.Id);
118-
rpcWorkerChannel.SendFunctionLoadRequests(_managedDependencyOptions.Value, _scriptOptions.FunctionTimeout);
119-
SetFunctionDispatcherStateToInitializedAndLog();
120-
}, TaskContinuationOptions.OnlyOnRanToCompletion);
121-
return Task.CompletedTask;
115+
await rpcWorkerChannel.StartWorkerProcessAsync();
116+
_logger.LogDebug("Adding jobhost language worker channel for runtime: {language}. workerId:{id}", _workerRuntime, rpcWorkerChannel.Id);
117+
rpcWorkerChannel.SendFunctionLoadRequests(_managedDependencyOptions.Value, _scriptOptions.FunctionTimeout);
118+
SetFunctionDispatcherStateToInitializedAndLog();
122119
}
123120

124121
private void SetFunctionDispatcherStateToInitializedAndLog()
@@ -128,7 +125,7 @@ private void SetFunctionDispatcherStateToInitializedAndLog()
128125
_logger.LogInformation("Worker process started and initialized.");
129126
}
130127

131-
internal async void InitializeWebhostLanguageWorkerChannel()
128+
internal async Task InitializeWebhostLanguageWorkerChannel()
132129
{
133130
_logger.LogDebug("Creating new webhost language worker channel for runtime:{workerRuntime}.", _workerRuntime);
134131
IRpcWorkerChannel workerChannel = await _webHostLanguageWorkerChannelManager.InitializeChannelAsync(_workerRuntime);
@@ -142,13 +139,24 @@ internal async void ShutdownWebhostLanguageWorkerChannels()
142139
await _webHostLanguageWorkerChannelManager?.ShutdownChannelsAsync();
143140
}
144141

145-
private void StartWorkerProcesses(int startIndex, Action startAction)
142+
private void StartWorkerProcesses(int startIndex, Func<Task> startAction)
146143
{
147-
for (var count = startIndex; count < _maxProcessCount; count++)
144+
Task.Run(async () =>
148145
{
149-
startAction = startAction.Debounce(_processStartCancellationToken.Token, count * _debounceMilliSeconds);
150-
startAction();
151-
}
146+
for (var count = startIndex; count < _maxProcessCount
147+
&& !_processStartCancellationToken.IsCancellationRequested; count++)
148+
{
149+
try
150+
{
151+
await startAction();
152+
await Task.Delay(_processStartupInterval);
153+
}
154+
catch (Exception ex)
155+
{
156+
_logger.LogError(ex, $"Failed to start a new language worker for runtime: {_workerRuntime}.");
157+
}
158+
}
159+
}, _processStartCancellationToken.Token);
152160
}
153161

154162
public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, CancellationToken cancellationToken = default)
@@ -185,7 +193,7 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, Cance
185193
throw new InvalidOperationException($"WorkerCofig for runtime: {_workerRuntime} not found");
186194
}
187195
_maxProcessCount = workerConfig.CountOptions.ProcessCount;
188-
_debounceMilliSeconds = (int)workerConfig.CountOptions.ProcessStartupInterval.TotalMilliseconds;
196+
_processStartupInterval = (int)workerConfig.CountOptions.ProcessStartupInterval.TotalMilliseconds;
189197
ErrorEventsThreshold = 3 * _maxProcessCount;
190198

191199
if (Utility.IsSupportedRuntime(_workerRuntime, _workerConfigs))
@@ -209,17 +217,18 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, Cance
209217
{
210218
_logger.LogWarning(ex, "Removing errored webhost language worker channel for runtime: {workerRuntime} workerId:{workerId}", _workerRuntime, workerId);
211219
await _webHostLanguageWorkerChannelManager.ShutdownChannelIfExistsAsync(_workerRuntime, workerId, ex);
212-
InitializeWebhostLanguageWorkerChannel();
213220
}
214221
}
215222
}
216223
StartWorkerProcesses(webhostLanguageWorkerChannels.Count(), InitializeWebhostLanguageWorkerChannel);
217-
SetFunctionDispatcherStateToInitializedAndLog();
224+
if (webhostLanguageWorkerChannels.Any())
225+
{
226+
SetFunctionDispatcherStateToInitializedAndLog();
227+
}
218228
}
219229
else
220230
{
221-
await InitializeJobhostLanguageWorkerChannelAsync(0);
222-
StartWorkerProcesses(1, InitializeJobhostLanguageWorkerChannelAsync);
231+
StartWorkerProcesses(0, InitializeJobhostLanguageWorkerChannelAsync);
223232
}
224233
}
225234
}

test/WebJobs.Script.Tests.Integration/Rpc/FunctionDispatcherEndToEndTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ private async Task WaitForWorkerProcessRestart(int restartCount)
3939
await TestHelpers.Await(() =>
4040
{
4141
var currentChannel = GetCurrentJobHostWorkerChannel();
42-
return currentChannel != null && currentChannel.Id != _nodeWorkerChannel.Id;
42+
return currentChannel != null && currentChannel.Id != _nodeWorkerChannel.Id && currentChannel.IsChannelReadyForInvocations();
4343

4444
}, pollingInterval: 4 * 1000, timeout: 60 * 1000);
4545
_nodeWorkerChannel = GetCurrentJobHostWorkerChannel();
@@ -50,7 +50,7 @@ private async Task WaitForJobHostChannelReady()
5050
await TestHelpers.Await(() =>
5151
{
5252
var currentChannel = GetCurrentJobHostWorkerChannel();
53-
return currentChannel != null;
53+
return currentChannel != null && currentChannel.IsChannelReadyForInvocations();
5454
}, pollingInterval: 4 * 1000, timeout: 60 * 1000);
5555
_nodeWorkerChannel = GetCurrentJobHostWorkerChannel();
5656
}

test/WebJobs.Script.Tests/Workers/Rpc/RpcFunctionInvocationDispatcherTests.cs

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Collections.Generic;
66
using System.IO;
77
using System.Linq;
8+
using System.Reflection;
89
using System.Threading.Tasks;
910
using Microsoft.AspNetCore.Hosting;
1011
using Microsoft.Azure.WebJobs.Script.Description;
@@ -13,7 +14,9 @@
1314
using Microsoft.Azure.WebJobs.Script.ManagedDependencies;
1415
using Microsoft.Azure.WebJobs.Script.Workers;
1516
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
17+
using Microsoft.Extensions.Logging;
1618
using Microsoft.Extensions.Options;
19+
using Microsoft.WebJobs.Script.Tests;
1720
using Moq;
1821
using Xunit;
1922
using FunctionMetadata = Microsoft.Azure.WebJobs.Script.Description.FunctionMetadata;
@@ -23,7 +26,18 @@ namespace Microsoft.Azure.WebJobs.Script.Tests.Workers.Rpc
2326
public class RpcFunctionInvocationDispatcherTests
2427
{
2528
private static TestRpcWorkerChannel _javaTestChannel;
26-
private static TestLogger _testLogger = new TestLogger("FunctionDispatcherTests");
29+
private static ILogger _testLogger;
30+
private static TestLoggerProvider _testLoggerProvider;
31+
private static LoggerFactory _testLoggerFactory;
32+
33+
public RpcFunctionInvocationDispatcherTests()
34+
{
35+
_testLoggerProvider = new TestLoggerProvider();
36+
_testLoggerFactory = new LoggerFactory();
37+
_testLoggerFactory.AddProvider(_testLoggerProvider);
38+
39+
_testLogger = _testLoggerProvider.CreateLogger("FunctionDispatcherTests");
40+
}
2741

2842
[Fact]
2943
public async Task GetWorkerStatusesAsync_ReturnsExpectedResult()
@@ -47,18 +61,22 @@ public async Task GetWorkerStatusesAsync_ReturnsExpectedResult()
4761
[Fact]
4862
public async Task Starting_MultipleJobhostChannels_Succeeds()
4963
{
64+
_testLoggerProvider.ClearAllLogMessages();
5065
int expectedProcessCount = 3;
5166
RpcFunctionInvocationDispatcher functionDispatcher = GetTestFunctionDispatcher(expectedProcessCount);
5267
await functionDispatcher.InitializeAsync(GetTestFunctionsList(RpcWorkerConstants.NodeLanguageWorkerName));
5368

5469
var finalChannelCount = await WaitForJobhostWorkerChannelsToStartup(functionDispatcher, expectedProcessCount);
5570
Assert.Equal(expectedProcessCount, finalChannelCount);
71+
72+
VerifyStartIntervals(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(15));
5673
}
5774

5875
[Fact]
5976
public async Task Starting_MultipleWebhostChannels_Succeeds()
6077
{
61-
int expectedProcessCount = 2;
78+
_testLoggerProvider.ClearAllLogMessages();
79+
int expectedProcessCount = 3;
6280
RpcFunctionInvocationDispatcher functionDispatcher = GetTestFunctionDispatcher(expectedProcessCount, true);
6381
await functionDispatcher.InitializeAsync(GetTestFunctionsList(RpcWorkerConstants.JavaLanguageWorkerName));
6482

@@ -67,6 +85,26 @@ public async Task Starting_MultipleWebhostChannels_Succeeds()
6785

6886
var finalJobhostChannelCount = functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels().Count();
6987
Assert.Equal(0, finalJobhostChannelCount);
88+
89+
// ignore first start as we added a WebhostChannel on GetTestFunctionDispatcher call
90+
VerifyStartIntervals(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(15), true);
91+
}
92+
93+
[Fact]
94+
public async Task Starting_MultipleJobhostChannels_Failed()
95+
{
96+
_testLoggerProvider.ClearAllLogMessages();
97+
int expectedProcessCount = 3;
98+
RpcFunctionInvocationDispatcher functionDispatcher = GetTestFunctionDispatcher(expectedProcessCount, throwOnProcessStartUp: true);
99+
await functionDispatcher.InitializeAsync(GetTestFunctionsList(RpcWorkerConstants.NodeLanguageWorkerName));
100+
101+
var finalChannelCount = await WaitForJobhostWorkerChannelsToStartup(functionDispatcher, expectedProcessCount, false);
102+
Assert.Equal(expectedProcessCount, finalChannelCount);
103+
104+
var logMessages = _testLoggerProvider.GetAllLogMessages().ToList();
105+
106+
Assert.Equal(logMessages.Where(x => x.FormattedMessage
107+
.Contains("Failed to start a new language worker")).Count(), 3);
70108
}
71109

72110
[Fact]
@@ -421,7 +459,7 @@ public async Task FunctionDispatcher_DoNot_Restart_ErroredChannels_If_WorkerRunt
421459
await functionDispatcher.InitializeAsync(GetTestFunctionsList(RpcWorkerConstants.NodeLanguageWorkerName));
422460
await WaitForJobhostWorkerChannelsToStartup(functionDispatcher, expectedProcessCount);
423461
_javaTestChannel.RaiseWorkerError();
424-
var testLogs = _testLogger.GetLogMessages();
462+
var testLogs = _testLoggerProvider.GetAllLogMessages();
425463
Assert.False(testLogs.Any(m => m.FormattedMessage.Contains("Restarting worker channel for runtime:java")));
426464
Assert.Equal(expectedProcessCount, functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels().Count());
427465
}
@@ -449,10 +487,11 @@ public async Task FunctionDispatcher_ShouldRestartChannel_Returns_True(string wo
449487
[Fact]
450488
public async Task FunctionDispatcher_ErroredWebHostChannel()
451489
{
490+
_testLoggerProvider.ClearAllLogMessages();
452491
RpcFunctionInvocationDispatcher functionDispatcher = GetTestFunctionDispatcher(throwOnProcessStartUp: true, addWebhostChannel: true);
453492
await functionDispatcher.InitializeAsync(GetTestFunctionsList(RpcWorkerConstants.JavaLanguageWorkerName));
454-
var testLogs = _testLogger.GetLogMessages();
455-
Assert.False(testLogs.Any(m => m.FormattedMessage.Contains("Removing errored webhost language worker channel for runtime")));
493+
var testLogs = _testLoggerProvider.GetAllLogMessages();
494+
Assert.True(testLogs.Any(m => m.FormattedMessage.Contains("Removing errored webhost language worker channel for runtime")));
456495
}
457496

458497
private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int maxProcessCountValue = 1, bool addWebhostChannel = false, Mock<IWebHostRpcWorkerChannelManager> mockwebHostLanguageWorkerChannelManager = null, bool throwOnProcessStartUp = false)
@@ -464,8 +503,6 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int max
464503

465504
testEnv.SetEnvironmentVariable(RpcWorkerConstants.FunctionsWorkerProcessCountSettingName, maxProcessCountValue.ToString());
466505

467-
var loggerFactory = MockNullLoggerFactory.CreateLoggerFactory();
468-
469506
var options = new ScriptJobHostOptions
470507
{
471508
RootLogPath = Path.GetTempPath()
@@ -479,7 +516,7 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int max
479516
};
480517
IRpcWorkerChannelFactory testLanguageWorkerChannelFactory = new TestRpcWorkerChannelFactory(eventManager, _testLogger, scriptOptions.Value.RootScriptPath, throwOnProcessStartUp);
481518
IWebHostRpcWorkerChannelManager testWebHostLanguageWorkerChannelManager = new TestRpcWorkerChannelManager(eventManager, _testLogger, scriptOptions.Value.RootScriptPath, testLanguageWorkerChannelFactory);
482-
IJobHostRpcWorkerChannelManager jobHostLanguageWorkerChannelManager = new JobHostRpcWorkerChannelManager(loggerFactory);
519+
IJobHostRpcWorkerChannelManager jobHostLanguageWorkerChannelManager = new JobHostRpcWorkerChannelManager(_testLoggerFactory);
483520
if (addWebhostChannel)
484521
{
485522
testWebHostLanguageWorkerChannelManager.InitializeChannelAsync("java");
@@ -497,7 +534,7 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int max
497534
testEnv,
498535
mockApplicationLifetime.Object,
499536
eventManager,
500-
loggerFactory,
537+
_testLoggerFactory,
501538
testLanguageWorkerChannelFactory,
502539
optionsMonitor,
503540
testWebHostLanguageWorkerChannelManager,
@@ -506,15 +543,17 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(int max
506543
mockFunctionDispatcherLoadBalancer.Object);
507544
}
508545

509-
private async Task<int> WaitForJobhostWorkerChannelsToStartup(RpcFunctionInvocationDispatcher functionDispatcher, int expectedCount)
546+
private async Task<int> WaitForJobhostWorkerChannelsToStartup(RpcFunctionInvocationDispatcher functionDispatcher, int expectedCount, bool allReadyForInvocations = true)
510547
{
511548
int currentChannelCount = 0;
512549
await TestHelpers.Await(() =>
513550
{
514551
currentChannelCount = functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels().Count();
515552
if (currentChannelCount == expectedCount)
516553
{
517-
return functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels().All(ch => ch.IsChannelReadyForInvocations());
554+
var channels = functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels();
555+
556+
return allReadyForInvocations ? channels.All(ch => ch.IsChannelReadyForInvocations()) : true;
518557
}
519558
return false;
520559
}, pollingInterval: expectedCount * 5 * 1000, timeout: 60 * 1000);
@@ -562,5 +601,20 @@ private IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime)
562601
}
563602
};
564603
}
604+
605+
private void VerifyStartIntervals(TimeSpan from, TimeSpan to, bool ignoreFirstStart = false)
606+
{
607+
var startTimestamps = _testLoggerProvider.GetAllLogMessages().Where(x => x.FormattedMessage
608+
.Contains("RegisterFunctions called")).Select(x => x.Timestamp).ToList();
609+
if (ignoreFirstStart)
610+
{
611+
startTimestamps.RemoveAt(0);
612+
}
613+
for (int i = 1; i < startTimestamps.Count(); i++)
614+
{
615+
var diff = startTimestamps[i] - startTimestamps[i - 1];
616+
Assert.True(diff > from && diff < to);
617+
}
618+
}
565619
}
566620
}

0 commit comments

Comments
 (0)