diff --git a/Sources/Basics/Concurrency/AsyncProcess.swift b/Sources/Basics/Concurrency/AsyncProcess.swift index 86222f8fa67..d3517e5e442 100644 --- a/Sources/Basics/Concurrency/AsyncProcess.swift +++ b/Sources/Basics/Concurrency/AsyncProcess.swift @@ -514,15 +514,18 @@ package final class AsyncProcess { if self.outputRedirection.redirectsOutput { let stdoutPipe = Pipe() let stderrPipe = Pipe() + let stdoutStream = DispatchFD(fileHandle: stdoutPipe.fileHandleForReading).dataStream() + let stderrStream = DispatchFD(fileHandle: stderrPipe.fileHandleForReading).dataStream() group.enter() - stdoutPipe.fileHandleForReading.readabilityHandler = { (fh: FileHandle) in - let data = (try? fh.read(upToCount: Int.max)) ?? Data() - if data.count == 0 { - stdoutPipe.fileHandleForReading.readabilityHandler = nil + Task { + defer { + print("--- finished consuming stdout ---") group.leave() - } else { - let contents = data.withUnsafeBytes { [UInt8]($0) } + } + print("--- started consuming stdout ---") + for try await data in stdoutStream { + let contents = [UInt8](data) self.outputRedirection.outputClosures?.stdoutClosure(contents) stdoutLock.withLock { stdout += contents @@ -531,13 +534,14 @@ package final class AsyncProcess { } group.enter() - stderrPipe.fileHandleForReading.readabilityHandler = { (fh: FileHandle) in - let data = (try? fh.read(upToCount: Int.max)) ?? Data() - if data.count == 0 { - stderrPipe.fileHandleForReading.readabilityHandler = nil + Task { + defer { + print("--- finished consuming stderr ---") group.leave() - } else { - let contents = data.withUnsafeBytes { [UInt8]($0) } + } + print("--- started consuming stderr ---") + for try await data in stderrStream { + let contents = [UInt8](data) self.outputRedirection.outputClosures?.stderrClosure(contents) stderrLock.withLock { stderr += contents @@ -557,6 +561,7 @@ package final class AsyncProcess { } group.notify(queue: self.completionQueue) { + print("--- notified that output is ready ---") self.stateLock.withLock { self.state = .outputReady(stdout: .success(stdout), stderr: .success(stderr)) } @@ -820,6 +825,7 @@ package final class AsyncProcess { /// Executes the process I/O state machine, calling completion block when finished. private func waitUntilExit(_ completion: @escaping (Result) -> Void) { self.stateLock.lock() + print("--- waitUntilExit called: \(self.state) ---") switch self.state { case .idle: defer { self.stateLock.unlock() } @@ -832,16 +838,19 @@ package final class AsyncProcess { completion(.failure(error)) case .readingOutput(let sync): self.stateLock.unlock() + print("--- queing up waitUntilExit block ---") sync.notify(queue: self.completionQueue) { + print("--- was notified we should enter waitUntilExit again ---") self.waitUntilExit(completion) } case .outputReady(let stdoutResult, let stderrResult): - defer { self.stateLock.unlock() } // Wait until process finishes execution. #if os(Windows) precondition(self._process != nil, "The process is not yet launched.") let p = self._process! + self.stateLock.unlock() p.waitUntilExit() + self.stateLock.lock() let exitStatusCode = p.terminationStatus let normalExit = p.terminationReason == .exit #else @@ -866,6 +875,7 @@ package final class AsyncProcess { stderrOutput: stderrResult ) self.state = .complete(executionResult) + self.stateLock.unlock() self.completionQueue.async { self.waitUntilExit(completion) } @@ -1354,3 +1364,51 @@ extension FileHandle: WritableByteStream { } } #endif + +extension DispatchFD { + public func readChunk(upToLength maxLength: Int) async throws -> DispatchData { + return try await withCheckedThrowingContinuation { continuation in + DispatchIO.read(fromFileDescriptor: numericCast(self.rawValue), maxLength: maxLength, runningHandlerOn: DispatchQueue.global()) + { data, error in + if error != 0 { + continuation.resume(throwing: StringError("POSIX error: \(error)")) + return + } + continuation.resume(returning: data) + } + } + + } + + /// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller. + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + public func dataStream() -> some AsyncSequence { + AsyncThrowingStream { + while !Task.isCancelled { + let chunk = try await readChunk(upToLength: 4096) + if chunk.isEmpty { + return nil + } + return chunk + } + throw CancellationError() + } + } +} + +public struct DispatchFD { + #if os(Windows) + fileprivate let rawValue: Int + #else + fileprivate let rawValue: Int32 + #endif + + init(fileHandle: FileHandle) { + #if os(Windows) + // This may look unsafe, but is how swift-corelibs-dispatch works. Basically, dispatch_fd_t directly represents either a POSIX file descriptor OR a Windows HANDLE pointer address, meaning that the fileDescriptor parameter of various Dispatch APIs is actually NOT a file descriptor on Windows but rather a HANDLE. This means that the handle should NOT be converted using _open_osfhandle, and the return value of this function should ONLY be passed to Dispatch functions where the fileDescriptor parameter is masquerading as a HANDLE in this manner. Use with extreme caution. + rawValue = .init(bitPattern: fileHandle._handle) + #else + rawValue = fileHandle.fileDescriptor + #endif + } +}