Skip to content

Commit 4a9aa46

Browse files
yojagadpragnagopa
authored andcommitted
Add 5 second grace period for langauge worker channels before JobHost shutsdown (#4986)
1 parent 0b82956 commit 4a9aa46

File tree

12 files changed

+194
-13
lines changed

12 files changed

+194
-13
lines changed

src/WebJobs.Script.WebHost/DependencyInjection/DependencyValidator/DependencyValidator.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ private static ExpectedDependencyBuilder CreateExpectedDependencies()
4444
.Expect<PrimaryHostCoordinator>()
4545
.Expect<FileMonitoringService>()
4646
.Expect<LanguageWorkerConsoleLogService>()
47+
.Expect<FunctionDispatcherShutdownManager>()
4748
.Optional<FunctionsScaleMonitorService>()
4849
.Optional<FuncAppFileProvisioningService>() // Used by powershell.
4950
.Optional<JobHostService>() // Missing when host is offline.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Microsoft.Extensions.Hosting;
7+
8+
namespace Microsoft.Azure.WebJobs.Script.Rpc
9+
{
10+
internal sealed class FunctionDispatcherShutdownManager : IHostedService
11+
{
12+
private readonly IFunctionDispatcher _functionDispatcher;
13+
14+
public FunctionDispatcherShutdownManager(IFunctionDispatcher functionDispatcher)
15+
{
16+
_functionDispatcher = functionDispatcher;
17+
}
18+
19+
public Task StartAsync(CancellationToken cancellationToken)
20+
{
21+
return Task.CompletedTask;
22+
}
23+
24+
public async Task StopAsync(CancellationToken cancellationToken)
25+
{
26+
await _functionDispatcher.ShutdownAsync();
27+
}
28+
}
29+
}

src/WebJobs.Script/OutOfProc/Http/HttpFunctionInvocationDispatcher.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,5 +171,10 @@ public void Dispose()
171171
_disposing = true;
172172
Dispose(true);
173173
}
174+
175+
public Task ShutdownAsync()
176+
{
177+
return Task.CompletedTask;
178+
}
174179
}
175180
}

src/WebJobs.Script/OutOfProc/IFunctionDispatcher.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,7 @@ public interface IFunctionDispatcher : IDisposable
1515
Task InvokeAsync(ScriptInvocationContext invocationContext);
1616

1717
Task InitializeAsync(IEnumerable<FunctionMetadata> functions);
18+
19+
Task ShutdownAsync();
1820
}
1921
}

src/WebJobs.Script/OutOfProc/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ internal class RpcFunctionInvocationDispatcher : IFunctionDispatcher
3030
private readonly IScriptJobHostEnvironment _scriptJobHostEnvironment;
3131
private readonly int _debounceSeconds = 10;
3232
private readonly int _maxAllowedProcessCount = 10;
33+
private readonly TimeSpan _shutdownTimeout = TimeSpan.FromSeconds(10);
3334
private readonly TimeSpan thresholdBetweenRestarts = TimeSpan.FromMinutes(OutOfProcConstants.WorkerRestartErrorIntervalThresholdInMinutes);
3435

3536
private IScriptEventManager _eventManager;
@@ -211,6 +212,23 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions)
211212
}
212213
}
213214

215+
public async Task ShutdownAsync()
216+
{
217+
_logger.LogDebug($"Waiting for {nameof(RpcFunctionInvocationDispatcher)} to shutdown");
218+
Task timeoutTask = Task.Delay(_shutdownTimeout);
219+
IList<Task> workerChannelTasks = (await GetInitializedWorkerChannelsAsync()).Select(a => a.DrainInvocationsAsync()).ToList();
220+
Task completedTask = await Task.WhenAny(timeoutTask, Task.WhenAll(workerChannelTasks));
221+
222+
if (completedTask.Equals(timeoutTask))
223+
{
224+
_logger.LogDebug($"Draining invocations from language worker channel timed out. Shutting down '{nameof(RpcFunctionInvocationDispatcher)}'");
225+
}
226+
else
227+
{
228+
_logger.LogDebug($"Draining invocations from language worker channel completed. Shutting down '{nameof(RpcFunctionInvocationDispatcher)}'");
229+
}
230+
}
231+
214232
public async Task InvokeAsync(ScriptInvocationContext invocationContext)
215233
{
216234
try

src/WebJobs.Script/OutOfProc/Rpc/ILanguageWorkerChannel.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,7 @@ public interface ILanguageWorkerChannel
2424
Task SendFunctionEnvironmentReloadRequest();
2525

2626
Task StartWorkerProcessAsync();
27+
28+
Task DrainInvocationsAsync();
2729
}
2830
}

