Skip to content

Commit 564dcc3

Browse files
yojagadmhoeger
authored andcommitted
Worker restart on function timeout (#5794)
Restart worker process(es) on function timeout for out-of-proc languages instead of shutting down host.
1 parent 3f19744 commit 564dcc3

20 files changed

+303
-26
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"bindings": [
3+
{
4+
"type": "httpTrigger",
5+
"name": "req",
6+
"direction": "in",
7+
"methods": [ "get" ]
8+
},
9+
{
10+
"type": "http",
11+
"name": "$return",
12+
"direction": "out"
13+
}
14+
]
15+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
var test = require('../Shared/test');
2+
3+
module.exports = function (context, req) {
4+
context.log('Node.js HTTP trigger function processed a request. Name=%s', req.query.name);
5+
6+
var headerValue = req.headers['test-header'];
7+
if (headerValue) {
8+
context.log('test-header=' + headerValue);
9+
}
10+
11+
var res;
12+
if (typeof req.query.name === 'undefined') {
13+
res = {
14+
status: 400,
15+
body: "Please pass a name on the query string",
16+
headers: {
17+
'Content-Type': 'text/plain'
18+
}
19+
};
20+
}
21+
else {
22+
var e = new Date().getTime() + (30 * 1000);
23+
while (new Date().getTime() <= e) {}
24+
res = {
25+
status: 200,
26+
body: test.greeting(req.query.name),
27+
headers: {
28+
'Content-Type': 'text/plain',
29+
'Shared-Module': test.timestamp
30+
}
31+
};
32+
}
33+
34+
context.done(null, res);
35+
};

sample/Node/host.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
"healthCheckThreshold": 6,
99
"counterThreshold": 0.80
1010
},
11-
"functionTimeout": "00:05:00",
11+
"functionTimeout": "00:00:10",
1212
"logging": {
1313
"fileLoggingMode": "always"
1414
},

src/WebJobs.Script.WebHost/WebScriptHostExceptionHandler.cs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using Microsoft.AspNetCore.Hosting;
88
using Microsoft.Azure.WebJobs.Host;
99
using Microsoft.Azure.WebJobs.Host.Timers;
10+
using Microsoft.Azure.WebJobs.Script.Workers;
1011
using Microsoft.Extensions.Logging;
1112

1213
namespace Microsoft.Azure.WebJobs.Script.WebHost
@@ -15,10 +16,12 @@ public class WebScriptHostExceptionHandler : IWebJobsExceptionHandler
1516
{
1617
private readonly IApplicationLifetime _applicationLifetime;
1718
private readonly ILogger _logger;
19+
private readonly IFunctionInvocationDispatcherFactory _functionInvocationDispatcherFactory;
1820

19-
public WebScriptHostExceptionHandler(IApplicationLifetime applicationLifetime, ILogger<WebScriptHostExceptionHandler> logger)
21+
public WebScriptHostExceptionHandler(IApplicationLifetime applicationLifetime, ILogger<WebScriptHostExceptionHandler> logger, IFunctionInvocationDispatcherFactory functionInvocationDispatcherFactory)
2022
{
2123
_applicationLifetime = applicationLifetime ?? throw new ArgumentNullException(nameof(applicationLifetime));
24+
_functionInvocationDispatcherFactory = functionInvocationDispatcherFactory;
2225
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
2326
}
2427

@@ -39,17 +42,30 @@ public async Task OnTimeoutExceptionAsync(ExceptionDispatchInfo exceptionInfo, T
3942
}
4043
}
4144

42-
LogErrorAndFlush("A function timeout has occurred. Host is shutting down.", exceptionInfo.SourceException);
43-
4445
// We can't wait on this as it may cause a deadlock if the timeout was fired
4546
// by a Listener that cannot stop until it has completed.
4647
// TODO: DI (FACAVAL) The shutdown call will invoke the host stop... but we may need to do this
4748
// explicitly in order to pass the timeout.
4849
// Task ignoreTask = _hostManager.StopAsync();
4950
// Give the manager and all running tasks some time to shut down gracefully.
5051
//await Task.Delay(timeoutGracePeriod);
51-
52-
_applicationLifetime.StopApplication();
52+
IFunctionInvocationDispatcher functionInvocationDispatcher = _functionInvocationDispatcherFactory.GetFunctionDispatcher();
53+
if (functionInvocationDispatcher.State.Equals(FunctionInvocationDispatcherState.Initialized))
54+
{
55+
_logger.LogWarning($"A function timeout has occurred. Restarting worker process executing invocationId '{timeoutException.InstanceId}'.", exceptionInfo.SourceException);
56+
bool result = await functionInvocationDispatcher.RestartWorkerWithInvocationIdAsync(timeoutException.InstanceId.ToString());
57+
if (!result)
58+
{
59+
_logger.LogWarning($"Restarting all language worker processes since invocation Id '{timeoutException.InstanceId}' was not found.", exceptionInfo.SourceException);
60+
await functionInvocationDispatcher.RestartAllWorkersAsync();
61+
}
62+
_logger.LogWarning("Restart of language worker process(es) completed.", exceptionInfo.SourceException);
63+
}
64+
else
65+
{
66+
LogErrorAndFlush("A function timeout has occurred. Host is shutting down.", exceptionInfo.SourceException);
67+
_applicationLifetime.StopApplication();
68+
}
5369
}
5470

5571
public Task OnUnhandledExceptionAsync(ExceptionDispatchInfo exceptionInfo)

