Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 71 additions & 13 deletions Sources/Basics/Concurrency/AsyncProcess.swift
Original file line number Diff line number Diff line change
Expand Up @@ -514,15 +514,18 @@ package final class AsyncProcess {
if self.outputRedirection.redirectsOutput {
let stdoutPipe = Pipe()
let stderrPipe = Pipe()
let stdoutStream = DispatchFD(fileHandle: stdoutPipe.fileHandleForReading).dataStream()
let stderrStream = DispatchFD(fileHandle: stderrPipe.fileHandleForReading).dataStream()

group.enter()
stdoutPipe.fileHandleForReading.readabilityHandler = { (fh: FileHandle) in
let data = (try? fh.read(upToCount: Int.max)) ?? Data()
if data.count == 0 {
stdoutPipe.fileHandleForReading.readabilityHandler = nil
Task {
defer {
print("--- finished consuming stdout ---")
group.leave()
} else {
let contents = data.withUnsafeBytes { [UInt8]($0) }
}
print("--- started consuming stdout ---")
for try await data in stdoutStream {
let contents = [UInt8](data)
self.outputRedirection.outputClosures?.stdoutClosure(contents)
stdoutLock.withLock {
stdout += contents
Expand All @@ -531,13 +534,14 @@ package final class AsyncProcess {
}

group.enter()
stderrPipe.fileHandleForReading.readabilityHandler = { (fh: FileHandle) in
let data = (try? fh.read(upToCount: Int.max)) ?? Data()
if data.count == 0 {
stderrPipe.fileHandleForReading.readabilityHandler = nil
Task {
defer {
print("--- finished consuming stderr ---")
group.leave()
} else {
let contents = data.withUnsafeBytes { [UInt8]($0) }
}
print("--- started consuming stderr ---")
for try await data in stderrStream {
let contents = [UInt8](data)
self.outputRedirection.outputClosures?.stderrClosure(contents)
stderrLock.withLock {
stderr += contents
Expand All @@ -557,6 +561,7 @@ package final class AsyncProcess {
}

group.notify(queue: self.completionQueue) {
print("--- notified that output is ready ---")
self.stateLock.withLock {
self.state = .outputReady(stdout: .success(stdout), stderr: .success(stderr))
}
Expand Down Expand Up @@ -820,6 +825,7 @@ package final class AsyncProcess {
/// Executes the process I/O state machine, calling completion block when finished.
private func waitUntilExit(_ completion: @escaping (Result<AsyncProcessResult, Swift.Error>) -> Void) {
self.stateLock.lock()
print("--- waitUntilExit called: \(self.state) ---")
switch self.state {
case .idle:
defer { self.stateLock.unlock() }
Expand All @@ -832,16 +838,19 @@ package final class AsyncProcess {
completion(.failure(error))
case .readingOutput(let sync):
self.stateLock.unlock()
print("--- queing up waitUntilExit block ---")
sync.notify(queue: self.completionQueue) {
print("--- was notified we should enter waitUntilExit again ---")
self.waitUntilExit(completion)
}
case .outputReady(let stdoutResult, let stderrResult):
defer { self.stateLock.unlock() }
// Wait until process finishes execution.
#if os(Windows)
precondition(self._process != nil, "The process is not yet launched.")
let p = self._process!
self.stateLock.unlock()
p.waitUntilExit()
self.stateLock.lock()
let exitStatusCode = p.terminationStatus
let normalExit = p.terminationReason == .exit
#else
Expand All @@ -866,6 +875,7 @@ package final class AsyncProcess {
stderrOutput: stderrResult
)
self.state = .complete(executionResult)
self.stateLock.unlock()
self.completionQueue.async {
self.waitUntilExit(completion)
}
Expand Down Expand Up @@ -1354,3 +1364,51 @@ extension FileHandle: WritableByteStream {
}
}
#endif

extension DispatchFD {
public func readChunk(upToLength maxLength: Int) async throws -> DispatchData {
return try await withCheckedThrowingContinuation { continuation in
DispatchIO.read(fromFileDescriptor: numericCast(self.rawValue), maxLength: maxLength, runningHandlerOn: DispatchQueue.global())
{ data, error in
if error != 0 {
continuation.resume(throwing: StringError("POSIX error: \(error)"))
return
}
continuation.resume(returning: data)
}
}

}

/// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *)
public func dataStream() -> some AsyncSequence<DispatchData, any Error> {
AsyncThrowingStream<DispatchData, any Error> {
while !Task.isCancelled {
let chunk = try await readChunk(upToLength: 4096)
if chunk.isEmpty {
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a continuation (e.g. cont.finish() here)? I'm not sure what the implications are of returning nil are for clients of this stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returning nil here is intended to terminate the stream, this initializer doesn't use a continuation

}
return chunk
}
throw CancellationError()
}
}
}

public struct DispatchFD {
#if os(Windows)
fileprivate let rawValue: Int
#else
fileprivate let rawValue: Int32
#endif

init(fileHandle: FileHandle) {
#if os(Windows)
// This may look unsafe, but is how swift-corelibs-dispatch works. Basically, dispatch_fd_t directly represents either a POSIX file descriptor OR a Windows HANDLE pointer address, meaning that the fileDescriptor parameter of various Dispatch APIs is actually NOT a file descriptor on Windows but rather a HANDLE. This means that the handle should NOT be converted using _open_osfhandle, and the return value of this function should ONLY be passed to Dispatch functions where the fileDescriptor parameter is masquerading as a HANDLE in this manner. Use with extreme caution.
rawValue = .init(bitPattern: fileHandle._handle)
#else
rawValue = fileHandle.fileDescriptor
#endif
}
}