Skip to content

Commit 287a270

Browse files
authored
Fix Language Worker Channels disposing (#4102)
1 parent b99edf7 commit 287a270

17 files changed

+109
-54
lines changed

src/WebJobs.Script/Eventing/Rpc/RpcChannelReadyEvent.cs renamed to src/WebJobs.Script/Eventing/Rpc/RpcWebhostChannelReadyEvent.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@
66

77
namespace Microsoft.Azure.WebJobs.Script.Eventing
88
{
9-
internal class RpcChannelReadyEvent : RpcChannelEvent
9+
/// <summary>
10+
/// RpcWebHostChannelReadyEvent is published when a language worker channel is started at the
11+
/// Webhost/Application level. LanguageWorkerChannelManager keeps track that channel created
12+
/// </summary>
13+
internal class RpcWebHostChannelReadyEvent : RpcChannelEvent
1014
{
11-
internal RpcChannelReadyEvent(string id, string language, ILanguageWorkerChannel languageWorkerChannel,
15+
internal RpcWebHostChannelReadyEvent(string id, string language, ILanguageWorkerChannel languageWorkerChannel,
1216
string version, IDictionary<string, string> capabilities)
1317
: base(id)
1418
{

src/WebJobs.Script/Host/ScriptHost.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ public class ScriptHost : JobHost, IScriptJobHost
6868

6969
private IList<IDisposable> _eventSubscriptions = new List<IDisposable>();
7070
private IFunctionDispatcher _functionDispatcher;
71-
private IProcessRegistry _processRegistry = new EmptyProcessRegistry();
7271

7372
// Specify the "builtin binding types". These are types that are directly accesible without needing an explicit load gesture.
7473
// This is the set of bindings we shipped prior to binding extensibility.
@@ -899,7 +898,6 @@ protected override void Dispose(bool disposing)
899898
}
900899

901900
_functionDispatcher?.Dispose();
902-
(_processRegistry as IDisposable)?.Dispose();
903901

904902
foreach (var function in Functions)
905903
{

src/WebJobs.Script/Rpc/FunctionRegistration/FunctionDispatcher.cs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ namespace Microsoft.Azure.WebJobs.Script.Rpc
2020
internal class FunctionDispatcher : IFunctionDispatcher
2121
{
2222
private readonly IMetricsLogger _metricsLogger;
23-
private readonly ILoggerFactory _loggerFactory;
23+
private readonly ILogger _logger;
2424
private IScriptEventManager _eventManager;
2525
private IEnumerable<WorkerConfig> _workerConfigs;
2626
private CreateChannel _channelFactory;
@@ -42,8 +42,8 @@ public FunctionDispatcher(IOptions<ScriptJobHostOptions> scriptHostOptions,
4242
_scriptOptions = scriptHostOptions.Value;
4343
_languageWorkerChannelManager = languageWorkerChannelManager;
4444
_eventManager = eventManager;
45-
_loggerFactory = loggerFactory;
4645
_workerConfigs = languageWorkerOptions.Value.WorkerConfigs;
46+
_logger = loggerFactory.CreateLogger(ScriptConstants.LogCategoryFunctionDispatcher);
4747

4848
_workerErrorSubscription = _eventManager.OfType<WorkerErrorEvent>()
4949
.Subscribe(WorkerError);
@@ -59,7 +59,7 @@ internal CreateChannel ChannelFactory
5959
{
6060
_channelFactory = (language, registrations, attemptCount) =>
6161
{
62-
var languageWorkerChannel = _languageWorkerChannelManager.CreateLanguageWorkerChannel(Guid.NewGuid().ToString(), _scriptOptions.RootScriptPath, language, registrations, _metricsLogger, attemptCount);
62+
var languageWorkerChannel = _languageWorkerChannelManager.CreateLanguageWorkerChannel(Guid.NewGuid().ToString(), _scriptOptions.RootScriptPath, language, registrations, _metricsLogger, attemptCount, false);
6363
languageWorkerChannel.StartWorkerProcess();
6464
return languageWorkerChannel;
6565
};
@@ -94,18 +94,19 @@ public LanguageWorkerState CreateWorkerStateWithExistingChannel(string language,
9494
public void Initialize(string workerRuntime, IEnumerable<FunctionMetadata> functions)
9595
{
9696
_languageWorkerChannelManager.ShutdownStandbyChannels(functions);
97-
9897
workerRuntime = workerRuntime ?? Utility.GetWorkerRuntime(functions);
9998

10099
if (Utility.IsSupportedRuntime(workerRuntime, _workerConfigs))
101100
{
102101
ILanguageWorkerChannel initializedChannel = _languageWorkerChannelManager.GetChannel(workerRuntime);
103102
if (initializedChannel != null)
104103
{
104+
_logger.LogDebug($"Found initialized language worker channel for runtime: {0}", workerRuntime);
105105
CreateWorkerStateWithExistingChannel(workerRuntime, initializedChannel);
106106
}
107107
else
108108
{
109+
_logger.LogDebug($"Creating new language worker channel for runtime:{0}", workerRuntime);
109110
CreateWorkerState(workerRuntime);
110111
}
111112
}
@@ -129,26 +130,31 @@ public void WorkerError(WorkerErrorEvent workerError)
129130
{
130131
if (_workerStates.TryGetValue(workerError.Language, out LanguageWorkerState erroredWorkerState))
131132
{
133+
_logger.LogDebug($"Handling WorkerErrorEvent for runtime:{workerError.Language}");
132134
erroredWorkerState.Errors.Add(workerError.Exception);
133135
bool isPreInitializedChannel = _languageWorkerChannelManager.ShutdownChannelIfExists(workerError.Language);
134136
if (!isPreInitializedChannel)
135137
{
138+
_logger.LogDebug($"Disposing errored channel for workerId: {0}, for runtime:{1}", erroredWorkerState.Channel.Id, workerError.Language);
136139
erroredWorkerState.Channel.Dispose();
137140
}
141+
_logger.LogDebug($"Restarting worker channel for runtime:{0}", workerError.Language);
138142
RestartWorkerChannel(workerError.Language, erroredWorkerState);
139143
}
140144
}
141145

142-
private void RestartWorkerChannel(string language, LanguageWorkerState erroredWorkerState)
146+
private void RestartWorkerChannel(string runtime, LanguageWorkerState erroredWorkerState)
143147
{
144148
if (erroredWorkerState.Errors.Count < 3)
145149
{
146-
erroredWorkerState.Channel = CreateNewChannelWithExistingWorkerState(language, erroredWorkerState);
147-
_workerStates[language] = erroredWorkerState;
150+
_logger.LogDebug("retrying process start");
151+
erroredWorkerState.Channel = CreateNewChannelWithExistingWorkerState(runtime, erroredWorkerState);
152+
_workerStates[runtime] = erroredWorkerState;
148153
}
149154
else
150155
{
151-
PublishWorkerProcessErrorEvent(language, erroredWorkerState);
156+
_logger.LogDebug($"Exceeded language worker restart retry count for runtime:{0}", runtime);
157+
PublishWorkerProcessErrorEvent(runtime, erroredWorkerState);
152158
}
153159
}
154160

@@ -190,7 +196,11 @@ protected virtual void Dispose(bool disposing)
190196
foreach (var pair in _workerStates)
191197
{
192198
// TODO #3296 - send WorkerTerminate message to shut down language worker process gracefully (instead of just a killing)
193-
pair.Value.Channel.Dispose();
199+
// WebhostLanguageWorkerChannels life time is managed by LanguageWorkerChannelManager
200+
if (!pair.Value.Channel.IsWebhostChannel)
201+
{
202+
pair.Value.Channel.Dispose();
203+
}
194204
pair.Value.Functions.Dispose();
195205
}
196206
}

src/WebJobs.Script/Rpc/ILanguageWorkerChannel.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ public interface ILanguageWorkerChannel : IDisposable
1111
{
1212
string Id { get; }
1313

14+
bool IsWebhostChannel { get; }
15+
1416
WorkerConfig Config { get; }
1517

1618
void RegisterFunctions(IObservable<FunctionRegistrationContext> functionRegistrations);

src/WebJobs.Script/Rpc/ILanguageWorkerChannelManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ public interface ILanguageWorkerChannelManager
2525

2626
void ShutdownChannels();
2727

28-
ILanguageWorkerChannel CreateLanguageWorkerChannel(string workerId, string scriptRootPath, string language, IObservable<FunctionRegistrationContext> functionRegistrations, IMetricsLogger metricsLogger, int attemptCount);
28+
ILanguageWorkerChannel CreateLanguageWorkerChannel(string workerId, string scriptRootPath, string language, IObservable<FunctionRegistrationContext> functionRegistrations, IMetricsLogger metricsLogger, int attemptCount, bool isWebhostChannel);
2929
}
3030
}

src/WebJobs.Script/Rpc/LanguageWorkerChannel.cs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ internal class LanguageWorkerChannel : ILanguageWorkerChannel
3636
private readonly ILanguageWorkerConsoleLogSource _consoleLogSource;
3737

3838
private bool _disposed;
39+
private bool _disposing;
40+
private bool _isWebHostChannel;
3941
private IObservable<FunctionRegistrationContext> _functionRegistrations;
4042
private WorkerInitResponse _initMessage;
4143
private string _workerId;
@@ -68,7 +70,8 @@ internal LanguageWorkerChannel(
6870
ILoggerFactory loggerFactory,
6971
IMetricsLogger metricsLogger,
7072
int attemptCount,
71-
ILanguageWorkerConsoleLogSource consoleLogSource)
73+
ILanguageWorkerConsoleLogSource consoleLogSource,
74+
bool isWebHostChannel = false)
7275
{
7376
_workerId = workerId;
7477
_functionRegistrations = functionRegistrations;
@@ -78,6 +81,7 @@ internal LanguageWorkerChannel(
7881
_processRegistry = processRegistry;
7982
_workerConfig = workerConfig;
8083
_serverUri = serverUri;
84+
_isWebHostChannel = isWebHostChannel;
8185
_workerChannelLogger = loggerFactory.CreateLogger($"Worker.{workerConfig.Language}.{_workerId}");
8286
_consoleLogSource = consoleLogSource;
8387

@@ -110,6 +114,8 @@ internal LanguageWorkerChannel(
110114

111115
internal Process WorkerProcess => _process;
112116

117+
public bool IsWebhostChannel => _isWebHostChannel;
118+
113119
internal void StartProcess()
114120
{
115121
try
@@ -151,6 +157,11 @@ private void OnOutputDataReceived(object sender, DataReceivedEventArgs e)
151157

152158
private void OnProcessExited(object sender, EventArgs e)
153159
{
160+
if (_disposing)
161+
{
162+
// No action needed
163+
return;
164+
}
154165
string exceptionMessage = string.Join(",", _processStdErrDataQueue.Where(s => !string.IsNullOrEmpty(s)));
155166
try
156167
{
@@ -248,7 +259,7 @@ internal void SendWorkerInitRequest(RpcEvent startEvent)
248259
_inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.WorkerInitResponse)
249260
.Timeout(workerInitTimeout)
250261
.Take(1)
251-
.Subscribe(PublishRpcChannelReadyEvent, HandleWorkerError);
262+
.Subscribe(PublishWebhostRpcChannelReadyEvent, HandleWorkerError);
252263

253264
SendStreamingMessage(new StreamingMessage
254265
{
@@ -265,7 +276,7 @@ internal void PublishWorkerProcessReadyEvent(FunctionEnvironmentReloadResponse r
265276
_eventManager.Publish(wpEvent);
266277
}
267278

268-
internal void PublishRpcChannelReadyEvent(RpcEvent initEvent)
279+
internal void PublishWebhostRpcChannelReadyEvent(RpcEvent initEvent)
269280
{
270281
_startLatencyMetric?.Dispose();
271282
_startLatencyMetric = null;
@@ -276,11 +287,12 @@ internal void PublishRpcChannelReadyEvent(RpcEvent initEvent)
276287
HandleWorkerError(exc);
277288
return;
278289
}
279-
280-
RpcChannelReadyEvent readyEvent = new RpcChannelReadyEvent(_workerId, _workerConfig.Language, this, _initMessage.WorkerVersion, _initMessage.Capabilities);
281-
_eventManager.Publish(readyEvent);
282-
283-
if (_functionRegistrations != null)
290+
if (_functionRegistrations == null)
291+
{
292+
RpcWebHostChannelReadyEvent readyEvent = new RpcWebHostChannelReadyEvent(_workerId, _workerConfig.Language, this, _initMessage.WorkerVersion, _initMessage.Capabilities);
293+
_eventManager.Publish(readyEvent);
294+
}
295+
else
284296
{
285297
RegisterFunctions(_functionRegistrations);
286298
}
@@ -516,6 +528,7 @@ protected virtual void Dispose(bool disposing)
516528

517529
public void Dispose()
518530
{
531+
_disposing = true;
519532
Dispose(true);
520533
}
521534
}

src/WebJobs.Script/Rpc/LanguageWorkerChannelManager.cs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public LanguageWorkerChannelManager(IScriptEventManager eventManager, IEnvironme
4343
_environment = environment ?? throw new ArgumentNullException(nameof(environment));
4444
_eventManager = eventManager;
4545
_loggerFactory = loggerFactory;
46-
_logger = loggerFactory.CreateLogger(ScriptConstants.LanguageWorkerChannelManager);
46+
_logger = loggerFactory.CreateLogger(ScriptConstants.LogCategoryLanguageWorkerChannelManager);
4747
_workerConfigs = languageWorkerOptions.Value.WorkerConfigs;
4848
_applicationHostOptions = applicationHostOptions;
4949
_consoleLogSource = consoleLogSource;
@@ -60,11 +60,11 @@ public LanguageWorkerChannelManager(IScriptEventManager eventManager, IEnvironme
6060

6161
_shutdownStandbyWorkerChannels = ScheduleShutdownStandbyChannels;
6262
_shutdownStandbyWorkerChannels = _shutdownStandbyWorkerChannels.Debounce(5000);
63-
_rpcChannelReadySubscriptions = _eventManager.OfType<RpcChannelReadyEvent>()
63+
_rpcChannelReadySubscriptions = _eventManager.OfType<RpcWebHostChannelReadyEvent>()
6464
.Subscribe(AddOrUpdateWorkerChannels);
6565
}
6666

67-
public ILanguageWorkerChannel CreateLanguageWorkerChannel(string workerId, string scriptRootPath, string language, IObservable<FunctionRegistrationContext> functionRegistrations, IMetricsLogger metricsLogger, int attemptCount)
67+
public ILanguageWorkerChannel CreateLanguageWorkerChannel(string workerId, string scriptRootPath, string language, IObservable<FunctionRegistrationContext> functionRegistrations, IMetricsLogger metricsLogger, int attemptCount, bool isWebhostChannel = false)
6868
{
6969
var languageWorkerConfig = _workerConfigs.Where(c => c.Language.Equals(language, StringComparison.OrdinalIgnoreCase)).FirstOrDefault();
7070
if (languageWorkerConfig == null)
@@ -83,7 +83,8 @@ public ILanguageWorkerChannel CreateLanguageWorkerChannel(string workerId, strin
8383
_loggerFactory,
8484
metricsLogger,
8585
attemptCount,
86-
_consoleLogSource);
86+
_consoleLogSource,
87+
isWebhostChannel);
8788
}
8889

8990
public async Task InitializeChannelAsync(string runtime)
@@ -98,12 +99,12 @@ private async Task InitializeLanguageWorkerChannel(string language, string scrip
9899
{
99100
string workerId = Guid.NewGuid().ToString();
100101
_logger.LogInformation("Creating language worker channel for runtime:{runtime}", language);
101-
ILanguageWorkerChannel languageWorkerChannel = CreateLanguageWorkerChannel(workerId, scriptRootPath, language, null, null, 0);
102+
ILanguageWorkerChannel languageWorkerChannel = CreateLanguageWorkerChannel(workerId, scriptRootPath, language, null, null, 0, true);
102103
languageWorkerChannel.StartWorkerProcess();
103-
IObservable<RpcChannelReadyEvent> rpcChannelReadyEvent = _eventManager.OfType<RpcChannelReadyEvent>()
104+
IObservable<RpcWebHostChannelReadyEvent> rpcChannelReadyEvent = _eventManager.OfType<RpcWebHostChannelReadyEvent>()
104105
.Where(msg => msg.Language == language).Timeout(workerInitTimeout);
105106
// Wait for response from language worker process
106-
RpcChannelReadyEvent readyEvent = await rpcChannelReadyEvent.FirstAsync();
107+
RpcWebHostChannelReadyEvent readyEvent = await rpcChannelReadyEvent.FirstAsync();
107108
}
108109
catch (Exception ex)
109110
{
@@ -198,9 +199,10 @@ public void ShutdownChannels()
198199
_workerChannels[runtime]?.Dispose();
199200
}
200201
_workerChannels.Clear();
202+
(_processRegistry as IDisposable)?.Dispose();
201203
}
202204

203-
private void AddOrUpdateWorkerChannels(RpcChannelReadyEvent rpcChannelReadyEvent)
205+
private void AddOrUpdateWorkerChannels(RpcWebHostChannelReadyEvent rpcChannelReadyEvent)
204206
{
205207
_logger.LogInformation("Adding language worker channel for runtime: {language}.", rpcChannelReadyEvent.Language);
206208
_workerChannels.Add(rpcChannelReadyEvent.Language, rpcChannelReadyEvent.LanguageWorkerChannel);

src/WebJobs.Script/Rpc/ProcessManagement/IProcessRegistry.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4+
using System;
45
using System.Diagnostics;
56

67
namespace Microsoft.Azure.WebJobs.Script.Rpc

src/WebJobs.Script/Rpc/ProcessManagement/JobObjectRegistry.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
namespace Microsoft.Azure.WebJobs.Script.Rpc
99
{
1010
// Registers processes on windows with a job object to ensure disposal after parent exit
11-
internal class JobObjectRegistry : IDisposable, IProcessRegistry
11+
internal class JobObjectRegistry : IProcessRegistry, IDisposable
1212
{
1313
private IntPtr _handle;
1414
private bool _disposed = false;

src/WebJobs.Script/ScriptConstants.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,9 @@ public static class ScriptConstants
4848
public const string LogCategoryFunction = "Function";
4949
public const string LogCategoryWorker = "Worker";
5050
public const string LogCategoryRpcInitializationService = "Host.RpcInitializationService";
51-
public const string LanguageWorkerChannelManager = "Host.LanguageWorkerChannelManager";
51+
public const string LogCategoryLanguageWorkerChannelManager = "Host.LanguageWorkerChannelManager";
5252
public const string LogCategoryFunctionRpcService = "Host.FunctionRpcService";
53+
public const string LogCategoryFunctionDispatcher = "Host.FunctionDispatcher";
5354

5455
public const string SkipHostJsonConfigurationKey = "MS_SkipHostJsonConfiguration";
5556
public const string SkipHostInitializationKey = "MS_SkipHostInitialization";

0 commit comments

Comments
 (0)