Skip to content

Commit 3f19744

Browse files
pragnagopamhoeger
authored andcommitted
Ensure invocation buffers are setup before routing invocations to worker channel (#5807)
1 parent 0a29548 commit 3f19744

File tree

7 files changed

+55
-28
lines changed

7 files changed

+55
-28
lines changed

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -117,18 +117,18 @@ internal Task InitializeJobhostLanguageWorkerChannelAsync(int attemptCount)
117117
rpcWorkerChannel.SetupFunctionInvocationBuffers(_functions);
118118
_jobHostLanguageWorkerChannelManager.AddChannel(rpcWorkerChannel);
119119
rpcWorkerChannel.StartWorkerProcessAsync().ContinueWith(workerInitTask =>
120-
{
121-
if (workerInitTask.IsCompleted)
122-
{
123-
_logger.LogDebug("Adding jobhost language worker channel for runtime: {language}. workerId:{id}", _workerRuntime, rpcWorkerChannel.Id);
124-
rpcWorkerChannel.SendFunctionLoadRequests(_managedDependencyOptions.Value);
125-
State = FunctionInvocationDispatcherState.Initialized;
126-
}
127-
else
128-
{
129-
_logger.LogWarning("Failed to start language worker process for runtime: {language}. workerId:{id}", _workerRuntime, rpcWorkerChannel.Id);
130-
}
131-
});
120+
{
121+
if (workerInitTask.IsCompleted)
122+
{
123+
_logger.LogDebug("Adding jobhost language worker channel for runtime: {language}. workerId:{id}", _workerRuntime, rpcWorkerChannel.Id);
124+
rpcWorkerChannel.SendFunctionLoadRequests(_managedDependencyOptions.Value);
125+
State = FunctionInvocationDispatcherState.Initialized;
126+
}
127+
else
128+
{
129+
_logger.LogWarning("Failed to start language worker process for runtime: {language}. workerId:{id}", _workerRuntime, rpcWorkerChannel.Id);
130+
}
131+
});
132132
return Task.CompletedTask;
133133
}
134134

@@ -264,7 +264,7 @@ internal async Task<IEnumerable<IRpcWorkerChannel>> GetInitializedWorkerChannels
264264
}
265265
}
266266
IEnumerable<IRpcWorkerChannel> workerChannels = webhostChannels == null ? _jobHostLanguageWorkerChannelManager.GetChannels() : webhostChannels.Union(_jobHostLanguageWorkerChannelManager.GetChannels());
267-
IEnumerable<IRpcWorkerChannel> initializedWorkers = workerChannels.Where(ch => ch.State == RpcWorkerChannelState.Initialized);
267+
IEnumerable<IRpcWorkerChannel> initializedWorkers = workerChannels.Where(ch => ch.IsChannelReadyForInvocations());
268268
if (initializedWorkers.Count() > _maxProcessCount)
269269
{
270270
throw new InvalidOperationException($"Number of initialized language workers exceeded:{initializedWorkers.Count()} exceeded maxProcessCount: {_maxProcessCount}");
@@ -400,4 +400,4 @@ public void Dispose()
400400
Dispose(true);
401401
}
402402
}
403-
}
403+
}

src/WebJobs.Script/Workers/Rpc/IRpcWorkerChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public interface IRpcWorkerChannel
1616

1717
IDictionary<string, BufferBlock<ScriptInvocationContext>> FunctionInputBuffers { get; }
1818

19-
RpcWorkerChannelState State { get; }
19+
bool IsChannelReadyForInvocations();
2020

2121
void SetupFunctionInvocationBuffers(IEnumerable<FunctionMetadata> functions);
2222

src/WebJobs.Script/Workers/Rpc/RpcWorkerChannel.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,13 @@ internal RpcWorkerChannel(
110110

111111
public IDictionary<string, BufferBlock<ScriptInvocationContext>> FunctionInputBuffers => _functionInputBuffers;
112112

113-
public RpcWorkerChannelState State => _state;
114-
115113
internal IWorkerProcess WorkerProcess => _rpcWorkerProcess;
116114

115+
public bool IsChannelReadyForInvocations()
116+
{
117+
return _state.HasFlag(RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
118+
}
119+
117120
public async Task StartWorkerProcessAsync()
118121
{
119122
_startSubscription = _inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.StartStream)
@@ -123,7 +126,7 @@ public async Task StartWorkerProcessAsync()
123126

124127
_workerChannelLogger.LogDebug("Initiating Worker Process start up");
125128
await _rpcWorkerProcess.StartProcessAsync();
126-
_state = RpcWorkerChannelState.Initializing;
129+
_state = _state | RpcWorkerChannelState.Initializing;
127130
await _workerInitTask.Task;
128131
}
129132

@@ -180,7 +183,7 @@ internal void WorkerInitResponse(RpcEvent initEvent)
180183
_workerInitTask.SetResult(false);
181184
return;
182185
}
183-
_state = RpcWorkerChannelState.Initialized;
186+
_state = _state | RpcWorkerChannelState.Initialized;
184187
_workerCapabilities.UpdateCapabilities(_initMessage.Capabilities);
185188
_workerInitTask.SetResult(true);
186189
}
@@ -193,6 +196,7 @@ public void SetupFunctionInvocationBuffers(IEnumerable<FunctionMetadata> functio
193196
_workerChannelLogger.LogDebug("Setting up FunctionInvocationBuffer for function:{functionName} with functionId:{id}", metadata.Name, metadata.FunctionId);
194197
_functionInputBuffers[metadata.FunctionId] = new BufferBlock<ScriptInvocationContext>();
195198
}
199+
_state = _state | RpcWorkerChannelState.InvocationBuffersInitialized;
196200
}
197201

