@@ -6,3 +6,51 @@ open class EventStream<Element> {
6
6
fatalError ( " This function should be overridden by implementing classes " )
7
7
}
8
8
}
9
+
10
+ #if compiler(>=5.5) && canImport(_Concurrency)
11
+
12
+ @available ( macOS 12 , iOS 15 , watchOS 8 , tvOS 15 , * )
13
+ /// Event stream that wraps an `AsyncThrowingStream` from Swift's standard concurrency system.
14
+ public class ConcurrentEventStream < Element> : EventStream < Element > {
15
+ public let stream : AsyncThrowingStream < Element , Error >
16
+
17
+ public init ( _ stream: AsyncThrowingStream < Element , Error > ) {
18
+ self . stream = stream
19
+ }
20
+
21
+ /// Performs the closure on each event in the current stream and returns a stream of the results.
22
+ /// - Parameter closure: The closure to apply to each event in the stream
23
+ /// - Returns: A stream of the results
24
+ override open func map< To> ( _ closure: @escaping ( Element ) throws -> To ) -> ConcurrentEventStream < To > {
25
+ let newStream = self . stream. mapStream ( closure)
26
+ return ConcurrentEventStream< To> . init( newStream)
27
+ }
28
+ }
29
+
30
+ @available ( macOS 12 , iOS 15 , watchOS 8 , tvOS 15 , * )
31
+ extension AsyncThrowingStream {
32
+ func mapStream< To> ( _ closure: @escaping ( Element ) throws -> To ) -> AsyncThrowingStream < To , Error > {
33
+ return AsyncThrowingStream < To , Error > { continuation in
34
+ Task {
35
+ for try await event in self {
36
+ let newEvent = try closure ( event)
37
+ continuation. yield ( newEvent)
38
+ }
39
+ }
40
+ }
41
+ }
42
+
43
+ func filterStream( _ isIncluded: @escaping ( Element ) throws -> Bool ) -> AsyncThrowingStream < Element , Error > {
44
+ return AsyncThrowingStream < Element , Error > { continuation in
45
+ Task {
46
+ for try await event in self {
47
+ if try isIncluded ( event) {
48
+ continuation. yield ( event)
49
+ }
50
+ }
51
+ }
52
+ }
53
+ }
54
+ }
55
+
56
+ #endif
0 commit comments