Skip to content

Commit 61a1835

Browse files
committed
wip on trying to fix by changing the offer
1 parent 009f64b commit 61a1835

File tree

1 file changed

+90
-135
lines changed

1 file changed

+90
-135
lines changed

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 90 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord {
374374
virtual void enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult) = 0;
375375

376376
/// Resume waiting task with result from `completedTask`
377-
void resumeWaitingTask(AsyncTask *completedTask, TaskGroupStatus &assumed, bool hadErrorResult);
377+
void resumeWaitingTask(AsyncTask *completedTask, TaskGroupStatus &assumed, bool hadErrorResult, bool alreadyDecremented = false);
378378

379379
// ==== Status manipulation -------------------------------------------------
380380

@@ -415,6 +415,8 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord {
415415
/// A waiting task MUST have been already enqueued in the `waitQueue`.
416416
TaskGroupStatus statusMarkWaitingAssumeRelease();
417417

418+
TaskGroupStatus statusAddPendingTaskRelaxed(bool unconditionally);
419+
418420
/// Cancels the group and returns true if was already cancelled before.
419421
/// After this function returns, the group is guaranteed to be cancelled.
420422
///
@@ -427,7 +429,6 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord {
427429
/// This also sets
428430
bool cancelAll();
429431

430-
virtual TaskGroupStatus statusAddPendingTaskRelaxed(bool unconditionally) = 0;
431432
};
432433

433434
[[maybe_unused]]
@@ -494,13 +495,14 @@ struct TaskGroupStatus {
494495

495496
/// Status value decrementing the Ready, Pending and Waiting counters by one.
496497
TaskGroupStatus completingPendingReadyWaiting(const TaskGroupBase* _Nonnull group) {
497-
assert(pendingTasks(group) &&
498-
"can only complete waiting task when pending tasks available");
498+
// assert(pendingTasks(group) &&
499+
// "can only complete waiting task when pending tasks available");
499500
assert(group->isDiscardingResults() || readyTasks(group) &&
500501
"can only complete waiting task when ready tasks available");
501502
assert(hasWaitingTask() &&
502503
"can only complete waiting task when waiting task available");
503-
uint64_t change = waiting + onePendingTask;
504+
uint64_t change = waiting;
505+
change += pendingTasks(group) ? onePendingTask : 0;
504506
// only while accumulating results does the status contain "ready" bits;
505507
// so if we're in "discard results" mode, we must not decrement the ready count,
506508
// as there is no ready count in the status.
@@ -509,11 +511,12 @@ struct TaskGroupStatus {
509511
}
510512

511513
TaskGroupStatus completingPendingReady(const TaskGroupBase* _Nonnull group) {
512-
assert(pendingTasks(group) &&
513-
"can only complete waiting task when pending tasks available");
514+
// assert(pendingTasks(group) &&
515+
// "can only complete waiting task when pending tasks available");
514516
assert(group->isDiscardingResults() || readyTasks(group) &&
515517
"can only complete waiting task when ready tasks available");
516-
auto change = onePendingTask;
518+
auto change = 0;
519+
change += pendingTasks(group) ? onePendingTask : 0;
517520
change += group->isAccumulatingResults() ? oneReadyTask : 0;
518521
return TaskGroupStatus{status - change};
519522
}
@@ -596,6 +599,39 @@ TaskGroupStatus TaskGroupBase::statusMarkWaitingAssumeRelease() {
596599
return TaskGroupStatus{old | TaskGroupStatus::waiting};
597600
}
598601

602+
/// Add a single pending task to the status counter.
603+
/// This is used to implement next() properly, as we need to know if there
604+
/// are pending tasks worth suspending/waiting for or not.
605+
///
606+
/// Note that the group does *not* store child tasks at all, as they are
607+
/// stored in the `TaskGroupTaskStatusRecord` inside the current task, that
608+
/// is currently executing the group. Here we only need the counts of
609+
/// pending/ready tasks.
610+
///
611+
/// If the `unconditionally` parameter is `true` the operation always successfully
612+
/// adds a pending task, even if the group is cancelled. If the unconditionally
613+
/// flag is `false`, the added pending count will be *reverted* before returning.
614+
/// This is because we will NOT add a task to a cancelled group, unless doing
615+
/// so unconditionally.
616+
///
617+
/// Returns *assumed* new status, including the just performed +1.
618+
TaskGroupStatus TaskGroupBase::statusAddPendingTaskRelaxed(bool unconditionally) {
619+
620+
auto old = status.fetch_add(TaskGroupStatus::onePendingTask,
621+
std::memory_order_relaxed);
622+
auto s = TaskGroupStatus{old + TaskGroupStatus::onePendingTask};
623+
624+
if (!unconditionally && s.isCancelled()) {
625+
// revert that add, it was meaningless
626+
auto o = status.fetch_sub(TaskGroupStatus::onePendingTask,
627+
std::memory_order_relaxed);
628+
s = TaskGroupStatus{o - TaskGroupStatus::onePendingTask};
629+
}
630+
631+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "addPending, after = %s", s.to_string(this).c_str());
632+
return s;
633+
}
634+
599635
TaskGroupStatus TaskGroupBase::statusRemoveWaitingRelease() {
600636
auto old = status.fetch_and(~TaskGroupStatus::waiting,
601637
std::memory_order_release);
@@ -646,49 +682,6 @@ class AccumulatingTaskGroup: public TaskGroupBase {
646682
return s;
647683
}
648684

649-
/// Add a single pending task to the status counter.
650-
/// This is used to implement next() properly, as we need to know if there
651-
/// are pending tasks worth suspending/waiting for or not.
652-
///
653-
/// Note that the group does *not* store child tasks at all, as they are
654-
/// stored in the `TaskGroupTaskStatusRecord` inside the current task, that
655-
/// is currently executing the group. Here we only need the counts of
656-
/// pending/ready tasks.
657-
///
658-
/// If the `unconditionally` parameter is `true` the operation always successfully
659-
/// adds a pending task, even if the group is cancelled. If the unconditionally
660-
/// flag is `false`, the added pending count will be *reverted* before returning.
661-
/// This is because we will NOT add a task to a cancelled group, unless doing
662-
/// so unconditionally.
663-
///
664-
/// Returns *assumed* new status, including the just performed +1.
665-
TaskGroupStatus statusAddPendingTaskRelaxed(bool unconditionally) override {
666-
auto old = status.fetch_add(TaskGroupStatus::onePendingTask,
667-
std::memory_order_relaxed);
668-
auto s = TaskGroupStatus{old + TaskGroupStatus::onePendingTask};
669-
670-
if (!unconditionally && s.isCancelled()) {
671-
// revert that add, it was meaningless
672-
auto o = status.fetch_sub(TaskGroupStatus::onePendingTask,
673-
std::memory_order_relaxed);
674-
s = TaskGroupStatus{o - TaskGroupStatus::onePendingTask};
675-
}
676-
677-
return s;
678-
}
679-
680-
/// Decrement the pending status count.
681-
/// Returns the *assumed* new status, including the just performed -1.
682-
TaskGroupStatus statusCompletePendingAssumeRelease() {
683-
assert(this->isDiscardingResults()
684-
&& "only a discardResults TaskGroup may use completePending, "
685-
"since it avoids updating the ready count, which other groups need.");
686-
auto old = status.fetch_sub(TaskGroupStatus::onePendingTask,
687-
std::memory_order_release);
688-
assert(TaskGroupStatus{old}.pendingTasks(this) > 0 && "attempted to decrement pending count when it was 0 already");
689-
return TaskGroupStatus{old - TaskGroupStatus::onePendingTask};
690-
}
691-
692685
virtual void offer(AsyncTask *completed, AsyncContext *context) override;
693686

694687
virtual void enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult) override;
@@ -731,37 +724,6 @@ class DiscardingTaskGroup: public TaskGroupBase {
731724
return TaskGroupStatus{status.load(std::memory_order_acquire)};
732725
}
733726

734-
/// Add a single pending task to the status counter.
735-
/// This is used to implement next() properly, as we need to know if there
736-
/// are pending tasks worth suspending/waiting for or not.
737-
///
738-
/// Note that the group does *not* store child tasks at all, as they are
739-
/// stored in the `TaskGroupTaskStatusRecord` inside the current task, that
740-
/// is currently executing the group. Here we only need the counts of
741-
/// pending/ready tasks.
742-
///
743-
/// If the `unconditionally` parameter is `true` the operation always successfully
744-
/// adds a pending task, even if the group is cancelled. If the unconditionally
745-
/// flag is `false`, the added pending count will be *reverted* before returning.
746-
/// This is because we will NOT add a task to a cancelled group, unless doing
747-
/// so unconditionally.
748-
///
749-
/// Returns *assumed* new status, including the just performed +1.
750-
TaskGroupStatus statusAddPendingTaskRelaxed(bool unconditionally) override {
751-
auto old = status.fetch_add(TaskGroupStatus::onePendingTask,
752-
std::memory_order_relaxed);
753-
auto s = TaskGroupStatus{old + TaskGroupStatus::onePendingTask};
754-
755-
if (!unconditionally && s.isCancelled()) {
756-
// revert that add, it was meaningless
757-
auto o = status.fetch_sub(TaskGroupStatus::onePendingTask,
758-
std::memory_order_relaxed);
759-
s = TaskGroupStatus{o - TaskGroupStatus::onePendingTask};
760-
}
761-
762-
return s;
763-
}
764-
765727
TaskGroupStatus statusLoadRelaxed() {
766728
return TaskGroupStatus{status.load(std::memory_order_relaxed)};
767729
}
@@ -784,9 +746,6 @@ class DiscardingTaskGroup: public TaskGroupBase {
784746
/// Decrement the pending status count.
785747
/// Returns the *assumed* new status, including the just performed -1.
786748
TaskGroupStatus statusCompletePendingAssumeRelease() {
787-
assert(this->isDiscardingResults()
788-
&& "only a discardResults TaskGroup may use completePending, "
789-
"since it avoids updating the ready count, which other groups need.");
790749
auto old = status.fetch_sub(TaskGroupStatus::onePendingTask,
791750
std::memory_order_release);
792751
assert(TaskGroupStatus{old}.pendingTasks(this) > 0 && "attempted to decrement pending count when it was 0 already");
@@ -1123,7 +1082,6 @@ void AccumulatingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *contex
11231082
assert(completedTask->hasChildFragment());
11241083
assert(completedTask->hasGroupChildFragment());
11251084
assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this));
1126-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, completedTask:%p , status:%s", completedTask, statusString().c_str());
11271085

11281086
// The current ownership convention is that we are *not* given ownership
11291087
// of a retain on completedTask; we're called from the task completion
@@ -1135,6 +1093,8 @@ void AccumulatingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *contex
11351093
// will need to release in the other path.
11361094
lock(); // TODO: remove fragment lock, and use status for synchronization
11371095

1096+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, completedTask:%p , status:%s", completedTask, statusString().c_str());
1097+
11381098
// Immediately increment ready count and acquire the status
11391099
//
11401100
// NOTE: If the group is `discardResults` this becomes a plain load(),
@@ -1183,16 +1143,7 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
11831143
assert(completedTask->hasChildFragment());
11841144
assert(completedTask->hasGroupChildFragment());
11851145
assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this));
1186-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, completedTask:%p, status:%s", completedTask, statusString().c_str());
11871146