src/WebJobs.Script/ScriptHostBuilderExtensions.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,6 @@ public static IHostBuilder AddScriptHostCore(this IHostBuilder builder, ScriptAp
142142

143143
//Worker Function Invocation dispatcher
144144
services.AddSingleton<IFunctionInvocationDispatcherFactory, FunctionInvocationDispatcherFactory>();
145-
146145
services.AddSingleton<IScriptJobHost>(p => p.GetRequiredService<ScriptHost>());
147146
services.AddSingleton<IJobHost>(p => p.GetRequiredService<ScriptHost>());
148147
services.AddSingleton<IFunctionMetadataManager, FunctionMetadataManager>();

src/WebJobs.Script/Workers/Http/HttpFunctionInvocationDispatcher.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,5 +185,16 @@ public Task ShutdownAsync()
185185
{
186186
return Task.CompletedTask;
187187
}
188+
189+
public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
190+
{
191+
await DisposeAndRestartWorkerChannel(_httpWorkerChannel.Id); // Since there's only one channel for httpworker
192+
return true;
193+
}
194+
195+
public async Task RestartAllWorkersAsync()
196+
{
197+
await DisposeAndRestartWorkerChannel(_httpWorkerChannel.Id); // Since there's only one channel for httpworker
198+
}
188199
}
189200
}

src/WebJobs.Script/Workers/IFunctionInvocationDispatcher.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,9 @@ public interface IFunctionInvocationDispatcher : IDisposable
2020
Task InitializeAsync(IEnumerable<FunctionMetadata> functions, CancellationToken cancellationToken = default);
2121

2222
Task ShutdownAsync();
23+
24+
Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId);
25+
26+
Task RestartAllWorkersAsync();
2327
}
2428
}

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ public async Task InvokeAsync(ScriptInvocationContext invocationContext)
248248
}
249249
}
250250

251-
internal async Task<IEnumerable<IRpcWorkerChannel>> GetInitializedWorkerChannelsAsync()
251+
internal async Task<IEnumerable<IRpcWorkerChannel>> GetAllWorkerChannelsAsync()
252252
{
253253
Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> webhostChannelDictionary = _webHostLanguageWorkerChannelManager.GetChannels(_workerRuntime);
254254
List<IRpcWorkerChannel> webhostChannels = null;
@@ -264,11 +264,18 @@ internal async Task<IEnumerable<IRpcWorkerChannel>> GetInitializedWorkerChannels
264264
}
265265
}
266266
IEnumerable<IRpcWorkerChannel> workerChannels = webhostChannels == null ? _jobHostLanguageWorkerChannelManager.GetChannels() : webhostChannels.Union(_jobHostLanguageWorkerChannelManager.GetChannels());
267+
return workerChannels;
268+
}
269+
270+
internal async Task<IEnumerable<IRpcWorkerChannel>> GetInitializedWorkerChannelsAsync()
271+
{
272+
IEnumerable<IRpcWorkerChannel> workerChannels = await GetAllWorkerChannelsAsync();
267273
IEnumerable<IRpcWorkerChannel> initializedWorkers = workerChannels.Where(ch => ch.IsChannelReadyForInvocations());
268274
if (initializedWorkers.Count() > _maxProcessCount)
269275
{
270276
throw new InvalidOperationException($"Number of initialized language workers exceeded:{initializedWorkers.Count()} exceeded maxProcessCount: {_maxProcessCount}");
271277
}
278+
272279
return initializedWorkers;
273280
}
274281

@@ -399,5 +406,33 @@ public void Dispose()
399406
State = FunctionInvocationDispatcherState.Disposing;
400407
Dispose(true);
401408
}
409+
410+
public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
411+
{
412+
// Dispose and restart errored channel with the particular invocation id
413+
State = FunctionInvocationDispatcherState.WorkerProcessRestarting;
414+
var channels = await GetInitializedWorkerChannelsAsync();
415+
foreach (var channel in channels)
416+
{
417+
if (channel.IsExecutingInvocation(invocationId))
418+
{
419+
_logger.LogInformation($"Restarting channel '{channel.Id}' that is executing invocation '{invocationId}' and timed out.");
420+
await DisposeAndRestartWorkerChannel(_workerRuntime, channel.Id);
421+
return true;
422+
}
423+
}
424+
return false;
425+
}
426+
427+
public async Task RestartAllWorkersAsync()
428+
{
429+
State = FunctionInvocationDispatcherState.WorkerProcessRestarting;
430+
var channels = await GetAllWorkerChannelsAsync();
431+
foreach (var channel in channels)
432+
{
433+
_logger.LogInformation($"Restarting channel '{channel.Id}' that is as part of restarting all channels.");
434+
await DisposeAndRestartWorkerChannel(_workerRuntime, channel.Id);
435+
}
436+
}
402437
}
403438
}

src/WebJobs.Script/Workers/Rpc/IRpcWorkerChannel.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,7 @@ public interface IRpcWorkerChannel
2727
Task StartWorkerProcessAsync();
2828

2929
Task DrainInvocationsAsync();
30+
31+
bool IsExecutingInvocation(string invocationId);
3032
}
3133
}

src/WebJobs.Script/Workers/Rpc/RpcWorkerChannel.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ internal RpcWorkerChannel(
114114

115115
public bool IsChannelReadyForInvocations()
116116
{
117-
return _state.HasFlag(RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
117+
return !_disposing && !_disposed && _state.HasFlag(RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
118118
}
119119

120120
public async Task StartWorkerProcessAsync()
@@ -513,5 +513,10 @@ public async Task DrainInvocationsAsync()
513513
await currContext.ResultSource.Task;
514514
}
515515
}
516+
517+
public bool IsExecutingInvocation(string invocationId)
518+
{
519+
return _executingInvocations.ContainsKey(invocationId);
520+
}
516521
}
517522
}

0 commit comments

Comments
 (0)