Skip to content

Commit 3699fc1

Browse files
authored
Initialize invoke BufferBlock during function load (#4492)
1 parent ce30a6e commit 3699fc1

File tree

5 files changed

+22
-25
lines changed

5 files changed

+22
-25
lines changed

src/WebJobs.Script/Description/Rpc/WorkerLanguageInvoker.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ protected override async Task<object> InvokeCore(object[] parameters, FunctionIn
7575

7676
ScriptInvocationResult result;
7777
_logger.LogDebug($"Sending invocation id:{invocationId}");
78-
await _functionDispatcher.InvokeAsync(invocationContext);
78+
_functionDispatcher.Invoke(invocationContext);
7979
result = await invocationContext.ResultSource.Task;
8080

8181
await BindOutputsAsync(triggerValue, context.Binder, result);
@@ -87,7 +87,7 @@ private async Task DelayUntilFunctionDispatcherInitialized()
8787
if (_functionDispatcher != null && _functionDispatcher.State == FunctionDispatcherState.Initializing)
8888
{
8989
_logger.LogDebug($"functionDispatcher state: {_functionDispatcher.State}");
90-
await Utility.DelayAsync(ScriptConstants.HostTimeoutSeconds, ScriptConstants.HostPollingIntervalMilliseconds, () =>
90+
await Utility.DelayAsync(LanguageWorkerConstants.ProcessStartTimeoutSeconds, 25, () =>
9191
{
9292
return _functionDispatcher.State != FunctionDispatcherState.Initialized;
9393
});

src/WebJobs.Script/Rpc/FunctionRegistration/FunctionDispatcher.cs

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -179,35 +179,28 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions)
179179
}
180180
}
181181

182-
public async Task InvokeAsync(ScriptInvocationContext invocationContext)
182+
public void Invoke(ScriptInvocationContext invocationContext)
183183
{
184184
try
185185
{
186186
IEnumerable<ILanguageWorkerChannel> workerChannels = GetInitializedWorkerChannels();
187187
var languageWorkerChannel = _functionDispatcherLoadBalancer.GetLanguageWorkerChannel(workerChannels, _maxProcessCount);
188-
BufferBlock<ScriptInvocationContext> bufferBlock = await GetFunctionInvocationBufferBlock(languageWorkerChannel, invocationContext.FunctionMetadata.FunctionId);
189-
_logger.LogDebug("Posting invocation id:{InvocationId} on workerId:{workerChannelId}", invocationContext.ExecutionContext.InvocationId, languageWorkerChannel.Id);
190-
languageWorkerChannel.FunctionInputBuffers[invocationContext.FunctionMetadata.FunctionId].Post(invocationContext);
188+
if (languageWorkerChannel.FunctionInputBuffers.TryGetValue(invocationContext.FunctionMetadata.FunctionId, out BufferBlock<ScriptInvocationContext> bufferBlock))
189+
{
190+
_logger.LogDebug("Posting invocation id:{InvocationId} on workerId:{workerChannelId}", invocationContext.ExecutionContext.InvocationId, languageWorkerChannel.Id);
191+
languageWorkerChannel.FunctionInputBuffers[invocationContext.FunctionMetadata.FunctionId].Post(invocationContext);
192+
}
193+
else
194+
{
195+
throw new InvalidOperationException($"Function:{invocationContext.FunctionMetadata.Name} is not loaded by the language worker: {languageWorkerChannel.Id}");
196+
}
191197
}
192198
catch (Exception invokeEx)
193199
{
194200
invocationContext.ResultSource.TrySetException(invokeEx);
195201
}
196202
}
197203

198-
private async Task<BufferBlock<ScriptInvocationContext>> GetFunctionInvocationBufferBlock(ILanguageWorkerChannel languageWorkerChannel, string functionId)
199-
{
200-
BufferBlock<ScriptInvocationContext> bufferBlock = null;
201-
if (languageWorkerChannel != null)
202-
{
203-
await Utility.DelayAsync(ScriptConstants.HostTimeoutSeconds, ScriptConstants.HostPollingIntervalMilliseconds, () =>
204-
{
205-
return !languageWorkerChannel.FunctionInputBuffers.TryGetValue(functionId, out bufferBlock);
206-
});
207-
}
208-
return bufferBlock;
209-
}
210-
211204
internal IEnumerable<ILanguageWorkerChannel> GetInitializedWorkerChannels()
212205
{
213206
IEnumerable<ILanguageWorkerChannel> webhostChannels = _languageWorkerChannelManager.GetChannels(_workerRuntime);

src/WebJobs.Script/Rpc/FunctionRegistration/IFunctionDispatcher.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public interface IFunctionDispatcher : IDisposable
1717
// Tests if the function metadata is supported by a known language worker
1818
bool IsSupported(FunctionMetadata metadata, string language);
1919

20-
Task InvokeAsync(ScriptInvocationContext invocationContext);
20+
void Invoke(ScriptInvocationContext invocationContext);
2121

2222
Task InitializeAsync(IEnumerable<FunctionMetadata> functions);
2323
}

src/WebJobs.Script/Rpc/LanguageWorkerChannel.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ namespace Microsoft.Azure.WebJobs.Script.Rpc
2828
{
2929
internal class LanguageWorkerChannel : ILanguageWorkerChannel
3030
{
31-
private readonly TimeSpan processStartTimeout = TimeSpan.FromSeconds(40);
3231
private readonly TimeSpan workerInitTimeout = TimeSpan.FromSeconds(30);
3332
private readonly string _rootScriptPath;
3433
private readonly IScriptEventManager _eventManager;
@@ -242,7 +241,7 @@ private void OnErrorDataReceived(object sender, DataReceivedEventArgs e)
242241
public Task StartWorkerProcessAsync()
243242
{
244243
_startSubscription = _inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.StartStream)
245-
.Timeout(processStartTimeout)
244+
.Timeout(TimeSpan.FromSeconds(LanguageWorkerConstants.ProcessStartTimeoutSeconds))
246245
.Take(1)
247246
.Subscribe(SendWorkerInitRequest, HandleWorkerError);
248247

@@ -353,6 +352,9 @@ public void SendFunctionEnvironmentReloadRequest()
353352

354353
internal void SendFunctionLoadRequest(FunctionMetadata metadata)
355354
{
355+
_workerChannelLogger.LogDebug("Sending FunctionLoadRequest for function:{functionName} with functionId:{id}", metadata.Name, metadata.FunctionId);
356+
_functionInputBuffers[metadata.FunctionId] = new BufferBlock<ScriptInvocationContext>();
357+
356358
// send a load request for the registered function
357359
FunctionLoadRequest request = new FunctionLoadRequest()
358360
{
@@ -387,9 +389,7 @@ internal void SendFunctionLoadRequest(FunctionMetadata metadata)
387389

388390
internal void LoadResponse(FunctionLoadResponse loadResponse)
389391
{
390-
// associate the invocation input buffer with the function
391-
_functionInputBuffers[loadResponse.FunctionId] = new BufferBlock<ScriptInvocationContext>();
392-
392+
_workerChannelLogger.LogDebug("Received FunctionLoadResponse for functionId:{functionId}", loadResponse.FunctionId);
393393
if (loadResponse.Result.IsFailure(out Exception ex))
394394
{
395395
//Cache function load errors to replay error messages on invoking failed functions
@@ -403,6 +403,7 @@ internal void LoadResponse(FunctionLoadResponse loadResponse)
403403

404404
// link the invocation inputs to the invoke call
405405
var invokeBlock = new ActionBlock<ScriptInvocationContext>(ctx => SendInvocationRequest(ctx));
406+
// associate the invocation input buffer with the function
406407
var disposableLink = _functionInputBuffers[loadResponse.FunctionId].LinkTo(invokeBlock);
407408
_inputLinks.Add(disposableLink);
408409
}

src/WebJobs.Script/Rpc/LanguageWorkerConstants.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4+
using System;
5+
46
namespace Microsoft.Azure.WebJobs.Script.Rpc
57
{
68
public static class LanguageWorkerConstants
79
{
10+
public const int ProcessStartTimeoutSeconds = 60;
811
public const string FunctionWorkerRuntimeSettingName = "FUNCTIONS_WORKER_RUNTIME";
912
public const string FunctionsWorkerProcessCountSettingName = "FUNCTIONS_WORKER_PROCESS_COUNT";
1013
public const string DotNetLanguageWorkerName = "dotnet";

0 commit comments

Comments
 (0)