Skip to content

Commit 4b3ea80

Browse files
soninarenbrettsam
andauthored
Sending metadata request once per scriptHost instance (#9248)
Co-authored-by: Brett Samblanet <[email protected]>
1 parent f9c7827 commit 4b3ea80

File tree

2 files changed

+74
-12
lines changed

2 files changed

+74
-12
lines changed

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
5252
private readonly IOptions<WorkerConcurrencyOptions> _workerConcurrencyOptions;
5353
private readonly WaitCallback _processInbound;
5454
private readonly object _syncLock = new object();
55+
private readonly object _metadataLock = new object();
5556
private readonly Dictionary<MsgType, Queue<PendingItem>> _pendingActions = new();
5657
private readonly ChannelWriter<OutboundGrpcEvent> _outbound;
5758
private readonly ChannelReader<InboundGrpcEvent> _inbound;
@@ -85,6 +86,7 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
8586
private IHttpProxyService _httpProxyService;
8687
private Uri _httpProxyEndpoint;
8788
private System.Timers.Timer _timer;
89+
private bool _functionMetadataRequestSent = false;
8890

8991
internal GrpcWorkerChannel(
9092
string workerId,
@@ -540,6 +542,8 @@ internal FunctionLoadRequestCollection GetFunctionLoadRequestCollection(IEnumera
540542
public Task SendFunctionEnvironmentReloadRequest()
541543
{
542544
_functionsIndexingTask = new TaskCompletionSource<List<RawFunctionMetadata>>(TaskCreationOptions.RunContinuationsAsynchronously);
545+
_functionMetadataRequestSent = false;
546+
543547
_workerChannelLogger.LogDebug("Sending FunctionEnvironmentReloadRequest to WorkerProcess with Pid: '{0}'", _rpcWorkerProcess.Id);
544548
IDisposable latencyEvent = _metricsLogger.LatencyEvent(MetricEventNames.SpecializationEnvironmentReloadRequestResponse);
545549

@@ -795,22 +799,32 @@ public Task<List<RawFunctionMetadata>> GetFunctionMetadata()
795799

796800
internal Task<List<RawFunctionMetadata>> SendFunctionMetadataRequest()
797801
{
798-
// reset indexing task when in case we need to send another request
799-
_functionsIndexingTask = new TaskCompletionSource<List<RawFunctionMetadata>>(TaskCreationOptions.RunContinuationsAsynchronously);
802+
_workerChannelLogger.LogDebug("Fetching worker metadata, FunctionMetadataReceived set to: {functionMetadataReceived}", _functionMetadataRequestSent);
803+
if (!_functionMetadataRequestSent)
804+
{
805+
lock (_metadataLock)
806+
{
807+
if (!_functionMetadataRequestSent)
808+
{
809+
RegisterCallbackForNextGrpcMessage(MsgType.FunctionMetadataResponse, _functionLoadTimeout, 1,
810+
msg => ProcessFunctionMetadataResponses(msg.Message.FunctionMetadataResponse), HandleWorkerMetadataRequestError);
800811

801-
RegisterCallbackForNextGrpcMessage(MsgType.FunctionMetadataResponse, _functionLoadTimeout, 1,
802-
msg => ProcessFunctionMetadataResponses(msg.Message.FunctionMetadataResponse), HandleWorkerMetadataRequestError);
812+
_workerChannelLogger.LogDebug("Sending WorkerMetadataRequest to {language} worker with worker ID {workerID}", _runtime, _workerId);
803813

804-
_workerChannelLogger.LogDebug("Sending WorkerMetadataRequest to {language} worker with worker ID {workerID}", _runtime, _workerId);
814+
// sends the function app directory path to worker for indexing
815+
SendStreamingMessage(new StreamingMessage
816+
{
817+
FunctionsMetadataRequest = new FunctionsMetadataRequest()
818+
{
819+
FunctionAppDirectory = _applicationHostOptions.CurrentValue.ScriptPath
820+
}
821+
});
805822

806-
// sends the function app directory path to worker for indexing
807-
SendStreamingMessage(new StreamingMessage
808-
{
809-
FunctionsMetadataRequest = new FunctionsMetadataRequest()
810-
{
811-
FunctionAppDirectory = _applicationHostOptions.CurrentValue.ScriptPath
823+
_functionMetadataRequestSent = true;
824+
}
812825
}
813-
});
826+
}
827+
814828
return _functionsIndexingTask.Task;
815829
}
816830

test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1303,6 +1303,54 @@ public async Task SendInvocationRequest_ValidateTraceContext_SessionId()
13031303
}
13041304
}
13051305

1306+
[Fact]
1307+
public async Task GetFunctionMetadata_MultipleCalls_ReturnSameTask()
1308+
{
1309+
using var block1 = new SemaphoreSlim(0, 1);
1310+
using var block2 = new SemaphoreSlim(0, 1);
1311+
int count = 0;
1312+
1313+
await CreateDefaultWorkerChannel();
1314+
1315+
_testFunctionRpcService.OnMessage(StreamingMessage.ContentOneofCase.FunctionsMetadataRequest,
1316+
async _ =>
1317+
{
1318+
if (Interlocked.Increment(ref count) == 1)
1319+
{
1320+
// notify the second request it can start
1321+
block2.Release();
1322+
1323+
// make the first call sit and wait until we know we've issued the second
1324+
await block1.WaitAsync();
1325+
}
1326+
1327+
_testFunctionRpcService.PublishWorkerMetadataResponse("TestFunctionId1", null, null, false, false, false);
1328+
});
1329+
1330+
var functionsTask1 = _workerChannel.GetFunctionMetadata();
1331+
await Task.Yield();
1332+
1333+
// wait until the first request has made it to the callback before issuing the second
1334+
await block2.WaitAsync();
1335+
1336+
var functionsTask2 = _workerChannel.GetFunctionMetadata();
1337+
await Task.Yield();
1338+
1339+
// now that both requests have been made, let the first return
1340+
block1.Release();
1341+
1342+
//Assert.Same(functionsTask1, functionsTask2);
1343+
1344+
var allTask = Task.WhenAll(functionsTask1, functionsTask2);
1345+
var timeoutTask = Task.Delay(5000);
1346+
1347+
// the timeout should never fire
1348+
var completedTask = await Task.WhenAny(allTask, timeoutTask);
1349+
1350+
Assert.True(completedTask == allTask, "Timed out waiting for tasks to complete");
1351+
Assert.Same(functionsTask1, functionsTask2);
1352+
}
1353+
13061354
private IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime, bool addWorkerProperties = false)
13071355
{
13081356
var metadata1 = new FunctionMetadata()

0 commit comments

Comments
 (0)