1188-
// The current ownership convention is that we are *not* given ownership
1189-
// of a retain on completedTask; we're called from the task completion
1190-
// handler, and the task will release itself. So if we need the task
1191-
// to survive this call (e.g. because there isn't an immediate waiting
1192-
// task), we will need to retain it, which we do in enqueueCompletedTask.
1193-
// This is wasteful, and the task completion function should be fixed to
1194-
// transfer ownership of a retain into this function, in which case we
1195-
// will need to release in the other path.
11961147
lock(); // TODO: remove fragment lock, and use status for synchronization
11971148

11981149
// Since we don't maintain ready counts in a discarding group, only load the status.
@@ -1207,53 +1158,29 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
12071158
hadErrorResult = true;
12081159
}
12091160

1210-
/// If we're the last task we've been waiting for, and there is a waiting task on the group
1211-
bool lastPendingTaskAndWaitingTask =
1212-
assumed.pendingTasks(this) == 1 &&
1213-
assumed.hasWaitingTask();
1161+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, completedTask:%p, error:%d, status:%s",
1162+
completedTask, hadErrorResult, assumed.to_string(this).c_str());
12141163

12151164
// Immediately decrement the pending count.
12161165
// We can do this, since in this mode there is no ready count to keep track of,
12171166
// and we immediately discard the result.
12181167
SWIFT_TASK_GROUP_DEBUG_LOG(this, "discard result, hadError:%d, was pending:%llu, status = %s",
12191168
hadErrorResult, assumed.pendingTasks(this), assumed.to_string(this).c_str());
1220-
// If this was the last pending task, and there is a waiting task (from waitAll),
1221-
// we must resume the task; but not otherwise. There cannot be any waiters on next()
1222-
// while we're discarding results.
1223-
if (lastPendingTaskAndWaitingTask) {
1224-
ReadyQueueItem item;
1225-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, offered last pending task, resume waiting task:%p",
1226-
waitQueue.load(std::memory_order_relaxed));
1227-
if (readyQueue.dequeue(item)) {
1228-
switch (item.getStatus()) {
1229-
case ReadyStatus::RawError:
1230-
resumeWaitingTaskWithError(item.getRawError(this), assumed);
1231-
break;
1232-
case ReadyStatus::Error:
1233-
resumeWaitingTask(item.getTask(), assumed, /*hadErrorResult=*/true);
1234-
break;
1235-
default:
1236-
swift_Concurrency_fatalError(0, "only errors can be stored by a discarding task group, yet it wasn't an error!");
1237-
}
1238-
} else {
1239-
resumeWaitingTask(completedTask, assumed, /*hadErrorResult=*/hadErrorResult);
1240-
}
1241-
} else {
1242-
assert(!lastPendingTaskAndWaitingTask);
1169+
12431170
if (hadErrorResult && readyQueue.isEmpty()) {
12441171
// a discardResults throwing task group must retain the FIRST error it encounters.
1245-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer error, completedTask:%p", completedTask);
1172+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer error, error:%d, completedTask:%p", hadErrorResult, completedTask);
12461173
enqueueCompletedTask(completedTask, /*hadErrorResult=*/hadErrorResult);
12471174
} else {
12481175
// we just are going to discard it.
1176+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer discard the completedTask:%p, status=%s", completedTask, statusString().c_str());
12491177
_swift_taskGroup_detachChild(asAbstract(this), completedTask);
12501178
}
12511179

12521180
auto afterComplete = statusCompletePendingAssumeRelease();
12531181
(void) afterComplete;
1254-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, either more pending tasks, or no waiting task, status:%s",
1182+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, status:%s",
12551183
afterComplete.to_string(this).c_str());
1256-
}
12571184

