Skip to content

Commit a747dc6

Browse files
committed
Offering body error must be done while holding lock
1 parent aca284e commit a747dc6

File tree

1 file changed

+52
-51
lines changed

1 file changed

+52
-51
lines changed

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 52 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,24 @@
5151
using namespace swift;
5252

5353
#if 0
54-
#define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) \
54+
#define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) \
5555
fprintf(stderr, "[%#lx] [%s:%d][group(%p%s)] (%s) " fmt "\n", \
5656
(unsigned long)Thread::current().platformThreadId(), \
5757
__FILE__, __LINE__, \
5858
group, group->isDiscardingResults() ? ",discardResults" : "", \
5959
__FUNCTION__, \
6060
__VA_ARGS__)
61+
62+
#define SWIFT_TASK_GROUP_DEBUG_LOG_0(group, fmt, ...) \
63+
fprintf(stderr, "[%#lx] [%s:%d][group(%p)] (%s) " fmt "\n", \
64+
(unsigned long)Thread::current().platformThreadId(), \
65+
__FILE__, __LINE__, \
66+
group, \
67+
__FUNCTION__, \
68+
__VA_ARGS__)
6169
#else
6270
#define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) (void)0
71+
#define SWIFT_TASK_GROUP_DEBUG_LOG_0(group, fmt, ...) (void)0
6372
#endif
6473

6574
using FutureFragment = AsyncTask::FutureFragment;
@@ -354,7 +363,11 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord {
354363
/// There can be only at-most-one waiting task on a group at any given time,
355364
/// and the waiting task is expected to be the parent task in which the group
356365
/// body is running.
357-
PollResult waitAll(AsyncTask *waitingTask);
366+
///
367+
/// \param bodyError error thrown by the body of a with...TaskGroup method
368+
/// \param waitingTask the task waiting on the group
369+
/// \return how the waiting task should be handled, e.g. must wait or can be completed immediately
370+
PollResult waitAll(SwiftError* bodyError, AsyncTask *waitingTask);
358371

359372
// Enqueue the completed task onto ready queue if there are no waiting tasks yet
360373
virtual void enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult) = 0;
@@ -411,6 +424,16 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord {
411424
virtual TaskGroupStatus statusAddPendingTaskRelaxed(bool unconditionally) = 0;
412425
};
413426

