Skip to content

Commit 521a61a

Browse files
pragnagopayojagad
authored andcommitted
Ensure invocation buffers are setup before routing invocations to worker channel (#5807)
1 parent 70d1c61 commit 521a61a

File tree

7 files changed

+55
-28
lines changed

7 files changed

+55
-28
lines changed

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -117,18 +117,18 @@ internal Task InitializeJobhostLanguageWorkerChannelAsync(int attemptCount)
117117
rpcWorkerChannel.SetupFunctionInvocationBuffers(_functions);
118118
_jobHostLanguageWorkerChannelManager.AddChannel(rpcWorkerChannel);
119119
rpcWorkerChannel.StartWorkerProcessAsync().ContinueWith(workerInitTask =>
120-
{
121-
if (workerInitTask.IsCompleted)
122-
{
123-
_logger.LogDebug("Adding jobhost language worker channel for runtime: {language}. workerId:{id}", _workerRuntime, rpcWorkerChannel.Id);
124-
rpcWorkerChannel.SendFunctionLoadRequests(_managedDependencyOptions.Value);
125-
State = FunctionInvocationDispatcherState.Initialized;
126-
}
127-
else
128-
{
129-
_logger.LogWarning("Failed to start language worker process for runtime: {language}. workerId:{id}", _workerRuntime, rpcWorkerChannel.Id);
130-
}
131-
});
120+
{
121+
if (workerInitTask.IsCompleted)
122+
{
123+
_logger.LogDebug("Adding jobhost language worker channel for runtime: {language}. workerId:{id}", _workerRuntime, rpcWorkerChannel.Id);
124+
rpcWorkerChannel.SendFunctionLoadRequests(_managedDependencyOptions.Value);
125+
State = FunctionInvocationDispatcherState.Initialized;
126+
}
127+
else
128+
{
129+
_logger.LogWarning("Failed to start language worker process for runtime: {language}. workerId:{id}", _workerRuntime, rpcWorkerChannel.Id);
130+
}
131+
});
132132
return Task.CompletedTask;
133133
}
134134

@@ -264,7 +264,7 @@ internal async Task<IEnumerable<IRpcWorkerChannel>> GetInitializedWorkerChannels
264264
}
265265
}
266266
IEnumerable<IRpcWorkerChannel> workerChannels = webhostChannels == null ? _jobHostLanguageWorkerChannelManager.GetChannels() : webhostChannels.Union(_jobHostLanguageWorkerChannelManager.GetChannels());
267-
IEnumerable<IRpcWorkerChannel> initializedWorkers = workerChannels.Where(ch => ch.State == RpcWorkerChannelState.Initialized);
267+
IEnumerable<IRpcWorkerChannel> initializedWorkers = workerChannels.Where(ch => ch.IsChannelReadyForInvocations());
268268
if (initializedWorkers.Count() > _maxProcessCount)
269269
{
270270
throw new InvalidOperationException($"Number of initialized language workers exceeded:{initializedWorkers.Count()} exceeded maxProcessCount: {_maxProcessCount}");
@@ -400,4 +400,4 @@ public void Dispose()
400400
Dispose(true);
401401
}
402402
}
403-
}
403+
}

src/WebJobs.Script/Workers/Rpc/IRpcWorkerChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public interface IRpcWorkerChannel
1616

1717
IDictionary<string, BufferBlock<ScriptInvocationContext>> FunctionInputBuffers { get; }
1818

19-
RpcWorkerChannelState State { get; }
19+
bool IsChannelReadyForInvocations();
2020

2121
void SetupFunctionInvocationBuffers(IEnumerable<FunctionMetadata> functions);
2222

src/WebJobs.Script/Workers/Rpc/RpcWorkerChannel.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,13 @@ internal RpcWorkerChannel(
107107

108108
public IDictionary<string, BufferBlock<ScriptInvocationContext>> FunctionInputBuffers => _functionInputBuffers;
109109

110-
public RpcWorkerChannelState State => _state;
111-
112110
internal IWorkerProcess WorkerProcess => _rpcWorkerProcess;
113111

112+
public bool IsChannelReadyForInvocations()
113+
{
114+
return _state.HasFlag(RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
115+
}
116+
114117
public async Task StartWorkerProcessAsync()
115118
{
116119
_startSubscription = _inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.StartStream)
@@ -120,7 +123,7 @@ public async Task StartWorkerProcessAsync()
120123

121124
_workerChannelLogger.LogDebug("Initiating Worker Process start up");
122125
await _rpcWorkerProcess.StartProcessAsync();
123-
_state = RpcWorkerChannelState.Initializing;
126+
_state = _state | RpcWorkerChannelState.Initializing;
124127
await _workerInitTask.Task;
125128
}
126129

@@ -167,7 +170,7 @@ internal void WorkerInitResponse(RpcEvent initEvent)
167170
_workerInitTask.SetResult(false);
168171
return;
169172
}
170-
_state = RpcWorkerChannelState.Initialized;
173+
_state = _state | RpcWorkerChannelState.Initialized;
171174
_workerCapabilities.UpdateCapabilities(_initMessage.Capabilities);
172175
_workerInitTask.SetResult(true);
173176
}
@@ -180,6 +183,7 @@ public void SetupFunctionInvocationBuffers(IEnumerable<FunctionMetadata> functio
180183
_workerChannelLogger.LogDebug("Setting up FunctionInvocationBuffer for function:{functionName} with functionId:{id}", metadata.Name, metadata.FunctionId);
181184
_functionInputBuffers[metadata.FunctionId] = new BufferBlock<ScriptInvocationContext>();
182185
}
186+
_state = _state | RpcWorkerChannelState.InvocationBuffersInitialized;
183187
}
184188

