Skip to content

Commit 9b5fda5

Browse files
committed
Add asyncForEach to AsyncSequence
1 parent 44b33db commit 9b5fda5

File tree

2 files changed

+74
-0
lines changed

2 files changed

+74
-0
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
extension AsyncSequence where Element: Sendable {
2+
/// An async function of `forEach`.
3+
///
4+
/// This is an example of a behavior.
5+
/// ```swift
6+
/// let asyncSequence = AsyncStream { c in
7+
/// (0..<5).forEach { c.yield($0) }
8+
/// c.finish()
9+
/// }
10+
/// await asyncSequence.asyncForEach(numberOfConcurrentTasks: 3) { @MainActor number in
11+
/// print("Start: \(number)")
12+
/// await Task.yield()
13+
/// print("end: \(number)")
14+
/// }
15+
/// // Start: 0
16+
/// // Start: 1
17+
/// // Start: 2
18+
/// // End: 0
19+
/// // End: 1
20+
/// // Start: 3
21+
/// // End: 2
22+
/// // End: 3
23+
/// // Start: 4
24+
/// // End: 4
25+
/// ```
26+
/// - Parameters:
27+
/// - numberOfConcurrentTasks: A number of concurrent tasks. the given `body` closure run in parallel when the value is 2 or more.
28+
/// - priority: A priority of the giving closure.
29+
/// - body: A similar closure with `forEach`'s one, but it's async.
30+
public func asyncForEach(
31+
numberOfConcurrentTasks: UInt = numberOfConcurrentTasks,
32+
priority: TaskPriority? = nil,
33+
_ body: @escaping @Sendable (Element) async throws -> Void
34+
) async rethrows {
35+
try await withThrowingOrderedTaskGroup(of: Void.self) { group in
36+
var counter = 0
37+
var asyncIterator = self.makeAsyncIterator()
38+
while let element = try await asyncIterator.next() {
39+
if counter < numberOfConcurrentTasks {
40+
group.addTask(priority: priority) {
41+
try await body(element)
42+
}
43+
counter += 1
44+
} else {
45+
try await group.next()
46+
group.addTask(priority: priority) {
47+
try await body(element)
48+
}
49+
}
50+
}
51+
}
52+
}
53+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import AsyncOperations
2+
import XCTest
3+
4+
final class AsyncSequenceAsyncForEachTests: XCTestCase {
5+
@MainActor
6+
func testAsyncForEach() async throws {
7+
var results: [Int] = []
8+
9+
let asyncSequence = AsyncStream { c in
10+
(0..<5).forEach { c.yield($0) }
11+
c.finish()
12+
}
13+
14+
await asyncSequence.asyncForEach(numberOfConcurrentTasks: 3) { @MainActor number in
15+
await Task.yield()
16+
results.append(number)
17+
}
18+
XCTAssertEqual(results.count, 5)
19+
XCTAssertEqual(Set(results), [0, 1, 2, 3, 4])
20+
}
21+
}

0 commit comments

Comments
 (0)