src/WebJobs.Script/OutOfProc/Rpc/LanguageWorkerChannel.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,5 +480,14 @@ private bool IsTriggerMetadataPopulatedByWorker()
480480
{
481481
return !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(LanguageWorkerConstants.RpcHttpTriggerMetadataRemoved));
482482
}
483+
484+
public async Task DrainInvocationsAsync()
485+
{
486+
_workerChannelLogger.LogDebug($"Count of in-buffer invocations waiting to be drained out: {_executingInvocations.Count}");
487+
foreach (ScriptInvocationContext currContext in _executingInvocations.Values)
488+
{
489+
await currContext.ResultSource.Task;
490+
}
491+
}
483492
}
484493
}

src/WebJobs.Script/ScriptHostBuilderExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ public static IHostBuilder AddScriptHostCore(this IHostBuilder builder, ScriptAp
178178

179179
services.AddSingleton<IHostedService, LanguageWorkerConsoleLogService>();
180180
services.TryAddEnumerable(ServiceDescriptor.Singleton<IHostedService, PrimaryHostCoordinator>());
181+
services.TryAddEnumerable(ServiceDescriptor.Singleton<IHostedService, FunctionDispatcherShutdownManager>());
181182

182183
if (SystemEnvironment.Instance.IsRuntimeScaleMonitoringEnabled())
183184
{
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Microsoft.Azure.WebJobs.Script.Rpc;
7+
using Microsoft.Extensions.Logging;
8+
using Moq;
9+
using Xunit;
10+
11+
namespace Microsoft.Azure.WebJobs.Script.Tests
12+
{
13+
public class FunctionDispatcherShutdownManagerTests
14+
{
15+
[Fact]
16+
public async Task Test_StopAsync()
17+
{
18+
Mock<IFunctionDispatcher> functionDispatcher = new Mock<IFunctionDispatcher>();
19+
functionDispatcher.Setup(a => a.ShutdownAsync()).Returns(Task.CompletedTask);
20+
var functionDispatcherShutdownManager = new FunctionDispatcherShutdownManager(functionDispatcher.Object);
21+
await functionDispatcherShutdownManager.StopAsync(CancellationToken.None);
22+
functionDispatcher.Verify(a => a.ShutdownAsync(), Times.Once);
23+
}
24+
}
25+
}

test/WebJobs.Script.Tests/Rpc/FunctionDispatcherTests.cs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,48 @@ public async void Starting_MultipleWebhostChannels_Succeeds()
4646
Assert.Equal(0, finalJobhostChannelCount);
4747
}
4848

49+
[Fact]
50+
public async Task ShutdownTests()
51+
{
52+
int expectedProcessCount = 2;
53+
RpcFunctionInvocationDispatcher functionDispatcher = GetTestFunctionDispatcher(expectedProcessCount.ToString());
54+
await functionDispatcher.InitializeAsync(GetTestFunctionsList(LanguageWorkerConstants.NodeLanguageWorkerName));
55+
await WaitForJobhostWorkerChannelsToStartup(functionDispatcher, expectedProcessCount);
56+
57+
foreach (var currChannel in functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels())
58+
{
59+
var initializedChannel = (TestLanguageWorkerChannel)currChannel;
60+
initializedChannel.ExecutionContexts.Add(Task.Factory.StartNew(() => { }));
61+
}
62+
63+
await functionDispatcher.ShutdownAsync();
64+
foreach (var currChannel in functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels())
65+
{
66+
Assert.True(((TestLanguageWorkerChannel)currChannel).ExecutionContexts.Count == 0);
67+
}
68+
}
69+
70+
[Fact]
71+
public async Task ShutdownTests_WithInfinitelyRunningTasks_Timesout()
72+
{
73+
int expectedProcessCount = 2;
74+
RpcFunctionInvocationDispatcher functionDispatcher = GetTestFunctionDispatcher(expectedProcessCount.ToString());
75+
await functionDispatcher.InitializeAsync(GetTestFunctionsList(LanguageWorkerConstants.NodeLanguageWorkerName));
76+
await WaitForJobhostWorkerChannelsToStartup(functionDispatcher, expectedProcessCount);
77+
78+
foreach (var currChannel in functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels())
79+
{
80+
var initializedChannel = (TestLanguageWorkerChannel)currChannel;
81+
initializedChannel.ExecutionContexts.Add(new Task<bool>(() => true)); // A task that never starts and therefore never runs to completion
82+
}
83+
84+
await functionDispatcher.ShutdownAsync();
85+
foreach (var currChannel in functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels())
86+
{
87+
Assert.True(((TestLanguageWorkerChannel)currChannel).ExecutionContexts.Count > 0);
88+
}
89+
}
90+
4991
[Fact]
5092
public void MaxProcessCount_Returns_Default()
5193
{

0 commit comments

Comments
 (0)