Skip to content

Allow callers to run a subprocess and provide low and high water marks when using SequenceOutput to emit standard output and standard error as soon as it arrives. #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Sources/Subprocess/API.swift
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
}

// Body runs in the same isolation
let outputSequence = AsyncBufferSequence(diskIO: outputIOBox.take()!.consumeDiskIO())
let outputSequence = AsyncBufferSequence(diskIO: outputIOBox.take()!.consumeDiskIO(), streamOptions: platformOptions.streamOptions)
let result = try await body(execution, outputSequence)
try await group.waitForAll()
return result
Expand Down Expand Up @@ -250,7 +250,7 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
}

// Body runs in the same isolation
let errorSequence = AsyncBufferSequence(diskIO: errorIOBox.take()!.consumeDiskIO())
let errorSequence = AsyncBufferSequence(diskIO: errorIOBox.take()!.consumeDiskIO(), streamOptions: platformOptions.streamOptions)
let result = try await body(execution, errorSequence)
try await group.waitForAll()
return result
Expand Down Expand Up @@ -286,7 +286,7 @@ public func run<Result, Error: OutputProtocol>(
error: try error.createPipe()
) { execution, inputIO, outputIO, errorIO in
let writer = StandardInputWriter(diskIO: inputIO!)
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO(), streamOptions: platformOptions.streamOptions)
return try await body(execution, writer, outputSequence)
}
}
Expand Down Expand Up @@ -319,7 +319,7 @@ public func run<Result, Output: OutputProtocol>(
error: try error.createPipe()
) { execution, inputIO, outputIO, errorIO in
let writer = StandardInputWriter(diskIO: inputIO!)
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO(), streamOptions: platformOptions.streamOptions)
return try await body(execution, writer, errorSequence)
}
}
Expand Down Expand Up @@ -376,8 +376,8 @@ public func run<Result>(
error: try error.createPipe()
) { execution, inputIO, outputIO, errorIO in
let writer = StandardInputWriter(diskIO: inputIO!)
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO(), streamOptions: platformOptions.streamOptions)
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO(), streamOptions: platformOptions.streamOptions)
return try await body(execution, writer, outputSequence, errorSequence)
}
}
Expand Down Expand Up @@ -503,8 +503,8 @@ public func run<Result>(
error: try error.createPipe()
) { execution, inputIO, outputIO, errorIO in
let writer = StandardInputWriter(diskIO: inputIO!)
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO(), streamOptions: configuration.platformOptions.streamOptions)
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO(), streamOptions: configuration.platformOptions.streamOptions)
return try await body(execution, writer, outputSequence, errorSequence)
}
}
Expand Down
69 changes: 59 additions & 10 deletions Sources/Subprocess/AsyncBufferSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,44 +35,93 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
@_nonSendable
public struct Iterator: AsyncIteratorProtocol {
public typealias Element = Buffer
internal typealias Stream = AsyncThrowingStream<StreamStatus, Swift.Error>

private let diskIO: DiskIO
private var buffer: [UInt8]
private var currentPosition: Int
private var finished: Bool
private var streamIterator: Stream.AsyncIterator
private let continuation: Stream.Continuation
private var needsNextChunk: Bool

internal init(diskIO: DiskIO) {
internal init(diskIO: DiskIO, streamOptions: PlatformOptions.StreamOptions) {
self.diskIO = diskIO
self.buffer = []
self.currentPosition = 0
self.finished = false
let (stream, continuation) = AsyncThrowingStream<StreamStatus, Swift.Error>.makeStream()
self.streamIterator = stream.makeAsyncIterator()
self.continuation = continuation
self.needsNextChunk = true

#if !os(Windows)
if let minimumBufferSize = streamOptions.minimumBufferSize {
diskIO.setLimit(lowWater: minimumBufferSize)
}

if let maximumBufferSize = streamOptions.maximumBufferSize {
diskIO.setLimit(highWater: maximumBufferSize)
}
#endif
}

public func next() async throws -> Buffer? {
let data = try await self.diskIO.readChunk(
upToLength: readBufferSize
)
if data == nil {
// We finished reading. Close the file descriptor now
public mutating func next() async throws -> Buffer? {

if needsNextChunk {
diskIO.readChunk(upToLength: readBufferSize, continuation: continuation)
needsNextChunk = false
}

if let status = try await streamIterator.next() {
switch status {
case .data(let data):
return data

case .endOfChunk(let data):
needsNextChunk = true
return data

case .endOfFile:
#if os(Windows)
try self.diskIO.close()
#else
self.diskIO.close()
#endif
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If streamIterator returned AsyncThrowingStream<Buffer> here you can just return it directly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu Yes, simplified this.

} else {
#if os(Windows)
try self.diskIO.close()
#else
self.diskIO.close()
#endif
return nil
}
return data
}
}

private let diskIO: DiskIO
private let streamOptions: PlatformOptions.StreamOptions

internal init(diskIO: DiskIO) {
internal init(diskIO: DiskIO, streamOptions: PlatformOptions.StreamOptions) {
self.diskIO = diskIO
self.streamOptions = streamOptions
}

public func makeAsyncIterator() -> Iterator {
return Iterator(diskIO: self.diskIO)
return Iterator(diskIO: self.diskIO, streamOptions: streamOptions)
}
}

extension AsyncBufferSequence {
#if SubprocessSpan
@available(SubprocessSpan, *)
#endif
internal enum StreamStatus {
case data(AsyncBufferSequence.Buffer)
case endOfChunk(AsyncBufferSequence.Buffer)
case endOfFile
}
}

Expand Down
2 changes: 2 additions & 0 deletions Sources/Subprocess/Execution.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import Musl
import WinSDK
#endif

internal import Dispatch

/// An object that repersents a subprocess that has been
/// executed. You can use this object to send signals to the
/// child process as well as stream its output and error.
Expand Down
15 changes: 15 additions & 0 deletions Sources/Subprocess/Platforms/Subprocess+Darwin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public struct PlatformOptions: Sendable {
/// Creates a session and sets the process group ID
/// i.e. Detach from the terminal.
public var createSession: Bool = false

public var streamOptions: StreamOptions = .init()

/// An ordered list of steps in order to tear down the child
/// process in case the parent task is cancelled before
/// the child proces terminates.
Expand Down Expand Up @@ -126,6 +129,18 @@ extension PlatformOptions {
#endif
}

extension PlatformOptions {
public struct StreamOptions: Sendable {
let minimumBufferSize: Int?
let maximumBufferSize: Int?

public init(minimumBufferSize: Int? = nil, maximumBufferSize: Int? = nil) {
self.minimumBufferSize = minimumBufferSize
self.maximumBufferSize = maximumBufferSize
}
}
}

extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible {
internal func description(withIndent indent: Int) -> String {
let indent = String(repeating: " ", count: indent * 4)
Expand Down
15 changes: 15 additions & 0 deletions Sources/Subprocess/Platforms/Subprocess+Linux.swift
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ public struct PlatformOptions: Sendable {
// Creates a session and sets the process group ID
// i.e. Detach from the terminal.
public var createSession: Bool = false

public var streamOptions: StreamOptions = .init()

/// An ordered list of steps in order to tear down the child
/// process in case the parent task is cancelled before
/// the child proces terminates.
Expand All @@ -235,6 +238,18 @@ public struct PlatformOptions: Sendable {
public init() {}
}

extension PlatformOptions {
public struct StreamOptions: Sendable {
let minimumBufferSize: Int?
let maximumBufferSize: Int?

public init(minimumBufferSize: Int? = nil, maximumBufferSize: Int? = nil) {
self.minimumBufferSize = minimumBufferSize
self.maximumBufferSize = maximumBufferSize
}
}
}

extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible {
internal func description(withIndent indent: Int) -> String {
let indent = String(repeating: " ", count: indent * 4)
Expand Down
68 changes: 37 additions & 31 deletions Sources/Subprocess/Platforms/Subprocess+Unix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ extension TrackedFileDescriptor {
}
}
)

return .init(dispatchIO, closeWhenDone: self.closeWhenDone)
}
}
Expand All @@ -414,37 +415,42 @@ extension DispatchIO {
#if SubprocessSpan
@available(SubprocessSpan, *)
#endif
internal func readChunk(upToLength maxLength: Int) async throws -> AsyncBufferSequence.Buffer? {
return try await withCheckedThrowingContinuation { continuation in
var buffer: DispatchData = .empty
self.read(
offset: 0,
length: maxLength,
queue: .global()
) { done, data, error in
if error != 0 {
continuation.resume(
throwing: SubprocessError(
code: .init(.failedToReadFromSubprocess),
underlyingError: .init(rawValue: error)
)
)
return
}
if let data = data {
if buffer.isEmpty {
buffer = data
} else {
buffer.append(data)
}
}
if done {
if !buffer.isEmpty {
continuation.resume(returning: AsyncBufferSequence.Buffer(data: buffer))
} else {
continuation.resume(returning: nil)
}
}
internal func readChunk(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) {
self.read(
offset: 0,
length: maxLength,
queue: .global()
) { done, data, error in
if error != 0 {
continuation.finish(throwing: SubprocessError(
code: .init(.failedToReadFromSubprocess),
underlyingError: .init(rawValue: error)
))
return
}

// Treat empty data and nil as the same
let buffer = data.map { $0.isEmpty ? nil : $0 } ?? nil
let status: AsyncBufferSequence.StreamStatus

switch (buffer, done) {
case (.some(let data), false):
status = .data(AsyncBufferSequence.Buffer(data: data))

case (.some(let data), true):
status = .endOfChunk(AsyncBufferSequence.Buffer(data: data))

case (nil, false):
fatalError("Unexpectedly received no data from DispatchIO with it indicating it is not done.")

case (nil, true):
status = .endOfFile
}

continuation.yield(status)

if done {
continuation.finish()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of setting using a StreamStatus to model the state, I think just having Iterator.Stream be AsyncThrowingStream<Buffer> is good enough. Because when the throwing stream returns nil, it naturally implies "end of stream"

switch (buffer, done) {
case (.some(let data), false):
    continuation.yield(AsyncBufferSequence.Buffer(data: data))
case (.some(let data), true):
    continuation.yield(AsyncBufferSequence.Buffer(data: data))
    continuation.finish() // Finish the stream now
case (nil, false):
    fatalError("Unexpectedly received no data from DispatchIO with it indicating it is not done.")
case (nil, true):
    continuation.finish() // Finish the stream
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu Simplified this.

}
}
Expand Down
Loading