File tree Expand file tree Collapse file tree 1 file changed +26
-8
lines changed
Sources/GraphQL/Subscription Expand file tree Collapse file tree 1 file changed +26
-8
lines changed Original file line number Diff line number Diff line change @@ -31,24 +31,42 @@ public class ConcurrentEventStream<Element>: EventStream<Element> {
31
31
extension AsyncThrowingStream {
32
32
func mapStream< To> ( _ closure: @escaping ( Element ) throws -> To ) -> AsyncThrowingStream < To , Error > {
33
33
return AsyncThrowingStream < To , Error > { continuation in
34
- Task {
35
- for try await event in self {
36
- let newEvent = try closure ( event)
37
- continuation. yield ( newEvent)
34
+ let task = Task {
35
+ do {
36
+ for try await event in self {
37
+ let newEvent = try closure ( event)
38
+ continuation. yield ( newEvent)
39
+ }
40
+ continuation. finish ( )
41
+ } catch {
42
+ continuation. finish ( throwing: error)
38
43
}
39
44
}
45
+
46
+ continuation. onTermination = { @Sendable _ in
47
+ task. cancel ( )
48
+ }
40
49
}
41
50
}
42
51
43
52
func filterStream( _ isIncluded: @escaping ( Element ) throws -> Bool ) -> AsyncThrowingStream < Element , Error > {
44
53
return AsyncThrowingStream < Element , Error > { continuation in
45
- Task {
46
- for try await event in self {
47
- if try isIncluded ( event) {
48
- continuation. yield ( event)
54
+ let task = Task {
55
+ do {
56
+ for try await event in self {
57
+ if try isIncluded ( event) {
58
+ continuation. yield ( event)
59
+ }
49
60
}
61
+ continuation. finish ( )
62
+ } catch {
63
+ continuation. finish ( throwing: error)
50
64
}
51
65
}
66
+
67
+ continuation. onTermination = { @Sendable _ in
68
+ task. cancel ( )
69
+ }
52
70
}
53
71
}
54
72
}
You can’t perform that action at this time.
0 commit comments