Skip to content

Commit 24770f8

Browse files
authored
preventing worker process startup during host shutdown (#9820)
1 parent d0f3ea8 commit 24770f8

File tree

9 files changed

+166
-12
lines changed

9 files changed

+166
-12
lines changed

src/WebJobs.Script.WebHost/WebJobsScriptHostService.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
using System.Collections.ObjectModel;
66
using System.IO;
77
using System.Linq;
8-
using System.Runtime.CompilerServices;
98
using System.Threading;
109
using System.Threading.Tasks;
1110
using Microsoft.ApplicationInsights.AspNetCore;
@@ -20,6 +19,7 @@
2019
using Microsoft.Azure.WebJobs.Script.Eventing;
2120
using Microsoft.Azure.WebJobs.Script.Scale;
2221
using Microsoft.Azure.WebJobs.Script.WebHost.Diagnostics.Extensions;
22+
using Microsoft.Azure.WebJobs.Script.Workers;
2323
using Microsoft.Extensions.Configuration;
2424
using Microsoft.Extensions.DependencyInjection;
2525
using Microsoft.Extensions.Hosting;
@@ -565,6 +565,7 @@ public async Task RestartHostAsync(CancellationToken cancellationToken)
565565
}
566566
else
567567
{
568+
NotifyHostStopping(previousHost);
568569
startTask = UnsynchronizedStartHostAsync(activeOperation);
569570
stopTask = Orphan(previousHost, cancellationToken);
570571
}
@@ -592,6 +593,19 @@ public async Task RestartHostAsync(CancellationToken cancellationToken)
592593
}
593594
}
594595

596+
// Because we fire-and-forget the host disposal, we cannot be guaranteed when it will be stopped
597+
// or disposed. Use this method to explicitly stop any services in the host that may be
598+
// problematic to run side-by-side with the new host that is starting.
599+
private static void NotifyHostStopping(IHost previousHost)
600+
{
601+
// It's important to prevent any new workers from starting on the orphaned host. The
602+
// only way to guarantee this is to signal to the dispatcher that it's done with process
603+
// creation before we begin a new host.
604+
var dispatcherFactory = previousHost?.Services?.GetService<IFunctionInvocationDispatcherFactory>();
605+
IFunctionInvocationDispatcher dispatcher = dispatcherFactory?.GetFunctionDispatcher();
606+
dispatcher?.PreShutdown();
607+
}
608+
595609
internal bool ShouldEnforceSequentialRestart()
596610
{
597611
var sequentialRestartSetting = _config.GetSection(ConfigurationSectionNames.SequentialJobHostRestart);

src/WebJobs.Script/Workers/Http/HttpFunctionInvocationDispatcher.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,5 +212,9 @@ public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
212212
await DisposeAndRestartWorkerChannel(_httpWorkerChannel.Id); // Since there's only one channel for httpworker
213213
return true;
214214
}
215+
216+
public void PreShutdown()
217+
{
218+
}
215219
}
216220
}

src/WebJobs.Script/Workers/IFunctionInvocationDispatcher.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,7 @@ public interface IFunctionInvocationDispatcher : IDisposable
2626
Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId);
2727

2828
Task StartWorkerChannel();
29+
30+
void PreShutdown();
2931
}
3032
}

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,5 +678,11 @@ private void AddLogUserCategory(IEnumerable<FunctionMetadata> functions)
678678
}
679679
}
680680
}
681+
682+
public void PreShutdown()
683+
{
684+
_logger.LogDebug($"Preventing any new worker processes from starting during shutdown.");
685+
_processStartCancellationToken.Cancel();
686+
}
681687
}
682688
}