427+
[[maybe_unused]]
428+
static std::string to_string(TaskGroupBase::PollStatus status) {
429+
switch (status) {
430+
case TaskGroupBase::PollStatus::Empty: return "Empty";
431+
case TaskGroupBase::PollStatus::MustWait: return "MustWait";
432+
case TaskGroupBase::PollStatus::Success: return "Success";
433+
case TaskGroupBase::PollStatus::Error: return "Error";
434+
}
435+
}
436+
414437
/// The status of a task group.
415438
///
416439
/// Its exact structure depends on the type of group, and therefore a group must be passed to operations
@@ -623,7 +646,7 @@ class AccumulatingTaskGroup: public TaskGroupBase {
623646
/// so unconditionally.
624647
///
625648
/// Returns *assumed* new status, including the just performed +1.
626-
TaskGroupStatus statusAddPendingTaskRelaxed(bool unconditionally) {
649+
TaskGroupStatus statusAddPendingTaskRelaxed(bool unconditionally) override {
627650
auto old = status.fetch_add(TaskGroupStatus::onePendingTask,
628651
std::memory_order_relaxed);
629652
auto s = TaskGroupStatus{old + TaskGroupStatus::onePendingTask};
@@ -720,7 +743,7 @@ class DiscardingTaskGroup: public TaskGroupBase {
720743
/// so unconditionally.
721744
///
722745
/// Returns *assumed* new status, including the just performed +1.
723-
TaskGroupStatus statusAddPendingTaskRelaxed(bool unconditionally) {
746+
TaskGroupStatus statusAddPendingTaskRelaxed(bool unconditionally) override {
724747
auto old = status.fetch_add(TaskGroupStatus::onePendingTask,
725748
std::memory_order_relaxed);
726749
auto s = TaskGroupStatus{old + TaskGroupStatus::onePendingTask};
@@ -779,8 +802,6 @@ class DiscardingTaskGroup: public TaskGroupBase {
779802
/// and the waitingTask eventually be woken up by a completion.
780803
PollResult poll(AsyncTask *waitingTask);
781804

782-
bool offerBodyError(SwiftError* _Nonnull bodyError);
783-
784805
private:
785806
/// Resume waiting task with specified error
786807
void resumeWaitingTaskWithError(SwiftError *error, TaskGroupStatus &assumed);
@@ -862,8 +883,8 @@ static void swift_taskGroup_initializeWithFlagsImpl(size_t rawGroupFlags,
862883
TaskGroup *group, const Metadata *T) {
863884

864885
TaskGroupFlags groupFlags(rawGroupFlags);
865-
SWIFT_TASK_DEBUG_LOG("group(%p) create; flags: isDiscardingResults=%d",
866-
group, groupFlags.isDiscardResults());
886+
SWIFT_TASK_GROUP_DEBUG_LOG_0(group, "create group; flags: isDiscardingResults=%d",
887+
groupFlags.isDiscardResults());
867888

868889
TaskGroupBase *impl;
869890
if (groupFlags.isDiscardResults()) {
@@ -1158,7 +1179,7 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
11581179
assert(completedTask->hasChildFragment());
11591180
assert(completedTask->hasGroupChildFragment());
11601181
assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this));
1161-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, completedTask:%p , status:%s", completedTask, statusString().c_str());
1182+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, completedTask:%p, status:%s", completedTask, statusString().c_str());
11621183

11631184
// The current ownership convention is that we are *not* given ownership
11641185
// of a retain on completedTask; we're called from the task completion
@@ -1224,7 +1245,7 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
12241245
}
12251246

12261247
auto afterComplete = statusCompletePendingAssumeRelease();
1227-
(void)afterComplete; // silence "not used" warning
1248+
(void) afterComplete;
12281249
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, either more pending tasks, or no waiting task, status:%s",
12291250
afterComplete.to_string(this).c_str());
12301251
}
@@ -1635,7 +1656,7 @@ static void swift_taskGroup_waitAllImpl(
16351656
waitingTask->ResumeContext = rawContext;
16361657

16371658
auto group = asBaseImpl(_group);
1638-
PollResult polled = group->waitAll(waitingTask);
1659+
PollResult polled = group->waitAll(bodyError, waitingTask);
16391660

16401661
auto context = static_cast<TaskFutureWaitAsyncContext *>(rawContext);
16411662
context->ResumeParent =
@@ -1644,23 +1665,13 @@ static void swift_taskGroup_waitAllImpl(
16441665
context->errorResult = nullptr;
16451666
context->successResultPointer = resultPointer;
16461667

1647-
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl, waiting task = %p, bodyError = %p, status:%s",
1648-
waitingTask, bodyError, group->statusString().c_str());
1668+
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl, waiting task = %p, bodyError = %p, status:%s, polled.status = %s",
1669+
waitingTask, bodyError, group->statusString().c_str(), to_string(polled.status).c_str());
16491670

16501671
switch (polled.status) {
16511672
case PollStatus::MustWait:
1652-
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAll MustWait, pending tasks exist, waiting task = %p",
1673+
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl MustWait, pending tasks exist, waiting task = %p",
16531674
waitingTask);
1654-
if (bodyError && group->isDiscardingResults()) {
1655-
auto discardingGroup = asDiscardingImpl(_group);
1656-
bool storedBodyError = discardingGroup->offerBodyError(bodyError);
1657-
if (storedBodyError) {
1658-
SWIFT_TASK_GROUP_DEBUG_LOG(
1659-
group, "waitAll, stored error thrown by with...Group body, error = %p",
1660-
bodyError);
1661-
}
1662-
}
1663-
16641675
// The waiting task has been queued on the channel,
16651676
// there were pending tasks so it will be woken up eventually.
16661677
#ifdef __ARM_ARCH_7K__
@@ -1671,7 +1682,7 @@ static void swift_taskGroup_waitAllImpl(
16711682
#endif /* __ARM_ARCH_7K__ */
16721683

16731684
case PollStatus::Error:
1674-
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAll found error, waiting task = %p, body error = %p, status:%s",
1685+
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl Error, waiting task = %p, body error = %p, status:%s",
16751686
waitingTask, bodyError, group->statusString().c_str());
16761687
#if SWIFT_TASK_GROUP_BODY_THROWN_ERROR_WINS
16771688
if (bodyError) {
@@ -1693,17 +1704,10 @@ static void swift_taskGroup_waitAllImpl(
16931704
return waitingTask->runInFullyEstablishedContext();
16941705

16951706
case PollStatus::Empty:
1696-
/// Anything else than a "MustWait" can be treated as a successful poll.
1697-
/// Only if there are in flight pending tasks do we need to wait after all.
1698-
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAll %s, waiting task = %p, status:%s",
1699-
polled.status == TaskGroupBase::PollStatus::Empty ? "empty" : "success",
1700-
waitingTask, group->statusString().c_str());
1701-
1702-
17031707
case PollStatus::Success:
17041708
/// Anything else than a "MustWait" can be treated as a successful poll.
17051709
/// Only if there are in flight pending tasks do we need to wait after all.
1706-
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAll %s, waiting task = %p, status:%s",
1710+
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl %s, waiting task = %p, status:%s",
17071711
polled.status == TaskGroupBase::PollStatus::Empty ? "empty" : "success",
17081712
waitingTask, group->statusString().c_str());
17091713

@@ -1718,26 +1722,13 @@ static void swift_taskGroup_waitAllImpl(
17181722
}
17191723
}
17201724

1721-
bool DiscardingTaskGroup::offerBodyError(SwiftError* _Nonnull bodyError) {
1725+
/// Must be called while holding the `taskGroup.lock`!
1726+
/// This is because the discarding task group still has some follow-up operations that must
1727+
/// be performed atomically after this operation sometimes, so we cannot unlock inside `waitAll` itself.
1728+
PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask) {
17221729
lock(); // TODO: remove group lock, and use status for synchronization
17231730

1724-
if (!readyQueue.isEmpty()) {
1725-
// already other error stored, discard this one
1726-
unlock();
1727-
return false;
1728-
}
1729-
1730-
auto readyItem = ReadyQueueItem::getRawError(this, bodyError);
1731-
readyQueue.enqueue(readyItem);
1732-
unlock();
1733-
1734-
return true;
1735-
}
1736-
1737-
PollResult TaskGroupBase::waitAll(AsyncTask *waitingTask) {
1738-
lock(); // TODO: remove group lock, and use status for synchronization
1739-
1740-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "waitAll, status = %s", statusString().c_str());
1731+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "waitAll, bodyError = %p, status = %s", bodyError, statusString().c_str());
17411732
PollResult result = PollResult::getEmpty(this->successType);
17421733
result.status = PollStatus::Empty;
17431734
result.storage = nullptr;
@@ -1781,6 +1772,16 @@ PollResult TaskGroupBase::waitAll(AsyncTask *waitingTask) {
17811772
}
17821773

17831774
// ==== 2) Add to wait queue -------------------------------------------------
1775+
1776+
// ---- 2.1) Discarding task group may need to story the bodyError before we park
1777+
if (bodyError && isDiscardingResults()) {
1778+
auto discardingGroup = asDiscardingImpl(this);
1779+
assert(readyQueue.isEmpty() &&
1780+
"only a single error may be stored in discarding task group, but something was enqueued already");
1781+
auto readyItem = ReadyQueueItem::getRawError(discardingGroup, bodyError);
1782+
readyQueue.enqueue(readyItem);
1783+
}
1784+
17841785
auto waitHead = waitQueue.load(std::memory_order_acquire);
17851786
_swift_tsan_release(static_cast<Job *>(waitingTask));
17861787
while (true) {

0 commit comments

Comments
 (0)