12581185
// Discarding results mode, immediately treats a child failure as group cancellation.
12591186
// "All for one, one for all!" - any task failing must cause the group and all sibling tasks to be cancelled,
@@ -1262,28 +1189,52 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
12621189
cancelAll();
12631190
}
12641191

1192+
if (afterComplete.hasWaitingTask() && afterComplete.pendingTasks(this) == 0) {
1193+
ReadyQueueItem priorErrorItem;
1194+
if (readyQueue.dequeue(priorErrorItem)) {
1195+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, last pending task, prior error found, fail waitingTask:%p",
1196+
waitQueue.load(std::memory_order_relaxed));
1197+
switch (priorErrorItem.getStatus()) {
1198+
case ReadyStatus::RawError:
1199+
resumeWaitingTaskWithError(priorErrorItem.getRawError(this), assumed);
1200+
break;
1201+
case ReadyStatus::Error:
1202+
resumeWaitingTask(priorErrorItem.getTask(), assumed, /*hadErrorResult=*/true, /*alreadyDecremented=*/true);
1203+
break;
1204+
default:
1205+
swift_Concurrency_fatalError(0, "only errors can be stored by a discarding task group, yet it wasn't an error!");
1206+
}
1207+
} else {
1208+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, last pending task, completing with completedTask:%p, completedTask.error:%d, waitingTask:%p",
1209+
completedTask, hadErrorResult, waitQueue.load(std::memory_order_relaxed));
1210+
resumeWaitingTask(completedTask, assumed, /*hadErrorResult=*/hadErrorResult);
1211+
}
1212+
}
1213+
12651214
unlock();
12661215
}
12671216

