Skip to content

Commit f336404

Browse files
committed
[Concurrency] TaskGroup children remove their records as they complete
If we didn't do this (and we didn't), the tasks get released as we perform the next() impl, and move the value from the ready task to the waiting task. Then, the ready task gets destroyed. But as the task group exists, it performs a cancelAll() and that iterates over all records. Those records were not removed previously (!!!) which meant we were pointing at now deallocated tasks. Previously this worked because we didn't deallocate the tasks, so they leaked, but we didn't crash. With the memory leak fixed, this began to crash since we'd attempt to cancel already destroyed tasks. Solution: - Remove task records whenever they complete a waiting task. - This can ONLY be done by the "group owning task" itself, becuause the contract of ONLY this task being allowed to modify records. o It MUST NOT be done by the completing tasks as they complete, as it would race with the owning task modifying this linked list of child tasks in the group record.
1 parent d4ebc58 commit f336404

File tree

8 files changed

+134
-41
lines changed

8 files changed

+134
-41
lines changed

include/swift/ABI/TaskStatus.h

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@ class TaskGroupTaskStatusRecord : public TaskStatusRecord {
182182

183183
/// Attach the passed in `child` task to this group.
184184
void attachChild(AsyncTask *child) {
185-
assert(child->groupChildFragment());
186185
assert(child->hasGroupChildFragment());
187186
assert(child->groupChildFragment()->getGroup() == getGroup());
188187

@@ -211,6 +210,33 @@ class TaskGroupTaskStatusRecord : public TaskStatusRecord {
211210
cur->childFragment()->setNextChild(child);
212211
}
213212

213+
void detachChild(AsyncTask *child) {
214+
assert(child && "cannot remove a null child from group");
215+
if (FirstChild == child) {
216+
FirstChild = getNextChildTask(child);
217+
return;
218+
}
219+
220+
AsyncTask *prev = FirstChild;
221+
// Remove the child from the linked list, i.e.:
222+
// prev -> afterPrev -> afterChild
223+
// ==
224+
// child -> afterChild
225+
// Becomes:
226+
// prev --------------> afterChild
227+
while (prev) {
228+
auto afterPrev = getNextChildTask(prev);
229+
230+
if (afterPrev == child) {
231+
auto afterChild = getNextChildTask(child);
232+
prev->childFragment()->setNextChild(afterChild);
233+
return;
234+
}
235+
236+
prev = afterPrev;
237+
}
238+
}
239+
214240
static AsyncTask *getNextChildTask(AsyncTask *task) {
215241
return task->childFragment()->getNextChild();
216242
}

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the Swift.org open source project
44
//
5-
// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
5+
// Copyright (c) 2014 - 2021 Apple Inc. and the Swift project authors
66
// Licensed under Apache License v2.0 with Runtime Library Exception
77
//
88
// See https://swift.org/LICENSE.txt for license information
@@ -279,7 +279,7 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
279279

280280
private:
281281

282-
// // TODO: move to lockless via the status atomic
282+
// TODO: move to lockless via the status atomic (make readyQueue an mpsc_queue_t<ReadyQueueItem>)
283283
mutable std::mutex mutex;
284284

285285
/// Used for queue management, counting number of waiting and ready tasks
@@ -290,7 +290,6 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
290290
/// The low bits contain the status, the rest of the pointer is the
291291
/// AsyncTask.
292292
NaiveQueue<ReadyQueueItem> readyQueue;
293-
// mpsc_queue_t<ReadyQueueItem> readyQueue; // TODO: can we get away with an MPSC queue here once actor executors land?
294293

295294
/// Single waiting `AsyncTask` currently waiting on `group.next()`,
296295
/// or `nullptr` if no task is currently waiting.
@@ -305,10 +304,8 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
305304
: TaskGroupTaskStatusRecord(),
306305
status(GroupStatus::initial().status),
307306
readyQueue(),
308-
// readyQueue(ReadyQueueItem::get(ReadyStatus::Empty, nullptr)),
309307
waitQueue(nullptr), successType(T) {}
310308

311-
312309
TaskGroupTaskStatusRecord *getTaskRecord() {
313310
return reinterpret_cast<TaskGroupTaskStatusRecord *>(this);
314311
}
@@ -472,11 +469,18 @@ static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T)
472469

473470
// =============================================================================
474471
// ==== add / attachChild ------------------------------------------------------
472+
475473
SWIFT_CC(swift)
476474
static void swift_taskGroup_attachChildImpl(TaskGroup *group,
477475
AsyncTask *child) {
476+
SWIFT_TASK_DEBUG_LOG("attach child task = %p to group = %p\n",
477+
child, group);
478+
479+
// The counterpart of this (detachChild) is performed by the group itself,
480+
// when it offers the completed (child) task's value to a waiting task -
481+
// during the implementation of `await group.next()`.
478482
auto groupRecord = asImpl(group)->getTaskRecord();
479-
return groupRecord->attachChild(child);
483+
groupRecord->attachChild(child);
480484
}
481485

482486
// =============================================================================
@@ -506,8 +510,7 @@ bool TaskGroup::isCancelled() {
506510
}
507511

508512
static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
509-
PollResult result,
510-
bool releaseResultRetainedTask) {
513+
PollResult result) {
511514
/// Fill in the result value
512515
switch (result.status) {
513516
case PollStatus::MustWait:
@@ -516,7 +519,7 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
516519

517520
case PollStatus::Error: {
518521
context->fillWithError(reinterpret_cast<SwiftError *>(result.storage));
519-
break;
522+
return;
520523
}
521524

522525
case PollStatus::Success: {
@@ -528,28 +531,17 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
528531
// remaining references to it.
529532
successType->vw_initializeWithCopy(destPtr, result.storage);
530533
successType->vw_storeEnumTagSinglePayload(destPtr, 0, 1);
531-
break;
534+
return;
532535
}
533536

534537
case PollStatus::Empty: {
535538
// Initialize the result as a nil Optional<Success>.
536539
const Metadata *successType = result.successType;
537540
OpaqueValue *destPtr = context->successResultPointer;
538541
successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1);
539-
break;
542+
return;
540543
}
541544
}
542-
543-
// We only release if asked to; This is because this function is called in two
544-
// cases "immediately":
545-
// a) when a completed task arrives and a waiting one existed then we don't
546-
// need to retain the completed task at all, thus we also don't release it.
547-
// b) when the task was stored in the readyQueue it was retained. As a
548-
// waitingTask arrives we will fill-in with the value from the retained
549-
// task. In this situation we must release the ready task, to allow it to
550-
// be destroyed.
551-
if (releaseResultRetainedTask)
552-
swift_release(result.retainedTask);
553545
}
554546

