Skip to content

Commit 35acd40

Browse files
DougGregorktoso
authored andcommitted
+concurrency task groups initial work in progress
1 parent a346953 commit 35acd40

File tree

5 files changed

+224
-83
lines changed

5 files changed

+224
-83
lines changed

stdlib/public/Concurrency/Task.swift

Lines changed: 15 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ extension Task {
148148
/// This is a port of the C++ FlagSet.
149149
struct JobFlags {
150150
/// Kinds of schedulable jobs.
151-
enum Kind : Int {
151+
enum Kind: Int {
152152
case task = 0
153153
};
154154

@@ -215,54 +215,6 @@ extension Task {
215215
// ==== Detached Tasks ---------------------------------------------------------
216216

217217
extension Task {
218-
/// Run given `operation` as part of a new top-level task.
219-
///
220-
/// Creating detached tasks should, generally, be avoided in favor of using
221-
/// `async` functions, `async let` declarations and `await` expressions - as
222-
/// those benefit from structured, bounded concurrency which is easier to reason
223-
/// about, as well as automatically inheriting the parent tasks priority,
224-
/// task-local storage, deadlines, as well as being cancelled automatically
225-
/// when their parent task is cancelled. Detached tasks do not get any of those
226-
/// benefits, and thus should only be used when an operation is impossible to
227-
/// be modelled with child tasks.
228-
///
229-
/// ### Cancellation
230-
/// A detached task always runs to completion unless it is explicitly cancelled.
231-
/// Specifically, dropping a detached tasks `Task.Handle` does _not_ automatically
232-
/// cancel given task.
233-
///
234-
/// Canceling a task must be performed explicitly via `handle.cancel()`.
235-
///
236-
/// - Note: it is generally preferable to use child tasks rather than detached
237-
/// tasks. Child tasks automatically carry priorities, task-local state,
238-
/// deadlines and have other benefits resulting from the structured
239-
/// concurrency concepts that they model. Consider using detached tasks only
240-
/// when strictly necessary and impossible to model operations otherwise.
241-
///
242-
/// - Parameters:
243-
/// - priority: priority of the task TODO: reword and define more explicitly once we have priorities well-defined
244-
/// - operation: the operation to execute
245-
/// - Returns: handle to the task, allowing to `await handle.get()` on the
246-
/// tasks result or `cancel` it.
247-
public static func runDetached<T>(
248-
priority: Priority = .default,
249-
operation: @escaping () async -> T
250-
) -> Handle<T> {
251-
// Set up the job flags for a new task.
252-
var flags = JobFlags()
253-
flags.kind = .task
254-
flags.priority = priority
255-
flags.isFuture = true
256-
257-
// Create the asynchronous task future.
258-
let (task, _) = Builtin.createAsyncTaskFuture(flags.bits, nil, operation)
259-
260-
// Enqueue the resulting job.
261-
_enqueueJobGlobal(Builtin.convertTaskToJob(task))
262-
263-
return Handle<T>(task: task)
264-
}
265-
266218
/// Run given throwing `operation` as part of a new top-level task.
267219
///
268220
/// Creating detached tasks should, generally, be avoided in favor of using
@@ -288,7 +240,7 @@ extension Task {
288240
/// when strictly necessary and impossible to model operations otherwise.
289241
///
290242
/// - Parameters:
291-
/// - priority: priority of the task TODO: reword and define more explicitly once we have priorities well-defined
243+
/// - priority: priority of the task
292244
/// - operation: the operation to execute
293245
/// - Returns: handle to the task, allowing to `await handle.get()` on the
294246
/// tasks result or `cancel` it. If the operation fails the handle will
@@ -405,6 +357,19 @@ public func _runChildTask<T>(operation: @escaping () async throws -> T) async
405357
return task
406358
}
407359

360+
//@_silgen_name("swift_task_get_priority") // TODO: not quite this way?
361+
//public func getTaskPriority(_ task: __owned Builtin.NativeObject) -> JobPriority
362+
363+
struct RawTaskFutureWaitResult {
364+
let hadErrorResult: Bool
365+
let storage: UnsafeRawPointer
366+
}
367+
368+
@_silgen_name("swift_task_future_wait")
369+
func taskFutureWait(
370+
on task: Builtin.NativeObject
371+
) async -> RawTaskFutureWaitResult
372+
408373
#if _runtime(_ObjC)
409374

410375
/// Intrinsic used by SILGen to launch a task for bridging a Swift async method

stdlib/public/Concurrency/TaskGroup.swift

Lines changed: 129 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
////===----------------------------------------------------------------------===//
1212

1313
import Swift
14+
import Dispatch
1415
@_implementationOnly import _SwiftConcurrencyShims
1516

1617
// ==== Task Group -------------------------------------------------------------
@@ -32,6 +33,12 @@ extension Task {
3233
/// // some accumulation logic (e.g. sum += result)
3334
/// }
3435
///
36+
/// ### Thrown errors
37+
/// When tasks are added to the group using the `group.add` function, they may
38+
/// immediately begin executing. Even if their results are not collected explicitly
39+
/// and such task throws, and was not yet cancelled, it may result in the `withGroup`
40+
/// throwing.
41+
///
3542
/// ### Cancellation
3643
/// If an error is thrown out of the task group, all of its remaining tasks
3744
/// will be cancelled and the `withGroup` call will rethrow that error.
@@ -57,61 +64,149 @@ extension Task {
5764
public static func withGroup<TaskResult, BodyResult>(
5865
resultType: TaskResult.Type,
5966
returning returnType: BodyResult.Type = BodyResult.self,
60-
body: (inout Task.Group<TaskResult>) async throws -> BodyResult
61-
) async rethrows -> BodyResult {
62-
fatalError("\(#function) not implemented yet.")
67+
cancelOutstandingTasksOnReturn: Bool = false,
68+
body: @escaping ((inout Task.Group<TaskResult>) async throws -> BodyResult)
69+
) async throws -> BodyResult {
70+
let drainPendingTasksOnSuccessfulReturn = !cancelOutstandingTasksOnReturn
71+
let parent = Builtin.getCurrentAsyncTask()
72+
73+
// Set up the job flags for a new task.
74+
var groupFlags = JobFlags()
75+
groupFlags.kind = .task // TODO: .taskGroup?
76+
groupFlags.priority = .default // TODO: parent's priority // await Task.currentPriority()
77+
groupFlags.isFuture = true
78+
79+
// 1. Prepare the Group task
80+
// FIXME: do we have to rather prepare it inside the task we spawn; and yield it back along with the result instead?
81+
var group = Task.Group<TaskResult>(parentTask: parent)
82+
83+
let (groupTask, context) =
84+
Builtin.createAsyncTaskFuture(groupFlags.bits, nil) { () async throws -> BodyResult in
85+
let got = await try body(&group)
86+
return got
87+
}
88+
let groupHandle = Handle<BodyResult>(task: groupTask)
89+
90+
// 2.0) Run the task!
91+
DispatchQueue.global(priority: .default).async {
92+
groupHandle.run() // TODO: this synchronously runs
93+
}
94+
95+
// 2.1) ensure that if we fail and exit by throwing we will cancel all tasks,
96+
// if we succeed, there is nothing to cancel anymore so this is noop
97+
defer { group.cancelAll() }
98+
99+
// 2.2) Await the group completing it's run ("until the withGroup returns")
100+
let result = await try groupHandle.get() // if we throw, so be it -- group tasks will be cancelled
101+
102+
// TODO: do drain before exiting
103+
// if drainPendingTasksOnSuccessfulReturn {
104+
// // drain all outstanding tasks
105+
// while await try group.next() != nil {
106+
// continue // awaiting all remaining tasks
107+
// }
108+
// }
109+
110+
return result
63111
}
64112

65113
/// A task group serves as storage for dynamically started tasks.
66114
///
67-
/// Its intended use is with the
115+
/// Its intended use is with the `Task.withGroup` function.
68116
/* @unmoveable */
69117
public struct Group<TaskResult> {
118+
private let parentTask: Builtin.NativeObject
119+
120+
// TODO: we want groups to be unordered in completion, the counterpart to streams (Series),
121+
// as such it feels like we need to keep them like this, because a next() can complete any of them
122+
// and then we need to remove it from the pending ones
123+
private var pendingTasks: [Int: Handle<TaskResult>] // TODO: make a dict for out of order completions
124+
private var nextTaskID: Int = 0
125+
126+
/// If present, the handle on which the `next()` call is awaiting,
127+
/// it should be resumed by *any* of the in-flight tasks completing.
128+
private var nextHandle: Task.Handle<TaskResult>? = nil
129+
70130
/// No public initializers
71-
private init() {}
131+
init(parentTask: Builtin.NativeObject) {
132+
self.parentTask = parentTask
133+
self.pendingTasks = [:]
134+
}
72135

73136
// Swift will statically prevent this type from being copied or moved.
74137
// For now, that implies that it cannot be used with generics.
75138

76139
/// Add a child task to the group.
77140
///
78141
/// ### Error handling
79-
/// Operations are allowed to throw.
80-
///
81-
/// in which case the `await try next()`
142+
/// Operations are allowed to `throw`, in which case the `await try next()`
82143
/// invocation corresponding to the failed task will re-throw the given task.
83144
///
145+
/// The `add` function will never (re)-throw exceptions from the `operation`,
146+
/// the corresponding `next()` call will throw the error when necessary.
147+
///
84148
/// - Parameters:
85149
/// - overridingPriority: override priority of the operation task
86150
/// - operation: operation to execute and add to the group
151+
@discardableResult
87152
public mutating func add(
88153
overridingPriority: Priority? = nil,
89-
operation: () async throws -> TaskResult
90-
) async {
91-
fatalError("\(#function) not implemented yet.")
92-
}
154+
operation: @escaping () async throws -> TaskResult
155+
) async -> Task.Handle<TaskResult> {
156+
var flags = JobFlags()
157+
flags.kind = .task // TODO: childTask?
158+
flags.priority = .default // TODO: priority getting from parent not implemented yet
159+
// if let overridingPriority = overridingPriority { // TODO: cannot use ?? with async defaultValue
160+
// flags.priority = overridingPriority
161+
// } else {
162+
// flags.priority = await Task.currentPriority() // TODO: self.parent.priority ?
163+
// }
164+
flags.isFuture = true
93165

94-
/// Add a child task and return a `Task.Handle` that can be used to manage it.
95-
///
96-
/// The task's result is accessible either via the returned `handle` or the
97-
/// `group.next()` function (as any other `add`-ed task).
98-
///
99-
/// - Parameters:
100-
/// - overridingPriority: override priority of the operation task
101-
/// - operation: operation to execute and add to the group
102-
public mutating func addWithHandle(
103-
overridingPriority: Priority? = nil,
104-
operation: () async throws -> TaskResult
105-
) async -> Handle<TaskResult> {
106-
fatalError("\(#function) not implemented yet.")
166+
let (childTask, context) =
167+
// TODO: passing the parentTask (instead of nil) here makes the program hang here
168+
Builtin.createAsyncTaskFuture(flags.bits, nil, operation)
169+
170+
let handle = Handle<TaskResult>(task: childTask)
171+
172+
// runTask(childTask)
173+
DispatchQueue.global(priority: .default).async {
174+
handle.run()
175+
}
176+
177+
// _ = DispatchQueue.global(priority: .default).async {
178+
// print("run dispatch INSIDE: \(childTask)")
179+
// await try operation()
180+
// }
181+
182+
// FIXME: need to store? self.pendingTasks[ObjectIdentifier(childTask)] = childTask
183+
184+
defer { nextTaskID += 1 }
185+
self.pendingTasks[nextTaskID] = handle
186+
187+
return handle
107188
}
108189

109190
/// Wait for a child task to complete and return the result it returned,
110191
/// or else return.
111192
///
112-
///
113-
public mutating func next() async throws -> TaskResult? {
114-
fatalError("\(#function) not implemented yet.")
193+
/// Order of completions is *not* guaranteed to be same as submission order,
194+
/// rather the order of `next()` calls completing is by completion order of
195+
/// the tasks. This differentiates task groups from streams (
196+
public mutating func next(file: String = #file, line: UInt = #line) async throws -> TaskResult? {
197+
// FIXME: this implementation is wrong and naive; we instead need to maintain a dict of handles,
198+
// and return them as they complete in that order; so likely a queue of "which one completed"
199+
// this will allow building "collect first N results" APIs easily;
200+
// APIs which need order can implement on top of this, or we provide a different API for it
201+
let handle = self.pendingTasks.removeValue(forKey: 0) ??
202+
self.pendingTasks.removeValue(forKey: 1)
203+
204+
if let handle = handle {
205+
let got = await try handle.get()
206+
return got
207+
} else {
208+
return nil
209+
}
115210
}
116211

117212
/// Query whether the group has any remaining tasks.
@@ -122,7 +217,7 @@ extension Task {
122217
///
123218
/// - Returns: `true` if the group has no pending tasks, `false` otherwise.
124219
public var isEmpty: Bool {
125-
fatalError("\(#function) not implemented yet.")
220+
return self.pendingTasks.isEmpty
126221
}
127222

128223
/// Cancel all the remaining tasks in the group.
@@ -133,8 +228,11 @@ extension Task {
133228
/// cancellation, are silently discarded.
134229
///
135230
/// - SeeAlso: `Task.addCancellationHandler`
136-
public mutating func cancelAll() {
137-
fatalError("\(#function) not implemented yet.")
231+
public mutating func cancelAll(file: String = #file, line: UInt = #line) {
232+
for (id, handle) in self.pendingTasks {
233+
handle.cancel()
234+
}
235+
self.pendingTasks = [:]
138236
}
139237
}
140238
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency) | %FileCheck %s --dump-input always
2+
// REQUIRES: executable_test
3+
// REQUIRES: concurrency
4+
// REQUIRES: OS=macosx
5+
6+
import Dispatch
7+
8+
// ==== ------------------------------------------------------------------------
9+
// MARK: "Infrastructure" for the tests
10+
11+
extension DispatchQueue {
12+
func async<R>(operation: @escaping () async -> R) -> Task.Handle<R> {
13+
let handle = Task.runDetached(operation: operation)
14+
15+
// Run the task
16+
_ = { self.async { handle.run() } }() // force invoking the non-async version
17+
18+
return handle
19+
}
20+
}
21+
22+
@available(*, deprecated, message: "This is a temporary hack")
23+
func launch<R>(operation: @escaping () async -> R) -> Task.Handle<R> {
24+
let handle = Task.runDetached(operation: operation)
25+
26+
// Run the task
27+
_ = DispatchQueue.global(priority: .default).async { handle.run() }
28+
29+
return handle
30+
}
31+
32+
// ==== ------------------------------------------------------------------------
33+
// MARK: Tests
34+
35+
func test_taskGroup_01_sum() {
36+
let numbers = [1, 2]
37+
let expected = numbers.reduce(0, +)
38+
39+
let taskHandle = launch { () async -> Int in
40+
return await try! Task.withGroup(resultType: Int.self) { (group) async -> Int in
41+
for n in numbers {
42+
await group.add { () async -> Int in n }
43+
}
44+
45+
var sum = 0
46+
while let r = await try! group.next() {
47+
print("next: \(r)")
48+
sum += r
49+
}
50+
51+
print("task group returning: \(sum)")
52+
return sum
53+
}
54+
}
55+
56+
// CHECK: main task
57+
// CHECK: next: 1
58+
// CHECK: next: 2
59+
// CHECK: task group returning: 3
60+
61+
launch { () async in
62+
let sum = await try! taskHandle.get()
63+
// CHECK: result: 3
64+
print("result: \(sum)")
65+
assert(expected == sum)
66+
exit(0)
67+
}
68+
69+
print("main task")
70+
}
71+
72+
test_taskGroup_01_sum()
73+
74+
dispatchMain()

0 commit comments

Comments
 (0)