Skip to content

Commit f47891a

Browse files
authored
Handle worker process start up failures (#4877)
1 parent a3be286 commit f47891a

15 files changed

+277
-78
lines changed

src/WebJobs.Script.WebHost/Standby/StandbyManager.cs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Reactive.Linq;
77
using System.Threading;
88
using System.Threading.Tasks;
9+
using Microsoft.AspNetCore.Hosting;
910
using Microsoft.Azure.WebJobs.Script.Rpc;
1011
using Microsoft.Azure.WebJobs.Script.WebHost.Properties;
1112
using Microsoft.Extensions.Configuration;
@@ -32,20 +33,21 @@ public class StandbyManager : IStandbyManager, IDisposable
3233
private readonly HostNameProvider _hostNameProvider;
3334
private readonly IDisposable _changeTokenCallbackSubscription;
3435
private readonly TimeSpan _specializationTimerInterval;
36+
private readonly IApplicationLifetime _applicationLifetime;
3537

3638
private Timer _specializationTimer;
3739
private static CancellationTokenSource _standbyCancellationTokenSource = new CancellationTokenSource();
3840
private static IChangeToken _standbyChangeToken = new CancellationChangeToken(_standbyCancellationTokenSource.Token);
3941
private static SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
4042

4143
public StandbyManager(IScriptHostManager scriptHostManager, IWebHostLanguageWorkerChannelManager languageWorkerChannelManager, IConfiguration configuration, IScriptWebHostEnvironment webHostEnvironment,
42-
IEnvironment environment, IOptionsMonitor<ScriptApplicationHostOptions> options, ILogger<StandbyManager> logger, HostNameProvider hostNameProvider)
43-
: this(scriptHostManager, languageWorkerChannelManager, configuration, webHostEnvironment, environment, options, logger, hostNameProvider, TimeSpan.FromMilliseconds(500))
44+
IEnvironment environment, IOptionsMonitor<ScriptApplicationHostOptions> options, ILogger<StandbyManager> logger, HostNameProvider hostNameProvider, IApplicationLifetime applicationLifetime)
45+
: this(scriptHostManager, languageWorkerChannelManager, configuration, webHostEnvironment, environment, options, logger, hostNameProvider, applicationLifetime, TimeSpan.FromMilliseconds(500))
4446
{
4547
}
4648

4749
public StandbyManager(IScriptHostManager scriptHostManager, IWebHostLanguageWorkerChannelManager languageWorkerChannelManager, IConfiguration configuration, IScriptWebHostEnvironment webHostEnvironment,
48-
IEnvironment environment, IOptionsMonitor<ScriptApplicationHostOptions> options, ILogger<StandbyManager> logger, HostNameProvider hostNameProvider, TimeSpan specializationTimerInterval)
50+
IEnvironment environment, IOptionsMonitor<ScriptApplicationHostOptions> options, ILogger<StandbyManager> logger, HostNameProvider hostNameProvider, IApplicationLifetime applicationLifetime, TimeSpan specializationTimerInterval)
4951
{
5052
_scriptHostManager = scriptHostManager ?? throw new ArgumentNullException(nameof(scriptHostManager));
5153
_options = options ?? throw new ArgumentNullException(nameof(options));
@@ -58,13 +60,23 @@ public StandbyManager(IScriptHostManager scriptHostManager, IWebHostLanguageWork
5860
_hostNameProvider = hostNameProvider ?? throw new ArgumentNullException(nameof(hostNameProvider));
5961
_changeTokenCallbackSubscription = ChangeToken.RegisterChangeCallback(_ => _logger.LogDebug($"{nameof(StandbyManager)}.{nameof(ChangeToken)} callback has fired."), null);
6062
_specializationTimerInterval = specializationTimerInterval;
63+
_applicationLifetime = applicationLifetime;
6164
}
6265

6366
public static IChangeToken ChangeToken => _standbyChangeToken;
6467

6568
public Task SpecializeHostAsync()
6669
{
67-
return _specializationTask.Value;
70+
return _specializationTask.Value.ContinueWith(t =>
71+
{
72+
if (t.IsFaulted)
73+
{
74+
// if we fail during specialization for whatever reason
75+
// this is fatal, so we shutdown
76+
_logger.LogError(t.Exception, $"Specialization failed. Shutting down.");
77+
_applicationLifetime.StopApplication();
78+
}
79+
});
6880
}
6981

7082
public async Task SpecializeHostCoreAsync()
@@ -86,6 +98,7 @@ public async Task SpecializeHostCoreAsync()
8698
_hostNameProvider.Reset();
8799

88100
await _languageWorkerChannelManager.SpecializeAsync();
101+
89102
NotifyChange();
90103
await _scriptHostManager.RestartHostAsync();
91104
await _scriptHostManager.DelayUntilHostReady();
@@ -165,15 +178,14 @@ private async Task CreateStandbyWarmupFunctions()
165178
_logger.LogInformation($"StandbyMode placeholder function directory created");
166179
}
167180