555547
void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
@@ -558,7 +550,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
558550
assert(completedTask->hasChildFragment());
559551
assert(completedTask->hasGroupChildFragment());
560552
assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this));
561-
SWIFT_TASK_DEBUG_LOG("offer task %p to group %p\n", completedTask, group);
553+
SWIFT_TASK_DEBUG_LOG("offer task %p to group %p", completedTask, this);
562554

563555
mutex.lock(); // TODO: remove fragment lock, and use status for synchronization
564556

@@ -604,7 +596,9 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
604596
static_cast<TaskFutureWaitAsyncContext *>(
605597
waitingTask->ResumeContext);
606598

607-
fillGroupNextResult(waitingContext, result, /*release*/false);
599+
fillGroupNextResult(waitingContext, result);
600+
detachChild(result.retainedTask);
601+
608602
_swift_tsan_acquire(static_cast<Job *>(waitingTask));
609603

610604
// TODO: allow the caller to suggest an executor
@@ -622,11 +616,12 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
622616
// queue when a task polls during next() it will notice that we have a value
623617
// ready for it, and will process it immediately without suspending.
624618
assert(!waitQueue.load(std::memory_order_relaxed));
625-
SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, store ready task = %p",
619+
SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, RETAIN and store ready task = %p",
626620
completedTask);
627621
// Retain the task while it is in the queue;
628622
// it must remain alive until the task group is alive.
629623
swift_retain(completedTask);
624+
630625
auto readyItem = ReadyQueueItem::get(
631626
hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success,
632627
completedTask
@@ -692,7 +687,7 @@ static void swift_taskGroup_wait_next_throwingImpl(
692687
PollResult polled = group->poll(waitingTask);
693688
switch (polled.status) {
694689
case PollStatus::MustWait:
695-
SWIFT_TASK_DEBUG_LOG("poll group = %p, no ready tasks, waiting task = %p\n",
690+
SWIFT_TASK_DEBUG_LOG("poll group = %p, no ready tasks, waiting task = %p",
696691
group, waitingTask);
697692
// The waiting task has been queued on the channel,
698693
// there were pending tasks so it will be woken up eventually.
@@ -706,15 +701,22 @@ static void swift_taskGroup_wait_next_throwingImpl(
706701
case PollStatus::Empty:
707702
case PollStatus::Error:
708703
case PollStatus::Success:
709-
SWIFT_TASK_DEBUG_LOG("poll group = %p, task = %p, ready task available = %p\n",
704+
SWIFT_TASK_DEBUG_LOG("poll group = %p, task = %p, ready task available = %p",
710705
group, waitingTask, polled.retainedTask);
711-
fillGroupNextResult(context, polled, /*release*/true);
706+
fillGroupNextResult(context, polled);
707+
if (auto completedTask = polled.retainedTask) {
708+
// it would be null for PollStatus::Empty, then we don't need to release
709+
group->detachChild(polled.retainedTask);
710+
swift_release(polled.retainedTask);
711+
}
712+
712713
return waitingTask->runInFullyEstablishedContext();
713714
}
714715
}
715716

716717
PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
717718
mutex.lock(); // TODO: remove group lock, and use status for synchronization
719+
SWIFT_TASK_DEBUG_LOG("poll group = %p", this);
718720
auto assumed = statusMarkWaitingAssumeAcquire();
719721

720722
PollResult result;
@@ -724,6 +726,7 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
724726

725727
// ==== 1) bail out early if no tasks are pending ----------------------------
726728
if (assumed.isEmpty()) {
729+
SWIFT_TASK_DEBUG_LOG("poll group = %p, group is empty, no pending tasks", this);
727730
// No tasks in flight, we know no tasks were submitted before this poll
728731
// was issued, and if we parked here we'd potentially never be woken up.
729732
// Bail out and return `nil` from `group.next()`.
@@ -741,6 +744,9 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
741744

742745
// ==== 2) Ready task was polled, return with it immediately -----------------
743746
if (assumed.readyTasks()) {
747+
SWIFT_TASK_DEBUG_LOG("poll group = %p, group has ready tasks = %d",
748+
this, assumed.readyTasks());
749+
744750
auto assumedStatus = assumed.status;
745751
auto newStatus = TaskGroupImpl::GroupStatus{assumedStatus};
746752
if (status.compare_exchange_weak(
@@ -844,13 +850,16 @@ static void swift_taskGroup_cancelAllImpl(TaskGroup *group) {
844850
}
845851

846852
bool TaskGroupImpl::cancelAll() {
853+
SWIFT_TASK_DEBUG_LOG("cancel all tasks in group = %p", this);
854+
847855
// store the cancelled bit
848856
auto old = statusCancel();
849857
if (old.isCancelled()) {
850858
// already was cancelled previously, nothing to do?
851859
return false;
852860
}
853861

862+
// FIXME: must also remove the records!!!!
854863
// cancel all existing tasks within the group
855864
swift_task_cancel_group_child_tasks(asAbstract(this));
856865
return true;

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
namespace swift {
4343

4444
// Set to 1 to enable helpful debug spew to stderr
45+
// If this is enabled, tests with `swift_task_debug_log` requirement can run.
4546
#if 0
4647
#define SWIFT_TASK_DEBUG_LOG(fmt, ...) \
4748
fprintf(stderr, "[%lu] [%s:%d](%s) " fmt "\n", \

stdlib/public/Concurrency/TaskStatus.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,8 @@ static bool swift_task_tryAddStatusRecordImpl(TaskStatusRecord *newRecord) {
354354
SWIFT_CC(swift)
355355
static bool swift_task_removeStatusRecordImpl(TaskStatusRecord *record) {
356356
auto task = swift_task_getCurrent();
357+
SWIFT_TASK_DEBUG_LOG("remove status record = %p, from current task = %p",
358+
record, task);
357359

358360
// Load the current state.
359361
auto &status = task->_private().Status;
@@ -454,6 +456,8 @@ static ChildTaskStatusRecord*
454456
swift_task_attachChildImpl(AsyncTask *child) {
455457
void *allocation = malloc(sizeof(swift::ChildTaskStatusRecord));
456458
auto record = new (allocation) swift::ChildTaskStatusRecord(child);
459+
SWIFT_TASK_DEBUG_LOG("attach child task = %p, record = %p, to current task = %p",
460+
child, record, swift_task_getCurrent());
457461
swift_task_addStatusRecord(record);
458462
return record;
459463
}
@@ -548,6 +552,7 @@ static void performGroupCancellationAction(TaskStatusRecord *record) {
548552

549553
SWIFT_CC(swift)
550554
static void swift_task_cancelImpl(AsyncTask *task) {
555+
SWIFT_TASK_DEBUG_LOG("cancel task = %p", task);
551556
Optional<StatusRecordLockRecord> recordLockRecord;
552557

553558
// Acquire the status record lock.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) 2>&1 | %FileCheck %s --dump-input=always
2+
// REQUIRES: executable_test
3+
// REQUIRES: concurrency
4+
// REQUIRES: swift_task_debug_log
5+
6+
// UNSUPPORTED: use_os_stdlib
7+
// UNSUPPORTED: back_deployment_runtime
8+
9+
#if os(Linux)
10+
import Glibc
11+
#elseif os(Windows)
12+
import MSVCRT
13+
#else
14+
import Darwin
15+
#endif
16+
17+
func test_withUnsafeCurrentTask() async {
18+
// The task we're running in ("main")
19+
// CHECK: creating task [[MAIN_TASK:0x.*]] with parent 0x0
20+
21+
// CHECK: creating task [[TASK:0x.*]] with parent 0x0
22+
let t = Task.detached {
23+
withUnsafeCurrentTask { task in
24+
fputs("OK: \(task!)", stderr)
25+
}
26+
fputs("DONE", stderr)
27+
}
28+
29+
// CHECK: OK: UnsafeCurrentTask(_task: (Opaque Value))
30+
// CHECK: DONE
31+
// CHECK: destroy task [[TASK]]
32+
await t.value
33+
}
34+
35+
@main struct Main {
36+
static func main() async {
37+
await test_withUnsafeCurrentTask()
38+
}
39+
}

test/Concurrency/Runtime/async_taskgroup_dontLeakTasks.swift

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) | %FileCheck %s
1+
// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) 2>&1 | %FileCheck %s --dump-input=always
22
// REQUIRES: executable_test
33
// REQUIRES: concurrency
4+
// REQUIRES: swift_task_debug_log
5+
46
// UNSUPPORTED: use_os_stdlib
57
// UNSUPPORTED: back_deployment_runtime
6-
// UNSUPPORTED: linux
78

89
#if os(Linux)
910
import Glibc
@@ -13,10 +14,16 @@ import MSVCRT
1314
import Darwin
1415
#endif
1516

16-
@available(SwiftStdlib 5.5, *)
1717
func test_taskGroup_next() async {
18+
// CHECK: creating task [[MAIN_TASK:0x.*]] with parent 0x0
19+
// CHECK: creating task [[GROUP_TASK_1:0x.*]] with parent [[MAIN_TASK]]
20+
// CHECK: creating task [[GROUP_TASK_2:0x.*]] with parent [[MAIN_TASK]]
21+
// CHECK: creating task [[GROUP_TASK_3:0x.*]] with parent [[MAIN_TASK]]
22+
// CHECK: creating task [[GROUP_TASK_4:0x.*]] with parent [[MAIN_TASK]]
23+
// CHECK: creating task [[GROUP_TASK_5:0x.*]] with parent [[MAIN_TASK]]
24+
1825
_ = await withTaskGroup(of: Int.self, returning: Int.self) { group in
19-
for n in 0..<100 {
26+
for n in 0..<5 {
2027
group.spawn {
2128
return n
2229
}
@@ -30,12 +37,18 @@ func test_taskGroup_next() async {
3037

3138
return sum
3239
}
33-
34-
// CHECK: result with group.next(): 100
35-
print("result with group.next(): \(100)")
40+
// as we exit the group, it must be guaranteed that its child tasks were destroyed
41+
//
42+
// NOTE: there is no great way to express "any of GROUP_TASK_n",
43+
// so we just check that 5 tasks were destroyed
44+
//
45+
// CHECK: destroy task [[DESTROY_GROUP_TASK_1:0x.*]]
46+
// CHECK: destroy task [[DESTROY_GROUP_TASK_2:0x.*]]
47+
// CHECK: destroy task [[DESTROY_GROUP_TASK_3:0x.*]]
48+
// CHECK: destroy task [[DESTROY_GROUP_TASK_4:0x.*]]
49+
// CHECK: destroy task [[DESTROY_GROUP_TASK_5:0x.*]]
3650
}
3751

38-
@available(SwiftStdlib 5.5, *)
3952
@main struct Main {
4053
static func main() async {
4154
await test_taskGroup_next()

test/Concurrency/Runtime/async_taskgroup_throw_recover.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func test_taskGroup_throws() async {
4545
return third
4646

4747
case .failure(let error):
48-
fatalError("got an erroneous third result")
48+
fatalError("got an erroneous third result: \(error)")
4949

5050
case .none:
5151
print("task group failed to get 3")

0 commit comments

Comments
 (0)