Skip to content

Commit f25a678

Browse files
Adding ability to allow worker to send Function load responses in batch (#8363)
* Adding FunctionLoadResponses to send single load response in case of multiple functions * Subscribed to LoadResponseCollection * Updated subtree from https://github.com/azure/azure-functions-language-worker-protobuf. Tag: v1.5.4-protofile. Commit: 576c9de * Removing redundant line * Added failure testcase for LoadRepsonseCollection * Moving event subscription under if else block * Send function outside else block * Tests * Renaming variable * Adding timeout * Cleaning up if-else
1 parent 25c56c9 commit f25a678

File tree

4 files changed

+127
-18
lines changed

4 files changed

+127
-18
lines changed

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
7070
private TaskCompletionSource<bool> _reloadTask = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
7171
private TaskCompletionSource<bool> _workerInitTask = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
7272
private TaskCompletionSource<List<RawFunctionMetadata>> _functionsIndexingTask = new TaskCompletionSource<List<RawFunctionMetadata>>(TaskCreationOptions.RunContinuationsAsynchronously);
73-
private TimeSpan _functionLoadTimeout = TimeSpan.FromMinutes(10);
73+
private TimeSpan _functionLoadTimeout = TimeSpan.FromMinutes(1);
7474
private bool _isSharedMemoryDataTransferEnabled;
7575

7676
private object _syncLock = new object();
@@ -299,32 +299,38 @@ public void SendFunctionLoadRequests(ManagedDependencyOptions managedDependencyO
299299
{
300300
if (_functions != null)
301301
{
302-
if (functionTimeout.HasValue)
303-
{
304-
_functionLoadTimeout = functionTimeout.Value > _functionLoadTimeout ? functionTimeout.Value : _functionLoadTimeout;
305-
_eventSubscriptions.Add(_inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.FunctionLoadResponse)
306-
.Timeout(_functionLoadTimeout)
307-
.Take(_functions.Count())
308-
.Subscribe((msg) => LoadResponse(msg.Message.FunctionLoadResponse), HandleWorkerFunctionLoadError));
309-
}
310-
else
311-
{
312-
_eventSubscriptions.Add(_inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.FunctionLoadResponse)
313-
.Subscribe((msg) => LoadResponse(msg.Message.FunctionLoadResponse), HandleWorkerFunctionLoadError));
314-
}
315-
316302
// Load Request is also sent for disabled function as it is invocable using the portal and admin endpoints
317303
// Loading disabled functions at the end avoids unnecessary performance issues. Refer PR #5072 and commit #38b57883be28524fa6ee67a457fa47e96663094c
318304
_functions = _functions.OrderBy(metadata => metadata.IsDisabled());
319305

320306
// Check if the worker supports this feature
321-
bool capabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.AcceptsListOfFunctionLoadRequests));
307+
bool capabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.SupportsLoadResponseCollection));
322308
if (capabilityEnabled)
323309
{
310+
var loadResponseCollectionObservable = _inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.FunctionLoadResponseCollection);
311+
if (functionTimeout.HasValue)
312+
{
313+
_functionLoadTimeout = functionTimeout.Value > _functionLoadTimeout ? functionTimeout.Value : _functionLoadTimeout;
314+
loadResponseCollectionObservable = loadResponseCollectionObservable.Timeout(_functionLoadTimeout);
315+
}
316+
317+
_eventSubscriptions.Add(loadResponseCollectionObservable
318+
.Subscribe((msg) => LoadResponse(msg.Message.FunctionLoadResponseCollection), HandleWorkerFunctionLoadError));
319+
324320
SendFunctionLoadRequestCollection(_functions, managedDependencyOptions);
325321
}
326322
else
327323
{
324+
var loadResponseObservable = _inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.FunctionLoadResponse);
325+
if (functionTimeout.HasValue)
326+
{
327+
_functionLoadTimeout = functionTimeout.Value > _functionLoadTimeout ? functionTimeout.Value : _functionLoadTimeout;
328+
loadResponseObservable = loadResponseObservable.Timeout(_functionLoadTimeout);
329+
}
330+
331+
_eventSubscriptions.Add(loadResponseObservable.Take(_functions.Count())
332+
.Subscribe((msg) => LoadResponse(msg.Message.FunctionLoadResponse), HandleWorkerFunctionLoadError));
333+
328334
foreach (FunctionMetadata metadata in _functions)
329335
{
330336
SendFunctionLoadRequest(metadata, managedDependencyOptions);
@@ -474,6 +480,16 @@ internal void LoadResponse(FunctionLoadResponse loadResponse)
474480
_inputLinks.Add(disposableLink);
475481
}
476482