src/WebJobs.Script/Workers/Rpc/WebHostRpcWorkerChannelManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ internal void AddOrUpdateWorkerChannels(string initializedRuntime, IRpcWorkerCha
389389

390390
internal void SetInitializedWorkerChannel(string initializedRuntime, IRpcWorkerChannel initializedLanguageWorkerChannel)
391391
{
392-
_logger.LogDebug("Adding webhost language worker channel for runtime: {language}. workerId:{id}", initializedRuntime, initializedLanguageWorkerChannel.Id);
392+
_logger.LogDebug("Initializing webhost language worker channel for runtime: {language}. workerId:{id}", initializedRuntime, initializedLanguageWorkerChannel.Id);
393393
if (_workerChannels.TryGetValue(initializedRuntime, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> channel))
394394
{
395395
if (channel.TryGetValue(initializedLanguageWorkerChannel.Id, out TaskCompletionSource<IRpcWorkerChannel> value))

test/WebJobs.Script.Tests.Integration/WebHostEndToEnd/NodeEndToEndTests.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,16 @@
1212
using System.Text;
1313
using System.Threading.Tasks;
1414
using System.Web.Http;
15+
using Microsoft.Azure.Storage.Blob;
16+
using Microsoft.Azure.Storage.Queue;
1517
using Microsoft.Azure.WebJobs.Logging;
16-
using Microsoft.Azure.WebJobs.Script.Diagnostics;
17-
using Microsoft.Azure.WebJobs.Script.WebHost.Diagnostics;
1818
using Microsoft.Azure.WebJobs.Script.WebHost.Models;
1919
using Microsoft.Azure.WebJobs.Script.Workers;
2020
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
2121
using Microsoft.Extensions.DependencyInjection;
2222
using Microsoft.Extensions.Hosting;
2323
using Microsoft.Extensions.Logging;
2424
using Microsoft.WebJobs.Script.Tests;
25-
using Microsoft.Azure.Storage.Blob;
26-
using Microsoft.Azure.Storage.Queue;
2725
using Newtonsoft.Json;
2826
using Newtonsoft.Json.Linq;
2927
using Xunit;
@@ -205,11 +203,11 @@ await TestHelpers.Await(() =>
205203

206204
// verify log levels in traces
207205
LogMessage[] traces = Fixture.Host.GetScriptHostLogMessages(userCategory).Where(t => t.FormattedMessage != null && t.FormattedMessage.Contains("loglevel")).ToArray();
208-
Assert.True(traces.Any(t => t.Level == LogLevel.Information && t.FormattedMessage == "loglevel default" ));
209-
Assert.True(traces.Any(t => t.Level == LogLevel.Information && t.FormattedMessage == "loglevel info" ));
210-
Assert.True(traces.Any(t => t.Level == LogLevel.Trace && t.FormattedMessage == "loglevel verbose" ));
211-
Assert.True(traces.Any(t => t.Level == LogLevel.Warning && t.FormattedMessage == "loglevel warn" ));
212-
Assert.True(traces.Any(t => t.Level == LogLevel.Error && t.FormattedMessage == "loglevel error" ));
206+
Assert.True(traces.Any(t => t.Level == LogLevel.Information && t.FormattedMessage == "loglevel default"));
207+
Assert.True(traces.Any(t => t.Level == LogLevel.Information && t.FormattedMessage == "loglevel info"));
208+
Assert.True(traces.Any(t => t.Level == LogLevel.Trace && t.FormattedMessage == "loglevel verbose"));
209+
Assert.True(traces.Any(t => t.Level == LogLevel.Warning && t.FormattedMessage == "loglevel warn"));
210+
Assert.True(traces.Any(t => t.Level == LogLevel.Error && t.FormattedMessage == "loglevel error"));
213211

214212
// verify most of the logs look correct
215213
Assert.True(userLogs.Contains("Mathew Charles"));
@@ -447,6 +445,7 @@ public async Task HttpTrigger_Get_Succeeds()
447445

448446
HttpResponseMessage response = await Fixture.Host.HttpClient.SendAsync(request);
449447

448+
Assert.True(response.StatusCode == HttpStatusCode.OK, Fixture.Host.GetLog());
450449
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
451450

452451
Assert.Equal("Test Response Header", response.Headers.GetValues("test-header").SingleOrDefault());
@@ -885,6 +884,7 @@ await ApiHubTestHelper.AssertTextUpdatedAsync(
885884
}
886885

887886
#endif
887+
888888
public class TestFixture : EndToEndTestFixture
889889
{
890890
private static string rootPath = Path.Combine("TestScripts", "Node");
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.IO;
6+
using System.Linq;
7+
using System.Net;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
11+
using Microsoft.Extensions.DependencyInjection;
12+
using Microsoft.Extensions.Hosting;
13+
using Microsoft.Extensions.Logging;
14+
using Xunit;
15+
16+
namespace Microsoft.Azure.WebJobs.Script.Tests.Integration.WebHostEndToEnd;
17+
18+
public class NodeHostRestartEndToEndTests
19+
{
20+
[Fact]
21+
// Confirms that a background timer to create new worker processes does not
22+
// continue to fire after we've initiated a restart. This could lead to issues
23+
// where we'd create too many processes and throw an exception.
24+
// See https://github.com/Azure/azure-functions-host/pull/9820 for details.
25+
public static async Task JobHostRestart_StopsCreatingNewWorkers()
26+
{
27+
CancellationTokenRegistration registration = default;
28+
var fixture = new WorkerProcessRestartTestFixture();
29+
30+
try
31+
{
32+
await fixture.InitializeAsync();
33+
var channelManager = fixture.Host.WebHostServices.GetService<IWebHostRpcWorkerChannelManager>();
34+
var scriptHostManager = fixture.Host.WebHostServices.GetService<IScriptHostManager>();
35+
var appHostLifecycle = fixture.Host.JobHostServices.GetService<IApplicationLifetime>();
36+
var semaphore = new SemaphoreSlim(0, 1);
37+
registration = appHostLifecycle.ApplicationStopping.Register(() =>
38+
{
39+
// pause here to prevent the original host from shutting down fully
40+
// this emulates scenarios in production where the disposal of an old host
41+
// can take a very long time
42+
semaphore.Wait();
43+
});
44+
45+
await TestHelpers.Await(() =>
46+
{
47+
var channels = channelManager.GetChannels("node");
48+
int? currentChannelCount = channels?.Count;
49+
return currentChannelCount == 2;
50+
});
51+
52+
// Once we've hit 2, we have a couple seconds to trigger a restart.
53+
_ = Task.Run(() => scriptHostManager.RestartHostAsync());
54+
55+
DateTime start = DateTime.UtcNow;
56+
await TestHelpers.Await(() =>
57+
{
58+
var channels = channelManager.GetChannels("node");
59+
if (channels == null)
60+
{
61+
return false;
62+
}
63+
64+
// If it hasn't started by now, we should be good.
65+
var waitTime = WorkerProcessRestartTestFixture.ProcessStartupInterval.Add(TimeSpan.FromSeconds(2));
66+
if (DateTime.UtcNow.AddSeconds(-waitTime.TotalSeconds) > start)
67+
{
68+
return true;
69+
}
70+
71+
return channels.Count(c => c.Value.Task.Status == TaskStatus.RanToCompletion) == 4;
72+
});
73+
74+
// let the original shutdown continue
75+
semaphore.Release();
76+
77+
string key = await fixture.Host.GetFunctionSecretAsync("HttpTrigger");
78+
var result = await fixture.Host.HttpClient.GetAsync($"/api/HttpTrigger?code={key}");
79+
80+
var errors = fixture.Host.GetScriptHostLogMessages()
81+
.Where(m => m.Level == LogLevel.Error)
82+
.Select(m => m.Exception);
83+
Assert.Empty(errors);
84+
Assert.Equal(HttpStatusCode.OK, result.StatusCode);
85+
}
86+
finally
87+
{
88+
await fixture.DisposeAsync();
89+
(registration as IDisposable)?.Dispose();
90+
}
91+
}
92+
private class WorkerProcessRestartTestFixture : EndToEndTestFixture
93+
{
94+
private static readonly string rootPath = Path.Combine("TestScripts", "Node");
95+
96+
public WorkerProcessRestartTestFixture()
97+
: base(rootPath, "nodeWorkerRestart", RpcWorkerConstants.NodeLanguageWorkerName, workerProcessesCount: 3)
98+
{
99+
}
100+
101+
public static readonly TimeSpan ProcessStartupInterval = TimeSpan.FromSeconds(3);
102+
103+
protected override Task CreateTestStorageEntities() => Task.CompletedTask;
104+
105+
public override void ConfigureScriptHost(IWebJobsBuilder webJobsBuilder)
106+
{
107+
base.ConfigureScriptHost(webJobsBuilder);
108+
109+
webJobsBuilder.AddAzureStorage()
110+
.Services.Configure<ScriptJobHostOptions>(o =>
111+
{
112+
o.Functions = new[]
113+
{
114+
"HttpTrigger"
115+
};
116+
});
117+
118+
webJobsBuilder.Services.AddOptions<LanguageWorkerOptions>()
119+
.PostConfigure(o =>
120+
{
121+
var nodeConfig = o.WorkerConfigs.Single(c => c.Description.Language == "node");
122+
nodeConfig.CountOptions.ProcessStartupInterval = ProcessStartupInterval;
123+
});
124+
}
125+
}
126+
}

test/WebJobs.Script.Tests.Shared/TestLogger.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public class LogMessage
105105

106106
public override string ToString()
107107
{
108-
return $"[{Timestamp.ToString("HH:mm:ss.fff")}] [{Category}] {FormattedMessage}";
108+
return $"[{Timestamp.ToString("HH:mm:ss.fff")}] [{Category}] {FormattedMessage} {Exception}";
109109
}
110110
}
111111
}

test/WebJobs.Script.Tests/WebJobsScriptHostServiceTests.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ public async Task HostRestart_Specialization_Succeeds()
152152
var metricsLogger = new TestMetricsLogger();
153153
_host.Setup(h => h.StartAsync(It.IsAny<CancellationToken>()))
154154
.Returns(Task.CompletedTask);
155+
_host.SetupGet(h => h.Services)
156+
.Returns((IServiceProvider)null);
155157

156158
var hostBuilder = new Mock<IScriptHostBuilder>();
157159
hostBuilder.Setup(b => b.BuildHost(It.IsAny<bool>(), It.IsAny<bool>()))

0 commit comments

Comments
 (0)