Skip to content

Commit b30f635

Browse files
authored
[Concurrency] Set thread base priority when running escalated Tasks (#84895)
1 parent 86fb7ac commit b30f635

File tree

8 files changed

+134
-28
lines changed

8 files changed

+134
-28
lines changed

include/swift/ABI/Task.h

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@
2929
#include "bitset"
3030
#include "queue" // TODO: remove and replace with our own mpsc
3131

32+
// Does the runtime integrate with libdispatch?
33+
#if defined(SWIFT_CONCURRENCY_USES_DISPATCH)
34+
#define SWIFT_CONCURRENCY_ENABLE_DISPATCH SWIFT_CONCURRENCY_USES_DISPATCH
35+
#else
36+
#define SWIFT_CONCURRENCY_ENABLE_DISPATCH 0
37+
#endif
38+
3239
// Does the runtime provide priority escalation support?
3340
#ifndef SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
3441
#if SWIFT_CONCURRENCY_ENABLE_DISPATCH && \
@@ -422,7 +429,22 @@ class AsyncTask : public Job {
422429
///
423430
/// Generally this should be done immediately after updating
424431
/// ActiveTask.
425-
void flagAsRunning();
432+
///
433+
/// When Dispatch is used for the default executor:
434+
/// * If the return value is non-zero, it must be passed
435+
/// to swift_dispatch_thread_reset_override_self
436+
/// before returning to the executor.
437+
/// * If the return value is zero, it may be ignored or passed to
438+
/// the aforementioned function (which will ignore values of zero).
439+
/// The current implementation will always return zero
440+
/// if you call flagAsRunning again before calling
441+
/// swift_dispatch_thread_reset_override_self with the
442+
/// initial value. This supports suspending and immediately
443+
/// resuming a Task without returning up the callstack.
444+
///
445+
/// For all other default executors, flagAsRunning
446+
/// will return zero which may be ignored.
447+
uint32_t flagAsRunning();
426448

427449
/// Flag that this task is now suspended with information about what it is
428450
/// waiting on.

include/swift/Runtime/Concurrency.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,6 @@
3838
#define SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL 0
3939
#endif
4040

41-
// Does the runtime integrate with libdispatch?
42-
#if defined(SWIFT_CONCURRENCY_USES_DISPATCH)
43-
#define SWIFT_CONCURRENCY_ENABLE_DISPATCH SWIFT_CONCURRENCY_USES_DISPATCH
44-
#else
45-
#define SWIFT_CONCURRENCY_ENABLE_DISPATCH 0
46-
#endif
47-
4841
namespace swift {
4942
class DefaultActor;
5043
class TaskOptionRecord;

include/swift/Runtime/DispatchShims.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,27 @@ swift_dispatch_thread_override_self(qos_class_t override_qos) {
4848
return 0;
4949
}
5050

51+
static inline uint32_t
52+
swift_dispatch_thread_override_self_with_base(qos_class_t override_qos, qos_class_t base_qos) {
53+
54+
if (__builtin_available(macOS 27.0, iOS 27.0, tvOS 27.0, watchOS 27.0, *)) {
55+
return dispatch_thread_override_self_with_base(override_qos, base_qos);
56+
} else if (__builtin_available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)) {
57+
// If we don't have the ability to set our base qos correctly, at least set the override
58+
// We want to return 0 here because we have nothing to reset in this case
59+
(void) dispatch_thread_override_self(override_qos);
60+
}
61+
62+
return 0;
63+
}
64+
65+
static inline void
66+
swift_dispatch_thread_reset_override_self(uint32_t opaque) {
67+
if (__builtin_available(macOS 27.0, iOS 27.0, tvOS 27.0, watchOS 27.0, *)) {
68+
dispatch_thread_reset_override_self(opaque);
69+
}
70+
}
71+
5172
static inline int
5273
swift_dispatch_lock_override_start_with_debounce(dispatch_lock_t *lock_addr,
5374
dispatch_tid_t expected_thread, qos_class_t override_to_apply) {

stdlib/public/Concurrency/Actor.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,12 +237,17 @@ void swift::runJobInEstablishedExecutorContext(Job *job) {
237237
// current thread. If the task suspends somewhere, it should
238238
// update the task status appropriately; we don't need to update
239239
// it afterwards.
240-
task->flagAsRunning();
240+
[[maybe_unused]]
241+
uint32_t dispatchOpaquePriority = task->flagAsRunning();
241242

242243
auto traceHandle = concurrency::trace::job_run_begin(job);
243244
task->runInFullyEstablishedContext();
244245
concurrency::trace::job_run_end(traceHandle);
245246

247+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
248+
swift_dispatch_thread_reset_override_self(dispatchOpaquePriority);
249+
#endif
250+
246251
assert(ActiveTask::get() == nullptr &&
247252
"active task wasn't cleared before suspending?");
248253
if (oldTask) ActiveTask::set(oldTask);

stdlib/public/Concurrency/Task.cpp

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,22 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
124124

125125
// NOTE: this acquire synchronizes with `completeFuture`.
126126
auto queueHead = fragment->waitQueue.load(std::memory_order_acquire);
127-
bool contextInitialized = false;
127+
bool suspendedWaiter = false;
128128
while (true) {
129129
switch (queueHead.getStatus()) {
130130
case Status::Error:
131131
case Status::Success:
132132
SWIFT_TASK_DEBUG_LOG("task %p waiting on task %p, completed immediately",
133133
waitingTask, this);
134134
_swift_tsan_acquire(static_cast<Job *>(this));
135-
if (contextInitialized) waitingTask->flagAsRunning();
135+
if (suspendedWaiter) {
136+
// This will always return zero because we were just
137+
// running this Task so its BasePriority (which is
138+
// immutable) should've already been set on the thread.
139+
[[maybe_unused]]
140+
uint32_t opaque = waitingTask->flagAsRunning();
141+
assert(opaque == 0);
142+
}
136143
// The task is done; we don't need to wait.
137144
return queueHead.getStatus();
138145

@@ -146,8 +153,8 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
146153
break;
147154
}
148155

149-
if (!contextInitialized) {
150-
contextInitialized = true;
156+
if (!suspendedWaiter) {
157+
suspendedWaiter = true;
151158
auto context =
152159
reinterpret_cast<TaskFutureWaitAsyncContext *>(waitingTaskContext);
153160
context->errorResult = nullptr;
@@ -1659,8 +1666,11 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
16591666
// we try to tail-call.
16601667
} while (false);
16611668
#else
1662-
// Restore the running state of the task and resume it.
1663-
task->flagAsRunning();
1669+
// This will always return zero because we were just running this Task so its
1670+
// BasePriority (which is immutable) should've already been set on the thread.
1671+
[[maybe_unused]]
1672+
uint32_t opaque = task->flagAsRunning();
1673+
assert(opaque == 0);
16641674
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
16651675

16661676
if (context->isExecutorSwitchForced())

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1822,7 +1822,12 @@ reevaluate_if_taskgroup_has_results:;
18221822
// We're going back to running the task, so if we suspended before,
18231823
// we need to flag it as running again.
18241824
if (hasSuspended) {
1825-
waitingTask->flagAsRunning();
1825+
// This will always return zero because we were just
1826+
// running this Task so its BasePriority (which is
1827+
// immutable) should've already been set on the thread.
1828+
[[maybe_unused]]
1829+
uint32_t opaque = waitingTask->flagAsRunning();
1830+
assert(opaque == 0);
18261831
}
18271832

18281833
// Success! We are allowed to poll.

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -970,32 +970,40 @@ inline bool AsyncTask::isCancelled() const {
970970
.isCancelled();
971971
}
972972

973-
inline void AsyncTask::flagAsRunning() {
973+
inline uint32_t AsyncTask::flagAsRunning() {
974974

975975
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
976976
dispatch_thread_override_info_s threadOverrideInfo;
977977
threadOverrideInfo = swift_dispatch_thread_get_current_override_qos_floor();
978978
qos_class_t overrideFloor = threadOverrideInfo.override_qos_floor;
979+
qos_class_t basePriorityCeil = overrideFloor;
980+
qos_class_t taskBasePriority = (qos_class_t) _private().BasePriority;
979981
#endif
980982

981983
auto oldStatus = _private()._status().load(std::memory_order_relaxed);
982984
assert(!oldStatus.isRunning());
983985
assert(!oldStatus.isComplete());
984986

987+
uint32_t dispatchOpaquePriority = 0;
985988
if (!oldStatus.hasTaskDependency()) {
986989
SWIFT_TASK_DEBUG_LOG("%p->flagAsRunning() with no task dependency", this);
987990
assert(_private().dependencyRecord == nullptr);
988991

989992
while (true) {
990993
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
991-
// Task's priority is greater than the thread's - do a self escalation
994+
// If the base priority is not equal to the current override floor then
995+
// dispqatch may need to apply the base priority to the thread. If the
996+
// current priority is higher than the override floor, then dispatch may
997+
// need to apply a self-override. In either case, call into dispatch to
998+
// do this.
992999
qos_class_t maxTaskPriority = (qos_class_t) oldStatus.getStoredPriority();
993-
if (threadOverrideInfo.can_override && (maxTaskPriority > overrideFloor)) {
994-
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x",
995-
overrideFloor, this, maxTaskPriority);
1000+
if (threadOverrideInfo.can_override && (taskBasePriority != basePriorityCeil || maxTaskPriority > overrideFloor)) {
1001+
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x and base priority %#x",
1002+
overrideFloor, this, maxTaskPriority, taskBasePriority);
9961003

997-
(void) swift_dispatch_thread_override_self(maxTaskPriority);
1004+
dispatchOpaquePriority = swift_dispatch_thread_override_self_with_base(maxTaskPriority, taskBasePriority);
9981005
overrideFloor = maxTaskPriority;
1006+
basePriorityCeil = taskBasePriority;
9991007
}
10001008
#endif
10011009
// Set self as executor and remove escalation bit if any - the task's
@@ -1024,14 +1032,19 @@ inline void AsyncTask::flagAsRunning() {
10241032
ActiveTaskStatus& newStatus) {
10251033

10261034
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1027-
// Task's priority is greater than the thread's - do a self escalation
1035+
// If the base priority is not equal to the current override floor then
1036+
// dispqatch may need to apply the base priority to the thread. If the
1037+
// current priority is higher than the override floor, then dispatch may
1038+
// need to apply a self-override. In either case, call into dispatch to
1039+
// do this.
10281040
qos_class_t maxTaskPriority = (qos_class_t) oldStatus.getStoredPriority();
1029-
if (threadOverrideInfo.can_override && (maxTaskPriority > overrideFloor)) {
1030-
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x",
1031-
overrideFloor, this, maxTaskPriority);
1041+
if (threadOverrideInfo.can_override && (taskBasePriority != basePriorityCeil || maxTaskPriority > overrideFloor)) {
1042+
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x and base priority %#x",
1043+
overrideFloor, this, maxTaskPriority, taskBasePriority);
10321044

1033-
(void) swift_dispatch_thread_override_self(maxTaskPriority);
1045+
dispatchOpaquePriority = swift_dispatch_thread_override_self_with_base(maxTaskPriority, taskBasePriority);
10341046
overrideFloor = maxTaskPriority;
1047+
basePriorityCeil = taskBasePriority;
10351048
}
10361049
#endif
10371050
// Set self as executor and remove escalation bit if any - the task's
@@ -1047,7 +1060,7 @@ inline void AsyncTask::flagAsRunning() {
10471060
swift_task_enterThreadLocalContext(
10481061
(char *)&_private().ExclusivityAccessSet[0]);
10491062
}
1050-
1063+
return dispatchOpaquePriority;
10511064
}
10521065

10531066
/// TODO (rokhinip): We need the handoff of the thread to the next executor to

test/Concurrency/async_task_priority.swift

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,43 @@ actor Test {
322322
await task2.value // Escalate task2 which should be queued behind task1 on the actor
323323
}
324324

325+
// This test will only work properly on 27.0+
326+
if #available(macOS 27.0, iOS 27.0, tvOS 27.0, watchOS 27.0, *) {
327+
tests.test("Task escalation doesn't impact qos_class_self") {
328+
let task = Task(priority: .utility) {
329+
let initialQos = DispatchQoS(
330+
qosClass: DispatchQoS.QoSClass(rawValue: qos_class_self())!,
331+
relativePriority: 0)
332+
expectEqual(initialQos, DispatchQoS.utility)
333+
let childTask = Task {
334+
let qosBeforeEscalate = DispatchQoS(
335+
qosClass: DispatchQoS.QoSClass(rawValue: qos_class_self())!,
336+
relativePriority: 0)
337+
// Unstructured task should inherit utility priority
338+
expectEqual(qosBeforeEscalate, DispatchQoS.utility)
339+
// Escalate priority override, not base QoS
340+
withUnsafeCurrentTask {
341+
$0!.escalatePriority(to: .userInitiated)
342+
}
343+
let qosAfterEscalate = DispatchQoS(
344+
qosClass: DispatchQoS.QoSClass(rawValue: qos_class_self())!,
345+
relativePriority: 0)
346+
// qos_class_self should remain utility after escalation
347+
expectEqual(qosAfterEscalate, DispatchQoS.utility)
348+
await Task.yield()
349+
let qosAfterYield = DispatchQoS(
350+
qosClass: DispatchQoS.QoSClass(rawValue: qos_class_self())!,
351+
relativePriority: 0)
352+
// qos_class_self should remain utility after yield
353+
expectEqual(qosAfterYield, DispatchQoS.utility)
354+
}
355+
356+
await childTask.value
357+
}
358+
359+
await task.value
360+
}
361+
}
325362
}
326363
await runAllTestsAsync()
327364
}

0 commit comments

Comments
 (0)