Skip to content

Commit e20e387

Browse files
authored
Introduce preferredBufferSize parameter to allow custom buffer size when streaming subprocess output (#168)
1 parent ae75fdf commit e20e387

File tree

3 files changed

+153
-11
lines changed

3 files changed

+153
-11
lines changed

Sources/Subprocess/API.swift

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol, Error: Out
156156
/// - platformOptions: The platform-specific options to use when running the executable.
157157
/// - input: The input to send to the executable.
158158
/// - error: How to manage executable standard error.
159+
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
160+
/// from the subprocess's standard error stream. If `nil`, uses the system page size
161+
/// as the default buffer size. Larger buffer sizes may improve performance for
162+
/// subprocesses that produce large amounts of output, while smaller buffer sizes
163+
/// may reduce memory usage and improve responsiveness for interactive applications.
159164
/// - isolation: the isolation context to run the body closure.
160165
/// - body: The custom execution body to manually control the running process.
161166
/// - Returns: an `ExecutableResult` type containing the return value of the closure.
@@ -167,6 +172,7 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
167172
platformOptions: PlatformOptions = PlatformOptions(),
168173
input: Input = .none,
169174
error: Error = .discarded,
175+
preferredBufferSize: Int? = nil,
170176
isolation: isolated (any Actor)? = #isolation,
171177
body: ((Execution, AsyncBufferSequence) async throws -> Result)
172178
) async throws -> ExecutionResult<Result> where Error.OutputType == Void {
@@ -181,6 +187,7 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
181187
configuration,
182188
input: input,
183189
error: error,
190+
preferredBufferSize: preferredBufferSize,
184191
isolation: isolation,
185192
body: body
186193
)
@@ -196,7 +203,12 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
196203
/// - platformOptions: The platform-specific options to use when running the executable.
197204
/// - input: The input to send to the executable.
198205
/// - output: How to manage executable standard output.
199-
/// - isolation: The isolation context to run the body closure.
206+
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
207+
/// from the subprocess's standard error stream. If `nil`, uses the system page size
208+
/// as the default buffer size. Larger buffer sizes may improve performance for
209+
/// subprocesses that produce large amounts of output, while smaller buffer sizes
210+
/// may reduce memory usage and improve responsiveness for interactive applications.
211+
/// - isolation: the isolation context to run the body closure.
200212
/// - body: The custom execution body to manually control the running process
201213
/// - Returns: an `ExecutableResult` type containing the return value of the closure.
202214
public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
@@ -207,6 +219,7 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
207219
platformOptions: PlatformOptions = PlatformOptions(),
208220
input: Input = .none,
209221
output: Output,
222+
preferredBufferSize: Int? = nil,
210223
isolation: isolated (any Actor)? = #isolation,
211224
body: ((Execution, AsyncBufferSequence) async throws -> Result)
212225
) async throws -> ExecutionResult<Result> where Output.OutputType == Void {
@@ -221,6 +234,7 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
221234
configuration,
222235
input: input,
223236
output: output,
237+
preferredBufferSize: preferredBufferSize,
224238
isolation: isolation,
225239
body: body
226240
)
@@ -235,6 +249,11 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
235249
/// - workingDirectory: The working directory in which to run the executable.
236250
/// - platformOptions: The platform-specific options to use when running the executable.
237251
/// - error: How to manage executable standard error.
252+
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
253+
/// from the subprocess's standard output stream. If `nil`, uses the system page size
254+
/// as the default buffer size. Larger buffer sizes may improve performance for
255+
/// subprocesses that produce large amounts of output, while smaller buffer sizes
256+
/// may reduce memory usage and improve responsiveness for interactive applications.
238257
/// - isolation: the isolation context to run the body closure.
239258
/// - body: The custom execution body to manually control the running process
240259
/// - Returns: An `ExecutableResult` type containing the return value of the closure.
@@ -245,6 +264,7 @@ public func run<Result, Error: OutputProtocol>(
245264
workingDirectory: FilePath? = nil,
246265
platformOptions: PlatformOptions = PlatformOptions(),
247266
error: Error = .discarded,
267+
preferredBufferSize: Int? = nil,
248268
isolation: isolated (any Actor)? = #isolation,
249269
body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result)
250270
) async throws -> ExecutionResult<Result> where Error.OutputType == Void {
@@ -258,6 +278,7 @@ public func run<Result, Error: OutputProtocol>(
258278
return try await run(
259279
configuration,
260280
error: error,
281+
preferredBufferSize: preferredBufferSize,
261282
isolation: isolation,
262283
body: body
263284
)
@@ -272,6 +293,11 @@ public func run<Result, Error: OutputProtocol>(
272293
/// - workingDirectory: The working directory in which to run the executable.
273294
/// - platformOptions: The platform-specific options to use when running the executable.
274295
/// - output: How to manage executable standard output.
296+
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
297+
/// from the subprocess's standard error stream. If `nil`, uses the system page size
298+
/// as the default buffer size. Larger buffer sizes may improve performance for
299+
/// subprocesses that produce large amounts of output, while smaller buffer sizes
300+
/// may reduce memory usage and improve responsiveness for interactive applications.
275301
/// - isolation: the isolation context to run the body closure.
276302
/// - body: The custom execution body to manually control the running process
277303
/// - Returns: An `ExecutableResult` type containing the return value of the closure.
@@ -282,6 +308,7 @@ public func run<Result, Output: OutputProtocol>(
282308
workingDirectory: FilePath? = nil,
283309
platformOptions: PlatformOptions = PlatformOptions(),
284310
output: Output,
311+
preferredBufferSize: Int? = nil,
285312
isolation: isolated (any Actor)? = #isolation,
286313
body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result)
287314
) async throws -> ExecutionResult<Result> where Output.OutputType == Void {
@@ -295,6 +322,7 @@ public func run<Result, Output: OutputProtocol>(
295322
return try await run(
296323
configuration,
297324
output: output,
325+
preferredBufferSize: preferredBufferSize,
298326
isolation: isolation,
299327
body: body
300328
)
@@ -309,6 +337,11 @@ public func run<Result, Output: OutputProtocol>(
309337
/// - environment: The environment in which to run the executable.
310338
/// - workingDirectory: The working directory in which to run the executable.
311339
/// - platformOptions: The platform-specific options to use when running the executable.
340+
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
341+
/// from the subprocess's standard output and error stream. If `nil`, uses the system page size
342+
/// as the default buffer size. Larger buffer sizes may improve performance for
343+
/// subprocesses that produce large amounts of output, while smaller buffer sizes
344+
/// may reduce memory usage and improve responsiveness for interactive applications.
312345
/// - isolation: the isolation context to run the body closure.
313346
/// - body: The custom execution body to manually control the running process
314347
/// - Returns: an `ExecutableResult` type containing the return value of the closure.
@@ -318,6 +351,7 @@ public func run<Result>(
318351
environment: Environment = .inherit,
319352
workingDirectory: FilePath? = nil,
320353
platformOptions: PlatformOptions = PlatformOptions(),
354+
preferredBufferSize: Int? = nil,
321355
isolation: isolated (any Actor)? = #isolation,
322356
body: (
323357
(
@@ -337,6 +371,7 @@ public func run<Result>(
337371
)
338372
return try await run(
339373
configuration,
374+
preferredBufferSize: preferredBufferSize,
340375
isolation: isolation,
341376
body: body
342377
)
@@ -546,6 +581,11 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol, Error: Out
546581
/// - configuration: The configuration to run.
547582
/// - input: The input to send to the executable.
548583
/// - error: How to manager executable standard error.
584+
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
585+
/// from the subprocess's standard output stream. If `nil`, uses the system page size
586+
/// as the default buffer size. Larger buffer sizes may improve performance for
587+
/// subprocesses that produce large amounts of output, while smaller buffer sizes
588+
/// may reduce memory usage and improve responsiveness for interactive applications.
549589
/// - isolation: the isolation context to run the body closure.
550590
/// - body: The custom execution body to manually control the running process
551591
/// - Returns an executableResult type containing the return value
@@ -554,6 +594,7 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
554594
_ configuration: Configuration,
555595
input: Input = .none,
556596
error: Error = .discarded,
597+
preferredBufferSize: Int? = nil,
557598
isolation: isolated (any Actor)? = #isolation,
558599
body: ((Execution, AsyncBufferSequence) async throws -> Result)
559600
) async throws -> ExecutionResult<Result> where Error.OutputType == Void {
@@ -579,7 +620,10 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
579620
}
580621

581622
// Body runs in the same isolation
582-
let outputSequence = AsyncBufferSequence(diskIO: outputIOBox.take()!.consumeIOChannel())
623+
let outputSequence = AsyncBufferSequence(
624+
diskIO: outputIOBox.take()!.consumeIOChannel(),
625+
preferredBufferSize: preferredBufferSize
626+
)
583627
let result = try await body(execution, outputSequence)
584628
try await group.waitForAll()
585629
return result
@@ -593,6 +637,11 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
593637
/// - configuration: The configuration to run.
594638
/// - input: The input to send to the executable.
595639
/// - output: How to manager executable standard output.
640+
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
641+
/// from the subprocess's standard error stream. If `nil`, uses the system page size
642+
/// as the default buffer size. Larger buffer sizes may improve performance for
643+
/// subprocesses that produce large amounts of output, while smaller buffer sizes
644+
/// may reduce memory usage and improve responsiveness for interactive applications.
596645
/// - isolation: the isolation context to run the body closure.
597646
/// - body: The custom execution body to manually control the running process
598647
/// - Returns an executableResult type containing the return value
@@ -601,6 +650,7 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
601650
_ configuration: Configuration,
602651
input: Input = .none,
603652
output: Output,
653+
preferredBufferSize: Int? = nil,
604654
isolation: isolated (any Actor)? = #isolation,
605655
body: ((Execution, AsyncBufferSequence) async throws -> Result)
606656
) async throws -> ExecutionResult<Result> where Output.OutputType == Void {
@@ -626,7 +676,10 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
626676
}
627677

628678
// Body runs in the same isolation
629-
let errorSequence = AsyncBufferSequence(diskIO: errorIOBox.take()!.consumeIOChannel())
679+
let errorSequence = AsyncBufferSequence(
680+
diskIO: errorIOBox.take()!.consumeIOChannel(),
681+
preferredBufferSize: preferredBufferSize
682+
)
630683
let result = try await body(execution, errorSequence)
631684
try await group.waitForAll()
632685
return result
@@ -640,13 +693,19 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
640693
/// - Parameters:
641694
/// - configuration: The `Configuration` to run.
642695
/// - error: How to manager executable standard error.
696+
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
697+
/// from the subprocess's standard output stream. If `nil`, uses the system page size
698+
/// as the default buffer size. Larger buffer sizes may improve performance for
699+
/// subprocesses that produce large amounts of output, while smaller buffer sizes
700+
/// may reduce memory usage and improve responsiveness for interactive applications.
643701
/// - isolation: the isolation context to run the body closure.
644702
/// - body: The custom execution body to manually control the running process
645703
/// - Returns an executableResult type containing the return value
646704
/// of the closure.
647705
public func run<Result, Error: OutputProtocol>(
648706
_ configuration: Configuration,
649707
error: Error = .discarded,
708+
preferredBufferSize: Int? = nil,
650709
isolation: isolated (any Actor)? = #isolation,
651710
body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result)
652711
) async throws -> ExecutionResult<Result> where Error.OutputType == Void {
@@ -658,7 +717,10 @@ public func run<Result, Error: OutputProtocol>(
658717
error: try error.createPipe()
659718
) { execution, inputIO, outputIO, errorIO in
660719
let writer = StandardInputWriter(diskIO: inputIO!)
661-
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeIOChannel())
720+
let outputSequence = AsyncBufferSequence(
721+
diskIO: outputIO!.consumeIOChannel(),
722+
preferredBufferSize: preferredBufferSize
723+
)
662724
return try await body(execution, writer, outputSequence)
663725
}
664726
}
@@ -669,13 +731,19 @@ public func run<Result, Error: OutputProtocol>(
669731
/// - Parameters:
670732
/// - configuration: The `Configuration` to run.
671733
/// - output: How to manager executable standard output.
734+
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
735+
/// from the subprocess's standard error stream. If `nil`, uses the system page size
736+
/// as the default buffer size. Larger buffer sizes may improve performance for
737+
/// subprocesses that produce large amounts of output, while smaller buffer sizes
738+
/// may reduce memory usage and improve responsiveness for interactive applications.
672739
/// - isolation: the isolation context to run the body closure.
673740
/// - body: The custom execution body to manually control the running process
674741
/// - Returns an executableResult type containing the return value
675742
/// of the closure.
676743
public func run<Result, Output: OutputProtocol>(
677744
_ configuration: Configuration,
678745
output: Output,
746+
preferredBufferSize: Int? = nil,
679747
isolation: isolated (any Actor)? = #isolation,
680748
body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result)
681749
) async throws -> ExecutionResult<Result> where Output.OutputType == Void {
@@ -687,21 +755,32 @@ public func run<Result, Output: OutputProtocol>(
687755
error: try error.createPipe()
688756
) { execution, inputIO, outputIO, errorIO in
689757
let writer = StandardInputWriter(diskIO: inputIO!)
690-
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeIOChannel())
758+
let errorSequence = AsyncBufferSequence(
759+
diskIO: errorIO!.consumeIOChannel(),
760+
preferredBufferSize: preferredBufferSize
761+
)
691762
return try await body(execution, writer, errorSequence)
692763
}
693764
}
694765

695766
/// Run an executable with given parameters specified by a `Configuration`
767+
/// and a custom closure to manage the running subprocess' lifetime, write to its
768+
/// standard input, and stream its standard output and standard error.
696769
/// - Parameters:
697770
/// - configuration: The `Subprocess` configuration to run.
771+
/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading
772+
/// from the subprocess's standard output and error stream. If `nil`, uses the system page size
773+
/// as the default buffer size. Larger buffer sizes may improve performance for
774+
/// subprocesses that produce large amounts of output, while smaller buffer sizes
775+
/// may reduce memory usage and improve responsiveness for interactive applications.
698776
/// - isolation: the isolation context to run the body closure.
699777
/// - body: The custom configuration body to manually control
700778
/// the running process, write to its standard input, stream
701779
/// the standard output and standard error.
702780
/// - Returns: an `ExecutableResult` type containing the return value of the closure.
703781
public func run<Result>(
704782
_ configuration: Configuration,
783+
preferredBufferSize: Int? = nil,
705784
isolation: isolated (any Actor)? = #isolation,
706785
body: ((Execution, StandardInputWriter, AsyncBufferSequence, AsyncBufferSequence) async throws -> Result)
707786
) async throws -> ExecutionResult<Result> {
@@ -714,8 +793,14 @@ public func run<Result>(
714793
error: try error.createPipe()
715794
) { execution, inputIO, outputIO, errorIO in
716795
let writer = StandardInputWriter(diskIO: inputIO!)
717-
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeIOChannel())
718-
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeIOChannel())
796+
let outputSequence = AsyncBufferSequence(
797+
diskIO: outputIO!.consumeIOChannel(),
798+
preferredBufferSize: preferredBufferSize
799+
)
800+
let errorSequence = AsyncBufferSequence(
801+
diskIO: errorIO!.consumeIOChannel(),
802+
preferredBufferSize: preferredBufferSize
803+
)
719804
return try await body(execution, writer, outputSequence, errorSequence)
720805
}
721806
}

Sources/Subprocess/AsyncBufferSequence.swift

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,13 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
4141
public typealias Element = Buffer
4242

4343
private let diskIO: DiskIO
44+
private let preferredBufferSize: Int
4445
private var buffer: [Buffer]
4546

46-
internal init(diskIO: DiskIO) {
47+
internal init(diskIO: DiskIO, preferredBufferSize: Int?) {
4748
self.diskIO = diskIO
4849
self.buffer = []
50+
self.preferredBufferSize = preferredBufferSize ?? readBufferSize
4951
}
5052

5153
/// Retrieve the next buffer in the sequence, or `nil` if
@@ -58,7 +60,7 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
5860
// Read more data
5961
let data = try await AsyncIO.shared.read(
6062
from: self.diskIO,
61-
upTo: readBufferSize
63+
upTo: self.preferredBufferSize
6264
)
6365
guard let data else {
6466
// We finished reading. Close the file descriptor now
@@ -84,14 +86,19 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
8486
}
8587

8688
private let diskIO: DiskIO
89+
private let preferredBufferSize: Int?
8790

88-
internal init(diskIO: DiskIO) {
91+
internal init(diskIO: DiskIO, preferredBufferSize: Int?) {
8992
self.diskIO = diskIO
93+
self.preferredBufferSize = preferredBufferSize
9094
}
9195

9296
/// Creates a iterator for this asynchronous sequence.
9397
public func makeAsyncIterator() -> Iterator {
94-
return Iterator(diskIO: self.diskIO)
98+
return Iterator(
99+
diskIO: self.diskIO,
100+
preferredBufferSize: self.preferredBufferSize
101+
)
95102
}
96103

97104
/// Creates a line sequence to iterate through this `AsyncBufferSequence` line by line.

0 commit comments

Comments
 (0)