168-
private void OnSpecializationTimerTick(object state)
181+
private async void OnSpecializationTimerTick(object state)
169182
{
170183
if (!_webHostEnvironment.InStandbyMode && _environment.IsContainerReady())
171184
{
172185
_specializationTimer?.Dispose();
173186
_specializationTimer = null;
174187

175-
SpecializeHostAsync().ContinueWith(t => _logger.LogError(t.Exception, "Error specializing host."),
176-
TaskContinuationOptions.OnlyOnFaulted);
188+
await SpecializeHostAsync();
177189
}
178190
}
179191

src/WebJobs.Script/Rpc/FunctionRegistration/FunctionDispatcher.cs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,7 @@ internal Task InitializeJobhostLanguageWorkerChannelAsync(int attemptCount)
111111
var languageWorkerChannel = _languageWorkerChannelFactory.CreateLanguageWorkerChannel(_scriptOptions.RootScriptPath, _workerRuntime, _metricsLogger, attemptCount, _managedDependencyOptions);
112112
languageWorkerChannel.SetupFunctionInvocationBuffers(_functions);
113113
_jobHostLanguageWorkerChannelManager.AddChannel(languageWorkerChannel);
114-
languageWorkerChannel.StartWorkerProcessAsync()
115-
.ContinueWith(workerInitTask =>
114+
languageWorkerChannel.StartWorkerProcessAsync().ContinueWith(workerInitTask =>
116115
{
117116
if (workerInitTask.IsCompleted)
118117
{
@@ -194,14 +193,23 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions)
194193
Dictionary<string, TaskCompletionSource<ILanguageWorkerChannel>> webhostLanguageWorkerChannels = _webHostLanguageWorkerChannelManager.GetChannels(_workerRuntime);
195194
if (webhostLanguageWorkerChannels != null)
196195
{
197-
foreach (string workerId in webhostLanguageWorkerChannels.Keys)
196+
foreach (string workerId in webhostLanguageWorkerChannels.Keys.ToList())
198197
{
199198
if (webhostLanguageWorkerChannels.TryGetValue(workerId, out TaskCompletionSource<ILanguageWorkerChannel> initializedLanguageWorkerChannelTask))
200199
{
201200
_logger.LogDebug("Found initialized language worker channel for runtime: {workerRuntime} workerId:{workerId}", _workerRuntime, workerId);
202-
ILanguageWorkerChannel initializedLanguageWorkerChannel = await initializedLanguageWorkerChannelTask.Task;
203-
initializedLanguageWorkerChannel.SetupFunctionInvocationBuffers(_functions);
204-
initializedLanguageWorkerChannel.SendFunctionLoadRequests();
201+
try
202+
{
203+
ILanguageWorkerChannel initializedLanguageWorkerChannel = await initializedLanguageWorkerChannelTask.Task;
204+
initializedLanguageWorkerChannel.SetupFunctionInvocationBuffers(_functions);
205+
initializedLanguageWorkerChannel.SendFunctionLoadRequests();
206+
}
207+
catch (Exception ex)
208+
{
209+
_logger.LogWarning(ex, "Removing errored webhost language worker channel for runtime: {workerRuntime} workerId:{workerId}", _workerRuntime, workerId);
210+
await _webHostLanguageWorkerChannelManager.ShutdownChannelIfExistsAsync(_workerRuntime, workerId);
211+
InitializeWebhostLanguageWorkerChannel();
212+
}
205213
}
206214
}
207215
StartWorkerProcesses(webhostLanguageWorkerChannels.Count(), InitializeWebhostLanguageWorkerChannel);
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4+
using System.Threading.Tasks;
5+
46
namespace Microsoft.Azure.WebJobs.Script.Rpc
57
{
68
public interface ILanguageWorkerProcess
79
{
810
int Id { get; }
911

10-
void StartProcess();
12+
Task StartProcessAsync();
1113
}
1214
}

src/WebJobs.Script/Rpc/LanguageWorkerChannel.cs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System.IO;
99
using System.Linq;
1010
using System.Reactive.Linq;
11+
using System.Threading;
1112
using System.Threading.Tasks;
1213
using System.Threading.Tasks.Dataflow;
1314
using Microsoft.AspNetCore.Http;
@@ -108,27 +109,27 @@ internal LanguageWorkerChannel(
108109

109110
internal ILanguageWorkerProcess WorkerProcess => _languageWorkerProcess;
110111

111-
public Task StartWorkerProcessAsync()
112+
public async Task StartWorkerProcessAsync()
112113
{
113114
_startSubscription = _inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.StartStream)
114115
.Timeout(TimeSpan.FromSeconds(LanguageWorkerConstants.ProcessStartTimeoutSeconds))
115116
.Take(1)
116-
.Subscribe(SendWorkerInitRequest, HandleWorkerChannelError);
117-
118-
_languageWorkerProcess.StartProcess();
117+
.Subscribe(SendWorkerInitRequest, HandleWorkerStartStreamError);
119118

119+
_workerChannelLogger.LogDebug("Initiating Worker Process start up");
120+
await _languageWorkerProcess.StartProcessAsync();
120121
_state = LanguageWorkerChannelState.Initializing;
121-
122-
return _workerInitTask.Task;
122+
await _workerInitTask.Task;
123123
}
124124

125125
// send capabilities to worker, wait for WorkerInitResponse
126126
internal void SendWorkerInitRequest(RpcEvent startEvent)
127127
{
128+
_workerChannelLogger.LogDebug("Worker Process started. Received StartStream message");
128129
_inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.WorkerInitResponse)
129130
.Timeout(workerInitTimeout)
130131
.Take(1)
131-
.Subscribe(WorkerInitResponse, HandleWorkerChannelError);
132+
.Subscribe(WorkerInitResponse, HandleWorkerInitError);
132133

133134
SendStreamingMessage(new StreamingMessage
134135
{
@@ -155,11 +156,11 @@ internal void WorkerInitResponse(RpcEvent initEvent)
155156
_startLatencyMetric?.Dispose();
156157
_startLatencyMetric = null;
157158

158-
_workerChannelLogger.LogDebug("Received WorkerInitResponse");
159+
_workerChannelLogger.LogDebug("Received WorkerInitResponse. Worker process initialized");
159160
_initMessage = initEvent.Message.WorkerInitResponse;
160161
if (_initMessage.Result.IsFailure(out Exception exc))
161162
{
162-
HandleWorkerChannelError(exc);
163+
HandleWorkerInitError(exc);
163164
_workerInitTask.SetResult(false);
164165
return;
165166
}
@@ -196,7 +197,7 @@ public Task SendFunctionEnvironmentReloadRequest()
196197
.Add(_inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.FunctionEnvironmentReloadResponse)
197198
.Timeout(workerInitTimeout)
198199
.Take(1)
199-
.Subscribe((msg) => FunctionEnvironmentReloadResponse(msg.Message.FunctionEnvironmentReloadResponse)));
200+
.Subscribe((msg) => FunctionEnvironmentReloadResponse(msg.Message.FunctionEnvironmentReloadResponse), HandleWorkerEnvReloadError));
200201

201202
IDictionary processEnv = Environment.GetEnvironmentVariables();
202203

@@ -388,8 +389,27 @@ internal void Log(RpcEvent msg)
388389
}
389390
}
390391

391-
internal void HandleWorkerChannelError(Exception exc)
392+
internal void HandleWorkerStartStreamError(Exception exc)
393+
{
394+
_workerChannelLogger.LogError(exc, "Starting worker process failed");
395+
PublishWorkerErrorEvent(exc);
396+
}
397+
398+
internal void HandleWorkerEnvReloadError(Exception exc)
399+
{
400+
_workerChannelLogger.LogError(exc, "Reloading environment variables failed");
401+
_reloadTask.SetException(exc);
402+
}
403+
404+
internal void HandleWorkerInitError(Exception exc)
405+
{
406+
_workerChannelLogger.LogError(exc, "Initializing worker process failed");
407+
PublishWorkerErrorEvent(exc);
408+
}
409+
410+
private void PublishWorkerErrorEvent(Exception exc)
392411
{
412+
_workerInitTask.SetException(exc);
393413
if (_disposing)
394414
{
395415
return;

src/WebJobs.Script/Rpc/LanguageWorkerProcess.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Collections.Generic;
66
using System.Diagnostics;
77
using System.Linq;
8+
using System.Threading.Tasks;
89
using Microsoft.Azure.WebJobs.Logging;
910
using Microsoft.Azure.WebJobs.Script.Abstractions;
1011
using Microsoft.Azure.WebJobs.Script.Eventing;
@@ -61,7 +62,7 @@ internal LanguageWorkerProcess(string runtime,
6162

6263
internal Queue<string> ProcessStdErrDataQueue => _processStdErrDataQueue;
6364

64-
public void StartProcess()
65+
public Task StartProcessAsync()
6566
{
6667
try
6768
{
@@ -79,10 +80,12 @@ public void StartProcess()
7980

8081
// Register process only after it starts
8182
_processRegistry?.Register(_process);
83+
return Task.CompletedTask;
8284
}
8385
catch (Exception ex)
8486
{
85-
throw new HostInitializationException($"Failed to start Language Worker Channel for language :{_runtime}", ex);
87+
_workerProcessLogger.LogError(ex, "Failed to start Language Worker Channel for language :{_runtime}", _runtime);
88+
return Task.FromException(ex);
8689
}
8790
}
8891

0 commit comments

Comments
 (0)