185189
public void SendFunctionLoadRequests(ManagedDependencyOptions managedDependencyOptions)
Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,31 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4+
using System;
5+
46
namespace Microsoft.Azure.WebJobs.Script.Workers.Rpc
57
{
8+
[Flags]
69
public enum RpcWorkerChannelState
710
{
811
/// <summary>
912
/// The Default state of LanguageWorkerChannel.
1013
/// </summary>
11-
Default,
14+
Default = 0,
15+
16+
/// <summary>
17+
/// LanguageWorkerChannel is created. InvocationBuffers per function are setup
18+
/// </summary>
19+
InvocationBuffersInitialized = 1,
1220

1321
/// <summary>
1422
/// The LanguageWorkerChannel is created.Worker process is starting
1523
/// </summary>
16-
Initializing,
24+
Initializing = 2,
1725

1826
/// <summary>
1927
/// LanguageWorkerChannel is created. Worker process is Initialized. Rpc Channel is established.
2028
/// </summary>
21-
Initialized
29+
Initialized = 4,
2230
}
2331
}

test/WebJobs.Script.Tests/Workers/Rpc/RpcFunctionInvocationDispatcherTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ await TestHelpers.Await(() =>
465465
currentChannelCount = functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels().Count();
466466
if (currentChannelCount == expectedCount)
467467
{
468-
return functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels().All(ch => ch.State == RpcWorkerChannelState.Initialized);
468+
return functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels().All(ch => ch.IsChannelReadyForInvocations());
469469
}
470470
return false;
471471
}, pollingInterval: expectedCount * 5 * 1000, timeout: 60 * 1000);
@@ -514,4 +514,4 @@ private IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime)
514514
};
515515
}
516516
}
517-
}
517+
}

test/WebJobs.Script.Tests/Workers/Rpc/RpcWorkerChannelTests.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,24 @@ public RpcWorkerChannelTests()
7171
}
7272

7373
[Fact]
74-
public async Task StartWorkerProcessAsync_Invoked()
74+
public async Task StartWorkerProcessAsync_Invoked_SetupFunctionBuffers_Verify_ReadyForInvocation()
7575
{
7676
var initTask = _workerChannel.StartWorkerProcessAsync();
7777
_testFunctionRpcService.PublishStartStreamEvent(_workerId);
7878
_testFunctionRpcService.PublishWorkerInitResponseEvent();
7979
await initTask;
8080
_mockrpcWorkerProcess.Verify(m => m.StartProcessAsync(), Times.Once);
81+
Assert.False(_workerChannel.IsChannelReadyForInvocations());
82+
_workerChannel.SetupFunctionInvocationBuffers(GetTestFunctionsList("node"));
83+
Assert.True(_workerChannel.IsChannelReadyForInvocations());
84+
}
85+
86+
[Fact]
87+
public void SetupFunctionBuffers_Verify_ReadyForInvocation_Returns_False()
88+
{
89+
Assert.False(_workerChannel.IsChannelReadyForInvocations());
90+
_workerChannel.SetupFunctionInvocationBuffers(GetTestFunctionsList("node"));
91+
Assert.False(_workerChannel.IsChannelReadyForInvocations());
8192
}
8293

8394
[Fact]

test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannel.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,13 @@ public TestRpcWorkerChannel(string workerId, string runtime = null, IScriptEvent
4141

4242
public List<Task> ExecutionContexts => _executionContexts;
4343

44-
public RpcWorkerChannelState State => _state;
45-
4644
public void Dispose()
4745
{
4846
}
4947

5048
public void SetupFunctionInvocationBuffers(IEnumerable<FunctionMetadata> functions)
5149
{
50+
_state = _state | RpcWorkerChannelState.InvocationBuffersInitialized;
5251
_testLogger.LogInformation("SetupFunctionInvocationBuffers called");
5352
}
5453

@@ -82,7 +81,7 @@ public async Task StartWorkerProcessAsync()
8281
{
8382
{ "test", "testSupported" }
8483
};
85-
_state = RpcWorkerChannelState.Initialized;
84+
_state = _state | RpcWorkerChannelState.Initialized;
8685
}
8786

8887
public void RaiseWorkerError()
@@ -107,5 +106,10 @@ public async Task DrainInvocationsAsync()
107106
await Task.WhenAll(ExecutionContexts);
108107
ExecutionContexts.Clear();
109108
}
109+
110+
public bool IsChannelReadyForInvocations()
111+
{
112+
return _state.HasFlag(RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
113+
}
110114
}
111115
}

0 commit comments

Comments
 (0)