@@ -514,15 +514,18 @@ package final class AsyncProcess {
514
514
if self . outputRedirection. redirectsOutput {
515
515
let stdoutPipe = Pipe ( )
516
516
let stderrPipe = Pipe ( )
517
+ let stdoutStream = DispatchFD ( fileHandle: stdoutPipe. fileHandleForReading) . dataStream ( )
518
+ let stderrStream = DispatchFD ( fileHandle: stderrPipe. fileHandleForReading) . dataStream ( )
517
519
518
520
group. enter ( )
519
- stdoutPipe. fileHandleForReading. readabilityHandler = { ( fh: FileHandle ) in
520
- let data = ( try ? fh. read ( upToCount: Int . max) ) ?? Data ( )
521
- if data. count == 0 {
522
- stdoutPipe. fileHandleForReading. readabilityHandler = nil
521
+ Task {
522
+ defer {
523
+ print ( " --- finished consuming stdout --- " )
523
524
group. leave ( )
524
- } else {
525
- let contents = data. withUnsafeBytes { [ UInt8] ( $0) }
525
+ }
526
+ print ( " --- started consuming stdout --- " )
527
+ for try await data in stdoutStream {
528
+ let contents = [ UInt8] ( data)
526
529
self . outputRedirection. outputClosures? . stdoutClosure ( contents)
527
530
stdoutLock. withLock {
528
531
stdout += contents
@@ -531,13 +534,14 @@ package final class AsyncProcess {
531
534
}
532
535
533
536
group. enter ( )
534
- stderrPipe. fileHandleForReading. readabilityHandler = { ( fh: FileHandle ) in
535
- let data = ( try ? fh. read ( upToCount: Int . max) ) ?? Data ( )
536
- if data. count == 0 {
537
- stderrPipe. fileHandleForReading. readabilityHandler = nil
537
+ Task {
538
+ defer {
539
+ print ( " --- finished consuming stderr --- " )
538
540
group. leave ( )
539
- } else {
540
- let contents = data. withUnsafeBytes { [ UInt8] ( $0) }
541
+ }
542
+ print ( " --- started consuming stderr --- " )
543
+ for try await data in stderrStream {
544
+ let contents = [ UInt8] ( data)
541
545
self . outputRedirection. outputClosures? . stderrClosure ( contents)
542
546
stderrLock. withLock {
543
547
stderr += contents
@@ -557,6 +561,7 @@ package final class AsyncProcess {
557
561
}
558
562
559
563
group. notify ( queue: self . completionQueue) {
564
+ print ( " --- notified that output is ready --- " )
560
565
self . stateLock. withLock {
561
566
self . state = . outputReady( stdout: . success( stdout) , stderr: . success( stderr) )
562
567
}
@@ -820,6 +825,7 @@ package final class AsyncProcess {
820
825
/// Executes the process I/O state machine, calling completion block when finished.
821
826
private func waitUntilExit( _ completion: @escaping ( Result < AsyncProcessResult , Swift . Error > ) -> Void ) {
822
827
self . stateLock. lock ( )
828
+ print ( " --- waitUntilExit called: \( self . state) --- " )
823
829
switch self . state {
824
830
case . idle:
825
831
defer { self . stateLock. unlock ( ) }
@@ -832,7 +838,9 @@ package final class AsyncProcess {
832
838
completion ( . failure( error) )
833
839
case . readingOutput( let sync) :
834
840
self . stateLock. unlock ( )
841
+ print ( " --- queing up waitUntilExit block --- " )
835
842
sync. notify ( queue: self . completionQueue) {
843
+ print ( " --- was notified we should enter waitUntilExit again --- " )
836
844
self . waitUntilExit ( completion)
837
845
}
838
846
case . outputReady( let stdoutResult, let stderrResult) :
@@ -1354,3 +1362,51 @@ extension FileHandle: WritableByteStream {
1354
1362
}
1355
1363
}
1356
1364
#endif
1365
+
1366
+ extension DispatchFD {
1367
+ public func readChunk( upToLength maxLength: Int ) async throws -> DispatchData {
1368
+ return try await withCheckedThrowingContinuation { continuation in
1369
+ DispatchIO . read ( fromFileDescriptor: numericCast ( self . rawValue) , maxLength: maxLength, runningHandlerOn: DispatchQueue . global ( ) )
1370
+ { data, error in
1371
+ if error != 0 {
1372
+ continuation. resume ( throwing: StringError ( " POSIX error: \( error) " ) )
1373
+ return
1374
+ }
1375
+ continuation. resume ( returning: data)
1376
+ }
1377
+ }
1378
+
1379
+ }
1380
+
1381
+ /// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
1382
+ @available ( macOS 15 . 0 , iOS 18 . 0 , tvOS 18 . 0 , watchOS 11 . 0 , visionOS 2 . 0 , * )
1383
+ public func dataStream( ) -> some AsyncSequence < DispatchData , any Error > {
1384
+ AsyncThrowingStream < DispatchData , any Error > {
1385
+ while !Task. isCancelled {
1386
+ let chunk = try await readChunk ( upToLength: 4096 )
1387
+ if chunk. isEmpty {
1388
+ return nil
1389
+ }
1390
+ return chunk
1391
+ }
1392
+ throw CancellationError ( )
1393
+ }
1394
+ }
1395
+ }
1396
+
1397
+ public struct DispatchFD {
1398
+ #if os(Windows)
1399
+ fileprivate let rawValue : Int
1400
+ #else
1401
+ fileprivate let rawValue : Int32
1402
+ #endif
1403
+
1404
+ init ( fileHandle: FileHandle ) {
1405
+ #if os(Windows)
1406
+ // This may look unsafe, but is how swift-corelibs-dispatch works. Basically, dispatch_fd_t directly represents either a POSIX file descriptor OR a Windows HANDLE pointer address, meaning that the fileDescriptor parameter of various Dispatch APIs is actually NOT a file descriptor on Windows but rather a HANDLE. This means that the handle should NOT be converted using _open_osfhandle, and the return value of this function should ONLY be passed to Dispatch functions where the fileDescriptor parameter is masquerading as a HANDLE in this manner. Use with extreme caution.
1407
+ rawValue = . init( bitPattern: fileHandle. _handle)
1408
+ #else
1409
+ rawValue = fileHandle. fileDescriptor
1410
+ #endif
1411
+ }
1412
+ }
0 commit comments