12681217
/// Must be called while holding the TaskGroup lock.
12691218
void TaskGroupBase::resumeWaitingTask(
12701219
AsyncTask *completedTask,
12711220
TaskGroupStatus &assumed,
1272-
bool hadErrorResult) {
1221+
bool hadErrorResult,
1222+
bool alreadyDecremented) {
1223+
auto backup = waitQueue.load(std::memory_order_acquire);
12731224
auto waitingTask = waitQueue.load(std::memory_order_acquire);
12741225
assert(waitingTask && "waitingTask must not be null when attempting to resume it");
1275-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume waiting task = %p, complete with = %p",
1276-
waitingTask, completedTask);
1226+
assert(assumed.hasWaitingTask());
1227+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume waiting task = %p, error:%d, complete with = %p",
1228+
waitingTask, hadErrorResult, completedTask);
12771229
while (true) {
12781230
// ==== a) run waiting task directly -------------------------------------
1279-
assert(assumed.hasWaitingTask());
12801231
// assert(assumed.pendingTasks(this) && "offered to group with no pending tasks!");
12811232
// We are the "first" completed task to arrive,
12821233
// and since there is a task waiting we immediately claim and complete it.
12831234
if (waitQueue.compare_exchange_strong(
12841235
waitingTask, nullptr,
1285-
/*success*/ std::memory_order_release,
1286-
/*failure*/ std::memory_order_acquire)) {
1236+
/*success*/ std::memory_order_seq_cst,
1237+
/*failure*/ std::memory_order_seq_cst)) {
12871238

12881239
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
12891240
// In the task-to-thread model, child tasks are always actually
@@ -1303,10 +1254,13 @@ void TaskGroupBase::resumeWaitingTask(
13031254
return;
13041255

13051256
#else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
1306-
if (statusCompletePendingReadyWaiting(assumed)) {
1257+
fprintf(stderr, "[%s:%d](%s) assumed:%s\n", __FILE_NAME__, __LINE__, __FUNCTION__, assumed.to_string(this).c_str());
1258+
fprintf(stderr, "[%s:%d](%s) had:%s\n", __FILE_NAME__, __LINE__, __FUNCTION__, this->statusString().c_str());
1259+
1260+
if (alreadyDecremented || statusCompletePendingReadyWaiting(assumed)) {
13071261
// Run the task.
13081262
auto result = PollResult::get(completedTask, hadErrorResult);
1309-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume waiting DONE, task = %p, complete with = %p, status = %s",
1263+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume waiting DONE, task = %p, complete with = %p, status = %s",
13101264
waitingTask, completedTask, statusString().c_str());
13111265

13121266
// Remove the child from the task group's running tasks list.
@@ -1670,8 +1624,8 @@ static void swift_taskGroup_waitAllImpl(
16701624
context->errorResult = nullptr;
16711625
context->successResultPointer = resultPointer;
16721626

1673-
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl, waiting task = %p, bodyError = %p, status:%s, polled.status = %s",
1674-
waitingTask, bodyError, group->statusString().c_str(), to_string(polled.status).c_str());
1627+
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl, waiting task = %p, bodyError = %p, status:%s, polled.status = %s, status.NOW=%s",
1628+
waitingTask, bodyError, group->statusString().c_str(), to_string(polled.status).c_str(), group->statusString().c_str());
16751629

16761630
switch (polled.status) {
16771631
case PollStatus::MustWait: {
@@ -1804,6 +1758,7 @@ PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask,
18041758
/*success*/ std::memory_order_release,
18051759
/*failure*/ std::memory_order_acquire)) {
18061760
statusMarkWaitingAssumeRelease();
1761+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "waitAll, marked waiting status = %s", statusString().c_str());
18071762
unlock();
18081763

18091764
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL

0 commit comments

Comments
 (0)