Skip to content

Commit a346953

Browse files
committed
[Concurrency][TaskGroup] example "map async" using task groups
1 parent 47da04f commit a346953

File tree

1 file changed

+52
-0
lines changed

1 file changed

+52
-0
lines changed

test/Concurrency/async_task_groups.swift

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,55 @@ func test_taskGroup_quorum_thenCancel() async {
172172
_ = await gatherQuorum(followers: [Follower("A"), Follower("B"), Follower("C")])
173173
}
174174

175+
extension Collection {
176+
177+
// DEMO-05-03: structured spawning of much work
178+
// func map <T>( _ transform: (Element) throws -> T) rethrows -> [T] {
179+
// func mapAsync <T>(parallelism: Int = 0/*system default*/, _ transform: (Element) async throws -> T) async rethrows -> [T] {
180+
func mapAsync<T>(
181+
parallelism requestedParallelism: Int? = nil/*system default*/,
182+
// ordered: Bool = true, /
183+
_ transform: (Element) async throws -> T
184+
) async rethrows -> [T] {
185+
let defaultParallelism = 2
186+
let parallelism = requestedParallelism ?? defaultParallelism
187+
188+
let n = self.count
189+
if n == 0 {
190+
return []
191+
}
192+
193+
return await try Task.withGroup(resultType: (Int, T).self) { group in
194+
var result = ContiguousArray<T>()
195+
result.reserveCapacity(n)
196+
197+
var i = self.startIndex
198+
var submitted = 0
199+
200+
func submitNext() async throws {
201+
await group.add {
202+
let value = await try transform(self[i])
203+
return (submitted, value)
204+
}
205+
submitted += 1
206+
formIndex(after: &i)
207+
}
208+
209+
// submit first initial tasks
210+
for _ in 0..<parallelism {
211+
await try submitNext()
212+
}
213+
214+
while let (index, taskResult) = await try group.next() {
215+
result[index] = taskResult
216+
217+
try await Task.checkCancellation()
218+
await try submitNext()
219+
}
220+
221+
assert(result.count == n)
222+
return Array(result)
223+
}
224+
}
225+
}
226+

0 commit comments

Comments
 (0)