Skip to content

Commit 999758c

Browse files
committed
[Concurrency][TaskGroup] allow cancelAll be invoked from child tasks
1 parent 4483ac8 commit 999758c

20 files changed

+136
-116
lines changed

include/swift/ABI/TaskGroup.h

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,12 @@ namespace swift {
5151
Error = 0b11,
5252
};
5353

54-
/// Describes the status of the waiting task that is suspended on `next()`.
55-
enum class WaitStatus : uintptr_t {
56-
Waiting = 0,
57-
};
58-
5954
enum class GroupPollStatus : uintptr_t {
6055
/// The channel is known to be empty and we can immediately return nil.
6156
Empty = 0,
6257

6358
/// The task has been enqueued to the channels wait queue.
64-
Waiting = 1,
59+
MustWait = 1,
6560

6661
/// The task has completed with result (of type \c resultType).
6762
Success = 2,
@@ -331,12 +326,17 @@ namespace swift {
331326
return s;
332327
}
333328

329+
/// Add a single pending task to the status counter.
330+
/// This is used to implement next() properly, as we need to know if there
331+
/// are pending tasks worth suspending/waiting for or not.
332+
///
333+
/// Note that the group does *not* store child tasks at all, as they are
334+
/// stored in the `TaskGroupTaskStatusRecord` inside the current task, that
335+
/// is currently executing the group. Here we only need the counts of
336+
/// pending/ready tasks.
337+
///
334338
/// Returns *assumed* new status, including the just performed +1.
335-
GroupStatus statusAddPendingTaskRelaxed(
336-
// AsyncTask* pendingTask
337-
) {
338-
// assert(pendingTask->isFuture());
339-
339+
GroupStatus statusAddPendingTaskRelaxed() {
340340
auto old = status.fetch_add(GroupStatus::onePendingTask, std::memory_order_relaxed);
341341
auto s = GroupStatus {old + GroupStatus::onePendingTask };
342342

@@ -346,30 +346,13 @@ namespace swift {
346346
s = GroupStatus {o - GroupStatus::onePendingTask };
347347
}
348348

349-
fprintf(stderr, "[%s:%d] (%s): status %s\n", __FILE__, __LINE__, __FUNCTION__, s.to_string().c_str());
350-
351-
// // FIXME: we won't need the +1 in the status, just the queue?
352-
// pendingQueue.enqueue(PendingQueueItem::get(pendingTask))
353349
return s;
354350
}
355351

356352
GroupStatus statusLoadRelaxed() {
357353
return GroupStatus{status.load(std::memory_order_relaxed)};
358354
}
359355

360-
// /// Returns *assumed* new status, including the just performed +1.
361-
// GroupStatus statusAddWaitingTaskAcquire() {
362-
// auto old = status.fetch_add(GroupStatus::oneWaitingTask, std::memory_order_acquire);
363-
// return GroupStatus { old + GroupStatus::oneWaitingTask };
364-
// }
365-
366-
// /// Remove waiting task, without taking any pending task.
367-
// GroupStatus statusRemoveWaitingTask() {
368-
// return GroupStatus {
369-
// status.fetch_sub(GroupStatus::oneWaitingTask, std::memory_order_relaxed)
370-
// };
371-
// }
372-
373356
/// Compare-and-set old status to a status derived from the old one,
374357
/// by simultaneously decrementing one Pending and one Waiting tasks.
375358
///
@@ -397,7 +380,7 @@ namespace swift {
397380
/// If unable to complete the waiting task immediately (with an readily
398381
/// available completed task), either returns an `GroupPollStatus::Empty`
399382
/// result if it is known that no pending tasks in the group,
400-
/// or a `GroupPollStatus::Waiting` result if there are tasks in flight
383+
/// or a `GroupPollStatus::MustWait` result if there are tasks in flight
401384
/// and the waitingTask eventually be woken up by a completion.
402385
TaskGroup::PollResult poll(AsyncTask *waitingTask);
403386

stdlib/public/Concurrency/Task.cpp

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,7 @@ void AsyncTask::completeFuture(AsyncContext *context, ExecutorRef executor) {
111111
assert(queueHead.getStatus() == Status::Executing);
112112

113113
// If this is task group child, notify the parent group about the completion.
114-
fprintf(stderr, "[%s:%d] (%s) complete task %d\n", __FILE__, __LINE__, __FUNCTION__, this);
115114
if (hasGroupChildFragment()) {
116-
fprintf(stderr, "[%s:%d] (%s) complete group child task %d\n", __FILE__, __LINE__, __FUNCTION__, this);
117115
// then we must offer into the parent group that we completed,
118116
// so it may `next()` poll completed child tasks in completion order.
119117
auto group = groupChildFragment()->getGroup();
@@ -141,7 +139,6 @@ static void destroyTask(SWIFT_CONTEXT HeapObject *obj) {
141139

142140
// For a future, destroy the result.
143141
if (task->isFuture()) {
144-
fprintf(stderr, "[%s:%d] (%s): task: %d\n", __FILE__, __LINE__, __FUNCTION__, task);
145142
task->futureFragment()->destroy();
146143
}
147144

@@ -313,13 +310,7 @@ AsyncTaskAndContext swift::swift_task_create_group_future_f(
313310

314311
// Perform additional linking between parent and child task.
315312
if (parent) {
316-
// FIXME: we must attach children (`async let` created child tasks to the parent)
317-
// if (!flags.task_isGroupChildTask()) {
318-
// // just a normal child task
319-
// swift_task_attachChild(parent, task); // TODO: this has to be done outside of here (!!!!!!!!!!!!!!!!!!!)
320-
// } // else, group children are recorded outside
321-
322-
// if the parent was already cancelled, we carry this flag forward to the child.
313+
// If the parent was already cancelled, we carry this flag forward to the child.
323314
//
324315
// In a task group we would not have allowed the `add` to create a child anymore,
325316
// however better safe than sorry and `async let` are not expressed as task groups,

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,6 @@ void TaskGroup::destroy(AsyncTask *task) {
9191
swift_task_removeStatusRecord(task, Record);
9292
swift_task_dealloc(task, Record);
9393

94-
// TODO: need to release all waiters as well
95-
// auto waitHead = waitQueue.load(std::memory_order_acquire);
96-
// switch (waitHead.getStatus()) {
97-
// case TaskGroup::WaitStatus::Waiting:
98-
// assert(false && "destroying a task group that still has waiting tasks");
99-
// }
100-
10194
mutex.lock(); // TODO: remove fragment lock, and use status for synchronization
10295
// Release all ready tasks which are kept retained, the group destroyed,
10396
// so no other task will ever await on them anymore;
@@ -223,7 +216,7 @@ void swift::swift_task_group_wait_next(
223216
TaskGroup::PollResult polled = group->poll(waitingTask);
224217
fprintf(stderr, "[%s:%d] (%s): group polled: %d\n", __FILE__, __LINE__, __FUNCTION__, polled.status);
225218

226-
if (polled.status == TaskGroup::GroupPollStatus::Waiting) {
219+
if (polled.status == TaskGroup::GroupPollStatus::MustWait) {
227220
fprintf(stderr, "[%s:%d] (%s): group polled: WAITING\n", __FILE__, __LINE__, __FUNCTION__);
228221
// The waiting task has been queued on the channel,
229222
// there were pending tasks so it will be woken up eventually.
@@ -280,7 +273,7 @@ TaskGroup::PollResult TaskGroup::poll(AsyncTask *waitingTask) {
280273
ReadyQueueItem item;
281274
bool taskDequeued = readyQueue.dequeue(item);
282275
if (!taskDequeued) {
283-
result.status = TaskGroup::GroupPollStatus::Waiting;
276+
result.status = TaskGroup::GroupPollStatus::MustWait;
284277
result.storage = nullptr;
285278
result.retainedTask = nullptr;
286279
mutex.unlock(); // TODO: remove group lock, and use status for synchronization
@@ -336,7 +329,7 @@ TaskGroup::PollResult TaskGroup::poll(AsyncTask *waitingTask) {
336329
fprintf(stderr, "[%s:%d] (%s): added to wait queue\n", __FILE__, __LINE__, __FUNCTION__);
337330
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
338331
// no ready tasks, so we must wait.
339-
result.status = TaskGroup::GroupPollStatus::Waiting;
332+
result.status = TaskGroup::GroupPollStatus::MustWait;
340333
return result;
341334
} // else, try again
342335
}

stdlib/public/Concurrency/TaskGroup.swift

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,6 @@
1313
import Swift
1414
@_implementationOnly import _SwiftConcurrencyShims
1515

16-
//import Darwin // FIXME: remove this
17-
18-
func pprint(_ m: String, file: String = #file, line: UInt = #line) {
19-
// fputs("[\(file):\(line)] \(m)\n", stderr)
20-
}
21-
2216
// ==== Task Group -------------------------------------------------------------
2317

2418
extension Task {
@@ -69,7 +63,6 @@ extension Task {
6963
public static func withGroup<TaskResult, BodyResult>(
7064
resultType: TaskResult.Type,
7165
returning returnType: BodyResult.Type = BodyResult.self,
72-
// startingChildTasksOn executor: ExecutorRef? = nil, // TODO: actually respect it
7366
body: (inout Task.Group<TaskResult>) async throws -> BodyResult
7467
) async throws -> BodyResult {
7568
let task = Builtin.getCurrentAsyncTask()
@@ -184,6 +177,12 @@ extension Task {
184177
/// Awaiting on an empty group results in the immediate return of a `nil`
185178
/// value, without the group task having to suspend.
186179
///
180+
/// It is also possible to use `for await` to collect results of a task groups:
181+
///
182+
/// for await try value in group {
183+
/// collected += value
184+
/// }
185+
///
187186
/// ### Thread-safety
188187
/// Please note that the `group` object MUST NOT escape into another task.
189188
/// The `group.next()` MUST be awaited from the task that had originally
@@ -263,10 +262,13 @@ extension Task {
263262
/// Any results, including errors thrown by tasks affected by this
264263
/// cancellation, are silently discarded.
265264
///
265+
/// This function may be called even from within child (or any other) tasks,
266+
/// and will reliably cause the group to become cancelled.
267+
///
266268
/// - SeeAlso: `Task.addCancellationHandler`
267269
/// - SeeAlso: `Task.checkCancelled`
268270
/// - SeeAlso: `Task.isCancelled`
269-
public mutating func cancelAll() {
271+
public func cancelAll() {
270272
_taskGroupCancelAll(task: _task, group: _group)
271273
}
272274

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ static void runTaskWithPollResult(
143143
// return a `nil` here (as result of the `group.next()`)
144144
waitingTaskContext->result.storage = nullptr;
145145
break;
146-
case TaskGroup::GroupPollStatus::Waiting:
147-
assert(false && "Must not attempt to run with a Waiting result.");
146+
case TaskGroup::GroupPollStatus::MustWait:
147+
assert(false && "Must not attempt to run with a MustWait result.");
148148
}
149149

150150
// FIXME: removing children on next() resumption is not implemented yet; we keep accumulating them until the group exits

test/Concurrency/Runtime/async_task_cancellation_early.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
// REQUIRES: concurrency
55
// REQUIRES: libdispatch
66

7+
import Dispatch
8+
79
#if canImport(Darwin)
810
import Darwin
911
#elseif canImport(Glibc)
@@ -14,7 +16,7 @@ func test_runDetached_cancel_child_early() async {
1416
print(#function) // CHECK: test_runDetached_cancel_child_early
1517
let h: Task.Handle<Bool, Error> = Task.runDetached {
1618
async let childCancelled: Bool = { () -> Bool in
17-
usleep(1000 * 2)
19+
sleep(2)
1820
return await Task.__unsafeCurrentAsync().isCancelled
1921
}()
2022

test/Concurrency/Runtime/async_task_cancellation_while_running.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
// REQUIRES: concurrency
55
// REQUIRES: libdispatch
66

7+
import Dispatch
8+
79
#if canImport(Darwin)
810
import Darwin
911
#elseif canImport(Glibc)
@@ -13,7 +15,7 @@ import Glibc
1315
func test_runDetached_cancel_while_child_running() async {
1416
let h: Task.Handle<Bool, Error> = Task.runDetached {
1517
async let childCancelled: Bool = { () -> Bool in
16-
usleep(1000 * 3)
18+
sleep(3)
1719
return await Task.__unsafeCurrentAsync().isCancelled
1820
}()
1921

@@ -25,7 +27,7 @@ func test_runDetached_cancel_while_child_running() async {
2527
}
2628

2729
// sleep here, i.e. give the task a moment to start running
28-
usleep(1000 * 2)
30+
sleep(2)
2931

3032
h.cancel()
3133
print("handle cancel")

test/Concurrency/Runtime/async_task_priority_current.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency -parse-as-library) | %FileCheck --dump-input=always %s
2+
23
// REQUIRES: executable_test
34
// REQUIRES: concurrency
5+
// REQUIRES: libdispatch
6+
7+
import Dispatch
48

59
#if canImport(Darwin)
610
import Darwin
@@ -30,7 +34,7 @@ func test_multiple_lo_indirectly_escalated() async {
3034
@concurrent
3135
func loopUntil(priority: Task.Priority) async {
3236
while (await Task.__unsafeCurrentAsync().task.priority != priority) {
33-
usleep(1000 * 1)
37+
sleep(1)
3438
}
3539
}
3640

test/Concurrency/Runtime/async_taskgroup_cancelAll_only_specific_group.swift

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency -parse-as-library) | %FileCheck %s --dump-input always
2+
23
// REQUIRES: executable_test
34
// REQUIRES: concurrency
5+
// REQUIRES: libdispatch
6+
7+
import Dispatch
48

59
#if canImport(Darwin)
610
import Darwin
@@ -24,7 +28,7 @@ func test_taskGroup_cancelAll_onlySpecificGroup() async {
2428

2529
for i in 1...5 {
2630
await group.add {
27-
usleep(1000 * 1)
31+
sleep(1)
2832
let c = await Task.__unsafeCurrentAsync().isCancelled
2933
pprint("add: \(i) (cancelled: \(c))")
3034
return i
@@ -49,7 +53,7 @@ func test_taskGroup_cancelAll_onlySpecificGroup() async {
4953
let g2: Int = try! await Task.withGroup(resultType: Int.self) { group in
5054
for i in 1...3 {
5155
await group.add {
52-
usleep(1000 * 1)
56+
sleep(1)
5357
let c = await Task.__unsafeCurrentAsync().isCancelled
5458
pprint("g1 task \(i) (cancelled: \(c))")
5559
return i
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency -parse-as-library) | %FileCheck %s --dump-input always
2+
3+
// REQUIRES: executable_test
4+
// REQUIRES: concurrency
5+
// REQUIRES: libdispatch
6+
7+
import Dispatch
8+
9+
#if canImport(Darwin)
10+
import Darwin
11+
#elseif canImport(Glibc)
12+
import Glibc
13+
#endif
14+
15+
func test_taskGroup_cancel_from_inside_child() async {
16+
let result: Int = try! await Task.withGroup(resultType: Int.self) { group in
17+
let firstAdded = await group.add { [group] in // must explicitly capture, as the task executes concurrently
18+
group.cancelAll() // allowed
19+
print("first")
20+
return 1
21+
}
22+
print("firstAdded: \(firstAdded)") // CHECK: firstAdded: true
23+
24+
let one = try! await group.next()
25+
26+
let secondAdded = await group.add {
27+
print("second")
28+
return 2
29+
}
30+
print("secondAdded: \(secondAdded)") // CHECK: secondAdded: false
31+
32+
return 1
33+
}
34+
35+
print("result: \(result)") // CHECK: result: 1
36+
}
37+
38+
39+
40+
@main struct Main {
41+
static func main() async {
42+
await test_taskGroup_cancel_from_inside_child()
43+
}
44+
}

0 commit comments

Comments
 (0)