198202
public void SendFunctionLoadRequests(ManagedDependencyOptions managedDependencyOptions)
Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,31 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4+
using System;
5+
46
namespace Microsoft.Azure.WebJobs.Script.Workers.Rpc
57
{
8+
[Flags]
69
public enum RpcWorkerChannelState
710
{
811
/// <summary>
912
/// The Default state of LanguageWorkerChannel.
1013
/// </summary>
11-
Default,
14+
Default = 0,
15+
16+
/// <summary>
17+
/// LanguageWorkerChannel is created. InvocationBuffers per function are setup
18+
/// </summary>
19+
InvocationBuffersInitialized = 1,
1220

1321
/// <summary>
1422
/// The LanguageWorkerChannel is created.Worker process is starting
1523
/// </summary>
16-
Initializing,
24+
Initializing = 2,
1725

1826
/// <summary>
1927
/// LanguageWorkerChannel is created. Worker process is Initialized. Rpc Channel is established.
2028
/// </summary>
21-
Initialized
29+
Initialized = 4,
2230
}
2331
}

test/WebJobs.Script.Tests/Workers/Rpc/RpcFunctionInvocationDispatcherTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ await TestHelpers.Await(() =>
465465
currentChannelCount = functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels().Count();
466466
if (currentChannelCount == expectedCount)
467467
{
468-
return functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels().All(ch => ch.State == RpcWorkerChannelState.Initialized);
468+
return functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels().All(ch => ch.IsChannelReadyForInvocations());
469469
}
470470
return false;
471471
}, pollingInterval: expectedCount * 5 * 1000, timeout: 60 * 1000);
@@ -514,4 +514,4 @@ private IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime)
514514
};
515515
}
516516
}
517-
}
517+
}

test/WebJobs.Script.Tests/Workers/Rpc/RpcWorkerChannelTests.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,24 @@ public RpcWorkerChannelTests()
7474
}
7575

7676
[Fact]
77-
public async Task StartWorkerProcessAsync_Invoked()
77+
public async Task StartWorkerProcessAsync_Invoked_SetupFunctionBuffers_Verify_ReadyForInvocation()
7878
{
7979
var initTask = _workerChannel.StartWorkerProcessAsync();
8080
_testFunctionRpcService.PublishStartStreamEvent(_workerId);
8181
_testFunctionRpcService.PublishWorkerInitResponseEvent();
8282
await initTask;
8383
_mockrpcWorkerProcess.Verify(m => m.StartProcessAsync(), Times.Once);
84+
Assert.False(_workerChannel.IsChannelReadyForInvocations());
85+
_workerChannel.SetupFunctionInvocationBuffers(GetTestFunctionsList("node"));
86+
Assert.True(_workerChannel.IsChannelReadyForInvocations());
87+
}
88+
89+
[Fact]
90+
public void SetupFunctionBuffers_Verify_ReadyForInvocation_Returns_False()
91+
{
92+
Assert.False(_workerChannel.IsChannelReadyForInvocations());
93+
_workerChannel.SetupFunctionInvocationBuffers(GetTestFunctionsList("node"));
94+
Assert.False(_workerChannel.IsChannelReadyForInvocations());
8495
}
8596

8697
[Fact]

test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannel.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,13 @@ public TestRpcWorkerChannel(string workerId, string runtime = null, IScriptEvent
4141

4242
public List<Task> ExecutionContexts => _executionContexts;
4343

44-
public RpcWorkerChannelState State => _state;
45-
4644
public void Dispose()
4745
{
4846
}
4947

5048
public void SetupFunctionInvocationBuffers(IEnumerable<FunctionMetadata> functions)
5149
{
50+
_state = _state | RpcWorkerChannelState.InvocationBuffersInitialized;
5251
_testLogger.LogInformation("SetupFunctionInvocationBuffers called");
5352
}
5453

@@ -82,7 +81,7 @@ public async Task StartWorkerProcessAsync()
8281
{
8382
{ "test", "testSupported" }
8483
};
85-
_state = RpcWorkerChannelState.Initialized;
84+
_state = _state | RpcWorkerChannelState.Initialized;
8685
}
8786

8887
public void RaiseWorkerError()
@@ -107,5 +106,10 @@ public async Task DrainInvocationsAsync()
107106
await Task.WhenAll(ExecutionContexts);
108107
ExecutionContexts.Clear();
109108
}
109+
110+
public bool IsChannelReadyForInvocations()
111+
{
112+
return _state.HasFlag(RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
113+
}
110114
}
111115
}

0 commit comments

Comments
 (0)