483+
internal void LoadResponse(FunctionLoadResponseCollection loadResponseCollection)
484+
{
485+
_workerChannelLogger.LogDebug("Received FunctionLoadResponseCollection with number of functions: '{count}'.", loadResponseCollection.FunctionLoadResponses.Count);
486+
487+
foreach (FunctionLoadResponse loadResponse in loadResponseCollection.FunctionLoadResponses)
488+
{
489+
LoadResponse(loadResponse);
490+
}
491+
}
492+
477493
internal async Task SendInvocationRequest(ScriptInvocationContext context)
478494
{
479495
try

src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public static class RpcWorkerConstants
4848
public const string UseNullableValueDictionaryForHttp = "UseNullableValueDictionaryForHttp";
4949
public const string SharedMemoryDataTransfer = "SharedMemoryDataTransfer";
5050
public const string FunctionDataCache = "FunctionDataCache";
51-
public const string AcceptsListOfFunctionLoadRequests = "AcceptsListOfFunctionLoadRequests";
51+
public const string SupportsLoadResponseCollection = "SupportsLoadResponseCollection";
5252

5353
// Host Capabilities
5454
public const string V2Compatable = "V2Compatable";

test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ public void SendLoadRequestCollection_PublishesOutboundEvents()
393393
};
394394
GrpcEvent rpcEvent = new GrpcEvent(_workerId, startStreamMessage);
395395
_workerChannel.SendWorkerInitRequest(rpcEvent);
396-
_testFunctionRpcService.PublishWorkerInitResponseEvent(new Dictionary<string, string>() { { RpcWorkerConstants.AcceptsListOfFunctionLoadRequests, "true" } });
396+
_testFunctionRpcService.PublishWorkerInitResponseEvent(new Dictionary<string, string>() { { RpcWorkerConstants.SupportsLoadResponseCollection, "true" } });
397397
_metricsLogger.ClearCollections();
398398
IEnumerable<FunctionMetadata> functionMetadata = GetTestFunctionsList("node");
399399
_workerChannel.SetupFunctionInvocationBuffers(functionMetadata);
@@ -511,6 +511,77 @@ public void ReceivesInboundEvent_FunctionLoadResponse()
511511
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Received FunctionLoadResponse for function: 'js1' with functionId: 'TestFunctionId1'.")));
512512
}
513513

