@@ -514,15 +514,16 @@ package final class AsyncProcess {
514
514
if self . outputRedirection. redirectsOutput {
515
515
let stdoutPipe = Pipe ( )
516
516
let stderrPipe = Pipe ( )
517
+ let stdoutStream = DispatchFD ( fileHandle: stdoutPipe. fileHandleForReading) . dataStream ( )
518
+ let stderrStream = DispatchFD ( fileHandle: stderrPipe. fileHandleForReading) . dataStream ( )
517
519
518
520
group. enter ( )
519
- stdoutPipe. fileHandleForReading. readabilityHandler = { ( fh: FileHandle ) in
520
- let data = ( try ? fh. read ( upToCount: Int . max) ) ?? Data ( )
521
- if data. count == 0 {
522
- stdoutPipe. fileHandleForReading. readabilityHandler = nil
521
+ Task {
522
+ defer {
523
523
group. leave ( )
524
- } else {
525
- let contents = data. withUnsafeBytes { [ UInt8] ( $0) }
524
+ }
525
+ for try await data in stdoutStream {
526
+ let contents = [ UInt8] ( data)
526
527
self . outputRedirection. outputClosures? . stdoutClosure ( contents)
527
528
stdoutLock. withLock {
528
529
stdout += contents
@@ -531,13 +532,12 @@ package final class AsyncProcess {
531
532
}
532
533
533
534
group. enter ( )
534
- stderrPipe. fileHandleForReading. readabilityHandler = { ( fh: FileHandle ) in
535
- let data = ( try ? fh. read ( upToCount: Int . max) ) ?? Data ( )
536
- if data. count == 0 {
537
- stderrPipe. fileHandleForReading. readabilityHandler = nil
535
+ Task {
536
+ defer {
538
537
group. leave ( )
539
- } else {
540
- let contents = data. withUnsafeBytes { [ UInt8] ( $0) }
538
+ }
539
+ for try await data in stderrStream {
540
+ let contents = [ UInt8] ( data)
541
541
self . outputRedirection. outputClosures? . stderrClosure ( contents)
542
542
stderrLock. withLock {
543
543
stderr += contents
@@ -1354,3 +1354,51 @@ extension FileHandle: WritableByteStream {
1354
1354
}
1355
1355
}
1356
1356
#endif
1357
+
1358
+ extension DispatchFD {
1359
+ public func readChunk( upToLength maxLength: Int ) async throws -> DispatchData {
1360
+ return try await withCheckedThrowingContinuation { continuation in
1361
+ DispatchIO . read ( fromFileDescriptor: numericCast ( self . rawValue) , maxLength: maxLength, runningHandlerOn: DispatchQueue . global ( ) )
1362
+ { data, error in
1363
+ if error != 0 {
1364
+ continuation. resume ( throwing: StringError ( " POSIX error: \( error) " ) )
1365
+ return
1366
+ }
1367
+ continuation. resume ( returning: data)
1368
+ }
1369
+ }
1370
+
1371
+ }
1372
+
1373
+ /// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
1374
+ @available ( macOS 15 . 0 , iOS 18 . 0 , tvOS 18 . 0 , watchOS 11 . 0 , visionOS 2 . 0 , * )
1375
+ public func dataStream( ) -> some AsyncSequence < DispatchData , any Error > {
1376
+ AsyncThrowingStream < DispatchData , any Error > {
1377
+ while !Task. isCancelled {
1378
+ let chunk = try await readChunk ( upToLength: 4096 )
1379
+ if chunk. isEmpty {
1380
+ return nil
1381
+ }
1382
+ return chunk
1383
+ }
1384
+ throw CancellationError ( )
1385
+ }
1386
+ }
1387
+ }
1388
+
1389
+ public struct DispatchFD {
1390
+ #if os(Windows)
1391
+ fileprivate let rawValue : Int
1392
+ #else
1393
+ fileprivate let rawValue : Int32
1394
+ #endif
1395
+
1396
+ init ( fileHandle: FileHandle ) {
1397
+ #if os(Windows)
1398
+ // This may look unsafe, but is how swift-corelibs-dispatch works. Basically, dispatch_fd_t directly represents either a POSIX file descriptor OR a Windows HANDLE pointer address, meaning that the fileDescriptor parameter of various Dispatch APIs is actually NOT a file descriptor on Windows but rather a HANDLE. This means that the handle should NOT be converted using _open_osfhandle, and the return value of this function should ONLY be passed to Dispatch functions where the fileDescriptor parameter is masquerading as a HANDLE in this manner. Use with extreme caution.
1399
+ rawValue = . init( bitPattern: fileHandle. _handle)
1400
+ #else
1401
+ rawValue = fileHandle. fileDescriptor
1402
+ #endif
1403
+ }
1404
+ }
0 commit comments