Skip to content

Commit 66731fa

Browse files
authored
Fixing WebHost worker process shutdown race (#9310)
1 parent dbb335a commit 66731fa

File tree

6 files changed

+133
-24
lines changed

6 files changed

+133
-24
lines changed

src/WebJobs.Script/Workers/Rpc/RpcInitializationService.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,16 @@ public async Task StartAsync(CancellationToken cancellationToken)
6060
}
6161
}
6262

63-
public async Task StopAsync(CancellationToken cancellationToken)
63+
public Task StopAsync(CancellationToken cancellationToken)
6464
{
65-
_logger.LogDebug("Shuttingdown Rpc Channels Manager");
66-
await _webHostRpcWorkerChannelManager.ShutdownChannelsAsync();
65+
return Task.CompletedTask;
6766
}
6867

6968
public async Task OuterStopAsync(CancellationToken cancellationToken)
7069
{
70+
_logger.LogDebug("Shutting down Rpc Channels Manager");
71+
await _webHostRpcWorkerChannelManager.ShutdownChannelsAsync();
72+
7173
_logger.LogDebug("Shutting down RPC server");
7274

7375
try

src/WebJobs.Script/Workers/Rpc/WebHostRpcWorkerChannelManager.cs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Linq;
88
using System.Reactive.Linq;
99
using System.Threading.Tasks;
10+
using Microsoft.Azure.AppService.Proxy.Common.Infra;
1011
using Microsoft.Azure.WebJobs.Script.Diagnostics;
1112
using Microsoft.Azure.WebJobs.Script.Eventing;
1213
using Microsoft.Azure.WebJobs.Script.Workers.Profiles;
@@ -285,7 +286,7 @@ internal void ScheduleShutdownStandbyChannels()
285286
}
286287
}
287288

288-
public Task ShutdownChannelsAsync()
289+
public async Task ShutdownChannelsAsync()
289290
{
290291
foreach (string runtime in _workerChannels.Keys)
291292
{
@@ -294,25 +295,34 @@ public Task ShutdownChannelsAsync()
294295
{
295296
foreach (string workerId in standbyChannels.Keys)
296297
{
297-
standbyChannels[workerId]?.Task.ContinueWith(channelTask =>
298+
if (standbyChannels.TryGetValue(workerId, out TaskCompletionSource<IRpcWorkerChannel> channelTask))
298299
{
299-
if (channelTask.Status == TaskStatus.Faulted)
300+
IRpcWorkerChannel workerChannel = null;
301+
302+
try
303+
{
304+
workerChannel = await channelTask.Task;
305+
}
306+
catch (Exception ex)
300307
{
301-
_logger.LogDebug(channelTask.Exception, "Removing errored worker channel");
308+
_logger.LogDebug(ex, "Removing errored worker channel");
302309
}
303-
else
310+
311+
if (workerChannel is IDisposable disposableWorkerChannel)
304312
{
305-
IRpcWorkerChannel workerChannel = channelTask.Result;
306-
if (workerChannel != null)
313+
try
307314
{
308-
(channelTask.Result as IDisposable)?.Dispose();
315+
disposableWorkerChannel.Dispose();
316+
}
317+
catch (Exception ex)
318+
{
319+
_logger.LogDebug(ex, "Error disposing worker channel");
309320
}
310321
}
311-
});
322+
}
312323
}
313324
}
314325
}
315-
return Task.CompletedTask;
316326
}
317327

318328
internal void AddOrUpdateWorkerChannels(string initializedRuntime, IRpcWorkerChannel initializedLanguageWorkerChannel)

test/DotNetIsolated60/DotNetIsolated60.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
<ItemGroup>
1111
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.14.0" />
1212
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.0.13" />
13+
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Storage" Version="4.0.4" />
1314
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.10.0" />
1415
</ItemGroup>
1516
<ItemGroup>

