Skip to content

Allow callers to run a subprocess and provide low and high water marks when using SequenceOutput to emit standard output and standard error as soon as it arrives. #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Sources/Subprocess/API.swift
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
}

// Body runs in the same isolation
let outputSequence = AsyncBufferSequence(diskIO: outputIOBox.take()!.consumeDiskIO())
let outputSequence = AsyncBufferSequence(diskIO: outputIOBox.take()!.consumeDiskIO(), streamOptions: platformOptions.streamOptions)
let result = try await body(execution, outputSequence)
try await group.waitForAll()
return result
Expand Down Expand Up @@ -250,7 +250,7 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
}

// Body runs in the same isolation
let errorSequence = AsyncBufferSequence(diskIO: errorIOBox.take()!.consumeDiskIO())
let errorSequence = AsyncBufferSequence(diskIO: errorIOBox.take()!.consumeDiskIO(), streamOptions: platformOptions.streamOptions)
let result = try await body(execution, errorSequence)
try await group.waitForAll()
return result
Expand Down Expand Up @@ -286,7 +286,7 @@ public func run<Result, Error: OutputProtocol>(
error: try error.createPipe()
) { execution, inputIO, outputIO, errorIO in
let writer = StandardInputWriter(diskIO: inputIO!)
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO(), streamOptions: platformOptions.streamOptions)
return try await body(execution, writer, outputSequence)
}
}
Expand Down Expand Up @@ -319,7 +319,7 @@ public func run<Result, Output: OutputProtocol>(
error: try error.createPipe()
) { execution, inputIO, outputIO, errorIO in
let writer = StandardInputWriter(diskIO: inputIO!)
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO(), streamOptions: platformOptions.streamOptions)
return try await body(execution, writer, errorSequence)
}
}
Expand Down Expand Up @@ -376,8 +376,8 @@ public func run<Result>(
error: try error.createPipe()
) { execution, inputIO, outputIO, errorIO in
let writer = StandardInputWriter(diskIO: inputIO!)
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO(), streamOptions: platformOptions.streamOptions)
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO(), streamOptions: platformOptions.streamOptions)
return try await body(execution, writer, outputSequence, errorSequence)
}
}
Expand Down Expand Up @@ -503,8 +503,8 @@ public func run<Result>(
error: try error.createPipe()
) { execution, inputIO, outputIO, errorIO in
let writer = StandardInputWriter(diskIO: inputIO!)
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO(), streamOptions: configuration.platformOptions.streamOptions)
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO(), streamOptions: configuration.platformOptions.streamOptions)
return try await body(execution, writer, outputSequence, errorSequence)
}
}
Expand Down
44 changes: 34 additions & 10 deletions Sources/Subprocess/AsyncBufferSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,44 +35,68 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
@_nonSendable
public struct Iterator: AsyncIteratorProtocol {
public typealias Element = Buffer
internal typealias Stream = AsyncThrowingStream<Buffer, Swift.Error>

private let diskIO: DiskIO
private var buffer: [UInt8]
private var currentPosition: Int
private var finished: Bool
private var streamIterator: Stream.AsyncIterator
private let continuation: Stream.Continuation
private var bytesRemaining: Int

internal init(diskIO: DiskIO) {
internal init(diskIO: DiskIO, streamOptions: PlatformOptions.StreamOptions) {
self.diskIO = diskIO
self.buffer = []
self.currentPosition = 0
self.finished = false
let (stream, continuation) = AsyncThrowingStream<Buffer, Swift.Error>.makeStream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned with the usage of the AsyncStream here for two reasons. The first is that AsyncStream has no way to propagate the internal async sequence back pressure to the external system that is producing the elements.

We had the same issue in swift-nio and created the NIOAsyncSequenceProducer that allowed us to bridge NIO's back pressure into the back pressure of an async sequence. The learned a lot from the APIs and implementation in NIO and have an open PR in swift-async-algorithms that generalizes this concept as a MultiProducerSingleConsumerAsyncChannel.

Now having said that it might be fine here back pressure wise since the DispatchIO is going to call the ioHandler multiple times but only up to readBufferSize. So we do have some maximum limit and our buffer can't balloon indefinitely.

However, this brings me to my second point which is the performance of this. AsyncStream is not super fast so I expect this PR to have performance impact when streaming a lot of data to/from a subprocess. It would be good to understand that impact and if the MultiProducerSingleConsumerAsyncChannel can improve this.

Lastly, aren't we missing to close the diskIO when the iterator is dropped. We probably want to setup a onTerminationCallback on the stream or the channel to close the diskIO right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into this in a separate PR. I think the AsyncStream based implementation is good enough for this PR.

Copy link
Contributor

@iCharlesHu iCharlesHu May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will investigate under #51

self.streamIterator = stream.makeAsyncIterator()
self.continuation = continuation
self.bytesRemaining = 0

#if !os(Windows)
if let minimumBufferSize = streamOptions.minimumBufferSize {
diskIO.setLimit(lowWater: minimumBufferSize)
}

if let maximumBufferSize = streamOptions.maximumBufferSize {
diskIO.setLimit(highWater: maximumBufferSize)
}
#endif
}

public func next() async throws -> Buffer? {
let data = try await self.diskIO.readChunk(
upToLength: readBufferSize
)
if data == nil {
// We finished reading. Close the file descriptor now
public mutating func next() async throws -> Buffer? {

if bytesRemaining <= 0 {
bytesRemaining = readBufferSize
diskIO.stream(upToLength: readBufferSize, continuation: continuation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to manually stream only up to readBufferSize here. The idea of streaming is that we want to read until the end of file so we should use .max here to specify DispatchIO should read data until an EOF is reached. You should only need to call stream once in the initializer.

Copy link
Author

@rdingman rdingman May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu

Are we sure we want that? That exacerbates the concerns of @weissi and @FranzBusch. With the current approach, the easing of back pressure is limited to buffering up to about readBufferSize for slow consumers. This suggestion opens that up to the entire output leaving it essentially unbounded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to unblock this PR so I created #51 to address the AsyncStream issue.

However, after some time to think over and studying @FranzBusch’s suggestion (apple/swift-async-algorithms#305.), I’ve come to the conclusion that we can’t make the read function “push based” (using AsyncStream) if we want to depend on DispatchIO. The main reason is that DispatchIO doesn’t support back pressure, so any back pressure mechanism we implement would be limited by DispatchIO since we can’t instruct it to slow down (blocking the handler block is a no-go).

The current method (AsyncStream + buffer size limit) is better because it prevents the unlimited buffer problem, but it loses the advantage of letting DispatchIO determine the optimal buffer size. Instead of calling .read once with .max, we call it multiple times with a small buffer size. Additionally, as @FranzBusch pointed out, AsyncStream has performance issues, making it not worthwhile to use any more.

Therefore, for this PR, could you please:

  • Remove the use of AsyncStream and .stream() and instead use withCheckedThrowingContinuation and .read() with a simple async function to read data up to a max length (similar to my implementation here https://github.com/swiftlang/swift-subprocess/pull/48/files)
  • This change means we can’t rely on calling .setLimit(lowWater:) / .setLimit(highWater:) since .read() will always read until the buffer size is reached. Therefore, instead of PlatformOptions.StreamOptions, we just need a let preferredStreamBufferSize: Int to set the actual buffer size we instruct .read() to fetch each time.

I believe the above approach should still allow callers to set how frequent they wish to be updated without needing to introduce AsyncStream.

}

if let buffer = try await streamIterator.next() {
bytesRemaining -= buffer.count
return buffer
} else {
#if os(Windows)
try self.diskIO.close()
#else
self.diskIO.close()
#endif
return nil
}
return data
}
}

private let diskIO: DiskIO
private let streamOptions: PlatformOptions.StreamOptions

internal init(diskIO: DiskIO) {
internal init(diskIO: DiskIO, streamOptions: PlatformOptions.StreamOptions) {
self.diskIO = diskIO
self.streamOptions = streamOptions
}

public func makeAsyncIterator() -> Iterator {
return Iterator(diskIO: self.diskIO)
return Iterator(diskIO: self.diskIO, streamOptions: streamOptions)
}
}

Expand Down
15 changes: 15 additions & 0 deletions Sources/Subprocess/Platforms/Subprocess+Darwin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public struct PlatformOptions: Sendable {
/// Creates a session and sets the process group ID
/// i.e. Detach from the terminal.
public var createSession: Bool = false

public var streamOptions: StreamOptions = .init()

/// An ordered list of steps in order to tear down the child
/// process in case the parent task is cancelled before
/// the child proces terminates.
Expand Down Expand Up @@ -126,6 +129,18 @@ extension PlatformOptions {
#endif
}

extension PlatformOptions {
public struct StreamOptions: Sendable {
let minimumBufferSize: Int?
let maximumBufferSize: Int?

public init(minimumBufferSize: Int? = nil, maximumBufferSize: Int? = nil) {
self.minimumBufferSize = minimumBufferSize
self.maximumBufferSize = maximumBufferSize
}
}
}

extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible {
internal func description(withIndent indent: Int) -> String {
let indent = String(repeating: " ", count: indent * 4)
Expand Down
15 changes: 15 additions & 0 deletions Sources/Subprocess/Platforms/Subprocess+Linux.swift
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ public struct PlatformOptions: Sendable {
// Creates a session and sets the process group ID
// i.e. Detach from the terminal.
public var createSession: Bool = false

public var streamOptions: StreamOptions = .init()

/// An ordered list of steps in order to tear down the child
/// process in case the parent task is cancelled before
/// the child proces terminates.
Expand All @@ -235,6 +238,18 @@ public struct PlatformOptions: Sendable {
public init() {}
}

extension PlatformOptions {
public struct StreamOptions: Sendable {
let minimumBufferSize: Int?
let maximumBufferSize: Int?

public init(minimumBufferSize: Int? = nil, maximumBufferSize: Int? = nil) {
self.minimumBufferSize = minimumBufferSize
self.maximumBufferSize = maximumBufferSize
}
}
}

extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible {
internal func description(withIndent indent: Int) -> String {
let indent = String(repeating: " ", count: indent * 4)
Expand Down
51 changes: 20 additions & 31 deletions Sources/Subprocess/Platforms/Subprocess+Unix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ extension TrackedFileDescriptor {
}
}
)

return .init(dispatchIO, closeWhenDone: self.closeWhenDone)
}
}
Expand All @@ -414,37 +415,25 @@ extension DispatchIO {
#if SubprocessSpan
@available(SubprocessSpan, *)
#endif
internal func readChunk(upToLength maxLength: Int) async throws -> AsyncBufferSequence.Buffer? {
return try await withCheckedThrowingContinuation { continuation in
var buffer: DispatchData = .empty
self.read(
offset: 0,
length: maxLength,
queue: .global()
) { done, data, error in
if error != 0 {
continuation.resume(
throwing: SubprocessError(
code: .init(.failedToReadFromSubprocess),
underlyingError: .init(rawValue: error)
)
)
return
}
if let data = data {
if buffer.isEmpty {
buffer = data
} else {
buffer.append(data)
}
}
if done {
if !buffer.isEmpty {
continuation.resume(returning: AsyncBufferSequence.Buffer(data: buffer))
} else {
continuation.resume(returning: nil)
}
}
internal func stream(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) {
self.read(
offset: 0,
length: maxLength,
queue: .global()
) { done, data, error in
if error != 0 {
continuation.finish(throwing: SubprocessError(
code: .init(.failedToReadFromSubprocess),
underlyingError: .init(rawValue: error)
))
return
}

// Treat empty data and nil as the same
if let data = data.map({ $0.isEmpty ? nil : $0 }) ?? nil {
continuation.yield(AsyncBufferSequence.Buffer(data: data))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, this strips backpressure. If you do that then a slow consumer and fast producer will OOM kill you. You won't be able to use AsyncStream here (it's a type that should be avoided unless you use AsyncStream(unfolding: {...}) or set it to drop elements if the buffer is full (but of course that's not possible here because it would lose data).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the concern, but I don't think this is the right PR to address this issue since it's trying to solve a different problem. I'll address this in a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #51

} else if done {
continuation.finish()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should stick with switch just to be explicit that we handle all possible combinations. For example, it's possible done == true && data != nil the current implementation won't call finish().

Copy link
Author

@rdingman rdingman May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't go back to a switch because I re-read the semantics of DispatchIO.read() and restructured this code to handle these semantics. Now all code paths either call fatalError for an invalid combination of handler arguments, yield non-empty data, or call finish() when we've reach the end of the file.

}
}
}
Expand Down
66 changes: 56 additions & 10 deletions Sources/Subprocess/Platforms/Subprocess+Windows.swift
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ public struct PlatformOptions: Sendable {
/// process in case the parent task is cancelled before
/// the child proces terminates.
/// Always ends in forcefully terminate at the end.
internal var streamOptions: StreamOptions = .init()
public var teardownSequence: [TeardownStep] = []
/// A closure to configure platform-specific
/// spawning constructs. This closure enables direct
Expand All @@ -441,6 +442,12 @@ public struct PlatformOptions: Sendable {
public init() {}
}

extension PlatformOptions {
internal struct StreamOptions: Sendable {
internal init() {}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this might make code sharing realizer but I don't think it's appropriate to add an empty struct here that will never be used (same with the streamOption property on Windows).

(Sorry I might have missed this in earlier reviews)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rearranged how we use StreamOptions so it is only defined and used on platforms that need it.

extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible {
internal func description(withIndent indent: Int) -> String {
let indent = String(repeating: " ", count: indent * 4)
Expand Down Expand Up @@ -1071,18 +1078,57 @@ extension FileDescriptor {
}

extension FileDescriptor {
internal func readChunk(upToLength maxLength: Int) async throws -> AsyncBufferSequence.Buffer? {
return try await withCheckedThrowingContinuation { continuation in
self.readUntilEOF(
upToLength: maxLength
) { result in
switch result {
case .failure(let error):
continuation.resume(throwing: error)
case .success(let bytes):
continuation.resume(returning: AsyncBufferSequence.Buffer(data: bytes))
internal func stream(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) {
do {
var totalBytesRead: Int = 0

while totalBytesRead < maxLength {
let values = try [UInt8](
unsafeUninitializedCapacity: maxLength - totalBytesRead
) { buffer, initializedCount in
guard let baseAddress = buffer.baseAddress else {
initializedCount = 0
return
}

var bytesRead: DWORD = 0
let readSucceed = ReadFile(
self.platformDescriptor,
UnsafeMutableRawPointer(mutating: baseAddress),
DWORD(maxLength - totalBytesRead),
&bytesRead,
nil
)

if !readSucceed {
// Windows throws ERROR_BROKEN_PIPE when the pipe is closed
let error = GetLastError()
if error == ERROR_BROKEN_PIPE {
// We are done reading
initializedCount = 0
} else {
initializedCount = 0
throw SubprocessError(
code: .init(.failedToReadFromSubprocess),
underlyingError: .init(rawValue: error)
)
}
} else {
// We successfully read the current round
initializedCount += Int(bytesRead)
}
}

if values.count > 0 {
totalBytesRead += values.count
continuation.yield(AsyncBufferSequence.Buffer(data: values))
} else {
continuation.finish()
return
}
}
} catch {
continuation.finish(throwing: error)
}
}

Expand Down
44 changes: 44 additions & 0 deletions Tests/SubprocessTests/SubprocessTests+Unix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,50 @@ extension SubprocessUnixTests {
#expect(catResult.terminationStatus.isSuccess)
#expect(catResult.standardError == expected)
}

@Test func testSlowDripRedirectedOutputRedirectToSequence() async throws {
guard #available(SubprocessSpan , *) else {
return
}
let threshold: Double = 0.5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately in tests you'll have to write

guard #available(SubprocessSpan , *) else {
    return
}

In the beginning to work around the same availability issue. See other tests for examples.

Copy link
Author

@rdingman rdingman May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu When I first started working on this, I was very confused as to why some of the tests weren't running my new code and it was because of this check. Wouldn't it be better to have them skipped and noted as such in the test output rather than falsely succeeding? I'm thinking something like this:

    @Test(
        .enabled(
            if: {
                if #available(SubprocessSpan , *) {
                    true
                } else {
                    false
                }
            }(),
            "This test requires SubprocessSpan"
        )
    )
    func testSlowDripRedirectedOutputRedirectToSequence() async throws {
    }

Of course, we can have a helper function to make this less verbose.

Thoughts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu I went ahead and conditionalized this one test this way as an example. Let me know if you don't like that and would like me to revert to a guard

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdingman unfortunately this won't work because on Swift 6.2 and above the compiler will complain the code inside of testSlowDripRedirectedOutputRedirectToSequence needs SubprocessSpan and ask you to put @available around the function. That doesn't work either because the macro won't pick it up.

Unfortunately so far this is the only way I found that works. Very ugly... but it'll have to do for now.

Copy link
Author

@rdingman rdingman May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu Hmm. This builds just fine for me with the latest Swift 6.2 toolchain, has the intended results, and works with the currently shipping Swift 6.1 toolchain. However, I'll go ahead and revert this and match the existing tests.


let script = """
echo "DONE"
sleep \(threshold)
"""

var platformOptions = PlatformOptions()
platformOptions.streamOptions = .init(minimumBufferSize: 0)

let start = ContinuousClock().now

let catResult = try await Subprocess.run(
.path("/bin/bash"),
arguments: ["-c", script],
platformOptions: platformOptions,
error: .discarded,
body: { (execution, standardOutput) in
for try await chunk in standardOutput {
let string = chunk._withUnsafeBytes { String(decoding: $0, as: UTF8.self) }

if string.hasPrefix("DONE") {
let end = ContinuousClock().now

if (end - start) > .seconds(threshold) {
return "Failure"

} else {
return "Success"
}
}
}

return "Failure"
}
)
#expect(catResult.terminationStatus.isSuccess)
#expect(catResult.value == "Success")
}
}

// MARK: - PlatformOption Tests
Expand Down