Skip to content

Commit 8bae46f

Browse files
Adding ability to allow worker to send Function load responses in batch (#8107)
* Adding FunctionLoadResponses to send single load response in case of multiple functions * Subscribed to LoadResponseCollection * Updated subtree from https://github.com/azure/azure-functions-language-worker-protobuf. Tag: v1.5.4-protofile. Commit: 576c9de * Removing redundant line * Added failure testcase for LoadRepsonseCollection
1 parent 8e6e393 commit 8bae46f

File tree

3 files changed

+73
-0
lines changed

3 files changed

+73
-0
lines changed

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,11 +294,19 @@ public void SendFunctionLoadRequests(ManagedDependencyOptions managedDependencyO
294294
.Timeout(_functionLoadTimeout)
295295
.Take(_functions.Count())
296296
.Subscribe((msg) => LoadResponse(msg.Message.FunctionLoadResponse), HandleWorkerFunctionLoadError));
297+
298+
_eventSubscriptions.Add(_inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.FunctionLoadResponseCollection)
299+
.Timeout(_functionLoadTimeout)
300+
.Take(_functions.Count())
301+
.Subscribe((msg) => LoadResponse(msg.Message.FunctionLoadResponseCollection), HandleWorkerFunctionLoadError));
297302
}
298303
else
299304
{
300305
_eventSubscriptions.Add(_inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.FunctionLoadResponse)
301306
.Subscribe((msg) => LoadResponse(msg.Message.FunctionLoadResponse), HandleWorkerFunctionLoadError));
307+
308+
_eventSubscriptions.Add(_inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.FunctionLoadResponseCollection)
309+
.Subscribe((msg) => LoadResponse(msg.Message.FunctionLoadResponseCollection), HandleWorkerFunctionLoadError));
302310
}
303311

304312
// Load Request is also sent for disabled function as it is invocable using the portal and admin endpoints
@@ -462,6 +470,16 @@ internal void LoadResponse(FunctionLoadResponse loadResponse)
462470
_inputLinks.Add(disposableLink);
463471
}
464472

473+
internal void LoadResponse(FunctionLoadResponseCollection loadResponseCollection)
474+
{
475+
_workerChannelLogger.LogDebug("Received FunctionLoadResponseCollection with number of functions: '{count}'.", loadResponseCollection.FunctionLoadResponses.Count);
476+
477+
foreach (FunctionLoadResponse loadResponse in loadResponseCollection.FunctionLoadResponses)
478+
{
479+
LoadResponse(loadResponse);
480+
}
481+
}
482+
465483
internal async Task SendInvocationRequest(ScriptInvocationContext context)
466484
{
467485
try

test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,39 @@ public void ReceivesInboundEvent_FunctionLoadResponse()
489489
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Received FunctionLoadResponse for function: 'js1' with functionId: 'TestFunctionId1'.")));
490490
}
491491

492+
[Fact]
493+
public void ReceivesInboundEvent_Failed_FunctionLoadResponses()
494+
{
495+
var functionMetadatas = GetTestFunctionsList("node");
496+
_workerChannel.SetupFunctionInvocationBuffers(functionMetadatas);
497+
_workerChannel.SendFunctionLoadRequests(null, TimeSpan.FromMinutes(1));
498+
_testFunctionRpcService.PublishFunctionLoadResponsesEvent(
499+
new List<string>() { "TestFunctionId1", "TestFunctionId2" },
500+
new StatusResult() { Status = StatusResult.Types.Status.Failure });
501+
var traces = _logger.GetLogMessages();
502+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Setting up FunctionInvocationBuffer for function: 'js1' with functionId: 'TestFunctionId1'")));
503+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Setting up FunctionInvocationBuffer for function: 'js2' with functionId: 'TestFunctionId2'")));
504+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Worker failed to load function: 'js1' with function id: 'TestFunctionId1'.")));
505+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Worker failed to load function: 'js2' with function id: 'TestFunctionId2'.")));
506+
}
507+
508+
[Fact]
509+
public void ReceivesInboundEvent_FunctionLoadResponses()
510+
{
511+
var functionMetadatas = GetTestFunctionsList("node");
512+
_workerChannel.SetupFunctionInvocationBuffers(functionMetadatas);
513+
_workerChannel.SendFunctionLoadRequests(null, TimeSpan.FromMinutes(1));
514+
_testFunctionRpcService.PublishFunctionLoadResponsesEvent(
515+
new List<string>() { "TestFunctionId1", "TestFunctionId2" },
516+
new StatusResult() { Status = StatusResult.Types.Status.Success });
517+
var traces = _logger.GetLogMessages();
518+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Setting up FunctionInvocationBuffer for function: 'js1' with functionId: 'TestFunctionId1'")));
519+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Setting up FunctionInvocationBuffer for function: 'js2' with functionId: 'TestFunctionId2'")));
520+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, string.Format("Received FunctionLoadResponseCollection with number of functions: '{0}'.", functionMetadatas.ToList().Count))));
521+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Received FunctionLoadResponse for function: 'js1' with functionId: 'TestFunctionId1'.")));
522+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Received FunctionLoadResponse for function: 'js2' with functionId: 'TestFunctionId2'.")));
523+
}
524+
492525
[Fact]
493526
public void ReceivesInboundEvent_Successful_FunctionMetadataResponse()
494527
{

test/WebJobs.Script.Tests/Workers/Rpc/TestFunctionRpcService.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,28 @@ public void PublishFunctionLoadResponseEvent(string functionId)
6262
_eventManager.Publish(new InboundGrpcEvent(_workerId, responseMessage));
6363
}
6464

65+
public void PublishFunctionLoadResponsesEvent(List<string> functionIds, StatusResult statusResult)
66+
{
67+
FunctionLoadResponseCollection functionLoadResponseCollection = new FunctionLoadResponseCollection();
68+
69+
foreach (string functionId in functionIds)
70+
{
71+
FunctionLoadResponse functionLoadResponse = new FunctionLoadResponse()
72+
{
73+
FunctionId = functionId,
74+
Result = statusResult
75+
};
76+
77+
functionLoadResponseCollection.FunctionLoadResponses.Add(functionLoadResponse);
78+
}
79+
80+
StreamingMessage responseMessage = new StreamingMessage()
81+
{
82+
FunctionLoadResponseCollection = functionLoadResponseCollection
83+
};
84+
_eventManager.Publish(new InboundGrpcEvent(_workerId, responseMessage));
85+
}
86+
6587
public void PublishFunctionEnvironmentReloadResponseEvent()
6688
{
6789
FunctionEnvironmentReloadResponse relaodEnvResponse = GetTestFunctionEnvReloadResponse();

0 commit comments

Comments
 (0)