test/DotNetIsolated60/DotNetIsolated60.sln

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ Microsoft Visual Studio Solution File, Format Version 12.00
33
# Visual Studio Version 17
44
VisualStudioVersion = 17.5.33627.172
55
MinimumVisualStudioVersion = 10.0.40219.1
6-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetIsolated60", "DotNetIsolated60.csproj", "{1DA92227-F28E-408D-96B1-20C72571E4AE}"
6+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetIsolated60", "DotNetIsolated60.csproj", "{1DA92227-F28E-408D-96B1-20C72571E4AE}"
77
EndProject
88
Global
99
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using System;
2+
using Microsoft.Azure.Functions.Worker;
3+
using Microsoft.Extensions.Logging;
4+
5+
namespace DotNetIsolated60
6+
{
7+
public class QueueFunction
8+
{
9+
private readonly ILogger _logger;
10+
11+
public QueueFunction(ILoggerFactory loggerFactory)
12+
{
13+
_logger = loggerFactory.CreateLogger<QueueFunction>();
14+
}
15+
16+
[Function("QueueFunction")]
17+
public void Run([QueueTrigger("myqueue-items", Connection = "AzureWebJobsStorage")] string myQueueItem)
18+
{
19+
_logger.LogInformation($"C# Queue trigger function processed: {myQueueItem}");
20+
}
21+
}
22+
}

test/WebJobs.Script.Tests.Integration/WebHostEndToEnd/SpecializationE2ETests.cs

Lines changed: 84 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
using Microsoft.ApplicationInsights.DataContracts;
1818
using Microsoft.AspNetCore.Hosting;
1919
using Microsoft.AspNetCore.TestHost;
20+
using Microsoft.Azure.Storage;
21+
using Microsoft.Azure.Storage.Queue;
2022
using Microsoft.Azure.WebJobs.Host.Config;
2123
using Microsoft.Azure.WebJobs.Host.Storage;
2224
using Microsoft.Azure.WebJobs.Logging;
@@ -32,10 +34,10 @@
3234
using Microsoft.Extensions.Logging;
3335
using Microsoft.Extensions.Options;
3436
using Microsoft.WebJobs.Script.Tests;
35-
using TestFunctions;
3637
using Xunit;
3738
using Xunit.Abstractions;
3839
using IApplicationLifetime = Microsoft.AspNetCore.Hosting.IApplicationLifetime;
40+
using LogLevel = Microsoft.Extensions.Logging.LogLevel;
3941