514+
[Fact]
515+
public void ReceivesInboundEvent_Failed_FunctionLoadResponses()
516+
{
517+
IDictionary<string, string> capabilities = new Dictionary<string, string>()
518+
{
519+
{ RpcWorkerConstants.SupportsLoadResponseCollection, "1" }
520+
};
521+
522+
StartStream startStream = new StartStream()
523+
{
524+
WorkerId = _workerId
525+
};
526+
527+
StreamingMessage startStreamMessage = new StreamingMessage()
528+
{
529+
StartStream = startStream
530+
};
531+
532+
GrpcEvent rpcEvent = new GrpcEvent(_workerId, startStreamMessage);
533+
_workerChannel.SendWorkerInitRequest(rpcEvent);
534+
_testFunctionRpcService.PublishWorkerInitResponseEvent(capabilities);
535+
536+
var functionMetadatas = GetTestFunctionsList("node");
537+
_workerChannel.SetupFunctionInvocationBuffers(functionMetadatas);
538+
_workerChannel.SendFunctionLoadRequests(null, TimeSpan.FromMinutes(1));
539+
_testFunctionRpcService.PublishFunctionLoadResponsesEvent(
540+
new List<string>() { "TestFunctionId1", "TestFunctionId2" },
541+
new StatusResult() { Status = StatusResult.Types.Status.Failure });
542+
var traces = _logger.GetLogMessages();
543+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Setting up FunctionInvocationBuffer for function: 'js1' with functionId: 'TestFunctionId1'")));
544+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Setting up FunctionInvocationBuffer for function: 'js2' with functionId: 'TestFunctionId2'")));
545+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Worker failed to load function: 'js1' with function id: 'TestFunctionId1'.")));
546+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Worker failed to load function: 'js2' with function id: 'TestFunctionId2'.")));
547+
}
548+
549+
[Fact]
550+
public void ReceivesInboundEvent_FunctionLoadResponses()
551+
{
552+
IDictionary<string, string> capabilities = new Dictionary<string, string>()
553+
{
554+
{ RpcWorkerConstants.SupportsLoadResponseCollection, "1" }
555+
};
556+
557+
StartStream startStream = new StartStream()
558+
{
559+
WorkerId = _workerId
560+
};
561+
562+
StreamingMessage startStreamMessage = new StreamingMessage()
563+
{
564+
StartStream = startStream
565+
};
566+
567+
GrpcEvent rpcEvent = new GrpcEvent(_workerId, startStreamMessage);
568+
_workerChannel.SendWorkerInitRequest(rpcEvent);
569+
_testFunctionRpcService.PublishWorkerInitResponseEvent(capabilities);
570+
571+
var functionMetadatas = GetTestFunctionsList("node");
572+
_workerChannel.SetupFunctionInvocationBuffers(functionMetadatas);
573+
_workerChannel.SendFunctionLoadRequests(null, TimeSpan.FromMinutes(1));
574+
_testFunctionRpcService.PublishFunctionLoadResponsesEvent(
575+
new List<string>() { "TestFunctionId1", "TestFunctionId2" },
576+
new StatusResult() { Status = StatusResult.Types.Status.Success });
577+
var traces = _logger.GetLogMessages();
578+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Setting up FunctionInvocationBuffer for function: 'js1' with functionId: 'TestFunctionId1'")));
579+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Setting up FunctionInvocationBuffer for function: 'js2' with functionId: 'TestFunctionId2'")));
580+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, string.Format("Received FunctionLoadResponseCollection with number of functions: '{0}'.", functionMetadatas.ToList().Count))));
581+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Received FunctionLoadResponse for function: 'js1' with functionId: 'TestFunctionId1'.")));
582+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Received FunctionLoadResponse for function: 'js2' with functionId: 'TestFunctionId2'.")));
583+
}
584+
514585
[Fact]
515586
public void ReceivesInboundEvent_Successful_FunctionMetadataResponse()
516587
{

test/WebJobs.Script.Tests/Workers/Rpc/TestFunctionRpcService.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,28 @@ public void PublishFunctionLoadResponseEvent(string functionId)
6262
_eventManager.Publish(new InboundGrpcEvent(_workerId, responseMessage));
6363
}
6464

65+
public void PublishFunctionLoadResponsesEvent(List<string> functionIds, StatusResult statusResult)
66+
{
67+
FunctionLoadResponseCollection functionLoadResponseCollection = new FunctionLoadResponseCollection();
68+
69+
foreach (string functionId in functionIds)
70+
{
71+
FunctionLoadResponse functionLoadResponse = new FunctionLoadResponse()
72+
{
73+
FunctionId = functionId,
74+
Result = statusResult
75+
};
76+
77+
functionLoadResponseCollection.FunctionLoadResponses.Add(functionLoadResponse);
78+
}
79+
80+
StreamingMessage responseMessage = new StreamingMessage()
81+
{
82+
FunctionLoadResponseCollection = functionLoadResponseCollection
83+
};
84+
_eventManager.Publish(new InboundGrpcEvent(_workerId, responseMessage));
85+
}
86+
6587
public void PublishFunctionEnvironmentReloadResponseEvent()
6688
{
6789
FunctionEnvironmentReloadResponse relaodEnvResponse = GetTestFunctionEnvReloadResponse();

0 commit comments

Comments
 (0)