Skip to content

Commit 12b8d83

Browse files
[release/8.0-staging] Transfer ThreadPool local queue to high-pri queue on Task blocking (#109990)
* Transfer ThreadPool local queue to high-pri queue on Task blocking Added for now under the same config flag as is being used for other work prioritization experimentation. * Update src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs --------- Co-authored-by: Stephen Toub <[email protected]>
1 parent 3989aac commit 12b8d83

File tree

3 files changed

+65
-1
lines changed

3 files changed

+65
-1
lines changed

src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3059,6 +3059,27 @@ private bool SpinThenBlockingWait(int millisecondsTimeout, CancellationToken can
30593059
bool returnValue = SpinWait(millisecondsTimeout);
30603060
if (!returnValue)
30613061
{
3062+
#if CORECLR
3063+
if (ThreadPoolWorkQueue.s_prioritizationExperiment)
3064+
{
3065+
// We're about to block waiting for the task to complete, which is expensive, and if
3066+
// the task being waited on depends on some other work to run, this thread could end up
3067+
// waiting for some other thread to do work. If the two threads are part of the same scheduler,
3068+
// such as the thread pool, that could lead to a (temporary) deadlock. This is made worse by
3069+
// it also leading to a possible priority inversion on previously queued work. Each thread in
3070+
// the thread pool has a local queue. A key motivator for this local queue is it allows this
3071+
// thread to create work items that it will then prioritize above all other work in the
3072+
// pool. However, while this thread makes its own local queue the top priority, that queue is
3073+
// every other thread's lowest priority. If this thread blocks, all of its created work that's
3074+
// supposed to be high priority becomes low priority, and work that's typically part of a
3075+
// currently in-flight operation gets deprioritized relative to new requests coming into the
3076+
// pool, which can lead to the whole system slowing down or even deadlocking. To address that,
3077+
// just before we block, we move all local work into a global queue, so that it's at least
3078+
// prioritized by other threads more fairly with respect to other work.
3079+
ThreadPoolWorkQueue.TransferAllLocalWorkItemsToHighPriorityGlobalQueue();
3080+
}
3081+
#endif
3082+
30623083
var mres = new SetOnInvokeMres();
30633084
try
30643085
{

src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,27 @@ public void EnqueueAtHighPriority(object workItem)
691691
EnsureThreadRequested();
692692
}
693693

694+
internal static void TransferAllLocalWorkItemsToHighPriorityGlobalQueue()
695+
{
696+
// If there's no local queue, there's nothing to transfer.
697+
if (ThreadPoolWorkQueueThreadLocals.threadLocals is not ThreadPoolWorkQueueThreadLocals tl)
698+
{
699+
return;
700+
}
701+
702+
// Pop each work item off the local queue and push it onto the global. This is a
703+
// bounded loop as no other thread is allowed to push into this thread's queue.
704+
ThreadPoolWorkQueue queue = ThreadPool.s_workQueue;
705+
while (tl.workStealingQueue.LocalPop() is object workItem)
706+
{
707+
queue.highPriorityWorkItems.Enqueue(workItem);
708+
}
709+
710+
Volatile.Write(ref queue._mayHaveHighPriorityWorkItems, 1);
711+
712+
queue.EnsureThreadRequested();
713+
}
714+
694715
internal static bool LocalFindAndPop(object callback)
695716
{
696717
ThreadPoolWorkQueueThreadLocals? tl = ThreadPoolWorkQueueThreadLocals.threadLocals;

src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1172,12 +1172,13 @@ public static void PrioritizationExperimentConfigVarTest()
11721172
RemoteExecutor.Invoke(() =>
11731173
{
11741174
const int WorkItemCountPerKind = 100;
1175+
const int Kinds = 3;
11751176

11761177
int completedWorkItemCount = 0;
11771178
var allWorkItemsCompleted = new AutoResetEvent(false);
11781179
Action<int> workItem = _ =>
11791180
{
1180-
if (Interlocked.Increment(ref completedWorkItemCount) == WorkItemCountPerKind * 3)
1181+
if (Interlocked.Increment(ref completedWorkItemCount) == WorkItemCountPerKind * Kinds)
11811182
{
11821183
allWorkItemsCompleted.Set();
11831184
}
@@ -1214,6 +1215,27 @@ public static void PrioritizationExperimentConfigVarTest()
12141215
0,
12151216
preferLocal: false);
12161217

1218+
ThreadPool.UnsafeQueueUserWorkItem(
1219+
_ =>
1220+
{
1221+
// Enqueue tasks from a thread pool thread into the local queue,
1222+
// then block this thread until a queued task completes.
1223+
1224+
startTest.CheckedWait();
1225+
1226+
Task queued = null;
1227+
for (int i = 0; i < WorkItemCountPerKind; i++)
1228+
{
1229+
queued = Task.Run(() => workItem(0));
1230+
}
1231+
1232+
queued
1233+
.ContinueWith(_ => { }) // prevent wait inlining
1234+
.Wait();
1235+
},
1236+
0,
1237+
preferLocal: false);
1238+
12171239
t = new Thread(() =>
12181240
{
12191241
// Enqueue local work from thread pool worker threads

0 commit comments

Comments
 (0)