4042
namespace Microsoft.Azure.WebJobs.Script.Tests
4143
{
@@ -797,7 +799,7 @@ public async Task Specialization_JobHostInternalStorageOptionsUpdatesWithActiveH
797799
[Fact]
798800
public async Task DotNetIsolated_PlaceholderHit()
799801
{
800-
var builder = InitializeDotNetIsolatedPlaceholderBuilder();
802+
var builder = InitializeDotNetIsolatedPlaceholderBuilder("Function1");
801803

802804
using var testServer = new TestServer(builder);
803805

@@ -860,14 +862,88 @@ await DotNetIsolatedPlaceholderMiss(() =>
860862
Assert.Contains("Shutting down placeholder worker. Worker is not compatible for runtime: dotnet-isolated", log);
861863
}
862864

863-
private async Task DotNetIsolatedPlaceholderMiss(Action additionalSpecializedSetup = null)
865+
[Fact]
866+
// Fix for https://github.com/Azure/azure-functions-host/issues/9288
867+
public async Task SpecializedSite_StopsHostBeforeWorker()
864868
{
865-
var builder = InitializeDotNetIsolatedPlaceholderBuilder(() =>
869+
// this app has a QueueTrigger reading from "myqueue-items"
870+
// add a few messages there before stopping the host
871+
var storageValue = TestHelpers.GetTestConfiguration().GetWebJobsConnectionString("AzureWebJobsStorage");
872+
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageValue);
873+
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
874+
CloudQueue queue = queueClient.GetQueueReference("myqueue-items");
875+
await queue.CreateIfNotExistsAsync();
876+
await queue.ClearAsync();
877+
878+
var builder = InitializeDotNetIsolatedPlaceholderBuilder("Function1", "QueueFunction");
879+
880+
using var testServer = new TestServer(builder);
881+
882+
var client = testServer.CreateClient();
883+
var response = await client.GetAsync("api/warmup");
884+
response.EnsureSuccessStatusCode();
885+
886+
_environment.SetEnvironmentVariable("AzureWebJobsStorage", storageValue);
887+
_environment.SetEnvironmentVariable(EnvironmentSettingNames.AzureWebsiteContainerReady, "1");
888+
_environment.SetEnvironmentVariable(EnvironmentSettingNames.AzureWebsitePlaceholderMode, "0");
889+
890+
response = await client.GetAsync("api/function1");
891+
response.EnsureSuccessStatusCode();
892+
893+
var scriptHostManager = testServer.Services.GetService<IScriptHostManager>();
894+
895+
scriptHostManager.ActiveHostChanged += (object sender, ActiveHostChangedEventArgs e) =>
866896
{
867-
// remove WEBSITE_USE_PLACEHOLDER_DOTNETISOLATED
868-
_environment.SetEnvironmentVariable(EnvironmentSettingNames.AzureWebsiteUsePlaceholderDotNetIsolated, null);
897+
// for this test, this signals the host is about to shut down, so introduce an
898+
// intentional delay to simulate a race condition
899+
//
900+
// there was a bug where we'd stop the worker channel and process before the host, resulting in
901+
// a lot of "Did not find initialized language worker" errors due to a race between the process
902+
// and listeners shutting down
903+
if (e.NewHost == null)
904+
{
905+
Thread.Sleep(1000);
906+
}
907+
};
908+
909+
bool keepRunning = true;
910+
911+
Task messageTask = Task.Run(async () =>
912+
{
913+
while (keepRunning)
914+
{
915+
await queue.AddMessageAsync(new CloudQueueMessage("test"));
916+
}
869917
});
870918

919+
// make sure the invocations are flowing before we stop the host
920+
await TestHelpers.Await(() =>
921+
{
922+
int completed = _loggerProvider.GetAllLogMessages().Count(p => p.Category == "Function.QueueFunction" && p.EventId.Name == "FunctionCompleted");
923+
return completed > 10;
924+
});
925+
926+
await testServer.Host.StopAsync();
927+
928+
keepRunning = false;
929+
await messageTask;
930+
await queue.ClearAsync();
931+
932+
var completedLogs = _loggerProvider.GetAllLogMessages()
933+
.Where(p => p.Category == "Function.QueueFunction")
934+
.Where(p => p.EventId.Name == "FunctionCompleted");
935+
936+
Assert.NotEmpty(completedLogs.Where(p => p.Level == LogLevel.Information));
937+
Assert.Empty(completedLogs.Where(p => p.Level == LogLevel.Error));
938+
}
939+
940+
private async Task DotNetIsolatedPlaceholderMiss(Action additionalSpecializedSetup = null)
941+
{
942+
var builder = InitializeDotNetIsolatedPlaceholderBuilder("Function1");
943+
944+
// remove WEBSITE_USE_PLACEHOLDER_DOTNETISOLATED
945+
_environment.SetEnvironmentVariable(EnvironmentSettingNames.AzureWebsiteUsePlaceholderDotNetIsolated, null);
946+
871947
using var testServer = new TestServer(builder);
872948

873949
var client = testServer.CreateClient();
@@ -909,7 +985,7 @@ private static void BuildDotnetIsolated60()
909985
p.WaitForExit();
910986
}
911987

912-
private IWebHostBuilder InitializeDotNetIsolatedPlaceholderBuilder(Action additionalSetup = null)
988+
private IWebHostBuilder InitializeDotNetIsolatedPlaceholderBuilder(params string[] functions)
913989
{
914990
BuildDotnetIsolated60();
915991

@@ -918,9 +994,7 @@ private IWebHostBuilder InitializeDotNetIsolatedPlaceholderBuilder(Action additi
918994
_environment.SetEnvironmentVariable(EnvironmentSettingNames.AzureWebJobsFeatureFlags, ScriptConstants.FeatureFlagEnableWorkerIndexing);
919995
_environment.SetEnvironmentVariable(RpcWorkerConstants.FunctionWorkerRuntimeVersionSettingName, "6.0");
920996

921-
additionalSetup?.Invoke();
922-
923-
var builder = CreateStandbyHostBuilder("Function1");
997+
var builder = CreateStandbyHostBuilder(functions);
924998

925999
builder.ConfigureAppConfiguration(config =>
9261000
{

0 commit comments

Comments
 (0)