Skip to content

Commit 0771ce5

Browse files
authored
improving worker shutdown race test (#9743)
1 parent f71404d commit 0771ce5

File tree

1 file changed

+35
-19
lines changed

1 file changed

+35
-19
lines changed

test/WebJobs.Script.Tests/Workers/Rpc/WebHostRpcWorkerChannelManagerTests.cs

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using System;
5+
using System.Collections.Concurrent;
56
using System.Collections.Generic;
67
using System.Linq;
78
using System.Threading;
@@ -411,33 +412,48 @@ public async Task ShutdownChannelsIfExist_Succeeds()
411412
[Fact]
412413
public void ShutdownChannelsIfExist_Race_Succeeds()
413414
{
414-
var channel = CreateTestChannel(RpcWorkerConstants.JavaLanguageWorkerName);
415-
string id = channel.Id;
415+
// This test covers an issue that was fixed by https://github.com/Azure/azure-functions-host/pull/9738
416+
// To repro, it requires ShutdownChannelIfExistsAsync to be called by multiple threads simultaneously. Using
417+
// Tasks did not repro, so using Semaphores and Threads for more precise timing. We run this several times
418+
// just to ensure it's not continuing to repro (it did not repro on every invocation).
416419

417-
List<Task<bool>> tasks = new();
418-
List<Thread> threads = new();
419-
for (int i = 0; i < 2; i++)
420+
IEnumerable<bool> RunRaceTest()
420421
{
421-
Thread t = new(static (state) =>
422+
var channel = CreateTestChannel(RpcWorkerConstants.JavaLanguageWorkerName);
423+
424+
int count = 4;
425+
SemaphoreSlim semaphore = new(0, count);
426+
List<Thread> threads = new();
427+
ConcurrentBag<bool> results = new();
428+
429+
for (int i = 0; i < count; i++)
422430
{
423-
var (channelManager, tasks, id) = ((WebHostRpcWorkerChannelManager, List<Task<bool>>, string))state;
424-
tasks.Add(channelManager.ShutdownChannelIfExistsAsync(RpcWorkerConstants.JavaLanguageWorkerName, id));
425-
});
426-
threads.Add(t);
427-
}
431+
Thread t = new(() =>
432+
{
433+
// Pause threads here. They will all be released simultaneously.
434+
semaphore.Wait();
435+
results.Add(_rpcWorkerChannelManager.ShutdownChannelIfExistsAsync(RpcWorkerConstants.JavaLanguageWorkerName, channel.Id).GetAwaiter().GetResult());
436+
});
437+
t.Start();
438+
threads.Add(t);
439+
}
428440

429-
foreach (Thread t in threads)
430-
{
431-
t.Start((_rpcWorkerChannelManager, tasks, id));
441+
// Release all threads.
442+
semaphore.Release(count);
443+
444+
foreach (Thread t in threads)
445+
{
446+
t.Join();
447+
}
448+
449+
return results;
432450
}
433451

434-
foreach (Thread t in threads)
452+
for (int i = 0; i < 50; i++)
435453
{
436-
t.Join();
454+
var results = RunRaceTest();
455+
Assert.Single(results, true);
437456
}
438-
439-
// only one should successfully shut down
440-
Assert.Single(tasks, t => t.Result == true);
441457
}
442458

443459
[Fact]

0 commit comments

Comments
 (0)