Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 92 additions & 7 deletions Sources/Subprocess/API.swift
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol, Error: Out
/// - platformOptions: The platform-specific options to use when running the executable.
/// - input: The input to send to the executable.
/// - error: How to manage executable standard error.
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
/// from the subprocess's standard error stream. If `nil`, uses the system page size
/// as the default buffer size. Larger buffer sizes may improve performance for
/// subprocesses that produce large amounts of output, while smaller buffer sizes
/// may reduce memory usage and improve responsiveness for interactive applications.
/// - isolation: the isolation context to run the body closure.
/// - body: The custom execution body to manually control the running process.
/// - Returns: an `ExecutableResult` type containing the return value of the closure.
Expand All @@ -167,6 +172,7 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
platformOptions: PlatformOptions = PlatformOptions(),
input: Input = .none,
error: Error = .discarded,
preferredBufferSize: Int? = nil,
isolation: isolated (any Actor)? = #isolation,
body: ((Execution, AsyncBufferSequence) async throws -> Result)
) async throws -> ExecutionResult<Result> where Error.OutputType == Void {
Expand All @@ -181,6 +187,7 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
configuration,
input: input,
error: error,
preferredBufferSize: preferredBufferSize,
isolation: isolation,
body: body
)
Expand All @@ -196,7 +203,12 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
/// - platformOptions: The platform-specific options to use when running the executable.
/// - input: The input to send to the executable.
/// - output: How to manage executable standard output.
/// - isolation: The isolation context to run the body closure.
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
/// from the subprocess's standard error stream. If `nil`, uses the system page size
/// as the default buffer size. Larger buffer sizes may improve performance for
/// subprocesses that produce large amounts of output, while smaller buffer sizes
/// may reduce memory usage and improve responsiveness for interactive applications.
/// - isolation: the isolation context to run the body closure.
/// - body: The custom execution body to manually control the running process
/// - Returns: an `ExecutableResult` type containing the return value of the closure.
public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
Expand All @@ -207,6 +219,7 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
platformOptions: PlatformOptions = PlatformOptions(),
input: Input = .none,
output: Output,
preferredBufferSize: Int? = nil,
isolation: isolated (any Actor)? = #isolation,
body: ((Execution, AsyncBufferSequence) async throws -> Result)
) async throws -> ExecutionResult<Result> where Output.OutputType == Void {
Expand All @@ -221,6 +234,7 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
configuration,
input: input,
output: output,
preferredBufferSize: preferredBufferSize,
isolation: isolation,
body: body
)
Expand All @@ -235,6 +249,11 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
/// - workingDirectory: The working directory in which to run the executable.
/// - platformOptions: The platform-specific options to use when running the executable.
/// - error: How to manage executable standard error.
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
/// from the subprocess's standard output stream. If `nil`, uses the system page size
/// as the default buffer size. Larger buffer sizes may improve performance for
/// subprocesses that produce large amounts of output, while smaller buffer sizes
/// may reduce memory usage and improve responsiveness for interactive applications.
/// - isolation: the isolation context to run the body closure.
/// - body: The custom execution body to manually control the running process
/// - Returns: An `ExecutableResult` type containing the return value of the closure.
Expand All @@ -245,6 +264,7 @@ public func run<Result, Error: OutputProtocol>(
workingDirectory: FilePath? = nil,
platformOptions: PlatformOptions = PlatformOptions(),
error: Error = .discarded,
preferredBufferSize: Int? = nil,
isolation: isolated (any Actor)? = #isolation,
body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result)
) async throws -> ExecutionResult<Result> where Error.OutputType == Void {
Expand All @@ -258,6 +278,7 @@ public func run<Result, Error: OutputProtocol>(
return try await run(
configuration,
error: error,
preferredBufferSize: preferredBufferSize,
isolation: isolation,
body: body
)
Expand All @@ -272,6 +293,11 @@ public func run<Result, Error: OutputProtocol>(
/// - workingDirectory: The working directory in which to run the executable.
/// - platformOptions: The platform-specific options to use when running the executable.
/// - output: How to manage executable standard output.
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
/// from the subprocess's standard error stream. If `nil`, uses the system page size
/// as the default buffer size. Larger buffer sizes may improve performance for
/// subprocesses that produce large amounts of output, while smaller buffer sizes
/// may reduce memory usage and improve responsiveness for interactive applications.
/// - isolation: the isolation context to run the body closure.
/// - body: The custom execution body to manually control the running process
/// - Returns: An `ExecutableResult` type containing the return value of the closure.
Expand All @@ -282,6 +308,7 @@ public func run<Result, Output: OutputProtocol>(
workingDirectory: FilePath? = nil,
platformOptions: PlatformOptions = PlatformOptions(),
output: Output,
preferredBufferSize: Int? = nil,
isolation: isolated (any Actor)? = #isolation,
body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result)
) async throws -> ExecutionResult<Result> where Output.OutputType == Void {
Expand All @@ -295,6 +322,7 @@ public func run<Result, Output: OutputProtocol>(
return try await run(
configuration,
output: output,
preferredBufferSize: preferredBufferSize,
isolation: isolation,
body: body
)
Expand All @@ -309,6 +337,11 @@ public func run<Result, Output: OutputProtocol>(
/// - environment: The environment in which to run the executable.
/// - workingDirectory: The working directory in which to run the executable.
/// - platformOptions: The platform-specific options to use when running the executable.
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
/// from the subprocess's standard output and error stream. If `nil`, uses the system page size
/// as the default buffer size. Larger buffer sizes may improve performance for
/// subprocesses that produce large amounts of output, while smaller buffer sizes
/// may reduce memory usage and improve responsiveness for interactive applications.
/// - isolation: the isolation context to run the body closure.
/// - body: The custom execution body to manually control the running process
/// - Returns: an `ExecutableResult` type containing the return value of the closure.
Expand All @@ -318,6 +351,7 @@ public func run<Result>(
environment: Environment = .inherit,
workingDirectory: FilePath? = nil,
platformOptions: PlatformOptions = PlatformOptions(),
preferredBufferSize: Int? = nil,
isolation: isolated (any Actor)? = #isolation,
body: (
(
Expand All @@ -337,6 +371,7 @@ public func run<Result>(
)
return try await run(
configuration,
preferredBufferSize: preferredBufferSize,
isolation: isolation,
body: body
)
Expand Down Expand Up @@ -546,6 +581,11 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol, Error: Out
/// - configuration: The configuration to run.
/// - input: The input to send to the executable.
/// - error: How to manager executable standard error.
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
/// from the subprocess's standard output stream. If `nil`, uses the system page size
/// as the default buffer size. Larger buffer sizes may improve performance for
/// subprocesses that produce large amounts of output, while smaller buffer sizes
/// may reduce memory usage and improve responsiveness for interactive applications.
/// - isolation: the isolation context to run the body closure.
/// - body: The custom execution body to manually control the running process
/// - Returns an executableResult type containing the return value
Expand All @@ -554,6 +594,7 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
_ configuration: Configuration,
input: Input = .none,
error: Error = .discarded,
preferredBufferSize: Int? = nil,
isolation: isolated (any Actor)? = #isolation,
body: ((Execution, AsyncBufferSequence) async throws -> Result)
) async throws -> ExecutionResult<Result> where Error.OutputType == Void {
Expand All @@ -579,7 +620,10 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
}

// Body runs in the same isolation
let outputSequence = AsyncBufferSequence(diskIO: outputIOBox.take()!.consumeIOChannel())
let outputSequence = AsyncBufferSequence(
diskIO: outputIOBox.take()!.consumeIOChannel(),
preferredBufferSize: preferredBufferSize
)
let result = try await body(execution, outputSequence)
try await group.waitForAll()
return result
Expand All @@ -593,6 +637,11 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
/// - configuration: The configuration to run.
/// - input: The input to send to the executable.
/// - output: How to manager executable standard output.
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
/// from the subprocess's standard error stream. If `nil`, uses the system page size
/// as the default buffer size. Larger buffer sizes may improve performance for
/// subprocesses that produce large amounts of output, while smaller buffer sizes
/// may reduce memory usage and improve responsiveness for interactive applications.
/// - isolation: the isolation context to run the body closure.
/// - body: The custom execution body to manually control the running process
/// - Returns an executableResult type containing the return value
Expand All @@ -601,6 +650,7 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
_ configuration: Configuration,
input: Input = .none,
output: Output,
preferredBufferSize: Int? = nil,
isolation: isolated (any Actor)? = #isolation,
body: ((Execution, AsyncBufferSequence) async throws -> Result)
) async throws -> ExecutionResult<Result> where Output.OutputType == Void {
Expand All @@ -626,7 +676,10 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
}

// Body runs in the same isolation
let errorSequence = AsyncBufferSequence(diskIO: errorIOBox.take()!.consumeIOChannel())
let errorSequence = AsyncBufferSequence(
diskIO: errorIOBox.take()!.consumeIOChannel(),
preferredBufferSize: preferredBufferSize
)
let result = try await body(execution, errorSequence)
try await group.waitForAll()
return result
Expand All @@ -640,13 +693,19 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
/// - Parameters:
/// - configuration: The `Configuration` to run.
/// - error: How to manager executable standard error.
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
/// from the subprocess's standard output stream. If `nil`, uses the system page size
/// as the default buffer size. Larger buffer sizes may improve performance for
/// subprocesses that produce large amounts of output, while smaller buffer sizes
/// may reduce memory usage and improve responsiveness for interactive applications.
/// - isolation: the isolation context to run the body closure.
/// - body: The custom execution body to manually control the running process
/// - Returns an executableResult type containing the return value
/// of the closure.
public func run<Result, Error: OutputProtocol>(
_ configuration: Configuration,
error: Error = .discarded,
preferredBufferSize: Int? = nil,
isolation: isolated (any Actor)? = #isolation,
body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result)
) async throws -> ExecutionResult<Result> where Error.OutputType == Void {
Expand All @@ -658,7 +717,10 @@ public func run<Result, Error: OutputProtocol>(
error: try error.createPipe()
) { execution, inputIO, outputIO, errorIO in
let writer = StandardInputWriter(diskIO: inputIO!)
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeIOChannel())
let outputSequence = AsyncBufferSequence(
diskIO: outputIO!.consumeIOChannel(),
preferredBufferSize: preferredBufferSize
)
return try await body(execution, writer, outputSequence)
}
}
Expand All @@ -669,13 +731,19 @@ public func run<Result, Error: OutputProtocol>(
/// - Parameters:
/// - configuration: The `Configuration` to run.
/// - output: How to manager executable standard output.
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
/// from the subprocess's standard error stream. If `nil`, uses the system page size
/// as the default buffer size. Larger buffer sizes may improve performance for
/// subprocesses that produce large amounts of output, while smaller buffer sizes
/// may reduce memory usage and improve responsiveness for interactive applications.
/// - isolation: the isolation context to run the body closure.
/// - body: The custom execution body to manually control the running process
/// - Returns an executableResult type containing the return value
/// of the closure.
public func run<Result, Output: OutputProtocol>(
_ configuration: Configuration,
output: Output,
preferredBufferSize: Int? = nil,
isolation: isolated (any Actor)? = #isolation,
body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result)
) async throws -> ExecutionResult<Result> where Output.OutputType == Void {
Expand All @@ -687,21 +755,32 @@ public func run<Result, Output: OutputProtocol>(
error: try error.createPipe()
) { execution, inputIO, outputIO, errorIO in
let writer = StandardInputWriter(diskIO: inputIO!)
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeIOChannel())
let errorSequence = AsyncBufferSequence(
diskIO: errorIO!.consumeIOChannel(),
preferredBufferSize: preferredBufferSize
)
return try await body(execution, writer, errorSequence)
}
}

/// Run an executable with given parameters specified by a `Configuration`
/// and a custom closure to manage the running subprocess' lifetime, write to its
/// standard input, and stream its standard output and standard error.
/// - Parameters:
/// - configuration: The `Subprocess` configuration to run.
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
/// from the subprocess's standard output and error stream. If `nil`, uses the system page size
/// as the default buffer size. Larger buffer sizes may improve performance for
/// subprocesses that produce large amounts of output, while smaller buffer sizes
/// may reduce memory usage and improve responsiveness for interactive applications.
/// - isolation: the isolation context to run the body closure.
/// - body: The custom configuration body to manually control
/// the running process, write to its standard input, stream
/// the standard output and standard error.
/// - Returns: an `ExecutableResult` type containing the return value of the closure.
public func run<Result>(
_ configuration: Configuration,
preferredBufferSize: Int? = nil,
isolation: isolated (any Actor)? = #isolation,
body: ((Execution, StandardInputWriter, AsyncBufferSequence, AsyncBufferSequence) async throws -> Result)
) async throws -> ExecutionResult<Result> {
Expand All @@ -714,8 +793,14 @@ public func run<Result>(
error: try error.createPipe()
) { execution, inputIO, outputIO, errorIO in
let writer = StandardInputWriter(diskIO: inputIO!)
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeIOChannel())
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeIOChannel())
let outputSequence = AsyncBufferSequence(
diskIO: outputIO!.consumeIOChannel(),
preferredBufferSize: preferredBufferSize
)
let errorSequence = AsyncBufferSequence(
diskIO: errorIO!.consumeIOChannel(),
preferredBufferSize: preferredBufferSize
)
return try await body(execution, writer, outputSequence, errorSequence)
}
}
15 changes: 11 additions & 4 deletions Sources/Subprocess/AsyncBufferSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
public typealias Element = Buffer

private let diskIO: DiskIO
private let preferredBufferSize: Int
private var buffer: [Buffer]

internal init(diskIO: DiskIO) {
internal init(diskIO: DiskIO, preferredBufferSize: Int?) {
self.diskIO = diskIO
self.buffer = []
self.preferredBufferSize = preferredBufferSize ?? readBufferSize
}

/// Retrieve the next buffer in the sequence, or `nil` if
Expand All @@ -58,7 +60,7 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
// Read more data
let data = try await AsyncIO.shared.read(
from: self.diskIO,
upTo: readBufferSize
upTo: self.preferredBufferSize
)
guard let data else {
// We finished reading. Close the file descriptor now
Expand All @@ -84,14 +86,19 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
}

private let diskIO: DiskIO
private let preferredBufferSize: Int?

internal init(diskIO: DiskIO) {
internal init(diskIO: DiskIO, preferredBufferSize: Int?) {
self.diskIO = diskIO
self.preferredBufferSize = preferredBufferSize
}

/// Creates a iterator for this asynchronous sequence.
public func makeAsyncIterator() -> Iterator {
return Iterator(diskIO: self.diskIO)
return Iterator(
diskIO: self.diskIO,
preferredBufferSize: self.preferredBufferSize
)
}

/// Creates a line sequence to iterate through this `AsyncBufferSequence` line by line.
Expand Down
Loading
Loading