@@ -57,114 +57,38 @@ extension DispatchFD {
57
57
}
58
58
}
59
59
}
60
- }
61
60
62
- extension AsyncThrowingStream where Element == UInt8 , Failure == any Error {
63
61
/// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
64
62
@available ( macOS, deprecated: 15.0 , message: " Use the AsyncSequence-returning overload. " )
65
63
@available ( iOS, deprecated: 18.0 , message: " Use the AsyncSequence-returning overload. " )
66
64
@available ( tvOS, deprecated: 18.0 , message: " Use the AsyncSequence-returning overload. " )
67
65
@available ( watchOS, deprecated: 11.0 , message: " Use the AsyncSequence-returning overload. " )
68
66
@available ( visionOS, deprecated: 2.0 , message: " Use the AsyncSequence-returning overload. " )
69
- public static func _dataStream( reading fileDescriptor: DispatchFD , on queue: SWBQueue ) -> AsyncThrowingStream < Element , any Error > {
70
- AsyncThrowingStream { continuation in
71
- let newFD : DispatchFD
72
- do {
73
- newFD = try fileDescriptor. _duplicate ( )
74
- } catch {
75
- continuation. finish ( throwing: error)
76
- return
77
- }
78
-
79
- let io = SWBDispatchIO . stream ( fileDescriptor: newFD, queue: queue) { error in
80
- do {
81
- try newFD. _close ( )
82
- if error != 0 {
83
- continuation. finish ( throwing: POSIXError ( error, context: " dataStream(reading: \( fileDescriptor) )#1 " ) )
84
- }
85
- } catch {
86
- continuation. finish ( throwing: error)
87
- }
88
- }
89
- io. setLimit ( lowWater: 0 )
90
- io. setLimit ( highWater: 4096 )
91
-
92
- continuation. onTermination = { termination in
93
- if case . cancelled = termination {
94
- io. close ( flags: . stop)
95
- } else {
96
- io. close ( )
97
- }
98
- }
99
-
100
- io. read ( offset: 0 , length: . max, queue: queue) { done, data, error in
101
- guard error == 0 else {
102
- continuation. finish ( throwing: POSIXError ( error, context: " dataStream(reading: \( fileDescriptor) )#2 " ) )
103
- return
104
- }
105
-
106
- let data = data ?? . empty
107
- for element in data {
108
- continuation. yield ( element)
109
- }
110
-
111
- if done {
112
- continuation. finish ( )
67
+ public func _dataStream( ) -> AsyncThrowingStream < SWBDispatchData , any Error > {
68
+ AsyncThrowingStream < SWBDispatchData , any Error > {
69
+ while !Task. isCancelled {
70
+ let chunk = try await readChunk ( upToLength: 4096 )
71
+ if chunk. isEmpty {
72
+ return nil
113
73
}
74
+ return chunk
114
75
}
76
+ throw CancellationError ( )
115
77
}
116
78
}
117
- }
118
79
119
- @available ( macOS 15 . 0 , iOS 18 . 0 , tvOS 18 . 0 , watchOS 11 . 0 , visionOS 2 . 0 , * )
120
- extension AsyncSequence where Element == UInt8 , Failure == any Error {
121
80
/// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
122
- public static func dataStream( reading fileDescriptor: DispatchFD , on queue: SWBQueue ) -> any AsyncSequence < Element , any Error > {
123
- AsyncThrowingStream < SWBDispatchData , any Error > { continuation in
124
- let newFD : DispatchFD
125
- do {
126
- newFD = try fileDescriptor. _duplicate ( )
127
- } catch {
128
- continuation. finish ( throwing: error)
129
- return
130
- }
131
-
132
- let io = SWBDispatchIO . stream ( fileDescriptor: newFD, queue: queue) { error in
133
- do {
134
- try newFD. _close ( )
135
- if error != 0 {
136
- let context = " dataStream(reading: \( fileDescriptor) \" \( Result { try fileDescriptor. _filePath ( ) } ) \" )#1 "
137
- continuation. finish ( throwing: POSIXError ( error, context: context) )
138
- }
139
- } catch {
140
- continuation. finish ( throwing: error)
141
- }
142
- }
143
- io. setLimit ( lowWater: 0 )
144
- io. setLimit ( highWater: 4096 )
145
-
146
- continuation. onTermination = { termination in
147
- if case . cancelled = termination {
148
- io. close ( flags: . stop)
149
- } else {
150
- io. close ( )
151
- }
152
- }
153
-
154
- io. read ( offset: 0 , length: . max, queue: queue) { done, data, error in
155
- guard error == 0 else {
156
- let context = " dataStream(reading: \( fileDescriptor) \" \( Result { try fileDescriptor. _filePath ( ) } ) \" )#2 "
157
- continuation. finish ( throwing: POSIXError ( error, context: context) )
158
- return
159
- }
160
-
161
- let data = data ?? . empty
162
- continuation. yield ( data)
163
-
164
- if done {
165
- continuation. finish ( )
81
+ @available ( macOS 15 . 0 , iOS 18 . 0 , tvOS 18 . 0 , watchOS 11 . 0 , visionOS 2 . 0 , * )
82
+ public func dataStream( ) -> some AsyncSequence < SWBDispatchData , any Error > {
83
+ AsyncThrowingStream < SWBDispatchData , any Error > {
84
+ while !Task. isCancelled {
85
+ let chunk = try await readChunk ( upToLength: 4096 )
86
+ if chunk. isEmpty {
87
+ return nil
166
88
}
89
+ return chunk
167
90
}
168
- } . flattened
91
+ throw CancellationError ( )
92
+ }
169
93
}
170
94
}
0 commit comments