Skip to content

Commit 2c05898

Browse files
authored
Delete Channel Ready Events (#4631)
1 parent 748e7c8 commit 2c05898

15 files changed

+47
-174
lines changed

src/WebJobs.Script/Eventing/Rpc/RpcChannelReadyEvent.cs

Lines changed: 0 additions & 29 deletions
This file was deleted.

src/WebJobs.Script/Eventing/Rpc/RpcJobHostChannelReadyEvent.cs

Lines changed: 0 additions & 17 deletions
This file was deleted.

src/WebJobs.Script/Eventing/Rpc/RpcWebhostChannelReadyEvent.cs

Lines changed: 0 additions & 21 deletions
This file was deleted.

src/WebJobs.Script/Rpc/FunctionRegistration/FunctionDispatcher.cs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ internal class FunctionDispatcher : IFunctionDispatcher
2828
private readonly ILanguageWorkerChannelFactory _languageWorkerChannelFactory;
2929
private readonly IEnvironment _environment;
3030
private readonly IScriptJobHostEnvironment _scriptJobHostEnvironment;
31-
private readonly IDisposable _rpcChannelReadySubscriptions;
3231
private readonly int _debounceSeconds = 10;
3332
private readonly int _maxAllowedProcessCount = 10;
3433
private IScriptEventManager _eventManager;
@@ -84,10 +83,6 @@ public FunctionDispatcher(IOptions<ScriptJobHostOptions> scriptHostOptions,
8483
_workerErrorSubscription = _eventManager.OfType<WorkerErrorEvent>()
8584
.Subscribe(WorkerError);
8685

87-
_rpcChannelReadySubscriptions = _eventManager.OfType<RpcJobHostChannelReadyEvent>()
88-
.ObserveOn(NewThreadScheduler.Default)
89-
.Subscribe(AddOrUpdateWorkerChannels);
90-
9186
_shutdownStandbyWorkerChannels = ShutdownWebhostLanguageWorkerChannels;
9287
_shutdownStandbyWorkerChannels = _shutdownStandbyWorkerChannels.Debounce(milliseconds: 5000);
9388
}
@@ -107,12 +102,26 @@ internal async void InitializeJobhostLanguageWorkerChannelAsync()
107102
await InitializeJobhostLanguageWorkerChannelAsync(0);
108103
}
109104

110-
internal async Task InitializeJobhostLanguageWorkerChannelAsync(int attemptCount)
105+
internal Task InitializeJobhostLanguageWorkerChannelAsync(int attemptCount)
111106
{
112-
var languageWorkerChannel = _languageWorkerChannelFactory.CreateLanguageWorkerChannel(_scriptOptions.RootScriptPath, _workerRuntime, _metricsLogger, attemptCount, false, _managedDependencyOptions);
107+
var languageWorkerChannel = _languageWorkerChannelFactory.CreateLanguageWorkerChannel(_scriptOptions.RootScriptPath, _workerRuntime, _metricsLogger, attemptCount, _managedDependencyOptions);
113108
languageWorkerChannel.SetupFunctionInvocationBuffers(_functions);
114109
_jobHostLanguageWorkerChannelManager.AddChannel(languageWorkerChannel);
115-
await languageWorkerChannel.StartWorkerProcessAsync();
110+
languageWorkerChannel.StartWorkerProcessAsync()
111+
.ContinueWith(workerInitTask =>
112+
{
113+
if (workerInitTask.IsCompleted)
114+
{
115+
_logger.LogDebug("Adding jobhost language worker channel for runtime: {language}. workerId:{id}", _workerRuntime, languageWorkerChannel.Id);
116+
languageWorkerChannel.SendFunctionLoadRequests();
117+
State = FunctionDispatcherState.Initialized;
118+
}
119+
else
120+
{
121+
_logger.LogWarning("Failed to start language worker process jobhost for runtime: {language}. workerId:{id}", _workerRuntime, languageWorkerChannel.Id);
122+
}
123+
});
124+
return Task.CompletedTask;
116125
}
117126

118127
internal async void InitializeWebhostLanguageWorkerChannel()
@@ -266,22 +275,11 @@ private async Task RestartWorkerChannel(string runtime, string workerId)
266275
}
267276
}
268277

