diff --git a/Sources/Subprocess/API.swift b/Sources/Subprocess/API.swift index 6ae7e1f..f70c679 100644 --- a/Sources/Subprocess/API.swift +++ b/Sources/Subprocess/API.swift @@ -156,6 +156,11 @@ public func run( platformOptions: PlatformOptions = PlatformOptions(), input: Input = .none, error: Error = .discarded, + preferredBufferSize: Int? = nil, isolation: isolated (any Actor)? = #isolation, body: ((Execution, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionResult where Error.OutputType == Void { @@ -181,6 +187,7 @@ public func run( configuration, input: input, error: error, + preferredBufferSize: preferredBufferSize, isolation: isolation, body: body ) @@ -196,7 +203,12 @@ public func run( /// - platformOptions: The platform-specific options to use when running the executable. /// - input: The input to send to the executable. /// - output: How to manage executable standard output. -/// - isolation: The isolation context to run the body closure. +/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading +/// from the subprocess's standard error stream. If `nil`, uses the system page size +/// as the default buffer size. Larger buffer sizes may improve performance for +/// subprocesses that produce large amounts of output, while smaller buffer sizes +/// may reduce memory usage and improve responsiveness for interactive applications. +/// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process /// - Returns: an `ExecutableResult` type containing the return value of the closure. public func run( @@ -207,6 +219,7 @@ public func run( platformOptions: PlatformOptions = PlatformOptions(), input: Input = .none, output: Output, + preferredBufferSize: Int? = nil, isolation: isolated (any Actor)? = #isolation, body: ((Execution, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionResult where Output.OutputType == Void { @@ -221,6 +234,7 @@ public func run( configuration, input: input, output: output, + preferredBufferSize: preferredBufferSize, isolation: isolation, body: body ) @@ -235,6 +249,11 @@ public func run( /// - workingDirectory: The working directory in which to run the executable. /// - platformOptions: The platform-specific options to use when running the executable. /// - error: How to manage executable standard error. +/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading +/// from the subprocess's standard output stream. If `nil`, uses the system page size +/// as the default buffer size. Larger buffer sizes may improve performance for +/// subprocesses that produce large amounts of output, while smaller buffer sizes +/// may reduce memory usage and improve responsiveness for interactive applications. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process /// - Returns: An `ExecutableResult` type containing the return value of the closure. @@ -245,6 +264,7 @@ public func run( workingDirectory: FilePath? = nil, platformOptions: PlatformOptions = PlatformOptions(), error: Error = .discarded, + preferredBufferSize: Int? = nil, isolation: isolated (any Actor)? = #isolation, body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionResult where Error.OutputType == Void { @@ -258,6 +278,7 @@ public func run( return try await run( configuration, error: error, + preferredBufferSize: preferredBufferSize, isolation: isolation, body: body ) @@ -272,6 +293,11 @@ public func run( /// - workingDirectory: The working directory in which to run the executable. /// - platformOptions: The platform-specific options to use when running the executable. /// - output: How to manage executable standard output. +/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading +/// from the subprocess's standard error stream. If `nil`, uses the system page size +/// as the default buffer size. Larger buffer sizes may improve performance for +/// subprocesses that produce large amounts of output, while smaller buffer sizes +/// may reduce memory usage and improve responsiveness for interactive applications. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process /// - Returns: An `ExecutableResult` type containing the return value of the closure. @@ -282,6 +308,7 @@ public func run( workingDirectory: FilePath? = nil, platformOptions: PlatformOptions = PlatformOptions(), output: Output, + preferredBufferSize: Int? = nil, isolation: isolated (any Actor)? = #isolation, body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionResult where Output.OutputType == Void { @@ -295,6 +322,7 @@ public func run( return try await run( configuration, output: output, + preferredBufferSize: preferredBufferSize, isolation: isolation, body: body ) @@ -309,6 +337,11 @@ public func run( /// - environment: The environment in which to run the executable. /// - workingDirectory: The working directory in which to run the executable. /// - platformOptions: The platform-specific options to use when running the executable. +/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading +/// from the subprocess's standard output and error stream. If `nil`, uses the system page size +/// as the default buffer size. Larger buffer sizes may improve performance for +/// subprocesses that produce large amounts of output, while smaller buffer sizes +/// may reduce memory usage and improve responsiveness for interactive applications. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process /// - Returns: an `ExecutableResult` type containing the return value of the closure. @@ -318,6 +351,7 @@ public func run( environment: Environment = .inherit, workingDirectory: FilePath? = nil, platformOptions: PlatformOptions = PlatformOptions(), + preferredBufferSize: Int? = nil, isolation: isolated (any Actor)? = #isolation, body: ( ( @@ -337,6 +371,7 @@ public func run( ) return try await run( configuration, + preferredBufferSize: preferredBufferSize, isolation: isolation, body: body ) @@ -546,6 +581,11 @@ public func run( _ configuration: Configuration, input: Input = .none, error: Error = .discarded, + preferredBufferSize: Int? = nil, isolation: isolated (any Actor)? = #isolation, body: ((Execution, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionResult where Error.OutputType == Void { @@ -579,7 +620,10 @@ public func run( } // Body runs in the same isolation - let outputSequence = AsyncBufferSequence(diskIO: outputIOBox.take()!.consumeIOChannel()) + let outputSequence = AsyncBufferSequence( + diskIO: outputIOBox.take()!.consumeIOChannel(), + preferredBufferSize: preferredBufferSize + ) let result = try await body(execution, outputSequence) try await group.waitForAll() return result @@ -593,6 +637,11 @@ public func run( /// - configuration: The configuration to run. /// - input: The input to send to the executable. /// - output: How to manager executable standard output. +/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading +/// from the subprocess's standard error stream. If `nil`, uses the system page size +/// as the default buffer size. Larger buffer sizes may improve performance for +/// subprocesses that produce large amounts of output, while smaller buffer sizes +/// may reduce memory usage and improve responsiveness for interactive applications. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process /// - Returns an executableResult type containing the return value @@ -601,6 +650,7 @@ public func run( _ configuration: Configuration, input: Input = .none, output: Output, + preferredBufferSize: Int? = nil, isolation: isolated (any Actor)? = #isolation, body: ((Execution, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionResult where Output.OutputType == Void { @@ -626,7 +676,10 @@ public func run( } // Body runs in the same isolation - let errorSequence = AsyncBufferSequence(diskIO: errorIOBox.take()!.consumeIOChannel()) + let errorSequence = AsyncBufferSequence( + diskIO: errorIOBox.take()!.consumeIOChannel(), + preferredBufferSize: preferredBufferSize + ) let result = try await body(execution, errorSequence) try await group.waitForAll() return result @@ -640,6 +693,11 @@ public func run( /// - Parameters: /// - configuration: The `Configuration` to run. /// - error: How to manager executable standard error. +/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading +/// from the subprocess's standard output stream. If `nil`, uses the system page size +/// as the default buffer size. Larger buffer sizes may improve performance for +/// subprocesses that produce large amounts of output, while smaller buffer sizes +/// may reduce memory usage and improve responsiveness for interactive applications. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process /// - Returns an executableResult type containing the return value @@ -647,6 +705,7 @@ public func run( public func run( _ configuration: Configuration, error: Error = .discarded, + preferredBufferSize: Int? = nil, isolation: isolated (any Actor)? = #isolation, body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionResult where Error.OutputType == Void { @@ -658,7 +717,10 @@ public func run( error: try error.createPipe() ) { execution, inputIO, outputIO, errorIO in let writer = StandardInputWriter(diskIO: inputIO!) - let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeIOChannel()) + let outputSequence = AsyncBufferSequence( + diskIO: outputIO!.consumeIOChannel(), + preferredBufferSize: preferredBufferSize + ) return try await body(execution, writer, outputSequence) } } @@ -669,6 +731,11 @@ public func run( /// - Parameters: /// - configuration: The `Configuration` to run. /// - output: How to manager executable standard output. +/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading +/// from the subprocess's standard error stream. If `nil`, uses the system page size +/// as the default buffer size. Larger buffer sizes may improve performance for +/// subprocesses that produce large amounts of output, while smaller buffer sizes +/// may reduce memory usage and improve responsiveness for interactive applications. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process /// - Returns an executableResult type containing the return value @@ -676,6 +743,7 @@ public func run( public func run( _ configuration: Configuration, output: Output, + preferredBufferSize: Int? = nil, isolation: isolated (any Actor)? = #isolation, body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionResult where Output.OutputType == Void { @@ -687,14 +755,24 @@ public func run( error: try error.createPipe() ) { execution, inputIO, outputIO, errorIO in let writer = StandardInputWriter(diskIO: inputIO!) - let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeIOChannel()) + let errorSequence = AsyncBufferSequence( + diskIO: errorIO!.consumeIOChannel(), + preferredBufferSize: preferredBufferSize + ) return try await body(execution, writer, errorSequence) } } /// Run an executable with given parameters specified by a `Configuration` +/// and a custom closure to manage the running subprocess' lifetime, write to its +/// standard input, and stream its standard output and standard error. /// - Parameters: /// - configuration: The `Subprocess` configuration to run. +/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading +/// from the subprocess's standard output and error stream. If `nil`, uses the system page size +/// as the default buffer size. Larger buffer sizes may improve performance for +/// subprocesses that produce large amounts of output, while smaller buffer sizes +/// may reduce memory usage and improve responsiveness for interactive applications. /// - isolation: the isolation context to run the body closure. /// - body: The custom configuration body to manually control /// the running process, write to its standard input, stream @@ -702,6 +780,7 @@ public func run( /// - Returns: an `ExecutableResult` type containing the return value of the closure. public func run( _ configuration: Configuration, + preferredBufferSize: Int? = nil, isolation: isolated (any Actor)? = #isolation, body: ((Execution, StandardInputWriter, AsyncBufferSequence, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionResult { @@ -714,8 +793,14 @@ public func run( error: try error.createPipe() ) { execution, inputIO, outputIO, errorIO in let writer = StandardInputWriter(diskIO: inputIO!) - let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeIOChannel()) - let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeIOChannel()) + let outputSequence = AsyncBufferSequence( + diskIO: outputIO!.consumeIOChannel(), + preferredBufferSize: preferredBufferSize + ) + let errorSequence = AsyncBufferSequence( + diskIO: errorIO!.consumeIOChannel(), + preferredBufferSize: preferredBufferSize + ) return try await body(execution, writer, outputSequence, errorSequence) } } diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index 3984d76..b184c3c 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -41,11 +41,13 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { public typealias Element = Buffer private let diskIO: DiskIO + private let preferredBufferSize: Int private var buffer: [Buffer] - internal init(diskIO: DiskIO) { + internal init(diskIO: DiskIO, preferredBufferSize: Int?) { self.diskIO = diskIO self.buffer = [] + self.preferredBufferSize = preferredBufferSize ?? readBufferSize } /// Retrieve the next buffer in the sequence, or `nil` if @@ -58,7 +60,7 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { // Read more data let data = try await AsyncIO.shared.read( from: self.diskIO, - upTo: readBufferSize + upTo: self.preferredBufferSize ) guard let data else { // We finished reading. Close the file descriptor now @@ -84,14 +86,19 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { } private let diskIO: DiskIO + private let preferredBufferSize: Int? - internal init(diskIO: DiskIO) { + internal init(diskIO: DiskIO, preferredBufferSize: Int?) { self.diskIO = diskIO + self.preferredBufferSize = preferredBufferSize } /// Creates a iterator for this asynchronous sequence. public func makeAsyncIterator() -> Iterator { - return Iterator(diskIO: self.diskIO) + return Iterator( + diskIO: self.diskIO, + preferredBufferSize: self.preferredBufferSize + ) } /// Creates a line sequence to iterate through this `AsyncBufferSequence` line by line. diff --git a/Tests/SubprocessTests/IntegrationTests.swift b/Tests/SubprocessTests/IntegrationTests.swift index 18bc7d4..d1f5479 100644 --- a/Tests/SubprocessTests/IntegrationTests.swift +++ b/Tests/SubprocessTests/IntegrationTests.swift @@ -1589,6 +1589,54 @@ extension SubprocessIntegrationTests { #expect(result.standardOutput?.trimmingNewLineAndQuotes() == "") #expect(result.standardError?.trimmingNewLineAndQuotes() == "") } + + @Test func testCustomStreamingBufferSize() async throws { + #if os(Windows) + let setup = TestSetup( + executable: .name("cmd.exe"), + arguments: [ + "/c", + """ + @echo off + echo one + :loop + timeout /t 1 >nul + goto loop + """, + ] + ) + #else + let setup = TestSetup( + executable: .path("/bin/sh"), + arguments: [ + "-c", + """ + echo one; + while true; do sleep 1; done + """, + ] + ) + #endif + _ = try await _run( + setup, + input: .none, + error: .discarded, + preferredBufferSize: 1 + ) { execution, standardOutput in + for try await line in standardOutput.lines() { + // If we use default buffer size this test will hang + // because Subprocess is stuck on waiting 16k worth of + // output when there are only 3. + #expect(line.trimmingNewLineAndQuotes() == "one") + // Kill the child process since it intentionally hang + #if os(Windows) + try execution.terminate(withExitCode: 0) + #else + try execution.send(signal: .terminate) + #endif + } + } + } } // MARK: - Other Tests @@ -2222,6 +2270,7 @@ func _run< _ setup: TestSetup, input: Input, error: Error, + preferredBufferSize: Int? = nil, body: ((Execution, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionResult where Error.OutputType == Void { return try await Subprocess.run( @@ -2231,6 +2280,7 @@ func _run< workingDirectory: setup.workingDirectory, input: input, error: error, + preferredBufferSize: preferredBufferSize, body: body ) }