Skip to content

Commit a882201

Browse files
authored
Merge pull request #4 from mtj0928/async-sequence
Add asyncForEach to AsyncSequence
2 parents e57aee2 + 9b5fda5 commit a882201

27 files changed

+83
-9
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
extension AsyncSequence where Element: Sendable {
2+
/// An async function of `forEach`.
3+
///
4+
/// This is an example of a behavior.
5+
/// ```swift
6+
/// let asyncSequence = AsyncStream { c in
7+
/// (0..<5).forEach { c.yield($0) }
8+
/// c.finish()
9+
/// }
10+
/// await asyncSequence.asyncForEach(numberOfConcurrentTasks: 3) { @MainActor number in
11+
/// print("Start: \(number)")
12+
/// await Task.yield()
13+
/// print("end: \(number)")
14+
/// }
15+
/// // Start: 0
16+
/// // Start: 1
17+
/// // Start: 2
18+
/// // End: 0
19+
/// // End: 1
20+
/// // Start: 3
21+
/// // End: 2
22+
/// // End: 3
23+
/// // Start: 4
24+
/// // End: 4
25+
/// ```
26+
/// - Parameters:
27+
/// - numberOfConcurrentTasks: A number of concurrent tasks. the given `body` closure run in parallel when the value is 2 or more.
28+
/// - priority: A priority of the giving closure.
29+
/// - body: A similar closure with `forEach`'s one, but it's async.
30+
public func asyncForEach(
31+
numberOfConcurrentTasks: UInt = numberOfConcurrentTasks,
32+
priority: TaskPriority? = nil,
33+
_ body: @escaping @Sendable (Element) async throws -> Void
34+
) async rethrows {
35+
try await withThrowingOrderedTaskGroup(of: Void.self) { group in
36+
var counter = 0
37+
var asyncIterator = self.makeAsyncIterator()
38+
while let element = try await asyncIterator.next() {
39+
if counter < numberOfConcurrentTasks {
40+
group.addTask(priority: priority) {
41+
try await body(element)
42+
}
43+
counter += 1
44+
} else {
45+
try await group.next()
46+
group.addTask(priority: priority) {
47+
try await body(element)
48+
}
49+
}
50+
}
51+
}
52+
}
53+
}
File renamed without changes.

Sources/AsyncOperations/Collections/Operations/InternalForEach.swift renamed to Sources/AsyncOperations/Sequence/InternalForEach.swift

File renamed without changes.

Sources/AsyncOperations/Collections/Operations/AsyncAllSatisfy.swift renamed to Sources/AsyncOperations/Sequence/Sequence+AsyncAllSatisfy.swift

File renamed without changes.

Sources/AsyncOperations/Collections/Operations/AsyncCompactMap.swift renamed to Sources/AsyncOperations/Sequence/Sequence+AsyncCompactMap.swift

File renamed without changes.

Sources/AsyncOperations/Collections/Operations/AsyncContains.swift renamed to Sources/AsyncOperations/Sequence/Sequence+AsyncContains.swift

File renamed without changes.

Sources/AsyncOperations/Collections/Operations/AsyncFilter.swift renamed to Sources/AsyncOperations/Sequence/Sequence+AsyncFilter.swift

File renamed without changes.

Sources/AsyncOperations/Collections/Operations/AsyncFirst.swift renamed to Sources/AsyncOperations/Sequence/Sequence+AsyncFirst.swift

File renamed without changes.

Sources/AsyncOperations/Collections/Operations/AsyncFlatMap.swift renamed to Sources/AsyncOperations/Sequence/Sequence+AsyncFlatMap.swift

File renamed without changes.

Sources/AsyncOperations/Collections/Operations/AsyncForEach.swift renamed to Sources/AsyncOperations/Sequence/Sequence+AsyncForEach.swift

File renamed without changes.

0 commit comments

Comments
 (0)