Skip to content

Commit 2f9acbd

Browse files
authored
Fixing thread safety issue in the GrpcWorkerChannel.LoadResponse method (#10363) (#10400)
* Making `GrpcWorkerChannel.LoadResponse` thread-safe * Replace `Task.Delay(500)` with `TestHelpers.Await` in the test.
1 parent 569f2a5 commit 2f9acbd

File tree

3 files changed

+72
-36
lines changed

3 files changed

+72
-36
lines changed

release_notes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,4 @@
3232
- Update PowerShell 7.2 worker to [4.0.4020](https://github.com/Azure/azure-functions-powershell-worker/releases/tag/v4.0.4020)
3333
- Update PowerShell 7.4 worker to [4.0.4021](https://github.com/Azure/azure-functions-powershell-worker/releases/tag/v4.0.4021)
3434
- Trim FunctionsNetHost artifacts (#10364)
35+
- Resolved thread safety issue in the `GrpcWorkerChannel.LoadResponse` method. (#10352)

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ internal partial class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
7070
private ConcurrentDictionary<string, ExecutingInvocation> _executingInvocations = new();
7171
private IDictionary<string, BufferBlock<ScriptInvocationContext>> _functionInputBuffers = new ConcurrentDictionary<string, BufferBlock<ScriptInvocationContext>>();
7272
private ConcurrentDictionary<string, TaskCompletionSource<bool>> _workerStatusRequests = new ConcurrentDictionary<string, TaskCompletionSource<bool>>();
73-
private List<IDisposable> _inputLinks = new List<IDisposable>();
73+
private ConcurrentBag<IDisposable> _inputLinks = new ConcurrentBag<IDisposable>();
7474
private List<IDisposable> _eventSubscriptions = new List<IDisposable>();
7575
private IDisposable _startLatencyMetric;
7676
private IEnumerable<FunctionMetadata> _functions;

test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs

Lines changed: 70 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -939,6 +939,48 @@ public async Task ReceivesInboundEvent_FunctionLoadResponse()
939939
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Received FunctionLoadResponse for function: 'js1' with functionId: 'TestFunctionId1'.")), "FunctionLoadResponse TestFunctionId1");
940940
}
941941

942+
[Fact]
943+
public async Task Receives_Individual_FunctionLoadResponses_Parallel()
944+
{
945+
await CreateDefaultWorkerChannel();
946+
947+
var startStreamMessage = new StreamingMessage()
948+
{
949+
StartStream = new StartStream()
950+
{
951+
WorkerId = _workerId
952+
}
953+
};
954+
955+
var rpcEvent = new GrpcEvent(_workerId, startStreamMessage);
956+
_workerChannel.SendWorkerInitRequest(rpcEvent);
957+
958+
var functionMetadataList = GetTestFunctionsList("node", numberOfFunctions: 250);
959+
_workerChannel.SetupFunctionInvocationBuffers(functionMetadataList);
960+
_workerChannel.SendFunctionLoadRequests(managedDependencyOptions: null, TimeSpan.FromSeconds(1));
961+
962+
var allFunctionIdsAndNames = functionMetadataList.Select(f => new { Id = f.Properties["FunctionId"].ToString(), f.Name }).ToList();
963+
964+
// Send function load responses for each function, not necessarily in the order the load requests were sent.
965+
var publishFunctionLoadResponseTasks = allFunctionIdsAndNames.Select(function =>
966+
Task.Run(() => _testFunctionRpcService.PublishFunctionLoadResponseEvent(function.Id)));
967+
968+
await Task.WhenAll(publishFunctionLoadResponseTasks);
969+
970+
await TestHelpers.Await(() =>
971+
{
972+
return _logger.GetLogMessages().Count(m => m.FormattedMessage.StartsWith("Received FunctionLoadResponse")) == allFunctionIdsAndNames.Count;
973+
});
974+
975+
var traces = _logger.GetLogMessages();
976+
977+
foreach (var function in allFunctionIdsAndNames)
978+
{
979+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, $"Setting up FunctionInvocationBuffer for function: '{function.Name}' with functionId: '{function.Id}'")), $"setup {function.Id}");
980+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, $"Received FunctionLoadResponse for function: '{function.Name}' with functionId: '{function.Id}'.")), $"FunctionLoadResponse {function.Id}");
981+
}
982+
}
983+
942984
[Fact]
943985
public async Task ReceivesInboundEvent_Failed_FunctionLoadResponses()
944986
{
@@ -1489,41 +1531,7 @@ await TestHelpers.Await(() => GetInvocationLogs().Length == logLoop,
14891531

14901532
private static IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime, bool addWorkerProperties = false)
14911533
{
1492-
var metadata1 = new FunctionMetadata()
1493-
{
1494-
Language = runtime,
1495-
Name = "js1"
1496-
};
1497-
1498-
metadata1.SetFunctionId("TestFunctionId1");
1499-
metadata1.Properties.Add(LogConstants.CategoryNameKey, "testcat1");
1500-
metadata1.Properties.Add(ScriptConstants.LogPropertyHostInstanceIdKey, "testhostId1");
1501-
1502-
if (addWorkerProperties)
1503-
{
1504-
metadata1.Properties.Add("worker.functionId", "fn1");
1505-
}
1506-
1507-
var metadata2 = new FunctionMetadata()
1508-
{
1509-
Language = runtime,
1510-
Name = "js2",
1511-
};
1512-
1513-
metadata2.SetFunctionId("TestFunctionId2");
1514-
metadata2.Properties.Add(LogConstants.CategoryNameKey, "testcat2");
1515-
metadata2.Properties.Add(ScriptConstants.LogPropertyHostInstanceIdKey, "testhostId2");
1516-
1517-
if (addWorkerProperties)
1518-
{
1519-
metadata2.Properties.Add("WORKER.functionId", "fn2");
1520-
}
1521-
1522-
return new List<FunctionMetadata>()
1523-
{
1524-
metadata1,
1525-
metadata2
1526-
};
1534+
return GetTestFunctionsList(runtime, numberOfFunctions: 2, addWorkerProperties);
15271535
}
15281536

15291537
public static ScriptInvocationContext GetTestScriptInvocationContext(Guid invocationId, TaskCompletionSource<ScriptInvocationResult> resultSource,
@@ -1548,6 +1556,33 @@ public static ScriptInvocationContext GetTestScriptInvocationContext(Guid invoca
15481556
};
15491557
}
15501558

1559+
private static List<FunctionMetadata> GetTestFunctionsList(string runtime, int numberOfFunctions, bool addWorkerProperties = false)
1560+
{
1561+
var functions = new List<FunctionMetadata>();
1562+
1563+
for (int i = 1; i <= numberOfFunctions; i++)
1564+
{
1565+
var metadata = new FunctionMetadata()
1566+
{
1567+
Language = runtime,
1568+
Name = $"js{i}"
1569+
};
1570+
1571+
metadata.SetFunctionId($"TestFunctionId{i}");
1572+
metadata.Properties.Add(LogConstants.CategoryNameKey, $"testcat1");
1573+
metadata.Properties.Add(ScriptConstants.LogPropertyHostInstanceIdKey, $"testhostId1");
1574+
1575+
if (addWorkerProperties)
1576+
{
1577+
metadata.Properties.Add("worker.functionId", $"fn{i}");
1578+
}
1579+
1580+
functions.Add(metadata);
1581+
}
1582+
1583+
return functions;
1584+
}
1585+
15511586
/// <summary>
15521587
/// The <see cref="ScriptInvocationContext"/> would contain inputs that can be transferred over shared memory.
15531588
/// </summary>

0 commit comments

Comments
 (0)