@@ -68,22 +68,9 @@ extension AsyncThrowingStream where Element == UInt8, Failure == any Error {
6868 @available ( visionOS, deprecated: 2.0 , message: " Use the AsyncSequence-returning overload. " )
6969 public static func _dataStream( reading fileDescriptor: DispatchFD , on queue: SWBQueue ) -> AsyncThrowingStream < Element , any Error > {
7070 AsyncThrowingStream { continuation in
71- let newFD : DispatchFD
72- do {
73- newFD = try fileDescriptor. _duplicate ( )
74- } catch {
75- continuation. finish ( throwing: error)
76- return
77- }
78-
79- let io = SWBDispatchIO . stream ( fileDescriptor: newFD, queue: queue) { error in
80- do {
81- try newFD. _close ( )
82- if error != 0 {
83- continuation. finish ( throwing: POSIXError ( error, context: " dataStream(reading: \( fileDescriptor) )#1 " ) )
84- }
85- } catch {
86- continuation. finish ( throwing: error)
71+ let io = SWBDispatchIO . stream ( fileDescriptor: fileDescriptor, queue: queue) { error in
72+ if error != 0 {
73+ continuation. finish ( throwing: POSIXError ( error, context: " dataStream(reading: \( fileDescriptor) )#1 " ) )
8774 }
8875 }
8976 io. setLimit ( lowWater: 0 )
@@ -120,51 +107,15 @@ extension AsyncThrowingStream where Element == UInt8, Failure == any Error {
120107extension AsyncSequence where Element == UInt8 , Failure == any Error {
121108 /// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
122109 public static func dataStream( reading fileDescriptor: DispatchFD , on queue: SWBQueue ) -> any AsyncSequence < Element , any Error > {
123- AsyncThrowingStream < SWBDispatchData , any Error > { continuation in
124- let newFD : DispatchFD
125- do {
126- newFD = try fileDescriptor. _duplicate ( )
127- } catch {
128- continuation. finish ( throwing: error)
129- return
130- }
131-
132- let io = SWBDispatchIO . stream ( fileDescriptor: newFD, queue: queue) { error in
133- do {
134- try newFD. _close ( )
135- if error != 0 {
136- let context = " dataStream(reading: \( fileDescriptor) \" \( Result { try fileDescriptor. _filePath ( ) } ) \" )#1 "
137- continuation. finish ( throwing: POSIXError ( error, context: context) )
138- }
139- } catch {
140- continuation. finish ( throwing: error)
141- }
142- }
143- io. setLimit ( lowWater: 0 )
144- io. setLimit ( highWater: 4096 )
145-
146- continuation. onTermination = { termination in
147- if case . cancelled = termination {
148- io. close ( flags: . stop)
149- } else {
150- io. close ( )
151- }
152- }
153-
154- io. read ( offset: 0 , length: . max, queue: queue) { done, data, error in
155- guard error == 0 else {
156- let context = " dataStream(reading: \( fileDescriptor) \" \( Result { try fileDescriptor. _filePath ( ) } ) \" )#2 "
157- continuation. finish ( throwing: POSIXError ( error, context: context) )
158- return
159- }
160-
161- let data = data ?? . empty
162- continuation. yield ( data)
163-
164- if done {
165- continuation. finish ( )
110+ AsyncThrowingStream < SWBDispatchData , any Error > {
111+ while !Task. isCancelled {
112+ let chunk = try await fileDescriptor. readChunk ( upToLength: 4096 )
113+ if chunk. isEmpty {
114+ return nil
166115 }
116+ return chunk
167117 }
118+ throw CancellationError ( )
168119 } . flattened
169120 }
170121}
0 commit comments