269-
private void AddOrUpdateWorkerChannels(RpcJobHostChannelReadyEvent rpcChannelReadyEvent)
270-
{
271-
if (!_disposing)
272-
{
273-
_logger.LogDebug("Adding jobhost language worker channel for runtime: {language}. workerId:{id}", rpcChannelReadyEvent.Language, rpcChannelReadyEvent.LanguageWorkerChannel.Id);
274-
rpcChannelReadyEvent.LanguageWorkerChannel.SendFunctionLoadRequests();
275-
State = FunctionDispatcherState.Initialized;
276-
}
277-
}
278-
279278
protected virtual void Dispose(bool disposing)
280279
{
281280
if (!_disposed && disposing)
282281
{
283282
_workerErrorSubscription.Dispose();
284-
_rpcChannelReadySubscriptions.Dispose();
285283
_processStartCancellationToken.Cancel();
286284
_processStartCancellationToken.Dispose();
287285
_jobHostLanguageWorkerChannelManager.DisposeAndRemoveChannels();

src/WebJobs.Script/Rpc/ILanguageWorkerChannelFactory.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@ namespace Microsoft.Azure.WebJobs.Script.Rpc
99
{
1010
public interface ILanguageWorkerChannelFactory
1111
{
12-
ILanguageWorkerChannel CreateLanguageWorkerChannel(string scriptRootPath, string language, IMetricsLogger metricsLogger, int attemptCount, bool isWebhostChannel = false, IOptions<ManagedDependencyOptions> managedDependencyOptions = null);
12+
ILanguageWorkerChannel CreateLanguageWorkerChannel(string scriptRootPath, string language, IMetricsLogger metricsLogger, int attemptCount, IOptions<ManagedDependencyOptions> managedDependencyOptions = null);
1313
}
1414
}
Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4-
using System.Diagnostics;
5-
64
namespace Microsoft.Azure.WebJobs.Script.Rpc
75
{
86
public interface ILanguageWorkerProcess
97
{
108
int Id { get; }
119

12-
Process StartProcess();
10+
void StartProcess();
1311
}
1412
}

src/WebJobs.Script/Rpc/LanguageWorkerChannel.cs

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Collections.Generic;
88
using System.IO;
99
using System.Linq;
10+
using System.Reactive.Concurrency;
1011
using System.Reactive.Linq;
1112
using System.Threading.Tasks;
1213
using System.Threading.Tasks.Dataflow;
@@ -34,7 +35,6 @@ internal class LanguageWorkerChannel : ILanguageWorkerChannel, IDisposable
3435

3536
private bool _disposed;
3637
private bool _disposing;
37-
private bool _isWebHostChannel;
3838
private WorkerInitResponse _initMessage;
3939
private string _workerId;
4040
private LanguageWorkerChannelState _state;
@@ -53,6 +53,7 @@ internal class LanguageWorkerChannel : ILanguageWorkerChannel, IDisposable
5353
private ILogger _workerChannelLogger;
5454
private ILanguageWorkerProcess _languageWorkerProcess;
5555
private TaskCompletionSource<bool> _reloadTask = new TaskCompletionSource<bool>();
56+
private TaskCompletionSource<bool> _workerInitTask = new TaskCompletionSource<bool>();
5657

5758
internal LanguageWorkerChannel(
5859
string workerId,
@@ -63,15 +64,13 @@ internal LanguageWorkerChannel(
6364
ILogger logger,
6465
IMetricsLogger metricsLogger,
6566
int attemptCount,
66-
bool isWebHostChannel = false,
6767
IOptions<ManagedDependencyOptions> managedDependencyOptions = null)
6868
{
6969
_workerId = workerId;
7070
_rootScriptPath = rootScriptPath;
7171
_eventManager = eventManager;
7272
_workerConfig = workerConfig;
7373
_runtime = workerConfig.Language;
74-
_isWebHostChannel = isWebHostChannel;
7574
_languageWorkerProcess = languageWorkerProcess;
7675
_workerChannelLogger = logger;
7776

@@ -120,7 +119,7 @@ public Task StartWorkerProcessAsync()
120119

121120
_state = LanguageWorkerChannelState.Initializing;
122121

123-
return Task.CompletedTask;
122+
return _workerInitTask.Task;
124123
}
125124

126125
// send capabilities to worker, wait for WorkerInitResponse
@@ -129,7 +128,7 @@ internal void SendWorkerInitRequest(RpcEvent startEvent)
129128
_inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.WorkerInitResponse)
130129
.Timeout(workerInitTimeout)
131130
.Take(1)
132-
.Subscribe(PublishRpcChannelReadyEvent, HandleWorkerChannelError);
131+
.Subscribe(WorkerInitResponse, HandleWorkerChannelError);
133132

134133
SendStreamingMessage(new StreamingMessage
135134
{
@@ -143,51 +142,30 @@ internal void SendWorkerInitRequest(RpcEvent startEvent)
143142
internal void FunctionEnvironmentReloadResponse(FunctionEnvironmentReloadResponse res)
144143
{
145144
_workerChannelLogger.LogDebug("Received FunctionEnvironmentReloadResponse");
146-
if (_reloadTask.Task.IsCompleted)
145+
if (res.Result.IsFailure(out Exception reloadEnvironmentVariablesException))
147146
{
148-
throw new InvalidOperationException("FunctionEnvironmentReloadResponse received more than once");
149-
}
150-
if (res.Result.IsFailure(out Exception relaodEnvironmentVariablesException))
151-
{
152-
_workerChannelLogger.LogError(relaodEnvironmentVariablesException, "Failed to reload environment variables");
147+
_workerChannelLogger.LogError(reloadEnvironmentVariablesException, "Failed to reload environment variables");
153148
_reloadTask.SetResult(false);
154149
}
155150
_reloadTask.SetResult(true);
156151
}
157152

158-
internal void PublishRpcChannelReadyEvent(RpcEvent initEvent)
153+
internal void WorkerInitResponse(RpcEvent initEvent)
159154
{
160155
_startLatencyMetric?.Dispose();
161156
_startLatencyMetric = null;
162157

163-
if (_disposing)
164-
{
165-
// do not publish ready events when disposing
166-
return;
167-
}
158+
_workerChannelLogger.LogDebug("Received WorkerInitResponse");
168159
_initMessage = initEvent.Message.WorkerInitResponse;
169160
if (_initMessage.Result.IsFailure(out Exception exc))
170161
{
171162
HandleWorkerChannelError(exc);
163+
_workerInitTask.SetResult(false);
172164
return;
173165
}
174-
175166
_state = LanguageWorkerChannelState.Initialized;
176-
177167
_workerCapabilities.UpdateCapabilities(_initMessage.Capabilities);
178-
179-
if (_isWebHostChannel)
180-
{
181-
_workerChannelLogger.LogDebug("Publishing RpcWebHostChannelReadyEvent for runtime:{language}, workerId:{id}", _workerConfig.Language, _workerId);
182-
RpcWebHostChannelReadyEvent readyEvent = new RpcWebHostChannelReadyEvent(_workerId, _runtime, this, _initMessage.WorkerVersion, _initMessage.Capabilities);
183-
_eventManager.Publish(readyEvent);
184-
}
185-
else
186-
{
187-
_workerChannelLogger.LogDebug("Publishing RpcJobHostChannelReadyEvent for runtime:{language}, workerId:{id}", _workerConfig.Language, _workerId);
188-
RpcJobHostChannelReadyEvent readyEvent = new RpcJobHostChannelReadyEvent(_workerId, _runtime, this, _initMessage.WorkerVersion, _initMessage.Capabilities);
189-
_eventManager.Publish(readyEvent);
190-
}
168+
_workerInitTask.SetResult(true);
191169
}
192170

193171
public void SetupFunctionInvocationBuffers(IEnumerable<FunctionMetadata> functions)

src/WebJobs.Script/Rpc/LanguageWorkerChannelFactory.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public LanguageWorkerChannelFactory(IScriptEventManager eventManager, IEnvironme
3030
_languageWorkerProcessManager = languageWorkerProcessManager;
3131
}
3232

33-
public ILanguageWorkerChannel CreateLanguageWorkerChannel(string scriptRootPath, string runtime, IMetricsLogger metricsLogger, int attemptCount, bool isWebhostChannel = false, IOptions<ManagedDependencyOptions> managedDependencyOptions = null)
33+
public ILanguageWorkerChannel CreateLanguageWorkerChannel(string scriptRootPath, string runtime, IMetricsLogger metricsLogger, int attemptCount, IOptions<ManagedDependencyOptions> managedDependencyOptions = null)
3434
{
3535
var languageWorkerConfig = _workerConfigs.Where(c => c.Language.Equals(runtime, StringComparison.OrdinalIgnoreCase)).FirstOrDefault();
3636
if (languageWorkerConfig == null)
@@ -49,7 +49,6 @@ public ILanguageWorkerChannel CreateLanguageWorkerChannel(string scriptRootPath,
4949
workerLogger,
5050
metricsLogger,
5151
attemptCount,
52-
isWebhostChannel,
5352
managedDependencyOptions);
5453
}
5554
}

src/WebJobs.Script/Rpc/LanguageWorkerProcess.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ internal LanguageWorkerProcess(string runtime,
6161

6262
internal Queue<string> ProcessStdErrDataQueue => _processStdErrDataQueue;
6363

64-
public Process StartProcess()
64+
public void StartProcess()
6565
{
6666
try
6767
{
@@ -84,8 +84,6 @@ public Process StartProcess()
8484
{
8585
throw new HostInitializationException($"Failed to start Language Worker Channel for language :{_runtime}", ex);
8686
}
87-
88-
return _process;
8987
}
9088

9189
private void OnErrorDataReceived(object sender, DataReceivedEventArgs e)

src/WebJobs.Script/Rpc/WebHostLanguageWorkerChannelManager.cs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,9 @@ private async Task<ILanguageWorkerChannel> InitializeLanguageWorkerChannel(strin
5454
_logger.LogDebug("Creating language worker channel for runtime:{runtime}", runtime);
5555
try
5656
{
57-
languageWorkerChannel = _languageWorkerChannelFactory.CreateLanguageWorkerChannel(scriptRootPath, runtime, null, 0, true);
57+
languageWorkerChannel = _languageWorkerChannelFactory.CreateLanguageWorkerChannel(scriptRootPath, runtime, null, 0);
5858
await languageWorkerChannel.StartWorkerProcessAsync();
59-
IObservable<RpcWebHostChannelReadyEvent> rpcChannelReadyEvent = _eventManager.OfType<RpcWebHostChannelReadyEvent>()
60-
.Where(msg => msg.Language == runtime).Timeout(workerInitTimeout);
61-
// Wait for response from language worker process
62-
RpcWebHostChannelReadyEvent readyEvent = await rpcChannelReadyEvent.FirstAsync();
63-
AddOrUpdateWorkerChannels(readyEvent);
59+
AddOrUpdateWorkerChannels(runtime, languageWorkerChannel);
6460
}
6561
catch (Exception ex)
6662
{
@@ -161,19 +157,19 @@ public void ShutdownChannels()
161157
}
162158
}
163159

164-
internal void AddOrUpdateWorkerChannels(RpcWebHostChannelReadyEvent rpcChannelReadyEvent)
160+
internal void AddOrUpdateWorkerChannels(string initializedRuntime, ILanguageWorkerChannel initializedLanguageWorkerChannel)
165161
{
166-
_logger.LogDebug("Adding webhost language worker channel for runtime: {language}. workerId:{id}", rpcChannelReadyEvent.Language, rpcChannelReadyEvent.LanguageWorkerChannel.Id);
167-
_workerChannels.AddOrUpdate(rpcChannelReadyEvent.Language,
162+
_logger.LogDebug("Adding webhost language worker channel for runtime: {language}. workerId:{id}", initializedRuntime, initializedLanguageWorkerChannel.Id);
163+
_workerChannels.AddOrUpdate(initializedRuntime,
168164
(runtime) =>
169165
{
170166
List<ILanguageWorkerChannel> newLanguageWorkerChannels = new List<ILanguageWorkerChannel>();
171-
newLanguageWorkerChannels.Add(rpcChannelReadyEvent.LanguageWorkerChannel);
167+
newLanguageWorkerChannels.Add(initializedLanguageWorkerChannel);
172168
return newLanguageWorkerChannels;
173169
},
174170
(runtime, existingLanguageWorkerChannels) =>
175171
{
176-
existingLanguageWorkerChannels.Add(rpcChannelReadyEvent.LanguageWorkerChannel);
172+
existingLanguageWorkerChannels.Add(initializedLanguageWorkerChannel);
177173
return existingLanguageWorkerChannels;
178174
});
179175
}

0 commit comments

Comments
 (0)